From a6037f0de6b010b8970ac55f19dd2a054134b509 Mon Sep 17 00:00:00 2001 From: "Brian S. O'Neill" Date: Tue, 31 Oct 2006 18:31:16 +0000 Subject: Implement resync without using a separate thread and queue, avoiding deadlock conditions. --- .../repo/replicated/ReplicatedRepository.java | 221 ++++++++------------- 1 file changed, 85 insertions(+), 136 deletions(-) (limited to 'src') diff --git a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java index 16a33bd..11212c4 100644 --- a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java +++ b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java @@ -21,10 +21,6 @@ import java.util.Comparator; import java.util.LinkedHashSet; import java.util.Set; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.TimeUnit; - import org.apache.commons.logging.LogFactory; import org.cojen.util.BeanComparator; @@ -70,9 +66,12 @@ class ReplicatedRepository ShutdownCapability, StorableInfoCapability { - // Constants used by resync method. - private static final int RESYNC_QUEUE_SIZE = 1000; - private static final long RESYNC_QUEUE_TIMEOUT_MS = 30000; + // Maximum number of resync updates to replica per transaction. + private static final int RESYNC_BATCH_SIZE = 10; + + // Commit resync replica transaction early if this many records + // scanned. Otherwise, write locks may be held for a very long time. + private static final int RESYNC_WATERMARK = 100; /** * Utility method to select the natural ordering of a storage, by looking @@ -354,9 +353,11 @@ class ReplicatedRepository } // Order both queries the same so that they can be run in parallel. - String[] orderBy = selectNaturalOrder(mMasterRepository, type); + // Favor natural order of replica, since cursors may be opened and + // re-opened on it. + String[] orderBy = selectNaturalOrder(mReplicaRepository, type); if (orderBy == null) { - orderBy = selectNaturalOrder(mReplicaRepository, type); + orderBy = selectNaturalOrder(mMasterRepository, type); if (orderBy == null) { Set pkSet = StorableIntrospector.examine(type).getPrimaryKeyProperties().keySet(); @@ -384,42 +385,41 @@ class ReplicatedRepository throttle = new Throttle(50); } - Cursor replicaCursor = replicaQuery.fetch(); + Transaction replicaTxn = mReplicaRepository.enterTransaction(); try { - Cursor masterCursor = masterQuery.fetch(); - try { - resync(trigger, - replicaCursor, - masterCursor, - throttle, desiredSpeed, - bc); - } finally { - masterCursor.close(); - } + replicaTxn.setForUpdate(true); + + resync(trigger, + replicaStorage, replicaQuery, + masterStorage, masterQuery, + throttle, desiredSpeed, + bc, replicaTxn); + + replicaTxn.commit(); } finally { - replicaCursor.close(); + replicaTxn.exit(); } } @SuppressWarnings("unchecked") private void resync(ReplicationTrigger trigger, - Cursor replicaCursor, - Cursor masterCursor, + Storage replicaStorage, Query replicaQuery, + Storage masterStorage, Query masterQuery, Throttle throttle, double desiredSpeed, - Comparator comparator) + Comparator comparator, Transaction replicaTxn) throws RepositoryException { - // Enqueue resyncs to a separate thread since open cursors hold locks - // on currently referenced entries. - BlockingQueue resyncQueue = - new ArrayBlockingQueue(RESYNC_QUEUE_SIZE, true); - ResyncThread resyncThread = new ResyncThread(resyncQueue); - resyncThread.start(); + Cursor replicaCursor = null; + Cursor masterCursor = null; try { + replicaCursor = replicaQuery.fetch(); + masterCursor = masterQuery.fetch(); + S replicaEntry = null; S masterEntry = null; - + + int count = 0, txnCount = 0; while (true) { if (throttle != null) { try { @@ -430,25 +430,43 @@ class ReplicatedRepository } } - if (replicaEntry == null && replicaCursor.hasNext()) { + if (replicaEntry == null && replicaCursor != null && replicaCursor.hasNext()) { replicaEntry = replicaCursor.next(); } + + if (count++ >= RESYNC_WATERMARK || txnCount >= RESYNC_BATCH_SIZE) { + replicaTxn.commit(); + if (replicaCursor != null) { + // Cursor should auto-close after txn commit, but force + // a close anyhow. Cursor is re-opened when it is + // allowed to advance. + replicaCursor.close(); + replicaCursor = null; + } + count = 0; + txnCount = 0; + } if (masterEntry == null && masterCursor.hasNext()) { masterEntry = masterCursor.next(); } + Runnable resyncTask = null; + // Comparator should treat null as high. int compare = comparator.compare(replicaEntry, masterEntry); if (compare < 0) { - // Bogus exists only in replica so delete it. - resyncThread.addResyncTask(trigger, replicaEntry, null); + // Bogus record exists only in replica so delete it. + resyncTask = prepareResyncTask(trigger, replicaEntry, null); // Allow replica to advance. + if (replicaCursor == null) { + replicaCursor = replicaQuery.fetchAfter(replicaEntry); + } replicaEntry = null; } else if (compare > 0) { // Replica cursor is missing an entry so copy it. - resyncThread.addResyncTask(trigger, null, masterEntry); + resyncTask = prepareResyncTask(trigger, null, masterEntry); // Allow master to advance. masterEntry = null; } else { @@ -459,126 +477,57 @@ class ReplicatedRepository if (!replicaEntry.equalProperties(masterEntry)) { // Replica is stale. - resyncThread.addResyncTask(trigger, replicaEntry, masterEntry); + resyncTask = prepareResyncTask(trigger, replicaEntry, masterEntry); } // Entries are synchronized so allow both cursors to advance. + if (replicaCursor == null) { + replicaCursor = replicaQuery.fetchAfter(replicaEntry); + } replicaEntry = null; masterEntry = null; } + + if (resyncTask != null) { + txnCount++; + resyncTask.run(); + } } } finally { - resyncThread.waitUntilDone(); - } - } - - // TODO: Use TaskQueueThread - - private static class ResyncThread extends Thread { - private static final int - STATE_RUNNING = 0, - STATE_SHOULD_STOP = 1, - STATE_STOPPED = 2; - - private static final Runnable STOP_TASK = new Runnable() {public void run() {}}; - - private final BlockingQueue mQueue; - - private int mState = STATE_RUNNING; - - ResyncThread(BlockingQueue queue) { - super("ReplicatedRepository Resync"); - mQueue = queue; - } - - public void run() { try { - while (true) { - boolean isStopping; - synchronized (this) { - isStopping = mState != STATE_RUNNING; - } - - Runnable task; - if (isStopping) { - // Poll the queue so this thread doesn't block when it - // should be stopping. - task = mQueue.poll(); - } else { - try { - task = mQueue.take(); - } catch (InterruptedException e) { - break; - } - } - - if (task == null || task == STOP_TASK) { - // Marker to indicate we should stop. - break; - } - - task.run(); + if (masterCursor != null) { + masterCursor.close(); } } finally { - synchronized (this) { - mState = STATE_STOPPED; - notifyAll(); + if (replicaCursor != null) { + replicaCursor.close(); } } } + } - void addResyncTask(final ReplicationTrigger trigger, - final S replicaEntry, - final S masterEntry) - throws RepositoryException - { - if (replicaEntry == null && masterEntry == null) { - // If both are null, then there's nothing to do, is there? - // Note: Caller shouldn't have passed double nulls to - // addResyncTask in the first place. - return; - } - - Runnable task = new Runnable() { - public void run() { - try { - trigger.resyncEntries(replicaEntry, masterEntry); - } catch (Exception e) { - LogFactory.getLog(ReplicatedRepository.class).error(null, e); - } - } - }; - - addResyncTask(task); + private Runnable prepareResyncTask(final ReplicationTrigger trigger, + final S replicaEntry, + final S masterEntry) + throws RepositoryException + { + if (replicaEntry == null && masterEntry == null) { + // If both are null, then there's nothing to do, is there? + // Note: Caller shouldn't have passed double nulls to + // prepareResyncTask in the first place. + return null; } - - void addResyncTask(Runnable task) - throws RepositoryException - { - try { - if (!mQueue.offer(task, RESYNC_QUEUE_TIMEOUT_MS, TimeUnit.MILLISECONDS)) { - throw new RepositoryException("Unable to enqueue resync task"); + Runnable task = new Runnable() { + public void run() { + try { + trigger.resyncEntries(replicaEntry, masterEntry); + } catch (Exception e) { + LogFactory.getLog(ReplicatedRepository.class).error(null, e); } - } catch (InterruptedException e) { - throw new RepositoryException(e); } - } + }; - synchronized void waitUntilDone() throws RepositoryException { - if (mState == STATE_STOPPED) { - return; - } - mState = STATE_SHOULD_STOP; - try { - // Inject stop task into the queue so it knows to stop. - mQueue.offer(STOP_TASK, RESYNC_QUEUE_TIMEOUT_MS, TimeUnit.MILLISECONDS); - while (mState != STATE_STOPPED) { - wait(); - } - } catch (InterruptedException e) { - throw new RepositoryException(e); - } - } + return task; } } -- cgit v1.2.3