diff options
Diffstat (limited to 'src/main/java/com/amazon')
| -rw-r--r-- | src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java | 221 | 
1 files changed, 85 insertions, 136 deletions
| diff --git a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java index 16a33bd..11212c4 100644 --- a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java +++ b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java @@ -21,10 +21,6 @@ import java.util.Comparator;  import java.util.LinkedHashSet;
  import java.util.Set;
 -import java.util.concurrent.ArrayBlockingQueue;
 -import java.util.concurrent.BlockingQueue;
 -import java.util.concurrent.TimeUnit;
 -
  import org.apache.commons.logging.LogFactory;
  import org.cojen.util.BeanComparator;
 @@ -70,9 +66,12 @@ class ReplicatedRepository                 ShutdownCapability,
                 StorableInfoCapability
  {
 -    // Constants used by resync method.
 -    private static final int RESYNC_QUEUE_SIZE = 1000;
 -    private static final long RESYNC_QUEUE_TIMEOUT_MS = 30000;
 +    // Maximum number of resync updates to replica per transaction.
 +    private static final int RESYNC_BATCH_SIZE = 10;
 +
 +    // Commit resync replica transaction early if this many records
 +    // scanned. Otherwise, write locks may be held for a very long time.
 +    private static final int RESYNC_WATERMARK = 100;
      /**
       * Utility method to select the natural ordering of a storage, by looking
 @@ -354,9 +353,11 @@ class ReplicatedRepository          }
          // Order both queries the same so that they can be run in parallel.
 -        String[] orderBy = selectNaturalOrder(mMasterRepository, type);
 +        // Favor natural order of replica, since cursors may be opened and
 +        // re-opened on it.
 +        String[] orderBy = selectNaturalOrder(mReplicaRepository, type);
          if (orderBy == null) {
 -            orderBy = selectNaturalOrder(mReplicaRepository, type);
 +            orderBy = selectNaturalOrder(mMasterRepository, type);
              if (orderBy == null) {
                  Set<String> pkSet =
                      StorableIntrospector.examine(type).getPrimaryKeyProperties().keySet();
 @@ -384,42 +385,41 @@ class ReplicatedRepository              throttle = new Throttle(50);
          }
 -        Cursor<S> replicaCursor = replicaQuery.fetch();
 +        Transaction replicaTxn = mReplicaRepository.enterTransaction();
          try {
 -            Cursor<S> masterCursor = masterQuery.fetch();
 -            try {
 -                resync(trigger,
 -                       replicaCursor,
 -                       masterCursor,
 -                       throttle, desiredSpeed,
 -                       bc);
 -            } finally {
 -                masterCursor.close();
 -            }
 +            replicaTxn.setForUpdate(true);
 +
 +            resync(trigger,
 +                   replicaStorage, replicaQuery,
 +                   masterStorage, masterQuery,
 +                   throttle, desiredSpeed,
 +                   bc, replicaTxn);
 +
 +            replicaTxn.commit();
          } finally {
 -            replicaCursor.close();
 +            replicaTxn.exit();
          }
      }
      @SuppressWarnings("unchecked")
      private <S extends Storable> void resync(ReplicationTrigger<S> trigger,
 -                                             Cursor<S> replicaCursor,
 -                                             Cursor<S> masterCursor,
 +                                             Storage<S> replicaStorage, Query<S> replicaQuery,
 +                                             Storage<S> masterStorage, Query<S> masterQuery,
                                               Throttle throttle, double desiredSpeed,
 -                                             Comparator comparator)
 +                                             Comparator comparator, Transaction replicaTxn)
          throws RepositoryException
      {
 -        // Enqueue resyncs to a separate thread since open cursors hold locks
 -        // on currently referenced entries.
 -        BlockingQueue<Runnable> resyncQueue =
 -            new ArrayBlockingQueue<Runnable>(RESYNC_QUEUE_SIZE, true);
 -        ResyncThread resyncThread = new ResyncThread(resyncQueue);
 -        resyncThread.start();
 +        Cursor<S> replicaCursor = null;
 +        Cursor<S> masterCursor = null;
          try {
 +            replicaCursor = replicaQuery.fetch();
 +            masterCursor = masterQuery.fetch();
 +
              S replicaEntry = null;
              S masterEntry = null;
 -
 +            
 +            int count = 0, txnCount = 0;
              while (true) {
                  if (throttle != null) {
                      try {
 @@ -430,25 +430,43 @@ class ReplicatedRepository                      }
                  }
 -                if (replicaEntry == null && replicaCursor.hasNext()) {
 +                if (replicaEntry == null && replicaCursor != null && replicaCursor.hasNext()) {
                      replicaEntry = replicaCursor.next();
                  }
 +                
 +                if (count++ >= RESYNC_WATERMARK || txnCount >= RESYNC_BATCH_SIZE) {
 +                    replicaTxn.commit();
 +                    if (replicaCursor != null) {
 +                        // Cursor should auto-close after txn commit, but force
 +                        // a close anyhow. Cursor is re-opened when it is
 +                        // allowed to advance.
 +                        replicaCursor.close();
 +                        replicaCursor = null;
 +                    }
 +                    count = 0;
 +                    txnCount = 0;
 +                }
                  if (masterEntry == null && masterCursor.hasNext()) {
                      masterEntry = masterCursor.next();
                  }
 +                Runnable resyncTask = null;
 +
                  // Comparator should treat null as high.
                  int compare = comparator.compare(replicaEntry, masterEntry);
                  if (compare < 0) {
 -                    // Bogus exists only in replica so delete it.
 -                    resyncThread.addResyncTask(trigger, replicaEntry, null);
 +                    // Bogus record exists only in replica so delete it.
 +                    resyncTask = prepareResyncTask(trigger, replicaEntry, null);
                      // Allow replica to advance.
 +                    if (replicaCursor == null) {
 +                        replicaCursor = replicaQuery.fetchAfter(replicaEntry);
 +                    }
                      replicaEntry = null;
                  } else if (compare > 0) {
                      // Replica cursor is missing an entry so copy it.
 -                    resyncThread.addResyncTask(trigger, null, masterEntry);
 +                    resyncTask = prepareResyncTask(trigger, null, masterEntry);
                      // Allow master to advance.
                      masterEntry = null;
                  } else {
 @@ -459,126 +477,57 @@ class ReplicatedRepository                      if (!replicaEntry.equalProperties(masterEntry)) {
                          // Replica is stale.
 -                        resyncThread.addResyncTask(trigger, replicaEntry, masterEntry);
 +                        resyncTask = prepareResyncTask(trigger, replicaEntry, masterEntry);
                      }
                      // Entries are synchronized so allow both cursors to advance.
 +                    if (replicaCursor == null) {
 +                        replicaCursor = replicaQuery.fetchAfter(replicaEntry);
 +                    }
                      replicaEntry = null;
                      masterEntry = null;
                  }
 +
 +                if (resyncTask != null) {
 +                    txnCount++;
 +                    resyncTask.run();
 +                }
              }
          } finally {
 -            resyncThread.waitUntilDone();
 -        }
 -    }
 -
 -    // TODO: Use TaskQueueThread
 -
 -    private static class ResyncThread extends Thread {
 -        private static final int
 -            STATE_RUNNING = 0,
 -            STATE_SHOULD_STOP = 1,
 -            STATE_STOPPED = 2;
 -
 -        private static final Runnable STOP_TASK = new Runnable() {public void run() {}};
 -
 -        private final BlockingQueue<Runnable> mQueue;
 -
 -        private int mState = STATE_RUNNING;
 -
 -        ResyncThread(BlockingQueue<Runnable> queue) {
 -            super("ReplicatedRepository Resync");
 -            mQueue = queue;
 -        }
 -
 -        public void run() {
              try {
 -                while (true) {
 -                    boolean isStopping;
 -                    synchronized (this) {
 -                        isStopping = mState != STATE_RUNNING;
 -                    }
 -
 -                    Runnable task;
 -                    if (isStopping) {
 -                        // Poll the queue so this thread doesn't block when it
 -                        // should be stopping.
 -                        task = mQueue.poll();
 -                    } else {
 -                        try {
 -                            task = mQueue.take();
 -                        } catch (InterruptedException e) {
 -                            break;
 -                        }
 -                    }
 -
 -                    if (task == null || task == STOP_TASK) {
 -                        // Marker to indicate we should stop.
 -                        break;
 -                    }
 -
 -                    task.run();
 +                if (masterCursor != null) {
 +                    masterCursor.close();
                  }
              } finally {
 -                synchronized (this) {
 -                    mState = STATE_STOPPED;
 -                    notifyAll();
 +                if (replicaCursor != null) {
 +                    replicaCursor.close();
                  }
              }
          }
 +    }
 -        <S extends Storable> void addResyncTask(final ReplicationTrigger<S> trigger,
 -                                                final S replicaEntry,
 -                                                final S masterEntry)
 -            throws RepositoryException
 -        {
 -            if (replicaEntry == null && masterEntry == null) {
 -                // If both are null, then there's nothing to do, is there?
 -                // Note: Caller shouldn't have passed double nulls to
 -                // addResyncTask in the first place.
 -                return;
 -            }
 -
 -            Runnable task = new Runnable() {
 -                public void run() {
 -                    try {
 -                        trigger.resyncEntries(replicaEntry, masterEntry);
 -                    } catch (Exception e) {
 -                        LogFactory.getLog(ReplicatedRepository.class).error(null, e);
 -                    }
 -                }
 -            };
 -
 -            addResyncTask(task);
 +    private <S extends Storable> Runnable prepareResyncTask(final ReplicationTrigger<S> trigger,
 +                                                            final S replicaEntry,
 +                                                            final S masterEntry)
 +        throws RepositoryException
 +    {
 +        if (replicaEntry == null && masterEntry == null) {
 +            // If both are null, then there's nothing to do, is there?
 +            // Note: Caller shouldn't have passed double nulls to
 +            // prepareResyncTask in the first place.
 +            return null;
          }
 -
 -        <S extends Storable> void addResyncTask(Runnable task)
 -            throws RepositoryException
 -        {
 -            try {
 -                if (!mQueue.offer(task, RESYNC_QUEUE_TIMEOUT_MS, TimeUnit.MILLISECONDS)) {
 -                    throw new RepositoryException("Unable to enqueue resync task");
 +        Runnable task = new Runnable() {
 +            public void run() {
 +                try {
 +                    trigger.resyncEntries(replicaEntry, masterEntry);
 +                } catch (Exception e) {
 +                    LogFactory.getLog(ReplicatedRepository.class).error(null, e);
                  }
 -            } catch (InterruptedException e) {
 -                throw new RepositoryException(e);
              }
 -        }
 +        };
 -        synchronized void waitUntilDone() throws RepositoryException {
 -            if (mState == STATE_STOPPED) {
 -                return;
 -            }
 -            mState = STATE_SHOULD_STOP;
 -            try {
 -                // Inject stop task into the queue so it knows to stop.
 -                mQueue.offer(STOP_TASK, RESYNC_QUEUE_TIMEOUT_MS, TimeUnit.MILLISECONDS);
 -                while (mState != STATE_STOPPED) {
 -                    wait();
 -                }
 -            } catch (InterruptedException e) {
 -                throw new RepositoryException(e);
 -            }
 -        }
 +        return task;
      }
  }
 | 
