From a6aa4d5279c2a7ab9aa87e80488e90cafae69e69 Mon Sep 17 00:00:00 2001
From: "Brian S. O'Neill" <bronee@gmail.com>
Date: Fri, 6 Jun 2008 00:18:14 +0000
Subject: Added listener callback for replicated repository resync.

---
 .../carbonado/capability/ResyncCapability.java     | 55 ++++++++++++++++++++++
 .../repo/replicated/ReplicatedRepository.java      | 35 ++++++++++++--
 .../repo/replicated/ReplicationTrigger.java        | 38 ++++++++++++---
 3 files changed, 118 insertions(+), 10 deletions(-)

(limited to 'src')

diff --git a/src/main/java/com/amazon/carbonado/capability/ResyncCapability.java b/src/main/java/com/amazon/carbonado/capability/ResyncCapability.java
index 502fc5c..337afb0 100644
--- a/src/main/java/com/amazon/carbonado/capability/ResyncCapability.java
+++ b/src/main/java/com/amazon/carbonado/capability/ResyncCapability.java
@@ -45,9 +45,64 @@ public interface ResyncCapability extends Capability {
                                      Object... filterValues)
         throws RepositoryException;
 
+    /**
+     * Re-synchronizes replicated storables against the master repository.
+     *
+     * @param type type of storable to re-sync
+     * @param listener optional listener which gets notified as storables are re-sync'd
+     * @param desiredSpeed throttling parameter - 1.0 = full speed, 0.5 = half
+     * speed, 0.1 = one-tenth speed, etc
+     * @param filter optional query filter to limit which objects get re-sync'ed
+     * @param filterValues filter values for optional filter
+     * @since 1.2
+     */
+    <S extends Storable> void resync(Class<S> type,
+                                     Listener<? super S> listener,
+                                     double desiredSpeed,
+                                     String filter,
+                                     Object... filterValues)
+        throws RepositoryException;
+
     /**
      * Returns the immediate master Repository, for manual comparison. Direct
      * updates to the master will likely create inconsistencies.
      */
     Repository getMasterRepository();
+
+    /**
+     * Defines callbacks which are invoked as storables get re-sync'd. The
+     * callback is invoked in the scope of the resync transaction. If any
+     * exception is thrown, the immediate changes are rolled back and the
+     * entire repository resync operation is aborted.
+     *
+     * <p>The listener implementation should return quickly from the callback
+     * methods, to avoid lingering transactions. If the listener is used to
+     * invoke special repair operations, they should be placed into a task
+     * queue. A separate thread can then perform the repairs outside the resync
+     * transaction.
+     */
+    public static interface Listener<S> {
+        /**
+         * Called when a storable was inserted as part of a resync.
+         *
+         * @param newStorable storable which was inserted, never null
+         */
+        void inserted(S newStorable);
+
+        /**
+         * Called when a storable was updated as part of a resync. Both old and
+         * new storables have a matching primary key.
+         *
+         * @param oldStorable storable which was deleted, never null
+         * @param newStorable storable which was inserted, never null
+         */
+        void updated(S oldStorable, S newStorable);
+
+        /**
+         * Called when a storable was deleted as part of a resync.
+         *
+         * @param oldStorable storable which was deleted, never null
+         */
+        void deleted(S oldStorable);
+    }
 }
diff --git a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java
index 55a1710..5aa4243 100644
--- a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java
+++ b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java
@@ -365,6 +365,28 @@ class ReplicatedRepository
                                             String filter,
                                             Object... filterValues)
         throws RepositoryException
