summaryrefslogtreecommitdiff
path: root/src/main/java/com/amazon/carbonado/repo/replicated
diff options
context:
space:
mode:
authorBrian S. O'Neill <bronee@gmail.com>2006-08-30 02:24:36 +0000
committerBrian S. O'Neill <bronee@gmail.com>2006-08-30 02:24:36 +0000
commit5a2aeb3ab59f286a6d2a5d8b7d62f4b17132b2b7 (patch)
tree6901a95bdd325cabfc3613fb11dbcc6afbccddac /src/main/java/com/amazon/carbonado/repo/replicated
parent8f88478b4be9c3165d678c43640052c8fc7d8943 (diff)
Add core repository implementations
Diffstat (limited to 'src/main/java/com/amazon/carbonado/repo/replicated')
-rw-r--r--src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java583
-rw-r--r--src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepositoryBuilder.java161
-rw-r--r--src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedStorage.java121
-rw-r--r--src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java398
-rw-r--r--src/main/java/com/amazon/carbonado/repo/replicated/package-info.java27
5 files changed, 1290 insertions, 0 deletions
diff --git a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java
new file mode 100644
index 0000000..5003053
--- /dev/null
+++ b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java
@@ -0,0 +1,583 @@
+/*
+ * Copyright 2006 Amazon Technologies, Inc. or its affiliates.
+ * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
+ * of Amazon Technologies, Inc. or its affiliates. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.amazon.carbonado.repo.replicated;
+
+import java.util.Comparator;
+import java.util.IdentityHashMap;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.cojen.util.BeanComparator;
+
+import com.amazon.carbonado.Cursor;
+import com.amazon.carbonado.FetchInterruptedException;
+import com.amazon.carbonado.IsolationLevel;
+import com.amazon.carbonado.MalformedTypeException;
+import com.amazon.carbonado.Query;
+import com.amazon.carbonado.Repository;
+import com.amazon.carbonado.RepositoryException;
+import com.amazon.carbonado.Storable;
+import com.amazon.carbonado.Storage;
+import com.amazon.carbonado.SupportException;
+import com.amazon.carbonado.Transaction;
+
+import com.amazon.carbonado.capability.Capability;
+import com.amazon.carbonado.capability.IndexInfo;
+import com.amazon.carbonado.capability.IndexInfoCapability;
+import com.amazon.carbonado.capability.ResyncCapability;
+import com.amazon.carbonado.capability.ShutdownCapability;
+import com.amazon.carbonado.capability.StorableInfoCapability;
+
+import com.amazon.carbonado.info.Direction;
+import com.amazon.carbonado.info.StorableIntrospector;
+
+import com.amazon.carbonado.spi.TransactionPair;
+
+import com.amazon.carbonado.util.Throttle;
+
+/**
+ * A ReplicatedRepository binds two repositories together. One will be used
+ * for reading (the replica), and the other will be used for writing; changes
+ * to the master repository will be copied to the replica.
+ *
+ * @author Don Schneider
+ */
+class ReplicatedRepository
+ implements Repository,
+ ResyncCapability,
+ ShutdownCapability,
+ StorableInfoCapability
+{
+ // Constants used by resync method.
+ private static final int RESYNC_QUEUE_SIZE = 1000;
+ private static final long RESYNC_QUEUE_TIMEOUT_MS = 30000;
+
+ /**
+ * Utility method to select the natural ordering of a storage, by looking
+ * for a clustered index on the primary key. Returns null if no clustered
+ * index was found.
+ *
+ * TODO: Try to incorporate this into standard storage interface somehow.
+ */
+ private static String[] selectNaturalOrder(Repository repo, Class<? extends Storable> type)
+ throws RepositoryException
+ {
+ IndexInfoCapability capability = repo.getCapability(IndexInfoCapability.class);
+ if (capability == null) {
+ return null;
+ }
+ IndexInfo info = null;
+ for (IndexInfo candidate : capability.getIndexInfo(type)) {
+ if (candidate.isClustered()) {
+ info = candidate;
+ break;
+ }
+ }
+ if (info == null) {
+ return null;
+ }
+
+ // Verify index is part of primary key.
+ Set<String> pkSet = StorableIntrospector.examine(type).getPrimaryKeyProperties().keySet();
+
+ String[] propNames = info.getPropertyNames();
+ for (String prop : propNames) {
+ if (!pkSet.contains(prop)) {
+ return null;
+ }
+ }
+
+ String[] orderBy = new String[pkSet.size()];
+
+ Direction[] directions = info.getPropertyDirections();
+
+ // Clone to remove elements.
+ pkSet = new LinkedHashSet<String>(pkSet);
+
+ int i;
+ for (i=0; i<propNames.length; i++) {
+ orderBy[i] = ((directions[i] == Direction.DESCENDING) ? "-" : "+") + propNames[i];
+ pkSet.remove(propNames[i]);
+ }
+
+ // Append any remaining pk properties, to ensure complete ordering.
+ if (pkSet.size() > 0) {
+ for (String prop : pkSet) {
+ orderBy[i++] = prop;
+ }
+ }
+
+ return orderBy;
+ }
+
+ private String mName;
+ private Repository mReplicaRepository;
+ private Repository mMasterRepository;
+
+ // Map of storages by storable class
+ private final Map<Class<?>, ReplicatedStorage<?>> mStorages;
+
+ ReplicatedRepository(String aName,
+ Repository aReplicaRepository,
+ Repository aMasterRepository) {
+ mName = aName;
+ mReplicaRepository = aReplicaRepository;
+ mMasterRepository = aMasterRepository;
+
+ mStorages = new IdentityHashMap<Class<?>, ReplicatedStorage<?>>();
+ }
+
+ public String getName() {
+ return mName;
+ }
+
+ Repository getReplicaRepository() {
+ return mReplicaRepository;
+ }
+
+ Repository getMasterRepository() {
+ return mMasterRepository;
+ }
+
+ @SuppressWarnings("unchecked")
+ public <S extends Storable> Storage<S> storageFor(Class<S> type)
+ throws MalformedTypeException, SupportException, RepositoryException
+ {
+ synchronized (mStorages) {
+ ReplicatedStorage storage = mStorages.get(type);
+ if (storage == null) {
+ // Examine and throw exception if there is a problem.
+ StorableIntrospector.examine(type);
+
+ storage = createStorage(type);
+ mStorages.put(type, storage);
+ }
+ return storage;
+ }
+ }
+
+ private <S extends Storable> ReplicatedStorage<S> createStorage(Class<S> type)
+ throws SupportException, RepositoryException
+ {
+ return new ReplicatedStorage<S>(this, type);
+ }
+
+ public Transaction enterTransaction() {
+ return new TransactionPair(mMasterRepository.enterTransaction(),
+ mReplicaRepository.enterTransaction());
+ }
+
+ public Transaction enterTransaction(IsolationLevel level) {
+ return new TransactionPair(mMasterRepository.enterTransaction(level),
+ mReplicaRepository.enterTransaction(level));
+ }
+
+ public Transaction enterTopTransaction(IsolationLevel level) {
+ return new TransactionPair(mMasterRepository.enterTopTransaction(level),
+ mReplicaRepository.enterTopTransaction(level));
+ }
+
+ public IsolationLevel getTransactionIsolationLevel() {
+ IsolationLevel replicaLevel = mReplicaRepository.getTransactionIsolationLevel();
+ if (replicaLevel == null) {
+ return null;
+ }
+ IsolationLevel masterLevel = mMasterRepository.getTransactionIsolationLevel();
+ if (masterLevel == null) {
+ return null;
+ }
+ return replicaLevel.lowestCommon(masterLevel);
+ }
+
+ @SuppressWarnings("unchecked")
+ public <C extends Capability> C getCapability(Class<C> capabilityType) {
+ if (capabilityType.isInstance(this)) {
+ if (ShutdownCapability.class.isAssignableFrom(capabilityType)) {
+ if (mReplicaRepository.getCapability(capabilityType) == null &&
+ mMasterRepository.getCapability(capabilityType) == null) {
+
+ return null;
+ }
+ }
+ return (C) this;
+ }
+
+ C cap = mMasterRepository.getCapability(capabilityType);
+ if (cap == null) {
+ cap = mReplicaRepository.getCapability(capabilityType);
+ }
+
+ return cap;
+ }
+
+ public void close() {
+ mReplicaRepository.close();
+ mMasterRepository.close();
+ }
+
+ public String[] getUserStorableTypeNames() throws RepositoryException {
+ StorableInfoCapability replicaCap =
+ mReplicaRepository.getCapability(StorableInfoCapability.class);
+ StorableInfoCapability masterCap =
+ mMasterRepository.getCapability(StorableInfoCapability.class);
+
+ if (replicaCap == null) {
+ if (masterCap == null) {
+ return new String[0];
+ }
+ return masterCap.getUserStorableTypeNames();
+ } else if (masterCap == null) {
+ return replicaCap.getUserStorableTypeNames();
+ }
+
+ // Merge the two sets together.
+ Set<String> names = new LinkedHashSet<String>();
+ for (String name : replicaCap.getUserStorableTypeNames()) {
+ names.add(name);
+ }
+ for (String name : masterCap.getUserStorableTypeNames()) {
+ names.add(name);
+ }
+
+ return names.toArray(new String[names.size()]);
+ }
+
+ public boolean isSupported(Class<Storable> type) {
+ StorableInfoCapability replicaCap =
+ mReplicaRepository.getCapability(StorableInfoCapability.class);
+ StorableInfoCapability masterCap =
+ mMasterRepository.getCapability(StorableInfoCapability.class);
+
+ return (masterCap == null || masterCap.isSupported(type))
+ && (replicaCap == null || replicaCap.isSupported(type));
+ }
+
+ public boolean isPropertySupported(Class<Storable> type, String name) {
+ StorableInfoCapability replicaCap =
+ mReplicaRepository.getCapability(StorableInfoCapability.class);
+ StorableInfoCapability masterCap =
+ mMasterRepository.getCapability(StorableInfoCapability.class);
+
+ return (masterCap == null || masterCap.isPropertySupported(type, name))
+ && (replicaCap == null || replicaCap.isPropertySupported(type, name));
+ }
+
+ public boolean isAutoShutdownEnabled() {
+ ShutdownCapability cap = mReplicaRepository.getCapability(ShutdownCapability.class);
+ if (cap != null && cap.isAutoShutdownEnabled()) {
+ return true;
+ }
+ cap = mMasterRepository.getCapability(ShutdownCapability.class);
+ if (cap != null && cap.isAutoShutdownEnabled()) {
+ return true;
+ }
+ return false;
+ }
+
+ public void setAutoShutdownEnabled(boolean enabled) {
+ ShutdownCapability cap = mReplicaRepository.getCapability(ShutdownCapability.class);
+ if (cap != null) {
+ cap.setAutoShutdownEnabled(enabled);
+ }
+ cap = mMasterRepository.getCapability(ShutdownCapability.class);
+ if (cap != null) {
+ cap.setAutoShutdownEnabled(enabled);
+ }
+ }
+
+ public void shutdown() {
+ ShutdownCapability cap = mReplicaRepository.getCapability(ShutdownCapability.class);
+ if (cap != null) {
+ cap.shutdown();
+ } else {
+ mReplicaRepository.close();
+ }
+ cap = mMasterRepository.getCapability(ShutdownCapability.class);
+ if (cap != null) {
+ cap.shutdown();
+ } else {
+ mMasterRepository.close();
+ }
+ }
+
+ /**
+ * Repairs replicated storables by synchronizing the replica repository
+ * against the master repository.
+ *
+ * @param type type of storable to re-sync
+ * @param desiredSpeed throttling parameter - 1.0 = full speed, 0.5 = half
+ * speed, 0.1 = one-tenth speed, etc
+ * @param filter optional query filter to limit which objects get re-sync'ed
+ * @param filterValues filter values for optional filter
+ */
+ public <S extends Storable> void resync(Class<S> type,
+ double desiredSpeed,
+ String filter,
+ Object... filterValues)
+ throws RepositoryException
+ {
+ Storage<S> replicaStorage, masterStorage;
+ replicaStorage = mReplicaRepository.storageFor(type);
+ masterStorage = mMasterRepository.storageFor(type);
+
+ Query<S> replicaQuery, masterQuery;
+ if (filter == null) {
+ replicaQuery = replicaStorage.query();
+ masterQuery = masterStorage.query();
+ } else {
+ replicaQuery = replicaStorage.query(filter).withValues(filterValues);
+ masterQuery = masterStorage.query(filter).withValues(filterValues);
+ }
+
+ // Order both queries the same so that they can be run in parallel.
+ String[] orderBy = selectNaturalOrder(mMasterRepository, type);
+ if (orderBy == null) {
+ orderBy = selectNaturalOrder(mReplicaRepository, type);
+ if (orderBy == null) {
+ Set<String> pkSet =
+ StorableIntrospector.examine(type).getPrimaryKeyProperties().keySet();
+ orderBy = pkSet.toArray(new String[0]);
+ }
+ }
+
+ BeanComparator bc = BeanComparator.forClass(type);
+ for (String order : orderBy) {
+ bc = bc.orderBy(order);
+ bc = bc.caseSensitive();
+ }
+
+ replicaQuery = replicaQuery.orderBy(orderBy);
+ masterQuery = masterQuery.orderBy(orderBy);
+
+ Throttle throttle;
+ if (desiredSpeed >= 1.0) {
+ throttle = null;
+ } else {
+ if (desiredSpeed < 0.0) {
+ desiredSpeed = 0.0;
+ }
+ // 50 samples
+ throttle = new Throttle(50);
+ }
+
+ Cursor<S> replicaCursor = replicaQuery.fetch();
+ try {
+ Cursor<S> masterCursor = masterQuery.fetch();
+ try {
+ resync(((ReplicatedStorage) storageFor(type)).getTrigger(),
+ replicaCursor,
+ masterCursor,
+ throttle, desiredSpeed,
+ bc);
+ } finally {
+ masterCursor.close();
+ }
+ } finally {
+ replicaCursor.close();
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private <S extends Storable> void resync(ReplicationTrigger<S> trigger,
+ Cursor<S> replicaCursor,
+ Cursor<S> masterCursor,
+ Throttle throttle, double desiredSpeed,
+ Comparator comparator)
+ throws RepositoryException
+ {
+ // Enqueue resyncs to a separate thread since open cursors hold locks
+ // on currently referenced entries.
+ BlockingQueue<Runnable> resyncQueue =
+ new ArrayBlockingQueue<Runnable>(RESYNC_QUEUE_SIZE, true);
+ ResyncThread resyncThread = new ResyncThread(resyncQueue);
+ resyncThread.start();
+
+ try {
+ S replicaEntry = null;
+ S masterEntry = null;
+
+ while (true) {
+ if (throttle != null) {
+ try {
+ // 100 millisecond clock precision
+ throttle.throttle(desiredSpeed, 100);
+ } catch (InterruptedException e) {
+ throw new FetchInterruptedException(e);
+ }
+ }
+
+ if (replicaEntry == null && replicaCursor.hasNext()) {
+ replicaEntry = replicaCursor.next();
+ }
+
+ if (masterEntry == null && masterCursor.hasNext()) {
+ masterEntry = masterCursor.next();
+ }
+
+ // Comparator should treat null as high.
+ int compare = comparator.compare(replicaEntry, masterEntry);
+
+ if (compare < 0) {
+ // Bogus exists only in replica so delete it.
+ resyncThread.addResyncTask(trigger, replicaEntry, null);
+ // Allow replica to advance.
+ replicaEntry = null;
+ } else if (compare > 0) {
+ // Replica cursor is missing an entry so copy it.
+ resyncThread.addResyncTask(trigger, null, masterEntry);
+ // Allow master to advance.
+ masterEntry = null;
+ } else {
+ if (replicaEntry == null && masterEntry == null) {
+ // Both cursors exhausted -- resync is complete.
+ break;
+ }
+
+ if (!replicaEntry.equalProperties(masterEntry)) {
+ // Replica is stale.
+ resyncThread.addResyncTask(trigger, replicaEntry, masterEntry);
+ }
+
+ // Entries are synchronized so allow both cursors to advance.
+ replicaEntry = null;
+ masterEntry = null;
+ }
+ }
+ } finally {
+ resyncThread.waitUntilDone();
+ }
+ }
+
+ // TODO: Use TaskQueueThread
+
+ private static class ResyncThread extends Thread {
+ private static final int
+ STATE_RUNNING = 0,
+ STATE_SHOULD_STOP = 1,
+ STATE_STOPPED = 2;
+
+ private static final Runnable STOP_TASK = new Runnable() {public void run() {}};
+
+ private final BlockingQueue<Runnable> mQueue;
+
+ private int mState = STATE_RUNNING;
+
+ ResyncThread(BlockingQueue<Runnable> queue) {
+ super("ReplicatedRepository Resync");
+ mQueue = queue;
+ }
+
+ public void run() {
+ try {
+ while (true) {
+ boolean isStopping;
+ synchronized (this) {
+ isStopping = mState != STATE_RUNNING;
+ }
+
+ Runnable task;
+ if (isStopping) {
+ // Poll the queue so this thread doesn't block when it
+ // should be stopping.
+ task = mQueue.poll();
+ } else {
+ try {
+ task = mQueue.take();
+ } catch (InterruptedException e) {
+ break;
+ }
+ }
+
+ if (task == null || task == STOP_TASK) {
+ // Marker to indicate we should stop.
+ break;
+ }
+
+ task.run();
+ }
+ } finally {
+ synchronized (this) {
+ mState = STATE_STOPPED;
+ notifyAll();
+ }
+ }
+ }
+
+ <S extends Storable> void addResyncTask(final ReplicationTrigger<S> trigger,
+ final S replicaEntry,
+ final S masterEntry)
+ throws RepositoryException
+ {
+ if (replicaEntry == null && masterEntry == null) {
+ // If both are null, then there's nothing to do, is there?
+ // Note: Caller shouldn't have passed double nulls to
+ // addResyncTask in the first place.
+ return;
+ }
+
+ Runnable task = new Runnable() {
+ public void run() {
+ try {
+ trigger.resyncEntries(replicaEntry, masterEntry);
+ } catch (Exception e) {
+ LogFactory.getLog(ReplicatedRepository.class).error(null, e);
+ }
+ }
+ };
+
+ addResyncTask(task);
+ }
+
+
+ <S extends Storable> void addResyncTask(Runnable task)
+ throws RepositoryException
+ {
+ try {
+ if (!mQueue.offer(task, RESYNC_QUEUE_TIMEOUT_MS, TimeUnit.MILLISECONDS)) {
+ throw new RepositoryException("Unable to enqueue resync task");
+ }
+ } catch (InterruptedException e) {
+ throw new RepositoryException(e);
+ }
+ }
+
+ synchronized void waitUntilDone() throws RepositoryException {
+ if (mState == STATE_STOPPED) {
+ return;
+ }
+ mState = STATE_SHOULD_STOP;
+ try {
+ // Inject stop task into the queue so it knows to stop.
+ mQueue.offer(STOP_TASK, RESYNC_QUEUE_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ while (mState != STATE_STOPPED) {
+ wait();
+ }
+ } catch (InterruptedException e) {
+ throw new RepositoryException(e);
+ }
+ }
+ }
+}
diff --git a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepositoryBuilder.java b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepositoryBuilder.java
new file mode 100644
index 0000000..8d5cc75
--- /dev/null
+++ b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepositoryBuilder.java
@@ -0,0 +1,161 @@
+/*
+ * Copyright 2006 Amazon Technologies, Inc. or its affiliates.
+ * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
+ * of Amazon Technologies, Inc. or its affiliates. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.amazon.carbonado.repo.replicated;
+
+import java.util.Collection;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import com.amazon.carbonado.ConfigurationException;
+import com.amazon.carbonado.Repository;
+import com.amazon.carbonado.RepositoryBuilder;
+import com.amazon.carbonado.RepositoryException;
+
+import com.amazon.carbonado.spi.AbstractRepositoryBuilder;
+import com.amazon.carbonado.spi.BelatedRepositoryCreator;
+
+/**
+ * Repository builder for the replicated repository.
+ * <p>
+ * The following extra capabilities are supported:
+ * <ul>
+ * <li>{@link com.amazon.carbonado.capability.ResyncCapability ResyncCapability}
+ * </ul>
+ *
+ * @author Don Schneider
+ * @author Brian S O'Neill
+ */
+public class ReplicatedRepositoryBuilder extends AbstractRepositoryBuilder {
+ static final int DEFAULT_MASTER_TIMEOUT_MILLIS = 15000;
+ static final int DEFAULT_RETRY_MILLIS = 30000;
+
+ private String mName;
+ private boolean mIsMaster = true;
+ private RepositoryBuilder mReplicaRepositoryBuilder;
+ private RepositoryBuilder mMasterRepositoryBuilder;
+
+ public ReplicatedRepositoryBuilder() {
+ }
+
+ public Repository build(RepositoryReference rootRef) throws RepositoryException {
+ assertReady();
+
+ Repository replica, master;
+
+ {
+ boolean originalOption = mReplicaRepositoryBuilder.isMaster();
+ try {
+ mReplicaRepositoryBuilder.setMaster(false);
+ replica = mReplicaRepositoryBuilder.build(rootRef);
+ } finally {
+ mReplicaRepositoryBuilder.setMaster(originalOption);
+ }
+ }
+
+ {
+ // Create master using BelatedRepositoryCreator such that we can
+ // start up and read from replica even if master is down.
+
+ final boolean originalOption = mMasterRepositoryBuilder.isMaster();
+ mMasterRepositoryBuilder.setMaster(mIsMaster);
+
+ Log log = LogFactory.getLog(ReplicatedRepositoryBuilder.class);
+ BelatedRepositoryCreator creator = new BelatedRepositoryCreator
+ (log, mMasterRepositoryBuilder, rootRef, DEFAULT_RETRY_MILLIS) {
+
+ protected void createdNotification(Repository repo) {
+ // Don't need builder any more so restore it.
+ mMasterRepositoryBuilder.setMaster(originalOption);
+ }
+ };
+
+ master = creator.get(DEFAULT_MASTER_TIMEOUT_MILLIS);
+ }
+
+ Repository repo = new ReplicatedRepository(getName(), replica, master);
+ rootRef.set(repo);
+ return repo;
+ }
+
+ public String getName() {
+ String name = mName;
+ if (name == null) {
+ if (mReplicaRepositoryBuilder != null && mReplicaRepositoryBuilder.getName() != null) {
+ name = mReplicaRepositoryBuilder.getName();
+ } else if (mMasterRepositoryBuilder != null) {
+ name = mMasterRepositoryBuilder.getName();
+ }
+ }
+ return name;
+ }
+
+ public void setName(String name) {
+ mName = name;
+ }
+
+ public boolean isMaster() {
+ return mIsMaster;
+ }
+
+ public void setMaster(boolean b) {
+ mIsMaster = b;
+ }
+
+ /**
+ * @return "replica" respository to replicate to.
+ */
+ public RepositoryBuilder getReplicaRepositoryBuilder() {
+ return mReplicaRepositoryBuilder;
+ }
+
+ /**
+ * Set "replica" respository to replicate to, which is required. This builder
+ * automatically sets the master option of the given repository builder to
+ * false.
+ */
+ public void setReplicaRepositoryBuilder(RepositoryBuilder replicaRepositoryBuilder) {
+ mReplicaRepositoryBuilder = replicaRepositoryBuilder;
+ }
+
+ /**
+ * @return "master" respository to replicate from.
+ */
+ public RepositoryBuilder getMasterRepositoryBuilder() {
+ return mMasterRepositoryBuilder;
+ }
+
+ /**
+ * Set "master" respository to replicate from, which is required. This
+ * builder automatically sets the master option of the given repository to
+ * true.
+ */
+ public void setMasterRepositoryBuilder(RepositoryBuilder masterRepositoryBuilder) {
+ mMasterRepositoryBuilder = masterRepositoryBuilder;
+ }
+
+ public void errorCheck(Collection<String> messages) throws ConfigurationException {
+ super.errorCheck(messages);
+ if (null == getReplicaRepositoryBuilder()) {
+ messages.add("replicaRepositoryBuilder missing");
+ }
+ if (null == getMasterRepositoryBuilder()) {
+ messages.add("masterRepositoryBuilder missing");
+ }
+ }
+}
diff --git a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedStorage.java b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedStorage.java
new file mode 100644
index 0000000..cf30e5d
--- /dev/null
+++ b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedStorage.java
@@ -0,0 +1,121 @@
+/*
+ * Copyright 2006 Amazon Technologies, Inc. or its affiliates.
+ * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
+ * of Amazon Technologies, Inc. or its affiliates. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.amazon.carbonado.repo.replicated;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import com.amazon.carbonado.FetchException;
+import com.amazon.carbonado.OptimisticLockException;
+import com.amazon.carbonado.PersistException;
+import com.amazon.carbonado.PersistNoneException;
+import com.amazon.carbonado.Query;
+import com.amazon.carbonado.Storage;
+import com.amazon.carbonado.Storable;
+import com.amazon.carbonado.Repository;
+import com.amazon.carbonado.SupportException;
+import com.amazon.carbonado.RepositoryException;
+import com.amazon.carbonado.Transaction;
+import com.amazon.carbonado.Trigger;
+import com.amazon.carbonado.UniqueConstraintException;
+import com.amazon.carbonado.UnsupportedTypeException;
+
+import com.amazon.carbonado.filter.Filter;
+
+import com.amazon.carbonado.spi.BelatedStorageCreator;
+import com.amazon.carbonado.spi.RepairExecutor;
+
+/**
+ * ReplicatedStorage
+ *
+ * @author Don Schneider
+ * @author Brian S O'Neill
+ */
+class ReplicatedStorage<S extends Storable> implements Storage<S> {
+ final Storage<S> mReplicaStorage;
+ final ReplicationTrigger<S> mTrigger;
+
+ public ReplicatedStorage(ReplicatedRepository aRepository, Class<S> aType)
+ throws SupportException, RepositoryException
+ {
+ mReplicaStorage = aRepository.getReplicaRepository().storageFor(aType);
+
+ // Create master using BelatedStorageCreator such that we can start up
+ // and read from replica even if master is down.
+
+ Log log = LogFactory.getLog(getClass());
+ BelatedStorageCreator<S> creator = new BelatedStorageCreator<S>
+ (log, aRepository.getMasterRepository(), aType,
+ ReplicatedRepositoryBuilder.DEFAULT_RETRY_MILLIS);
+
+ Storage<S> masterStorage;
+ try {
+ masterStorage = creator.get(ReplicatedRepositoryBuilder.DEFAULT_MASTER_TIMEOUT_MILLIS);
+ } catch (UnsupportedTypeException e) {
+ // Master doesn't support Storable, but it is marked as Independent.
+ masterStorage = null;
+ }
+
+ mTrigger = new ReplicationTrigger<S>(aRepository, mReplicaStorage, masterStorage);
+ addTrigger(mTrigger);
+ }
+
+ /**
+ * For testing only.
+ */
+ ReplicatedStorage(Repository aRepository,
+ Storage<S> replicaStorage,
+ Storage<S> masterStorage)
+ {
+ mReplicaStorage = replicaStorage;
+ mTrigger = new ReplicationTrigger<S>(aRepository, mReplicaStorage, masterStorage);
+ addTrigger(mTrigger);
+ }
+
+ public Class<S> getStorableType() {
+ return mReplicaStorage.getStorableType();
+ }
+
+ public S prepare() {
+ return mReplicaStorage.prepare();
+ }
+
+ public Query<S> query() throws FetchException {
+ return mReplicaStorage.query();
+ }
+
+ public Query<S> query(String filter) throws FetchException {
+ return mReplicaStorage.query(filter);
+ }
+
+ public Query<S> query(Filter<S> filter) throws FetchException {
+ return mReplicaStorage.query(filter);
+ }
+
+ public boolean addTrigger(Trigger<? super S> trigger) {
+ return mReplicaStorage.addTrigger(trigger);
+ }
+
+ public boolean removeTrigger(Trigger<? super S> trigger) {
+ return mReplicaStorage.removeTrigger(trigger);
+ }
+
+ ReplicationTrigger<S> getTrigger() {
+ return mTrigger;
+ }
+}
diff --git a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java
new file mode 100644
index 0000000..19915b7
--- /dev/null
+++ b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java
@@ -0,0 +1,398 @@
+/*
+ * Copyright 2006 Amazon Technologies, Inc. or its affiliates.
+ * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
+ * of Amazon Technologies, Inc. or its affiliates. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.amazon.carbonado.repo.replicated;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import com.amazon.carbonado.FetchDeadlockException;
+import com.amazon.carbonado.FetchException;
+import com.amazon.carbonado.FetchNoneException;
+import com.amazon.carbonado.IsolationLevel;
+import com.amazon.carbonado.OptimisticLockException;
+import com.amazon.carbonado.PersistDeadlockException;
+import com.amazon.carbonado.PersistException;
+import com.amazon.carbonado.PersistNoneException;
+import com.amazon.carbonado.Repository;
+import com.amazon.carbonado.Storable;
+import com.amazon.carbonado.Storage;
+import com.amazon.carbonado.Transaction;
+import com.amazon.carbonado.Trigger;
+import com.amazon.carbonado.UniqueConstraintException;
+
+import com.amazon.carbonado.spi.RepairExecutor;
+
+/**
+ * All inserts/updates/deletes are first committed to the master storage, then
+ * duplicated and committed to the replica.
+ *
+ * @author Don Schneider
+ * @author Brian S O'Neill
+ */
+class ReplicationTrigger<S extends Storable> extends Trigger<S> {
+ private final Repository mRepository;
+ private final Storage<S> mReplicaStorage;
+ private final Storage<S> mMasterStorage;
+
+ private final ThreadLocal<AtomicInteger> mDisabled = new ThreadLocal<AtomicInteger>();
+
+ ReplicationTrigger(Repository repository,
+ Storage<S> replicaStorage,
+ Storage<S> masterStorage)
+ {
+ mRepository = repository;
+ mReplicaStorage = replicaStorage;
+ mMasterStorage = masterStorage;
+ }
+
+ @Override
+ public Object beforeInsert(S replica) throws PersistException {
+ return beforeInsert(replica, false);
+ }
+
+ @Override
+ public Object beforeTryInsert(S replica) throws PersistException {
+ return beforeInsert(replica, true);
+ }
+
+ private Object beforeInsert(S replica, boolean forTry) throws PersistException {
+ if (isReplicationDisabled()) {
+ return null;
+ }
+
+ final S master = mMasterStorage.prepare();
+ replica.copyAllProperties(master);
+
+ try {
+ if (forTry) {
+ if (!master.tryInsert()) {
+ throw abortTry();
+ }
+ } else {
+ master.insert();
+ }
+ } catch (UniqueConstraintException e) {
+ // This may be caused by an inconsistency between replica and
+ // master. Here's one scenerio: user called tryLoad and saw the
+ // entry does not exist. So instead of calling update, he/she calls
+ // insert. If the master entry exists, then there is an
+ // inconsistency. The code below checks for this specific kind of
+ // error and repairs it by inserting a record in the replica.
+
+ // Here's another scenerio: Unique constraint was caused by an
+ // inconsistency with the values of the alternate keys. User
+ // expected alternate keys to have unique values, as indicated by
+ // replica.
+
+ repair(replica);
+
+ // Throw exception since we don't know what the user's intentions
+ // really are.
+ throw e;
+ }
+
+ // Master may have applied sequences to unitialized primary keys, so
+ // copy primary keys to replica. Mark properties as dirty to allow
+ // primary key to be changed.
+ replica.markPropertiesDirty();
+
+ // Copy all properties in order to trigger constraints that
+ // master should have resolved.
+ master.copyAllProperties(replica);
+
+ return null;
+ }
+
+ @Override
+ public Object beforeUpdate(S replica) throws PersistException {
+ return beforeUpdate(replica, false);
+ }
+
+ @Override
+ public Object beforeTryUpdate(S replica) throws PersistException {
+ return beforeUpdate(replica, true);
+ }
+
+ private Object beforeUpdate(S replica, boolean forTry) throws PersistException {
+ if (isReplicationDisabled()) {
+ return null;
+ }
+
+ final S master = mMasterStorage.prepare();
+ replica.copyPrimaryKeyProperties(master);
+
+ if (!replica.hasDirtyProperties()) {
+ // Nothing to update, but must load from master anyhow, since
+ // update must always perform a fresh load as a side-effect. We
+ // cannot simply call update on the master, since it may need a
+ // version property to be set. Setting the version has the
+ // side-effect of making the storable look dirty, so the master
+ // will perform an update. This in turn causes the version to
+ // increase for no reason.
+ try {
+ if (forTry) {
+ if (!master.tryLoad()) {
+ // Master record does not exist. To ensure consistency,
+ // delete record from replica.
+ deleteReplica(replica);
+ throw abortTry();
+ }
+ } else {
+ try {
+ master.load();
+ } catch (FetchNoneException e) {
+ // Master record does not exist. To ensure consistency,
+ // delete record from replica.
+ deleteReplica(replica);
+ throw e;
+ }
+ }
+ } catch (FetchException e) {
+ throw e.toPersistException
+ ("Could not load master object for update: " + master.toStringKeyOnly());
+ }
+ } else {
+ replica.copyVersionProperty(master);
+ replica.copyDirtyProperties(master);
+
+ try {
+ if (forTry) {
+ if (!master.tryUpdate()) {
+ // Master record does not exist. To ensure consistency,
+ // delete record from replica.
+ deleteReplica(replica);
+ throw abortTry();
+ }
+ } else {
+ try {
+ master.update();
+ } catch (PersistNoneException e) {
+ // Master record does not exist. To ensure consistency,
+ // delete record from replica.
+ deleteReplica(replica);
+ throw e;
+ }
+ }
+ } catch (OptimisticLockException e) {
+ // This may be caused by an inconsistency between replica and
+ // master.
+
+ repair(replica);
+
+ // Throw original exception since we don't know what the user's
+ // intentions really are.
+ throw e;
+ }
+ }
+
+ // Copy master properties back, since its repository may have
+ // altered property values as a side effect.
+ master.copyUnequalProperties(replica);
+
+ return null;
+ }
+
+ @Override
+ public Object beforeDelete(S replica) throws PersistException {
+ if (isReplicationDisabled()) {
+ return null;
+ }
+
+ S master = mMasterStorage.prepare();
+ replica.copyPrimaryKeyProperties(master);
+
+ // If this fails to delete anything, don't care. Any delete failure
+ // will be detected when the replica is deleted. If there was an
+ // inconsistency, it is resolved after the replica is deleted.
+ master.tryDelete();
+
+ return null;
+ }
+
+ /**
+ * Re-sync the replica to the master. The primary keys of both entries are
+ * assumed to match.
+ *
+ * @param replicaEntry current replica entry, or null if none
+ * @param masterEntry current master entry, or null if none
+ */
+ void resyncEntries(S replicaEntry, S masterEntry) throws FetchException, PersistException {
+ if (replicaEntry == null && masterEntry == null) {
+ return;
+ }
+
+ Log log = LogFactory.getLog(ReplicatedRepository.class);
+
+ setReplicationDisabled(true);
+ try {
+ Transaction txn = mRepository.enterTransaction();
+ try {
+ if (replicaEntry != null) {
+ if (masterEntry == null) {
+ log.info("Deleting bogus entry: " + replicaEntry);
+ }
+ replicaEntry.tryDelete();
+ }
+ if (masterEntry != null) {
+ Storable newReplicaEntry = mReplicaStorage.prepare();
+ if (replicaEntry == null) {
+ masterEntry.copyAllProperties(newReplicaEntry);
+ log.info("Adding missing entry: " + newReplicaEntry);
+ } else {
+ if (replicaEntry.equalProperties(masterEntry)) {
+ return;
+ }
+ // First copy from old replica to preserve values of
+ // any independent properties. Be sure not to copy
+ // nulls from old replica to new replica, in case new
+ // non-nullable properties have been added. This is why
+ // copyUnequalProperties is called instead of
+ // copyAllProperties.
+ replicaEntry.copyUnequalProperties(newReplicaEntry);
+ // Calling copyAllProperties will skip unsupported
+ // independent properties in master, thus preserving
+ // old independent property values.
+ masterEntry.copyAllProperties(newReplicaEntry);
+ log.info("Replacing stale entry with: " + newReplicaEntry);
+ }
+ if (!newReplicaEntry.tryInsert()) {
+ // Try to correct bizarre corruption.
+ newReplicaEntry.tryDelete();
+ newReplicaEntry.tryInsert();
+ }
+ }
+ txn.commit();
+ } finally {
+ txn.exit();
+ }
+ } finally {
+ setReplicationDisabled(false);
+ }
+ }
+
+ /**
+ * Runs a repair in a background thread. This is done for two reasons: It
+ * allows repair to not be hindered by locks acquired by transactions and
+ * repairs don't get rolled back when culprit exception is thrown. Culprit
+ * may be UniqueConstraintException or OptimisticLockException.
+ */
+ private void repair(S replica) throws PersistException {
+ replica = (S) replica.copy();
+ S master = mMasterStorage.prepare();
+ replica.copyPrimaryKeyProperties(master);
+
+ try {
+ if (replica.tryLoad()) {
+ if (master.tryLoad()) {
+ if (replica.equalProperties(master)) {
+ // Both are equal -- no repair needed.
+ return;
+ }
+ }
+ } else if (!master.tryLoad()) {
+ // Both are missing -- no repair needed.
+ return;
+ }
+ } catch (FetchException e) {
+ throw e.toPersistException();
+ }
+
+ final S finalReplica = replica;
+ final S finalMaster = master;
+
+ RepairExecutor.execute(new Runnable() {
+ public void run() {
+ try {
+ Transaction txn = mRepository.enterTransaction();
+ try {
+ txn.setForUpdate(true);
+ if (finalReplica.tryLoad()) {
+ if (finalMaster.tryLoad()) {
+ resyncEntries(finalReplica, finalMaster);
+ } else {
+ resyncEntries(finalReplica, null);
+ }
+ } else if (finalMaster.tryLoad()) {
+ resyncEntries(null, finalMaster);
+ }
+ txn.commit();
+ } finally {
+ txn.exit();
+ }
+ } catch (FetchException fe) {
+ Log log = LogFactory.getLog(ReplicatedRepository.class);
+ log.warn("Unable to check if repair is required for " +
+ finalReplica.toStringKeyOnly(), fe);
+ } catch (PersistException pe) {
+ Log log = LogFactory.getLog(ReplicatedRepository.class);
+ log.error("Unable to repair entry " +
+ finalReplica.toStringKeyOnly(), pe);
+ }
+ }
+ });
+ }
+
+ /**
+ * Deletes the replica entry with replication disabled.
+ */
+ private void deleteReplica(S replica) throws PersistException {
+ // Disable replication to prevent trigger from being invoked by
+ // deleting replica.
+ setReplicationDisabled(true);
+ try {
+ replica.tryDelete();
+ } finally {
+ setReplicationDisabled(false);
+ }
+ }
+
+ /**
+ * Returns true if replication is disabled for the current thread.
+ */
+ private boolean isReplicationDisabled() {
+ // Count indicates how many times disabled (nested)
+ AtomicInteger i = mDisabled.get();
+ return i != null && i.get() > 0;
+ }
+
+ /**
+ * By default, replication is enabled for the current thread. Pass true to
+ * disable during re-sync operations.
+ */
+ private void setReplicationDisabled(boolean disabled) {
+ // Using a count allows this method call to be nested. Based on the
+ // current implementation, it should never be nested, so this extra
+ // work is just a safeguard.
+ AtomicInteger i = mDisabled.get();
+ if (disabled) {
+ if (i == null) {
+ i = new AtomicInteger(1);
+ mDisabled.set(i);
+ } else {
+ i.incrementAndGet();
+ }
+ } else {
+ if (i != null) {
+ i.decrementAndGet();
+ }
+ }
+ }
+}
diff --git a/src/main/java/com/amazon/carbonado/repo/replicated/package-info.java b/src/main/java/com/amazon/carbonado/repo/replicated/package-info.java
new file mode 100644
index 0000000..44d8124
--- /dev/null
+++ b/src/main/java/com/amazon/carbonado/repo/replicated/package-info.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2006 Amazon Technologies, Inc. or its affiliates.
+ * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
+ * of Amazon Technologies, Inc. or its affiliates. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Repository implementation that supports replication between two
+ * repositories. One repository is the replica, and the other is the
+ * master. Read operations are served by the replica, and the master is
+ * consulted when writing. Changes to the master are copied to the replica.
+ *
+ * @see com.amazon.carbonado.repo.replicated.ReplicatedRepositoryBuilder
+ */
+package com.amazon.carbonado.repo.replicated;