summaryrefslogtreecommitdiff
path: root/src/main/java/com/amazon/carbonado/repo
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/amazon/carbonado/repo')
-rw-r--r--src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java6
-rw-r--r--src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java139
2 files changed, 96 insertions, 49 deletions
diff --git a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java
index 51dbba8..8f0e45d 100644
--- a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java
+++ b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java
@@ -493,6 +493,9 @@ class ReplicatedRepository
while (replicaCursor.hasNext()) {
try {
replicaEntry = replicaCursor.next();
+ if (listener != null) {
+ listener.afterLoad(replicaEntry);
+ }
if (skippedCount > 0) {
if (skippedCount == 1) {
log.warn("Skipped corrupt replica entry before this one: " +
@@ -546,6 +549,9 @@ class ReplicatedRepository
if (replicaWithKeyOnly != null) {
// Try to update entry which could not be deleted.
replicaEntry = replicaWithKeyOnly;
+ if (listener != null) {
+ listener.afterLoad(replicaEntry);
+ }
break;
}
} catch (FetchException e2) {
diff --git a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java
index 29af101..dd1d78b 100644
--- a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java
+++ b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java
@@ -37,6 +37,8 @@ import com.amazon.carbonado.capability.ResyncCapability;
import com.amazon.carbonado.spi.RepairExecutor;
import com.amazon.carbonado.spi.TriggerManager;
+import com.amazon.carbonado.util.ThrowUnchecked;
+
/**
* All inserts/updates/deletes are first committed to the master storage, then
* duplicated and committed to the replica.
@@ -228,67 +230,86 @@ class ReplicationTrigger<S extends Storable> extends Trigger<S> {
try {
Transaction txn = mRepository.enterTransaction();
try {
- S newReplicaEntry = null;
-
- // Delete old entry.
- if (replicaEntry != null) {
- if (masterEntry == null) {
- log.info("Deleting bogus replica entry: " + replicaEntry);
- }
- try {
- replicaEntry.tryDelete();
- } catch (PersistException e) {
- log.error("Unable to delete replica entry: " + replicaEntry, e);
- if (masterEntry != null) {
- // Try to update instead.
- newReplicaEntry = mReplicaStorage.prepare();
- transferToReplicaEntry(replicaEntry, masterEntry, newReplicaEntry);
- log.info("Replacing corrupt replica entry with: " + newReplicaEntry);
- try {
- newReplicaEntry.update();
- // This disables the insert step, which is not needed now.
- masterEntry = null;
- } catch (PersistException e2) {
- log.error("Unable to update replica entry: " + replicaEntry, e2);
- return;
- }
- }
+ final S newReplicaEntry;
+ if (replicaEntry == null) {
+ newReplicaEntry = mReplicaStorage.prepare();
+ masterEntry.copyAllProperties(newReplicaEntry);
+ log.info("Inserting missing replica entry: " + newReplicaEntry);
+ } else if (masterEntry != null) {
+ if (replicaEntry.equalProperties(masterEntry)) {
+ return;
}
+ newReplicaEntry = mReplicaStorage.prepare();
+ transferToReplicaEntry(replicaEntry, masterEntry, newReplicaEntry);
+ log.info("Updating stale replica entry with: " + newReplicaEntry);
+ } else {
+ newReplicaEntry = null;
+ log.info("Deleting bogus replica entry: " + replicaEntry);
}
- // Insert new entry.
- if (masterEntry != null) {
- newReplicaEntry = mReplicaStorage.prepare();
+ final Object state;
+ if (listener == null) {
+ state = null;
+ } else {
if (replicaEntry == null) {
- masterEntry.copyAllProperties(newReplicaEntry);
- log.info("Adding missing replica entry: " + newReplicaEntry);
+ state = listener.beforeInsert(newReplicaEntry);
+ } else if (masterEntry != null) {
+ state = listener.beforeUpdate(replicaEntry, newReplicaEntry);
} else {
- if (replicaEntry.equalProperties(masterEntry)) {
- return;
+ state = listener.beforeDelete(replicaEntry);
+ }
+ }
+
+ try {
+ // Delete old entry.
+ if (replicaEntry != null) {
+ try {
+ replicaEntry.tryDelete();
+ } catch (PersistException e) {
+ log.error("Unable to delete replica entry: " + replicaEntry, e);
+ if (masterEntry != null) {
+ // Try to update instead.
+ log.info("Updating corrupt replica entry with: " +
+ newReplicaEntry);
+ try {
+ newReplicaEntry.update();
+ // This disables the insert step, which is not needed now.
+ masterEntry = null;
+ } catch (PersistException e2) {
+ log.error("Unable to update replica entry: " +
+ replicaEntry, e2);
+ resyncFailed(listener, replicaEntry, masterEntry,
+ newReplicaEntry, state);
+ return;
+ }
+ }
}
- transferToReplicaEntry(replicaEntry, masterEntry, newReplicaEntry);
- log.info("Replacing stale replica entry with: " + newReplicaEntry);
}
- if (!newReplicaEntry.tryInsert()) {
- // Try to correct bizarre corruption.
- newReplicaEntry.tryDelete();
- newReplicaEntry.tryInsert();
+
+ // Insert new entry.
+ if (masterEntry != null && newReplicaEntry != null) {
+ if (!newReplicaEntry.tryInsert()) {
+ // Try to correct bizarre corruption.
+ newReplicaEntry.tryDelete();
+ newReplicaEntry.tryInsert();
+ }
}
- }
- if (listener != null) {
- if (replicaEntry == null) {
- if (newReplicaEntry != null) {
- listener.inserted(newReplicaEntry);
+ if (listener != null) {
+ if (replicaEntry == null) {
+ listener.afterInsert(newReplicaEntry, state);
+ } else if (masterEntry != null) {
+ listener.afterUpdate(newReplicaEntry, state);
+ } else {
+ listener.afterDelete(replicaEntry, state);
}
- } else if (newReplicaEntry != null) {
- listener.updated(replicaEntry, newReplicaEntry);
- } else {
- listener.deleted(replicaEntry);
}
- }
- txn.commit();
+ txn.commit();
+ } catch (Throwable e) {
+ resyncFailed(listener, replicaEntry, masterEntry, newReplicaEntry, state);
+ ThrowUnchecked.fire(e);
+ }
} finally {
txn.exit();
}
@@ -297,6 +318,26 @@ class ReplicationTrigger<S extends Storable> extends Trigger<S> {
}
}
+ private void resyncFailed(ResyncCapability.Listener<? super S> listener,
+ S replicaEntry, S masterEntry,
+ S newReplicaEntry, Object state)
+ {
+ if (listener != null) {
+ try {
+ if (replicaEntry == null) {
+ listener.failedInsert(newReplicaEntry, state);
+ } else if (masterEntry != null) {
+ listener.failedUpdate(newReplicaEntry, state);
+ } else {
+ listener.failedDelete(replicaEntry, state);
+ }
+ } catch (Throwable e2) {
+ Thread t = Thread.currentThread();
+ t.getUncaughtExceptionHandler().uncaughtException(t, e2);
+ }
+ }
+ }
+
private void transferToReplicaEntry(S replicaEntry, S masterEntry, S newReplicaEntry) {
// First copy from old replica to preserve values of any independent
// properties. Be sure not to copy nulls from old replica to new