summaryrefslogtreecommitdiff
path: root/src/main/java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java')
-rw-r--r--src/main/java/com/amazon/carbonado/capability/ResyncCapability.java36
-rw-r--r--src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java6
-rw-r--r--src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java139
3 files changed, 110 insertions, 71 deletions
diff --git a/src/main/java/com/amazon/carbonado/capability/ResyncCapability.java b/src/main/java/com/amazon/carbonado/capability/ResyncCapability.java
index 337afb0..33efca7 100644
--- a/src/main/java/com/amazon/carbonado/capability/ResyncCapability.java
+++ b/src/main/java/com/amazon/carbonado/capability/ResyncCapability.java
@@ -18,9 +18,11 @@
package com.amazon.carbonado.capability;
+import com.amazon.carbonado.PersistException;
import com.amazon.carbonado.Repository;
import com.amazon.carbonado.RepositoryException;
import com.amazon.carbonado.Storable;
+import com.amazon.carbonado.Trigger;
/**
* Capability of replicating repositories for re-synchronizing to the master
@@ -70,8 +72,8 @@ public interface ResyncCapability extends Capability {
Repository getMasterRepository();
/**
- * Defines callbacks which are invoked as storables get re-sync'd. The
- * callback is invoked in the scope of the resync transaction. If any
+ * Trigger which is invoked as storables get re-sync'd. Callbacks are
+ * invoked in the scope of the resync transaction. If any unchecked
* exception is thrown, the immediate changes are rolled back and the
* entire repository resync operation is aborted.
*
@@ -81,28 +83,18 @@ public interface ResyncCapability extends Capability {
* queue. A separate thread can then perform the repairs outside the resync
* transaction.
*/
- public static interface Listener<S> {
+ public static class Listener<S> extends Trigger<S> {
/**
- * Called when a storable was inserted as part of a resync.
+ * Overloaded version of beforeUpdate method which is passed the
+ * storable in it's out-of-sync and sync'd states. The default
+ * implementation calls the inherited beforeUpdate method, only passing
+ * the newly sync'd storable.
*
- * @param newStorable storable which was inserted, never null
+ * @param oldStorable storable prior to being sync'd
+ * @param newStorable sync'd storable
*/
- void inserted(S newStorable);
-
- /**
- * Called when a storable was updated as part of a resync. Both old and
- * new storables have a matching primary key.
- *
- * @param oldStorable storable which was deleted, never null
- * @param newStorable storable which was inserted, never null
- */
- void updated(S oldStorable, S newStorable);
-
- /**
- * Called when a storable was deleted as part of a resync.
- *
- * @param oldStorable storable which was deleted, never null
- */
- void deleted(S oldStorable);
+ public Object beforeUpdate(S oldStorable, S newStorable) throws PersistException {
+ return beforeUpdate(newStorable);
+ }
}
}
diff --git a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java
index 51dbba8..8f0e45d 100644
--- a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java
+++ b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java
@@ -493,6 +493,9 @@ class ReplicatedRepository
while (replicaCursor.hasNext()) {
try {
replicaEntry = replicaCursor.next();
+ if (listener != null) {
+ listener.afterLoad(replicaEntry);
+ }
if (skippedCount > 0) {
if (skippedCount == 1) {
log.warn("Skipped corrupt replica entry before this one: " +
@@ -546,6 +549,9 @@ class ReplicatedRepository
if (replicaWithKeyOnly != null) {
// Try to update entry which could not be deleted.
replicaEntry = replicaWithKeyOnly;
+ if (listener != null) {
+ listener.afterLoad(replicaEntry);
+ }
break;
}
} catch (FetchException e2) {
diff --git a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java
index 29af101..dd1d78b 100644
--- a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java
+++ b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java
@@ -37,6 +37,8 @@ import com.amazon.carbonado.capability.ResyncCapability;
import com.amazon.carbonado.spi.RepairExecutor;
import com.amazon.carbonado.spi.TriggerManager;
+import com.amazon.carbonado.util.ThrowUnchecked;
+
/**
* All inserts/updates/deletes are first committed to the master storage, then
* duplicated and committed to the replica.
@@ -228,67 +230,86 @@ class ReplicationTrigger<S extends Storable> extends Trigger<S> {
try {
Transaction txn = mRepository.enterTransaction();
try {
- S newReplicaEntry = null;
-
- // Delete old entry.
- if (replicaEntry != null) {
- if (masterEntry == null) {
- log.info("Deleting bogus replica entry: " + replicaEntry);
- }
- try {
- replicaEntry.tryDelete();
- } catch (PersistException e) {
- log.error("Unable to delete replica entry: " + replicaEntry, e);
- if (masterEntry != null) {
- // Try to update instead.
- newReplicaEntry = mReplicaStorage.prepare();
- transferToReplicaEntry(replicaEntry, masterEntry, newReplicaEntry);
- log.info("Replacing corrupt replica entry with: " + newReplicaEntry);
- try {
- newReplicaEntry.update();
- // This disables the insert step, which is not needed now.
- masterEntry = null;
- } catch (PersistException e2) {
- log.error("Unable to update replica entry: " + replicaEntry, e2);
- return;
- }
- }
+ final S newReplicaEntry;
+ if (replicaEntry == null) {
+ newReplicaEntry = mReplicaStorage.prepare();
+ masterEntry.copyAllProperties(newReplicaEntry);
+ log.info("Inserting missing replica entry: " + newReplicaEntry);
+ } else if (masterEntry != null) {
+ if (replicaEntry.equalProperties(masterEntry)) {
+ return;
}
+ newReplicaEntry = mReplicaStorage.prepare();
+ transferToReplicaEntry(replicaEntry, masterEntry, newReplicaEntry);
+ log.info("Updating stale replica entry with: " + newReplicaEntry);
+ } else {
+ newReplicaEntry = null;
+ log.info("Deleting bogus replica entry: " + replicaEntry);
}
- // Insert new entry.
- if (masterEntry != null) {
- newReplicaEntry = mReplicaStorage.prepare();
+ final Object state;
+ if (listener == null) {
+ state = null;
+ } else {
if (replicaEntry == null) {
- masterEntry.copyAllProperties(newReplicaEntry);
- log.info("Adding missing replica entry: " + newReplicaEntry);
+ state = listener.beforeInsert(newReplicaEntry);
+ } else if (masterEntry != null) {
+ state = listener.beforeUpdate(replicaEntry, newReplicaEntry);
} else {
- if (replicaEntry.equalProperties(masterEntry)) {
- return;
+ state = listener.beforeDelete(replicaEntry);
+ }
+ }
+
+ try {
+ // Delete old entry.
+ if (replicaEntry != null) {
+ try {
+ replicaEntry.tryDelete();
+ } catch (PersistException e) {
+ log.error("Unable to delete replica entry: " + replicaEntry, e);
+ if (masterEntry != null) {
+ // Try to update instead.
+ log.info("Updating corrupt replica entry with: " +
+ newReplicaEntry);
+ try {
+ newReplicaEntry.update();
+ // This disables the insert step, which is not needed now.
+ masterEntry = null;
+ } catch (PersistException e2) {
+ log.error("Unable to update replica entry: " +
+ replicaEntry, e2);
+ resyncFailed(listener, replicaEntry, masterEntry,
+ newReplicaEntry, state);
+ return;
+ }
+ }
}
- transferToReplicaEntry(replicaEntry, masterEntry, newReplicaEntry);
- log.info("Replacing stale replica entry with: " + newReplicaEntry);
}
- if (!newReplicaEntry.tryInsert()) {
- // Try to correct bizarre corruption.
- newReplicaEntry.tryDelete();
- newReplicaEntry.tryInsert();
+
+ // Insert new entry.
+ if (masterEntry != null && newReplicaEntry != null) {
+ if (!newReplicaEntry.tryInsert()) {
+ // Try to correct bizarre corruption.
+ newReplicaEntry.tryDelete();
+ newReplicaEntry.tryInsert();
+ }
}
- }
- if (listener != null) {
- if (replicaEntry == null) {
- if (newReplicaEntry != null) {
- listener.inserted(newReplicaEntry);
+ if (listener != null) {
+ if (replicaEntry == null) {
+ listener.afterInsert(newReplicaEntry, state);
+ } else if (masterEntry != null) {
+ listener.afterUpdate(newReplicaEntry, state);
+ } else {
+ listener.afterDelete(replicaEntry, state);
}
- } else if (newReplicaEntry != null) {
- listener.updated(replicaEntry, newReplicaEntry);
- } else {
- listener.deleted(replicaEntry);
}
- }
- txn.commit();
+ txn.commit();
+ } catch (Throwable e) {
+ resyncFailed(listener, replicaEntry, masterEntry, newReplicaEntry, state);
+ ThrowUnchecked.fire(e);
+ }
} finally {
txn.exit();
}
@@ -297,6 +318,26 @@ class ReplicationTrigger<S extends Storable> extends Trigger<S> {
}
}
+ private void resyncFailed(ResyncCapability.Listener<? super S> listener,
+ S replicaEntry, S masterEntry,
+ S newReplicaEntry, Object state)
+ {
+ if (listener != null) {
+ try {
+ if (replicaEntry == null) {
+ listener.failedInsert(newReplicaEntry, state);
+ } else if (masterEntry != null) {
+ listener.failedUpdate(newReplicaEntry, state);
+ } else {
+ listener.failedDelete(replicaEntry, state);
+ }
+ } catch (Throwable e2) {
+ Thread t = Thread.currentThread();
+ t.getUncaughtExceptionHandler().uncaughtException(t, e2);
+ }
+ }
+ }
+
private void transferToReplicaEntry(S replicaEntry, S masterEntry, S newReplicaEntry) {
// First copy from old replica to preserve values of any independent
// properties. Be sure not to copy nulls from old replica to new