diff options
Diffstat (limited to 'src/main/java')
3 files changed, 110 insertions, 71 deletions
| diff --git a/src/main/java/com/amazon/carbonado/capability/ResyncCapability.java b/src/main/java/com/amazon/carbonado/capability/ResyncCapability.java index 337afb0..33efca7 100644 --- a/src/main/java/com/amazon/carbonado/capability/ResyncCapability.java +++ b/src/main/java/com/amazon/carbonado/capability/ResyncCapability.java @@ -18,9 +18,11 @@  package com.amazon.carbonado.capability;
 +import com.amazon.carbonado.PersistException;
  import com.amazon.carbonado.Repository;
  import com.amazon.carbonado.RepositoryException;
  import com.amazon.carbonado.Storable;
 +import com.amazon.carbonado.Trigger;
  /**
   * Capability of replicating repositories for re-synchronizing to the master
 @@ -70,8 +72,8 @@ public interface ResyncCapability extends Capability {      Repository getMasterRepository();
      /**
 -     * Defines callbacks which are invoked as storables get re-sync'd. The
 -     * callback is invoked in the scope of the resync transaction. If any
 +     * Trigger which is invoked as storables get re-sync'd. Callbacks are
 +     * invoked in the scope of the resync transaction. If any unchecked
       * exception is thrown, the immediate changes are rolled back and the
       * entire repository resync operation is aborted.
       *
 @@ -81,28 +83,18 @@ public interface ResyncCapability extends Capability {       * queue. A separate thread can then perform the repairs outside the resync
       * transaction.
       */
 -    public static interface Listener<S> {
 +    public static class Listener<S> extends Trigger<S> {
          /**
 -         * Called when a storable was inserted as part of a resync.
 +         * Overloaded version of beforeUpdate method which is passed the
 +         * storable in it's out-of-sync and sync'd states. The default
 +         * implementation calls the inherited beforeUpdate method, only passing
 +         * the newly sync'd storable.
           *
 -         * @param newStorable storable which was inserted, never null
 +         * @param oldStorable storable prior to being sync'd
 +         * @param newStorable sync'd storable
           */
 -        void inserted(S newStorable);
 -
 -        /**
 -         * Called when a storable was updated as part of a resync. Both old and
 -         * new storables have a matching primary key.
 -         *
 -         * @param oldStorable storable which was deleted, never null
 -         * @param newStorable storable which was inserted, never null
 -         */
 -        void updated(S oldStorable, S newStorable);
 -
 -        /**
 -         * Called when a storable was deleted as part of a resync.
 -         *
 -         * @param oldStorable storable which was deleted, never null
 -         */
 -        void deleted(S oldStorable);
 +        public Object beforeUpdate(S oldStorable, S newStorable) throws PersistException {
 +            return beforeUpdate(newStorable);
 +        }
      }
  }
 diff --git a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java index 51dbba8..8f0e45d 100644 --- a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java +++ b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java @@ -493,6 +493,9 @@ class ReplicatedRepository                      while (replicaCursor.hasNext()) {
                          try {
                              replicaEntry = replicaCursor.next();
 +                            if (listener != null) {
 +                                listener.afterLoad(replicaEntry);
 +                            }
                              if (skippedCount > 0) {
                                  if (skippedCount == 1) {
                                      log.warn("Skipped corrupt replica entry before this one: " +
 @@ -546,6 +549,9 @@ class ReplicatedRepository                                      if (replicaWithKeyOnly != null) {
                                          // Try to update entry which could not be deleted.
                                          replicaEntry = replicaWithKeyOnly;
 +                                        if (listener != null) {
 +                                            listener.afterLoad(replicaEntry);
 +                                        }
                                          break;
                                      }
                                  } catch (FetchException e2) {
 diff --git a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java index 29af101..dd1d78b 100644 --- a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java +++ b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java @@ -37,6 +37,8 @@ import com.amazon.carbonado.capability.ResyncCapability;  import com.amazon.carbonado.spi.RepairExecutor;
  import com.amazon.carbonado.spi.TriggerManager;
 +import com.amazon.carbonado.util.ThrowUnchecked;
 +
  /**
   * All inserts/updates/deletes are first committed to the master storage, then
   * duplicated and committed to the replica.
 @@ -228,67 +230,86 @@ class ReplicationTrigger<S extends Storable> extends Trigger<S> {          try {
              Transaction txn = mRepository.enterTransaction();
              try {
 -                S newReplicaEntry = null;
 -
 -                // Delete old entry.
 -                if (replicaEntry != null) {
 -                    if (masterEntry == null) {
 -                        log.info("Deleting bogus replica entry: " + replicaEntry);
 -                    }
 -                    try {
 -                        replicaEntry.tryDelete();
 -                    } catch (PersistException e) {
 -                        log.error("Unable to delete replica entry: " + replicaEntry, e);
 -                        if (masterEntry != null) {
 -                            // Try to update instead.
 -                            newReplicaEntry = mReplicaStorage.prepare();
 -                            transferToReplicaEntry(replicaEntry, masterEntry, newReplicaEntry);
 -                            log.info("Replacing corrupt replica entry with: " + newReplicaEntry);
 -                            try {
 -                                newReplicaEntry.update();
 -                                // This disables the insert step, which is not needed now.
 -                                masterEntry = null;
 -                            } catch (PersistException e2) {
 -                                log.error("Unable to update replica entry: " + replicaEntry, e2);
 -                                return;
 -                            }
 -                        }
 +                final S newReplicaEntry;
 +                if (replicaEntry == null) {
 +                    newReplicaEntry = mReplicaStorage.prepare();
 +                    masterEntry.copyAllProperties(newReplicaEntry);
 +                    log.info("Inserting missing replica entry: " + newReplicaEntry);
 +                } else if (masterEntry != null) {
 +                    if (replicaEntry.equalProperties(masterEntry)) {
 +                        return;
                      }
 +                    newReplicaEntry = mReplicaStorage.prepare();
 +                    transferToReplicaEntry(replicaEntry, masterEntry, newReplicaEntry);
 +                    log.info("Updating stale replica entry with: " + newReplicaEntry);
 +                } else {
 +                    newReplicaEntry = null;
 +                    log.info("Deleting bogus replica entry: " + replicaEntry);
                  }
 -                // Insert new entry.
 -                if (masterEntry != null) {
 -                    newReplicaEntry = mReplicaStorage.prepare();
 +                final Object state;
 +                if (listener == null) {
 +                    state = null;
 +                } else {
                      if (replicaEntry == null) {
 -                        masterEntry.copyAllProperties(newReplicaEntry);
 -                        log.info("Adding missing replica entry: " + newReplicaEntry);
 +                        state = listener.beforeInsert(newReplicaEntry);
 +                    } else if (masterEntry != null) {
 +                        state = listener.beforeUpdate(replicaEntry, newReplicaEntry);
                      } else {
 -                        if (replicaEntry.equalProperties(masterEntry)) {
 -                            return;
 +                        state = listener.beforeDelete(replicaEntry);
 +                    }
 +                }
 +
 +                try {
 +                    // Delete old entry.
 +                    if (replicaEntry != null) {
 +                        try {
 +                            replicaEntry.tryDelete();
 +                        } catch (PersistException e) {
 +                            log.error("Unable to delete replica entry: " + replicaEntry, e);
 +                            if (masterEntry != null) {
 +                                // Try to update instead.
 +                                log.info("Updating corrupt replica entry with: " +
 +                                         newReplicaEntry);
 +                                try {
 +                                    newReplicaEntry.update();
 +                                    // This disables the insert step, which is not needed now.
 +                                    masterEntry = null;
 +                                } catch (PersistException e2) {
 +                                    log.error("Unable to update replica entry: " +
 +                                              replicaEntry, e2);
 +                                    resyncFailed(listener, replicaEntry, masterEntry,
 +                                                 newReplicaEntry, state);
 +                                    return;
 +                                }
 +                            }
                          }
 -                        transferToReplicaEntry(replicaEntry, masterEntry, newReplicaEntry);
 -                        log.info("Replacing stale replica entry with: " + newReplicaEntry);
                      }
 -                    if (!newReplicaEntry.tryInsert()) {
 -                        // Try to correct bizarre corruption.
 -                        newReplicaEntry.tryDelete();
 -                        newReplicaEntry.tryInsert();
 +
 +                    // Insert new entry.
 +                    if (masterEntry != null && newReplicaEntry != null) {
 +                        if (!newReplicaEntry.tryInsert()) {
 +                            // Try to correct bizarre corruption.
 +                            newReplicaEntry.tryDelete();
 +                            newReplicaEntry.tryInsert();
 +                        }
                      }
 -                }
 -                if (listener != null) {
 -                    if (replicaEntry == null) {
 -                        if (newReplicaEntry != null) {
 -                            listener.inserted(newReplicaEntry);
 +                    if (listener != null) {
 +                        if (replicaEntry == null) {
 +                            listener.afterInsert(newReplicaEntry, state);
 +                        } else if (masterEntry != null) {
 +                            listener.afterUpdate(newReplicaEntry, state);
 +                        } else {
 +                            listener.afterDelete(replicaEntry, state);
                          }
 -                    } else if (newReplicaEntry != null) {
 -                        listener.updated(replicaEntry, newReplicaEntry);
 -                    } else {
 -                        listener.deleted(replicaEntry);
                      }
 -                }
 -                txn.commit();
 +                    txn.commit();
 +                } catch (Throwable e) {
 +                    resyncFailed(listener, replicaEntry, masterEntry, newReplicaEntry, state);
 +                    ThrowUnchecked.fire(e);
 +                }
              } finally {
                  txn.exit();
              }
 @@ -297,6 +318,26 @@ class ReplicationTrigger<S extends Storable> extends Trigger<S> {          }
      }
 +    private void resyncFailed(ResyncCapability.Listener<? super S> listener,
 +                              S replicaEntry, S masterEntry,
 +                              S newReplicaEntry, Object state)
 +    {
 +        if (listener != null) {
 +            try {
 +                if (replicaEntry == null) {
 +                    listener.failedInsert(newReplicaEntry, state);
 +                } else if (masterEntry != null) {
 +                    listener.failedUpdate(newReplicaEntry, state);
 +                } else {
 +                    listener.failedDelete(replicaEntry, state);
 +                }
 +            } catch (Throwable e2) {
 +                Thread t = Thread.currentThread();
 +                t.getUncaughtExceptionHandler().uncaughtException(t, e2);
 +            }
 +        }
 +    }
 +
      private void transferToReplicaEntry(S replicaEntry, S masterEntry, S newReplicaEntry) {
          // First copy from old replica to preserve values of any independent
          // properties. Be sure not to copy nulls from old replica to new
 | 
