diff options
| author | Brian S. O'Neill <bronee@gmail.com> | 2008-06-06 00:18:14 +0000 | 
|---|---|---|
| committer | Brian S. O'Neill <bronee@gmail.com> | 2008-06-06 00:18:14 +0000 | 
| commit | a6aa4d5279c2a7ab9aa87e80488e90cafae69e69 (patch) | |
| tree | 7206329c3038a929fcc77b862a4e3c4a05cf1f5b /src/main | |
| parent | 25dad57d68a1f6fb56bac9f3afe166409cf96cff (diff) | |
Added listener callback for replicated repository resync.
Diffstat (limited to 'src/main')
3 files changed, 118 insertions, 10 deletions
| diff --git a/src/main/java/com/amazon/carbonado/capability/ResyncCapability.java b/src/main/java/com/amazon/carbonado/capability/ResyncCapability.java index 502fc5c..337afb0 100644 --- a/src/main/java/com/amazon/carbonado/capability/ResyncCapability.java +++ b/src/main/java/com/amazon/carbonado/capability/ResyncCapability.java @@ -46,8 +46,63 @@ public interface ResyncCapability extends Capability {          throws RepositoryException;
      /**
 +     * Re-synchronizes replicated storables against the master repository.
 +     *
 +     * @param type type of storable to re-sync
 +     * @param listener optional listener which gets notified as storables are re-sync'd
 +     * @param desiredSpeed throttling parameter - 1.0 = full speed, 0.5 = half
 +     * speed, 0.1 = one-tenth speed, etc
 +     * @param filter optional query filter to limit which objects get re-sync'ed
 +     * @param filterValues filter values for optional filter
 +     * @since 1.2
 +     */
 +    <S extends Storable> void resync(Class<S> type,
 +                                     Listener<? super S> listener,
 +                                     double desiredSpeed,
 +                                     String filter,
 +                                     Object... filterValues)
 +        throws RepositoryException;
 +
 +    /**
       * Returns the immediate master Repository, for manual comparison. Direct
       * updates to the master will likely create inconsistencies.
       */
      Repository getMasterRepository();
 +
 +    /**
 +     * Defines callbacks which are invoked as storables get re-sync'd. The
 +     * callback is invoked in the scope of the resync transaction. If any
 +     * exception is thrown, the immediate changes are rolled back and the
 +     * entire repository resync operation is aborted.
 +     *
 +     * <p>The listener implementation should return quickly from the callback
 +     * methods, to avoid lingering transactions. If the listener is used to
 +     * invoke special repair operations, they should be placed into a task
 +     * queue. A separate thread can then perform the repairs outside the resync
 +     * transaction.
 +     */
 +    public static interface Listener<S> {
 +        /**
 +         * Called when a storable was inserted as part of a resync.
 +         *
 +         * @param newStorable storable which was inserted, never null
 +         */
 +        void inserted(S newStorable);
 +
 +        /**
 +         * Called when a storable was updated as part of a resync. Both old and
 +         * new storables have a matching primary key.
 +         *
 +         * @param oldStorable storable which was deleted, never null
 +         * @param newStorable storable which was inserted, never null
 +         */
 +        void updated(S oldStorable, S newStorable);
 +
 +        /**
 +         * Called when a storable was deleted as part of a resync.
 +         *
 +         * @param oldStorable storable which was deleted, never null
 +         */
 +        void deleted(S oldStorable);
 +    }
  }
 diff --git a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java index 55a1710..5aa4243 100644 --- a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java +++ b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java @@ -366,6 +366,28 @@ class ReplicatedRepository                                              Object... filterValues)
          throws RepositoryException
      {
 +        resync(type, null, desiredSpeed, filter, filterValues);
 +    }
 +
 +    /**
 +     * Repairs replicated storables by synchronizing the replica repository
 +     * against the master repository.
 +     *
 +     * @param type type of storable to re-sync
 +     * @param listener optional listener which gets notified as storables are re-sync'd
 +     * @param desiredSpeed throttling parameter - 1.0 = full speed, 0.5 = half
 +     * speed, 0.1 = one-tenth speed, etc
 +     * @param filter optional query filter to limit which objects get re-sync'ed
 +     * @param filterValues filter values for optional filter
 +     * @since 1.2
 +     */
 +    public <S extends Storable> void resync(Class<S> type,
 +                                            ResyncCapability.Listener<? super S> listener,
 +                                            double desiredSpeed,
 +                                            String filter,
 +                                            Object... filterValues)
 +        throws RepositoryException
 +    {
          ReplicationTrigger<S> replicationTrigger;
          if (storageFor(type) instanceof ReplicatedStorage) {
              replicationTrigger = ((ReplicatedStorage) storageFor(type)).getReplicationTrigger();
 @@ -422,6 +444,7 @@ class ReplicatedRepository              resync(replicationTrigger,
                     replicaStorage, replicaQuery,
                     masterStorage, masterQuery,
 +                   listener,
                     throttle, desiredSpeed,
                     comparator, replicaTxn);
 @@ -435,6 +458,7 @@ class ReplicatedRepository      private <S extends Storable> void resync(ReplicationTrigger<S> replicationTrigger,
                                               Storage<S> replicaStorage, Query<S> replicaQuery,
                                               Storage<S> masterStorage, Query<S> masterQuery,
 +                                             ResyncCapability.Listener<? super S> listener,
                                               Throttle throttle, double desiredSpeed,
                                               Comparator comparator, Transaction replicaTxn)
          throws RepositoryException
 @@ -556,7 +580,8 @@ class ReplicatedRepository                  if (compare < 0) {
                      // Bogus record exists only in replica so delete it.
 -                    resyncTask = prepareResyncTask(replicationTrigger, replicaEntry, null);
 +                    resyncTask = prepareResyncTask
 +                        (replicationTrigger, listener, replicaEntry, null);
                      // Allow replica to advance.
                      if (replicaCursor == null) {
                          replicaCursor = replicaQuery.fetchAfter(replicaEntry);
 @@ -565,7 +590,8 @@ class ReplicatedRepository                      replicaEntry = null;
                  } else if (compare > 0) {
                      // Replica cursor is missing an entry so copy it.
 -                    resyncTask = prepareResyncTask(replicationTrigger, null, masterEntry);
 +                    resyncTask = prepareResyncTask
 +                        (replicationTrigger, listener, null, masterEntry);
                      // Allow master to advance.
                      masterEntry = null;
                  } else {
 @@ -581,7 +607,7 @@ class ReplicatedRepository                      if (!replicaEntry.equalProperties(masterEntry)) {
                          // Replica is stale.
                          resyncTask = prepareResyncTask
 -                            (replicationTrigger, replicaEntry, masterEntry);
 +                            (replicationTrigger, listener, replicaEntry, masterEntry);
                      }
                      // Entries are synchronized so allow both cursors to advance.
 @@ -613,6 +639,7 @@ class ReplicatedRepository      private <S extends Storable> Runnable prepareResyncTask
                         (final ReplicationTrigger<S> replicationTrigger,
 +                        final ResyncCapability.Listener<? super S> listener,
                          final S replicaEntry,
                          final S masterEntry)
          throws RepositoryException
 @@ -627,7 +654,7 @@ class ReplicatedRepository          Runnable task = new Runnable() {
              public void run() {
                  try {
 -                    replicationTrigger.resyncEntries(replicaEntry, masterEntry);
 +                    replicationTrigger.resyncEntries(listener, replicaEntry, masterEntry);
                  } catch (Exception e) {
                      LogFactory.getLog(ReplicatedRepository.class).error(null, e);
                  }
 diff --git a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java index bdacca4..8cf7007 100644 --- a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java +++ b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java @@ -32,6 +32,8 @@ import com.amazon.carbonado.Transaction;  import com.amazon.carbonado.Trigger;
  import com.amazon.carbonado.UniqueConstraintException;
 +import com.amazon.carbonado.capability.ResyncCapability;
 +
  import com.amazon.carbonado.spi.RepairExecutor;
  import com.amazon.carbonado.spi.TriggerManager;
 @@ -198,10 +200,14 @@ class ReplicationTrigger<S extends Storable> extends Trigger<S> {       * Re-sync the replica to the master. The primary keys of both entries are
       * assumed to match.
       *
 +     * @param listener optional
       * @param replicaEntry current replica entry, or null if none
       * @param masterEntry current master entry, or null if none
       */
 -    void resyncEntries(S replicaEntry, S masterEntry) throws FetchException, PersistException {
 +    void resyncEntries(ResyncCapability.Listener<? super S> listener,
 +                       S replicaEntry, S masterEntry)
 +        throws FetchException, PersistException
 +    {
          if (replicaEntry == null && masterEntry == null) {
              return;
          }
 @@ -212,6 +218,9 @@ class ReplicationTrigger<S extends Storable> extends Trigger<S> {          try {
              Transaction txn = mRepository.enterTransaction();
              try {
 +                S newReplicaEntry = null;
 +
 +                // Delete old entry.
                  if (replicaEntry != null) {
                      if (masterEntry == null) {
                          log.info("Deleting bogus replica entry: " + replicaEntry);
 @@ -222,11 +231,13 @@ class ReplicationTrigger<S extends Storable> extends Trigger<S> {                          log.error("Unable to delete replica entry: " + replicaEntry, e);
                          if (masterEntry != null) {
                              // Try to update instead.
 -                            S newReplicaEntry = mReplicaStorage.prepare();
 +                            newReplicaEntry = mReplicaStorage.prepare();
                              transferToReplicaEntry(replicaEntry, masterEntry, newReplicaEntry);
                              log.info("Replacing corrupt replica entry with: " + newReplicaEntry);
                              try {
                                  newReplicaEntry.update();
 +                                // This disables the insert step, which is not needed now.
 +                                masterEntry = null;
                              } catch (PersistException e2) {
                                  log.error("Unable to update replica entry: " + replicaEntry, e2);
                                  return;
 @@ -234,8 +245,10 @@ class ReplicationTrigger<S extends Storable> extends Trigger<S> {                          }
                      }
                  }
 +
 +                // Insert new entry.
                  if (masterEntry != null) {
 -                    S newReplicaEntry = mReplicaStorage.prepare();
 +                    newReplicaEntry = mReplicaStorage.prepare();
                      if (replicaEntry == null) {
                          masterEntry.copyAllProperties(newReplicaEntry);
                          log.info("Adding missing replica entry: " + newReplicaEntry);
 @@ -252,6 +265,19 @@ class ReplicationTrigger<S extends Storable> extends Trigger<S> {                          newReplicaEntry.tryInsert();
                      }
                  }
 +
 +                if (listener != null) {
 +                    if (replicaEntry == null) {
 +                        if (newReplicaEntry != null) {
 +                            listener.inserted(newReplicaEntry);
 +                        }
 +                    } else if (newReplicaEntry != null) {
 +                        listener.updated(replicaEntry, newReplicaEntry);
 +                    } else {
 +                        listener.deleted(replicaEntry);
 +                    }
 +                }
 +
                  txn.commit();
              } finally {
                  txn.exit();
 @@ -319,12 +345,12 @@ class ReplicationTrigger<S extends Storable> extends Trigger<S> {                          txn.setForUpdate(true);
                          if (finalReplica.tryLoad()) {
                              if (finalMaster.tryLoad()) {
 -                                resyncEntries(finalReplica, finalMaster);
 +                                resyncEntries(null, finalReplica, finalMaster);
                              } else {
 -                                resyncEntries(finalReplica, null);
 +                                resyncEntries(null, finalReplica, null);
                              }
                          } else if (finalMaster.tryLoad()) {
 -                            resyncEntries(null, finalMaster);
 +                            resyncEntries(null, null, finalMaster);
                          }
                          txn.commit();
                      } finally {
 | 
