From a6aa4d5279c2a7ab9aa87e80488e90cafae69e69 Mon Sep 17 00:00:00 2001 From: "Brian S. O'Neill" Date: Fri, 6 Jun 2008 00:18:14 +0000 Subject: Added listener callback for replicated repository resync. --- .../carbonado/capability/ResyncCapability.java | 55 ++++++++++++++++++++++ .../repo/replicated/ReplicatedRepository.java | 35 ++++++++++++-- .../repo/replicated/ReplicationTrigger.java | 38 ++++++++++++--- 3 files changed, 118 insertions(+), 10 deletions(-) diff --git a/src/main/java/com/amazon/carbonado/capability/ResyncCapability.java b/src/main/java/com/amazon/carbonado/capability/ResyncCapability.java index 502fc5c..337afb0 100644 --- a/src/main/java/com/amazon/carbonado/capability/ResyncCapability.java +++ b/src/main/java/com/amazon/carbonado/capability/ResyncCapability.java @@ -45,9 +45,64 @@ public interface ResyncCapability extends Capability { Object... filterValues) throws RepositoryException; + /** + * Re-synchronizes replicated storables against the master repository. + * + * @param type type of storable to re-sync + * @param listener optional listener which gets notified as storables are re-sync'd + * @param desiredSpeed throttling parameter - 1.0 = full speed, 0.5 = half + * speed, 0.1 = one-tenth speed, etc + * @param filter optional query filter to limit which objects get re-sync'ed + * @param filterValues filter values for optional filter + * @since 1.2 + */ + void resync(Class type, + Listener listener, + double desiredSpeed, + String filter, + Object... filterValues) + throws RepositoryException; + /** * Returns the immediate master Repository, for manual comparison. Direct * updates to the master will likely create inconsistencies. */ Repository getMasterRepository(); + + /** + * Defines callbacks which are invoked as storables get re-sync'd. The + * callback is invoked in the scope of the resync transaction. If any + * exception is thrown, the immediate changes are rolled back and the + * entire repository resync operation is aborted. + * + *

The listener implementation should return quickly from the callback + * methods, to avoid lingering transactions. If the listener is used to + * invoke special repair operations, they should be placed into a task + * queue. A separate thread can then perform the repairs outside the resync + * transaction. + */ + public static interface Listener { + /** + * Called when a storable was inserted as part of a resync. + * + * @param newStorable storable which was inserted, never null + */ + void inserted(S newStorable); + + /** + * Called when a storable was updated as part of a resync. Both old and + * new storables have a matching primary key. + * + * @param oldStorable storable which was deleted, never null + * @param newStorable storable which was inserted, never null + */ + void updated(S oldStorable, S newStorable); + + /** + * Called when a storable was deleted as part of a resync. + * + * @param oldStorable storable which was deleted, never null + */ + void deleted(S oldStorable); + } } diff --git a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java index 55a1710..5aa4243 100644 --- a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java +++ b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java @@ -365,6 +365,28 @@ class ReplicatedRepository String filter, Object... filterValues) throws RepositoryException + { + resync(type, null, desiredSpeed, filter, filterValues); + } + + /** + * Repairs replicated storables by synchronizing the replica repository + * against the master repository. + * + * @param type type of storable to re-sync + * @param listener optional listener which gets notified as storables are re-sync'd + * @param desiredSpeed throttling parameter - 1.0 = full speed, 0.5 = half + * speed, 0.1 = one-tenth speed, etc + * @param filter optional query filter to limit which objects get re-sync'ed + * @param filterValues filter values for optional filter + * @since 1.2 + */ + public void resync(Class type, + ResyncCapability.Listener listener, + double desiredSpeed, + String filter, + Object... filterValues) + throws RepositoryException { ReplicationTrigger replicationTrigger; if (storageFor(type) instanceof ReplicatedStorage) { @@ -422,6 +444,7 @@ class ReplicatedRepository resync(replicationTrigger, replicaStorage, replicaQuery, masterStorage, masterQuery, + listener, throttle, desiredSpeed, comparator, replicaTxn); @@ -435,6 +458,7 @@ class ReplicatedRepository private void resync(ReplicationTrigger replicationTrigger, Storage replicaStorage, Query replicaQuery, Storage masterStorage, Query masterQuery, + ResyncCapability.Listener listener, Throttle throttle, double desiredSpeed, Comparator comparator, Transaction replicaTxn) throws RepositoryException @@ -556,7 +580,8 @@ class ReplicatedRepository if (compare < 0) { // Bogus record exists only in replica so delete it. - resyncTask = prepareResyncTask(replicationTrigger, replicaEntry, null); + resyncTask = prepareResyncTask + (replicationTrigger, listener, replicaEntry, null); // Allow replica to advance. if (replicaCursor == null) { replicaCursor = replicaQuery.fetchAfter(replicaEntry); @@ -565,7 +590,8 @@ class ReplicatedRepository replicaEntry = null; } else if (compare > 0) { // Replica cursor is missing an entry so copy it. - resyncTask = prepareResyncTask(replicationTrigger, null, masterEntry); + resyncTask = prepareResyncTask + (replicationTrigger, listener, null, masterEntry); // Allow master to advance. masterEntry = null; } else { @@ -581,7 +607,7 @@ class ReplicatedRepository if (!replicaEntry.equalProperties(masterEntry)) { // Replica is stale. resyncTask = prepareResyncTask - (replicationTrigger, replicaEntry, masterEntry); + (replicationTrigger, listener, replicaEntry, masterEntry); } // Entries are synchronized so allow both cursors to advance. @@ -613,6 +639,7 @@ class ReplicatedRepository private Runnable prepareResyncTask (final ReplicationTrigger replicationTrigger, + final ResyncCapability.Listener listener, final S replicaEntry, final S masterEntry) throws RepositoryException @@ -627,7 +654,7 @@ class ReplicatedRepository Runnable task = new Runnable() { public void run() { try { - replicationTrigger.resyncEntries(replicaEntry, masterEntry); + replicationTrigger.resyncEntries(listener, replicaEntry, masterEntry); } catch (Exception e) { LogFactory.getLog(ReplicatedRepository.class).error(null, e); } diff --git a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java index bdacca4..8cf7007 100644 --- a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java +++ b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java @@ -32,6 +32,8 @@ import com.amazon.carbonado.Transaction; import com.amazon.carbonado.Trigger; import com.amazon.carbonado.UniqueConstraintException; +import com.amazon.carbonado.capability.ResyncCapability; + import com.amazon.carbonado.spi.RepairExecutor; import com.amazon.carbonado.spi.TriggerManager; @@ -198,10 +200,14 @@ class ReplicationTrigger extends Trigger { * Re-sync the replica to the master. The primary keys of both entries are * assumed to match. * + * @param listener optional * @param replicaEntry current replica entry, or null if none * @param masterEntry current master entry, or null if none */ - void resyncEntries(S replicaEntry, S masterEntry) throws FetchException, PersistException { + void resyncEntries(ResyncCapability.Listener listener, + S replicaEntry, S masterEntry) + throws FetchException, PersistException + { if (replicaEntry == null && masterEntry == null) { return; } @@ -212,6 +218,9 @@ class ReplicationTrigger extends Trigger { try { Transaction txn = mRepository.enterTransaction(); try { + S newReplicaEntry = null; + + // Delete old entry. if (replicaEntry != null) { if (masterEntry == null) { log.info("Deleting bogus replica entry: " + replicaEntry); @@ -222,11 +231,13 @@ class ReplicationTrigger extends Trigger { log.error("Unable to delete replica entry: " + replicaEntry, e); if (masterEntry != null) { // Try to update instead. - S newReplicaEntry = mReplicaStorage.prepare(); + newReplicaEntry = mReplicaStorage.prepare(); transferToReplicaEntry(replicaEntry, masterEntry, newReplicaEntry); log.info("Replacing corrupt replica entry with: " + newReplicaEntry); try { newReplicaEntry.update(); + // This disables the insert step, which is not needed now. + masterEntry = null; } catch (PersistException e2) { log.error("Unable to update replica entry: " + replicaEntry, e2); return; @@ -234,8 +245,10 @@ class ReplicationTrigger extends Trigger { } } } + + // Insert new entry. if (masterEntry != null) { - S newReplicaEntry = mReplicaStorage.prepare(); + newReplicaEntry = mReplicaStorage.prepare(); if (replicaEntry == null) { masterEntry.copyAllProperties(newReplicaEntry); log.info("Adding missing replica entry: " + newReplicaEntry); @@ -252,6 +265,19 @@ class ReplicationTrigger extends Trigger { newReplicaEntry.tryInsert(); } } + + if (listener != null) { + if (replicaEntry == null) { + if (newReplicaEntry != null) { + listener.inserted(newReplicaEntry); + } + } else if (newReplicaEntry != null) { + listener.updated(replicaEntry, newReplicaEntry); + } else { + listener.deleted(replicaEntry); + } + } + txn.commit(); } finally { txn.exit(); @@ -319,12 +345,12 @@ class ReplicationTrigger extends Trigger { txn.setForUpdate(true); if (finalReplica.tryLoad()) { if (finalMaster.tryLoad()) { - resyncEntries(finalReplica, finalMaster); + resyncEntries(null, finalReplica, finalMaster); } else { - resyncEntries(finalReplica, null); + resyncEntries(null, finalReplica, null); } } else if (finalMaster.tryLoad()) { - resyncEntries(null, finalMaster); + resyncEntries(null, null, finalMaster); } txn.commit(); } finally { -- cgit v1.2.3