summaryrefslogtreecommitdiff
path: root/src/main/java/com/amazon
diff options
context:
space:
mode:
authorBrian S. O'Neill <bronee@gmail.com>2006-10-31 18:31:16 +0000
committerBrian S. O'Neill <bronee@gmail.com>2006-10-31 18:31:16 +0000
commita6037f0de6b010b8970ac55f19dd2a054134b509 (patch)
tree46f2df64395ae4109755fb8e6bd7f1dec76135a8 /src/main/java/com/amazon
parent332cccc96e2a9014cbc1d614fedc35048315b53a (diff)
Implement resync without using a separate thread and queue, avoiding deadlock conditions.
Diffstat (limited to 'src/main/java/com/amazon')
-rw-r--r--src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java221
1 files changed, 85 insertions, 136 deletions
diff --git a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java
index 16a33bd..11212c4 100644
--- a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java
+++ b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java
@@ -21,10 +21,6 @@ import java.util.Comparator;
import java.util.LinkedHashSet;
import java.util.Set;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
-
import org.apache.commons.logging.LogFactory;
import org.cojen.util.BeanComparator;
@@ -70,9 +66,12 @@ class ReplicatedRepository
ShutdownCapability,
StorableInfoCapability
{
- // Constants used by resync method.
- private static final int RESYNC_QUEUE_SIZE = 1000;
- private static final long RESYNC_QUEUE_TIMEOUT_MS = 30000;
+ // Maximum number of resync updates to replica per transaction.
+ private static final int RESYNC_BATCH_SIZE = 10;
+
+ // Commit resync replica transaction early if this many records
+ // scanned. Otherwise, write locks may be held for a very long time.
+ private static final int RESYNC_WATERMARK = 100;
/**
* Utility method to select the natural ordering of a storage, by looking
@@ -354,9 +353,11 @@ class ReplicatedRepository
}
// Order both queries the same so that they can be run in parallel.
- String[] orderBy = selectNaturalOrder(mMasterRepository, type);
+ // Favor natural order of replica, since cursors may be opened and
+ // re-opened on it.
+ String[] orderBy = selectNaturalOrder(mReplicaRepository, type);
if (orderBy == null) {
- orderBy = selectNaturalOrder(mReplicaRepository, type);
+ orderBy = selectNaturalOrder(mMasterRepository, type);
if (orderBy == null) {
Set<String> pkSet =
StorableIntrospector.examine(type).getPrimaryKeyProperties().keySet();
@@ -384,42 +385,41 @@ class ReplicatedRepository
throttle = new Throttle(50);
}
- Cursor<S> replicaCursor = replicaQuery.fetch();
+ Transaction replicaTxn = mReplicaRepository.enterTransaction();
try {
- Cursor<S> masterCursor = masterQuery.fetch();
- try {
- resync(trigger,
- replicaCursor,
- masterCursor,
- throttle, desiredSpeed,
- bc);
- } finally {
- masterCursor.close();
- }
+ replicaTxn.setForUpdate(true);
+
+ resync(trigger,
+ replicaStorage, replicaQuery,
+ masterStorage, masterQuery,
+ throttle, desiredSpeed,
+ bc, replicaTxn);
+
+ replicaTxn.commit();
} finally {
- replicaCursor.close();
+ replicaTxn.exit();
}
}
@SuppressWarnings("unchecked")
private <S extends Storable> void resync(ReplicationTrigger<S> trigger,
- Cursor<S> replicaCursor,
- Cursor<S> masterCursor,
+ Storage<S> replicaStorage, Query<S> replicaQuery,
+ Storage<S> masterStorage, Query<S> masterQuery,
Throttle throttle, double desiredSpeed,
- Comparator comparator)
+ Comparator comparator, Transaction replicaTxn)
throws RepositoryException
{
- // Enqueue resyncs to a separate thread since open cursors hold locks
- // on currently referenced entries.
- BlockingQueue<Runnable> resyncQueue =
- new ArrayBlockingQueue<Runnable>(RESYNC_QUEUE_SIZE, true);
- ResyncThread resyncThread = new ResyncThread(resyncQueue);
- resyncThread.start();
+ Cursor<S> replicaCursor = null;
+ Cursor<S> masterCursor = null;
try {
+ replicaCursor = replicaQuery.fetch();
+ masterCursor = masterQuery.fetch();
+
S replicaEntry = null;
S masterEntry = null;
-
+
+ int count = 0, txnCount = 0;
while (true) {
if (throttle != null) {
try {
@@ -430,25 +430,43 @@ class ReplicatedRepository
}
}
- if (replicaEntry == null && replicaCursor.hasNext()) {
+ if (replicaEntry == null && replicaCursor != null && replicaCursor.hasNext()) {
replicaEntry = replicaCursor.next();
}
+
+ if (count++ >= RESYNC_WATERMARK || txnCount >= RESYNC_BATCH_SIZE) {
+ replicaTxn.commit();
+ if (replicaCursor != null) {
+ // Cursor should auto-close after txn commit, but force
+ // a close anyhow. Cursor is re-opened when it is
+ // allowed to advance.
+ replicaCursor.close();
+ replicaCursor = null;
+ }
+ count = 0;
+ txnCount = 0;
+ }
if (masterEntry == null && masterCursor.hasNext()) {
masterEntry = masterCursor.next();
}
+ Runnable resyncTask = null;
+
// Comparator should treat null as high.
int compare = comparator.compare(replicaEntry, masterEntry);
if (compare < 0) {
- // Bogus exists only in replica so delete it.
- resyncThread.addResyncTask(trigger, replicaEntry, null);
+ // Bogus record exists only in replica so delete it.
+ resyncTask = prepareResyncTask(trigger, replicaEntry, null);
// Allow replica to advance.
+ if (replicaCursor == null) {
+ replicaCursor = replicaQuery.fetchAfter(replicaEntry);
+ }
replicaEntry = null;
} else if (compare > 0) {
// Replica cursor is missing an entry so copy it.
- resyncThread.addResyncTask(trigger, null, masterEntry);
+ resyncTask = prepareResyncTask(trigger, null, masterEntry);
// Allow master to advance.
masterEntry = null;
} else {
@@ -459,126 +477,57 @@ class ReplicatedRepository
if (!replicaEntry.equalProperties(masterEntry)) {
// Replica is stale.
- resyncThread.addResyncTask(trigger, replicaEntry, masterEntry);
+ resyncTask = prepareResyncTask(trigger, replicaEntry, masterEntry);
}
// Entries are synchronized so allow both cursors to advance.
+ if (replicaCursor == null) {
+ replicaCursor = replicaQuery.fetchAfter(replicaEntry);
+ }
replicaEntry = null;
masterEntry = null;
}
+
+ if (resyncTask != null) {
+ txnCount++;
+ resyncTask.run();
+ }
}
} finally {
- resyncThread.waitUntilDone();
- }
- }
-
- // TODO: Use TaskQueueThread
-
- private static class ResyncThread extends Thread {
- private static final int
- STATE_RUNNING = 0,
- STATE_SHOULD_STOP = 1,
- STATE_STOPPED = 2;
-
- private static final Runnable STOP_TASK = new Runnable() {public void run() {}};
-
- private final BlockingQueue<Runnable> mQueue;
-
- private int mState = STATE_RUNNING;
-
- ResyncThread(BlockingQueue<Runnable> queue) {
- super("ReplicatedRepository Resync");
- mQueue = queue;
- }
-
- public void run() {
try {
- while (true) {
- boolean isStopping;
- synchronized (this) {
- isStopping = mState != STATE_RUNNING;
- }
-
- Runnable task;
- if (isStopping) {
- // Poll the queue so this thread doesn't block when it
- // should be stopping.
- task = mQueue.poll();
- } else {
- try {
- task = mQueue.take();
- } catch (InterruptedException e) {
- break;
- }
- }
-
- if (task == null || task == STOP_TASK) {
- // Marker to indicate we should stop.
- break;
- }
-
- task.run();
+ if (masterCursor != null) {
+ masterCursor.close();
}
} finally {
- synchronized (this) {
- mState = STATE_STOPPED;
- notifyAll();
+ if (replicaCursor != null) {
+ replicaCursor.close();
}
}
}
+ }
- <S extends Storable> void addResyncTask(final ReplicationTrigger<S> trigger,
- final S replicaEntry,
- final S masterEntry)
- throws RepositoryException
- {
- if (replicaEntry == null && masterEntry == null) {
- // If both are null, then there's nothing to do, is there?
- // Note: Caller shouldn't have passed double nulls to
- // addResyncTask in the first place.
- return;
- }
-
- Runnable task = new Runnable() {
- public void run() {
- try {
- trigger.resyncEntries(replicaEntry, masterEntry);
- } catch (Exception e) {
- LogFactory.getLog(ReplicatedRepository.class).error(null, e);
- }
- }
- };
-
- addResyncTask(task);
+ private <S extends Storable> Runnable prepareResyncTask(final ReplicationTrigger<S> trigger,
+ final S replicaEntry,
+ final S masterEntry)
+ throws RepositoryException
+ {
+ if (replicaEntry == null && masterEntry == null) {
+ // If both are null, then there's nothing to do, is there?
+ // Note: Caller shouldn't have passed double nulls to
+ // prepareResyncTask in the first place.
+ return null;
}
-
- <S extends Storable> void addResyncTask(Runnable task)
- throws RepositoryException
- {
- try {
- if (!mQueue.offer(task, RESYNC_QUEUE_TIMEOUT_MS, TimeUnit.MILLISECONDS)) {
- throw new RepositoryException("Unable to enqueue resync task");
+ Runnable task = new Runnable() {
+ public void run() {
+ try {
+ trigger.resyncEntries(replicaEntry, masterEntry);
+ } catch (Exception e) {
+ LogFactory.getLog(ReplicatedRepository.class).error(null, e);
}
- } catch (InterruptedException e) {
- throw new RepositoryException(e);
}
- }
+ };
- synchronized void waitUntilDone() throws RepositoryException {
- if (mState == STATE_STOPPED) {
- return;
- }
- mState = STATE_SHOULD_STOP;
- try {
- // Inject stop task into the queue so it knows to stop.
- mQueue.offer(STOP_TASK, RESYNC_QUEUE_TIMEOUT_MS, TimeUnit.MILLISECONDS);
- while (mState != STATE_STOPPED) {
- wait();
- }
- } catch (InterruptedException e) {
- throw new RepositoryException(e);
- }
- }
+ return task;
}
}