+    {
+        resync(type, null, desiredSpeed, filter, filterValues);
+    }
+
+    /**
+     * Repairs replicated storables by synchronizing the replica repository
+     * against the master repository.
+     *
+     * @param type type of storable to re-sync
+     * @param listener optional listener which gets notified as storables are re-sync'd
+     * @param desiredSpeed throttling parameter - 1.0 = full speed, 0.5 = half
+     * speed, 0.1 = one-tenth speed, etc
+     * @param filter optional query filter to limit which objects get re-sync'ed
+     * @param filterValues filter values for optional filter
+     * @since 1.2
+     */
+    public <S extends Storable> void resync(Class<S> type,
+                                            ResyncCapability.Listener<? super S> listener,
+                                            double desiredSpeed,
+                                            String filter,
+                                            Object... filterValues)
+        throws RepositoryException
     {
         ReplicationTrigger<S> replicationTrigger;
         if (storageFor(type) instanceof ReplicatedStorage) {
@@ -422,6 +444,7 @@ class ReplicatedRepository
             resync(replicationTrigger,
                    replicaStorage, replicaQuery,
                    masterStorage, masterQuery,
+                   listener,
                    throttle, desiredSpeed,
                    comparator, replicaTxn);
 
@@ -435,6 +458,7 @@ class ReplicatedRepository
     private <S extends Storable> void resync(ReplicationTrigger<S> replicationTrigger,
                                              Storage<S> replicaStorage, Query<S> replicaQuery,
                                              Storage<S> masterStorage, Query<S> masterQuery,
+                                             ResyncCapability.Listener<? super S> listener,
                                              Throttle throttle, double desiredSpeed,
                                              Comparator comparator, Transaction replicaTxn)
         throws RepositoryException
@@ -556,7 +580,8 @@ class ReplicatedRepository
 
                 if (compare < 0) {
                     // Bogus record exists only in replica so delete it.
-                    resyncTask = prepareResyncTask(replicationTrigger, replicaEntry, null);
+                    resyncTask = prepareResyncTask
+                        (replicationTrigger, listener, replicaEntry, null);
                     // Allow replica to advance.
                     if (replicaCursor == null) {
                         replicaCursor = replicaQuery.fetchAfter(replicaEntry);
@@ -565,7 +590,8 @@ class ReplicatedRepository
                     replicaEntry = null;
                 } else if (compare > 0) {
                     // Replica cursor is missing an entry so copy it.
-                    resyncTask = prepareResyncTask(replicationTrigger, null, masterEntry);
+                    resyncTask = prepareResyncTask
+                        (replicationTrigger, listener, null, masterEntry);
                     // Allow master to advance.
                     masterEntry = null;
                 } else {
@@ -581,7 +607,7 @@ class ReplicatedRepository
                     if (!replicaEntry.equalProperties(masterEntry)) {
                         // Replica is stale.
                         resyncTask = prepareResyncTask
-                            (replicationTrigger, replicaEntry, masterEntry);
+                            (replicationTrigger, listener, replicaEntry, masterEntry);
                     }
 
                     // Entries are synchronized so allow both cursors to advance.
@@ -613,6 +639,7 @@ class ReplicatedRepository
 
     private <S extends Storable> Runnable prepareResyncTask
                        (final ReplicationTrigger<S> replicationTrigger,
+                        final ResyncCapability.Listener<? super S> listener,
                         final S replicaEntry,
                         final S masterEntry)
         throws RepositoryException
@@ -627,7 +654,7 @@ class ReplicatedRepository
         Runnable task = new Runnable() {
             public void run() {
                 try {
-                    replicationTrigger.resyncEntries(replicaEntry, masterEntry);
+                    replicationTrigger.resyncEntries(listener, replicaEntry, masterEntry);
                 } catch (Exception e) {
                     LogFactory.getLog(ReplicatedRepository.class).error(null, e);
                 }
diff --git a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java
index bdacca4..8cf7007 100644
--- a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java
+++ b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java
@@ -32,6 +32,8 @@ import com.amazon.carbonado.Transaction;
 import com.amazon.carbonado.Trigger;
 import com.amazon.carbonado.UniqueConstraintException;
 
+import com.amazon.carbonado.capability.ResyncCapability;
+
 import com.amazon.carbonado.spi.RepairExecutor;
 import com.amazon.carbonado.spi.TriggerManager;
 
@@ -198,10 +200,14 @@ class ReplicationTrigger<S extends Storable> extends Trigger<S> {
      * Re-sync the replica to the master. The primary keys of both entries are
      * assumed to match.
      *
+     * @param listener optional
      * @param replicaEntry current replica entry, or null if none
      * @param masterEntry current master entry, or null if none
      */
-    void resyncEntries(S replicaEntry, S masterEntry) throws FetchException, PersistException {
+    void resyncEntries(ResyncCapability.Listener<? super S> listener,
+                       S replicaEntry, S masterEntry)
+        throws FetchException, PersistException
+    {
         if (replicaEntry == null && masterEntry == null) {
             return;
         }
@@ -212,6 +218,9 @@ class ReplicationTrigger<S extends Storable> extends Trigger<S> {
         try {
             Transaction txn = mRepository.enterTransaction();
             try {
+                S newReplicaEntry = null;
+
+                // Delete old entry.
                 if (replicaEntry != null) {
                     if (masterEntry == null) {
                         log.info("Deleting bogus replica entry: " + replicaEntry);
@@ -222,11 +231,13 @@ class ReplicationTrigger<S extends Storable> extends Trigger<S> {
                         log.error("Unable to delete replica entry: " + replicaEntry, e);
                         if (masterEntry != null) {
                             // Try to update instead.
-                            S newReplicaEntry = mReplicaStorage.prepare();
+                            newReplicaEntry = mReplicaStorage.prepare();
                             transferToReplicaEntry(replicaEntry, masterEntry, newReplicaEntry);
                             log.info("Replacing corrupt replica entry with: " + newReplicaEntry);
                             try {
                                 newReplicaEntry.update();
+                                // This disables the insert step, which is not needed now.
+                                masterEntry = null;
                             } catch (PersistException e2) {
                                 log.error("Unable to update replica entry: " + replicaEntry, e2);
                                 return;
@@ -234,8 +245,10 @@ class ReplicationTrigger<S extends Storable> extends Trigger<S> {
                         }
                     }
                 }
+
+                // Insert new entry.
                 if (masterEntry != null) {
-                    S newReplicaEntry = mReplicaStorage.prepare();
+                    newReplicaEntry = mReplicaStorage.prepare();
                     if (replicaEntry == null) {
                         masterEntry.copyAllProperties(newReplicaEntry);
                         log.info("Adding missing replica entry: " + newReplicaEntry);
@@ -252,6 +265,19 @@ class ReplicationTrigger<S extends Storable> extends Trigger<S> {
                         newReplicaEntry.tryInsert();
                     }
                 }
+
+                if (listener != null) {
+                    if (replicaEntry == null) {
+                        if (newReplicaEntry != null) {
+                            listener.inserted(newReplicaEntry);
+                        }
+                    } else if (newReplicaEntry != null) {
+                        listener.updated(replicaEntry, newReplicaEntry);
+                    } else {
+                        listener.deleted(replicaEntry);
+                    }
+                }
+
                 txn.commit();
             } finally {
                 txn.exit();
@@ -319,12 +345,12 @@ class ReplicationTrigger<S extends Storable> extends Trigger<S> {
                         txn.setForUpdate(true);
                         if (finalReplica.tryLoad()) {
                             if (finalMaster.tryLoad()) {
-                                resyncEntries(finalReplica, finalMaster);
+                                resyncEntries(null, finalReplica, finalMaster);
                             } else {
-                                resyncEntries(finalReplica, null);
+                                resyncEntries(null, finalReplica, null);
                             }
                         } else if (finalMaster.tryLoad()) {
-                            resyncEntries(null, finalMaster);
+                            resyncEntries(null, null, finalMaster);
                         }
                         txn.commit();
                     } finally {
-- 
cgit v1.2.3