From 6e6c3ff6bfd2893df3215160dfa2029b01f2906e Mon Sep 17 00:00:00 2001 From: "Brian S. O'Neill" Date: Fri, 26 Sep 2008 18:31:47 +0000 Subject: Resync listener is now a trigger in order to capture more information. --- .../repo/replicated/ReplicatedRepository.java | 6 + .../repo/replicated/ReplicationTrigger.java | 139 +++++++++++++-------- 2 files changed, 96 insertions(+), 49 deletions(-) (limited to 'src/main/java/com/amazon/carbonado/repo/replicated') diff --git a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java index 51dbba8..8f0e45d 100644 --- a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java +++ b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java @@ -493,6 +493,9 @@ class ReplicatedRepository while (replicaCursor.hasNext()) { try { replicaEntry = replicaCursor.next(); + if (listener != null) { + listener.afterLoad(replicaEntry); + } if (skippedCount > 0) { if (skippedCount == 1) { log.warn("Skipped corrupt replica entry before this one: " + @@ -546,6 +549,9 @@ class ReplicatedRepository if (replicaWithKeyOnly != null) { // Try to update entry which could not be deleted. replicaEntry = replicaWithKeyOnly; + if (listener != null) { + listener.afterLoad(replicaEntry); + } break; } } catch (FetchException e2) { diff --git a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java index 29af101..dd1d78b 100644 --- a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java +++ b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java @@ -37,6 +37,8 @@ import com.amazon.carbonado.capability.ResyncCapability; import com.amazon.carbonado.spi.RepairExecutor; import com.amazon.carbonado.spi.TriggerManager; +import com.amazon.carbonado.util.ThrowUnchecked; + /** * All inserts/updates/deletes are first committed to the master storage, then * duplicated and committed to the replica. @@ -228,67 +230,86 @@ class ReplicationTrigger extends Trigger { try { Transaction txn = mRepository.enterTransaction(); try { - S newReplicaEntry = null; - - // Delete old entry. - if (replicaEntry != null) { - if (masterEntry == null) { - log.info("Deleting bogus replica entry: " + replicaEntry); - } - try { - replicaEntry.tryDelete(); - } catch (PersistException e) { - log.error("Unable to delete replica entry: " + replicaEntry, e); - if (masterEntry != null) { - // Try to update instead. - newReplicaEntry = mReplicaStorage.prepare(); - transferToReplicaEntry(replicaEntry, masterEntry, newReplicaEntry); - log.info("Replacing corrupt replica entry with: " + newReplicaEntry); - try { - newReplicaEntry.update(); - // This disables the insert step, which is not needed now. - masterEntry = null; - } catch (PersistException e2) { - log.error("Unable to update replica entry: " + replicaEntry, e2); - return; - } - } + final S newReplicaEntry; + if (replicaEntry == null) { + newReplicaEntry = mReplicaStorage.prepare(); + masterEntry.copyAllProperties(newReplicaEntry); + log.info("Inserting missing replica entry: " + newReplicaEntry); + } else if (masterEntry != null) { + if (replicaEntry.equalProperties(masterEntry)) { + return; } + newReplicaEntry = mReplicaStorage.prepare(); + transferToReplicaEntry(replicaEntry, masterEntry, newReplicaEntry); + log.info("Updating stale replica entry with: " + newReplicaEntry); + } else { + newReplicaEntry = null; + log.info("Deleting bogus replica entry: " + replicaEntry); } - // Insert new entry. - if (masterEntry != null) { - newReplicaEntry = mReplicaStorage.prepare(); + final Object state; + if (listener == null) { + state = null; + } else { if (replicaEntry == null) { - masterEntry.copyAllProperties(newReplicaEntry); - log.info("Adding missing replica entry: " + newReplicaEntry); + state = listener.beforeInsert(newReplicaEntry); + } else if (masterEntry != null) { + state = listener.beforeUpdate(replicaEntry, newReplicaEntry); } else { - if (replicaEntry.equalProperties(masterEntry)) { - return; + state = listener.beforeDelete(replicaEntry); + } + } + + try { + // Delete old entry. + if (replicaEntry != null) { + try { + replicaEntry.tryDelete(); + } catch (PersistException e) { + log.error("Unable to delete replica entry: " + replicaEntry, e); + if (masterEntry != null) { + // Try to update instead. + log.info("Updating corrupt replica entry with: " + + newReplicaEntry); + try { + newReplicaEntry.update(); + // This disables the insert step, which is not needed now. + masterEntry = null; + } catch (PersistException e2) { + log.error("Unable to update replica entry: " + + replicaEntry, e2); + resyncFailed(listener, replicaEntry, masterEntry, + newReplicaEntry, state); + return; + } + } } - transferToReplicaEntry(replicaEntry, masterEntry, newReplicaEntry); - log.info("Replacing stale replica entry with: " + newReplicaEntry); } - if (!newReplicaEntry.tryInsert()) { - // Try to correct bizarre corruption. - newReplicaEntry.tryDelete(); - newReplicaEntry.tryInsert(); + + // Insert new entry. + if (masterEntry != null && newReplicaEntry != null) { + if (!newReplicaEntry.tryInsert()) { + // Try to correct bizarre corruption. + newReplicaEntry.tryDelete(); + newReplicaEntry.tryInsert(); + } } - } - if (listener != null) { - if (replicaEntry == null) { - if (newReplicaEntry != null) { - listener.inserted(newReplicaEntry); + if (listener != null) { + if (replicaEntry == null) { + listener.afterInsert(newReplicaEntry, state); + } else if (masterEntry != null) { + listener.afterUpdate(newReplicaEntry, state); + } else { + listener.afterDelete(replicaEntry, state); } - } else if (newReplicaEntry != null) { - listener.updated(replicaEntry, newReplicaEntry); - } else { - listener.deleted(replicaEntry); } - } - txn.commit(); + txn.commit(); + } catch (Throwable e) { + resyncFailed(listener, replicaEntry, masterEntry, newReplicaEntry, state); + ThrowUnchecked.fire(e); + } } finally { txn.exit(); } @@ -297,6 +318,26 @@ class ReplicationTrigger extends Trigger { } } + private void resyncFailed(ResyncCapability.Listener listener, + S replicaEntry, S masterEntry, + S newReplicaEntry, Object state) + { + if (listener != null) { + try { + if (replicaEntry == null) { + listener.failedInsert(newReplicaEntry, state); + } else if (masterEntry != null) { + listener.failedUpdate(newReplicaEntry, state); + } else { + listener.failedDelete(replicaEntry, state); + } + } catch (Throwable e2) { + Thread t = Thread.currentThread(); + t.getUncaughtExceptionHandler().uncaughtException(t, e2); + } + } + } + private void transferToReplicaEntry(S replicaEntry, S masterEntry, S newReplicaEntry) { // First copy from old replica to preserve values of any independent // properties. Be sure not to copy nulls from old replica to new -- cgit v1.2.3