diff options
author | Brian S. O'Neill <bronee@gmail.com> | 2008-09-26 18:31:47 +0000 |
---|---|---|
committer | Brian S. O'Neill <bronee@gmail.com> | 2008-09-26 18:31:47 +0000 |
commit | 6e6c3ff6bfd2893df3215160dfa2029b01f2906e (patch) | |
tree | 0e8252921df1a6f7d452b9a09dcaaf04a3c7a09b /src | |
parent | 0b9a4d866166ac0c555f0a3a4b44d1804ea145fb (diff) |
Resync listener is now a trigger in order to capture more information.
Diffstat (limited to 'src')
3 files changed, 110 insertions, 71 deletions
diff --git a/src/main/java/com/amazon/carbonado/capability/ResyncCapability.java b/src/main/java/com/amazon/carbonado/capability/ResyncCapability.java index 337afb0..33efca7 100644 --- a/src/main/java/com/amazon/carbonado/capability/ResyncCapability.java +++ b/src/main/java/com/amazon/carbonado/capability/ResyncCapability.java @@ -18,9 +18,11 @@ package com.amazon.carbonado.capability;
+import com.amazon.carbonado.PersistException;
import com.amazon.carbonado.Repository;
import com.amazon.carbonado.RepositoryException;
import com.amazon.carbonado.Storable;
+import com.amazon.carbonado.Trigger;
/**
* Capability of replicating repositories for re-synchronizing to the master
@@ -70,8 +72,8 @@ public interface ResyncCapability extends Capability { Repository getMasterRepository();
/**
- * Defines callbacks which are invoked as storables get re-sync'd. The
- * callback is invoked in the scope of the resync transaction. If any
+ * Trigger which is invoked as storables get re-sync'd. Callbacks are
+ * invoked in the scope of the resync transaction. If any unchecked
* exception is thrown, the immediate changes are rolled back and the
* entire repository resync operation is aborted.
*
@@ -81,28 +83,18 @@ public interface ResyncCapability extends Capability { * queue. A separate thread can then perform the repairs outside the resync
* transaction.
*/
- public static interface Listener<S> {
+ public static class Listener<S> extends Trigger<S> {
/**
- * Called when a storable was inserted as part of a resync.
+ * Overloaded version of beforeUpdate method which is passed the
+ * storable in it's out-of-sync and sync'd states. The default
+ * implementation calls the inherited beforeUpdate method, only passing
+ * the newly sync'd storable.
*
- * @param newStorable storable which was inserted, never null
+ * @param oldStorable storable prior to being sync'd
+ * @param newStorable sync'd storable
*/
- void inserted(S newStorable);
-
- /**
- * Called when a storable was updated as part of a resync. Both old and
- * new storables have a matching primary key.
- *
- * @param oldStorable storable which was deleted, never null
- * @param newStorable storable which was inserted, never null
- */
- void updated(S oldStorable, S newStorable);
-
- /**
- * Called when a storable was deleted as part of a resync.
- *
- * @param oldStorable storable which was deleted, never null
- */
- void deleted(S oldStorable);
+ public Object beforeUpdate(S oldStorable, S newStorable) throws PersistException {
+ return beforeUpdate(newStorable);
+ }
}
}
diff --git a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java index 51dbba8..8f0e45d 100644 --- a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java +++ b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java @@ -493,6 +493,9 @@ class ReplicatedRepository while (replicaCursor.hasNext()) {
try {
replicaEntry = replicaCursor.next();
+ if (listener != null) {
+ listener.afterLoad(replicaEntry);
+ }
if (skippedCount > 0) {
if (skippedCount == 1) {
log.warn("Skipped corrupt replica entry before this one: " +
@@ -546,6 +549,9 @@ class ReplicatedRepository if (replicaWithKeyOnly != null) {
// Try to update entry which could not be deleted.
replicaEntry = replicaWithKeyOnly;
+ if (listener != null) {
+ listener.afterLoad(replicaEntry);
+ }
break;
}
} catch (FetchException e2) {
diff --git a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java index 29af101..dd1d78b 100644 --- a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java +++ b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java @@ -37,6 +37,8 @@ import com.amazon.carbonado.capability.ResyncCapability; import com.amazon.carbonado.spi.RepairExecutor;
import com.amazon.carbonado.spi.TriggerManager;
+import com.amazon.carbonado.util.ThrowUnchecked;
+
/**
* All inserts/updates/deletes are first committed to the master storage, then
* duplicated and committed to the replica.
@@ -228,67 +230,86 @@ class ReplicationTrigger<S extends Storable> extends Trigger<S> { try {
Transaction txn = mRepository.enterTransaction();
try {
- S newReplicaEntry = null;
-
- // Delete old entry.
- if (replicaEntry != null) {
- if (masterEntry == null) {
- log.info("Deleting bogus replica entry: " + replicaEntry);
- }
- try {
- replicaEntry.tryDelete();
- } catch (PersistException e) {
- log.error("Unable to delete replica entry: " + replicaEntry, e);
- if (masterEntry != null) {
- // Try to update instead.
- newReplicaEntry = mReplicaStorage.prepare();
- transferToReplicaEntry(replicaEntry, masterEntry, newReplicaEntry);
- log.info("Replacing corrupt replica entry with: " + newReplicaEntry);
- try {
- newReplicaEntry.update();
- // This disables the insert step, which is not needed now.
- masterEntry = null;
- } catch (PersistException e2) {
- log.error("Unable to update replica entry: " + replicaEntry, e2);
- return;
- }
- }
+ final S newReplicaEntry;
+ if (replicaEntry == null) {
+ newReplicaEntry = mReplicaStorage.prepare();
+ masterEntry.copyAllProperties(newReplicaEntry);
+ log.info("Inserting missing replica entry: " + newReplicaEntry);
+ } else if (masterEntry != null) {
+ if (replicaEntry.equalProperties(masterEntry)) {
+ return;
}
+ newReplicaEntry = mReplicaStorage.prepare();
+ transferToReplicaEntry(replicaEntry, masterEntry, newReplicaEntry);
+ log.info("Updating stale replica entry with: " + newReplicaEntry);
+ } else {
+ newReplicaEntry = null;
+ log.info("Deleting bogus replica entry: " + replicaEntry);
}
- // Insert new entry.
- if (masterEntry != null) {
- newReplicaEntry = mReplicaStorage.prepare();
+ final Object state;
+ if (listener == null) {
+ state = null;
+ } else {
if (replicaEntry == null) {
- masterEntry.copyAllProperties(newReplicaEntry);
- log.info("Adding missing replica entry: " + newReplicaEntry);
+ state = listener.beforeInsert(newReplicaEntry);
+ } else if (masterEntry != null) {
+ state = listener.beforeUpdate(replicaEntry, newReplicaEntry);
} else {
- if (replicaEntry.equalProperties(masterEntry)) {
- return;
+ state = listener.beforeDelete(replicaEntry);
+ }
+ }
+
+ try {
+ // Delete old entry.
+ if (replicaEntry != null) {
+ try {
+ replicaEntry.tryDelete();
+ } catch (PersistException e) {
+ log.error("Unable to delete replica entry: " + replicaEntry, e);
+ if (masterEntry != null) {
+ // Try to update instead.
+ log.info("Updating corrupt replica entry with: " +
+ newReplicaEntry);
+ try {
+ newReplicaEntry.update();
+ // This disables the insert step, which is not needed now.
+ masterEntry = null;
+ } catch (PersistException e2) {
+ log.error("Unable to update replica entry: " +
+ replicaEntry, e2);
+ resyncFailed(listener, replicaEntry, masterEntry,
+ newReplicaEntry, state);
+ return;
+ }
+ }
}
- transferToReplicaEntry(replicaEntry, masterEntry, newReplicaEntry);
- log.info("Replacing stale replica entry with: " + newReplicaEntry);
}
- if (!newReplicaEntry.tryInsert()) {
- // Try to correct bizarre corruption.
- newReplicaEntry.tryDelete();
- newReplicaEntry.tryInsert();
+
+ // Insert new entry.
+ if (masterEntry != null && newReplicaEntry != null) {
+ if (!newReplicaEntry.tryInsert()) {
+ // Try to correct bizarre corruption.
+ newReplicaEntry.tryDelete();
+ newReplicaEntry.tryInsert();
+ }
}
- }
- if (listener != null) {
- if (replicaEntry == null) {
- if (newReplicaEntry != null) {
- listener.inserted(newReplicaEntry);
+ if (listener != null) {
+ if (replicaEntry == null) {
+ listener.afterInsert(newReplicaEntry, state);
+ } else if (masterEntry != null) {
+ listener.afterUpdate(newReplicaEntry, state);
+ } else {
+ listener.afterDelete(replicaEntry, state);
}
- } else if (newReplicaEntry != null) {
- listener.updated(replicaEntry, newReplicaEntry);
- } else {
- listener.deleted(replicaEntry);
}
- }
- txn.commit();
+ txn.commit();
+ } catch (Throwable e) {
+ resyncFailed(listener, replicaEntry, masterEntry, newReplicaEntry, state);
+ ThrowUnchecked.fire(e);
+ }
} finally {
txn.exit();
}
@@ -297,6 +318,26 @@ class ReplicationTrigger<S extends Storable> extends Trigger<S> { }
}
+ private void resyncFailed(ResyncCapability.Listener<? super S> listener,
+ S replicaEntry, S masterEntry,
+ S newReplicaEntry, Object state)
+ {
+ if (listener != null) {
+ try {
+ if (replicaEntry == null) {
+ listener.failedInsert(newReplicaEntry, state);
+ } else if (masterEntry != null) {
+ listener.failedUpdate(newReplicaEntry, state);
+ } else {
+ listener.failedDelete(replicaEntry, state);
+ }
+ } catch (Throwable e2) {
+ Thread t = Thread.currentThread();
+ t.getUncaughtExceptionHandler().uncaughtException(t, e2);
+ }
+ }
+ }
+
private void transferToReplicaEntry(S replicaEntry, S masterEntry, S newReplicaEntry) {
// First copy from old replica to preserve values of any independent
// properties. Be sure not to copy nulls from old replica to new
|