summaryrefslogtreecommitdiff
path: root/src/main/java/com/amazon/carbonado/repo
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/amazon/carbonado/repo')
-rw-r--r--src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java59
-rw-r--r--src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java49
2 files changed, 68 insertions, 40 deletions
diff --git a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java
index 0c30ce0..e76ee9a 100644
--- a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java
+++ b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java
@@ -439,7 +439,7 @@ class ReplicatedRepository
}
if (replicaEntry == null && replicaCursor != null) {
- long skippedCount = 0;
+ int skippedCount = 0;
while (replicaCursor.hasNext()) {
try {
replicaEntry = replicaCursor.next();
@@ -459,31 +459,27 @@ class ReplicatedRepository
replicaCursor.close();
replicaCursor = null;
- skipCorruption: {
- Storable withKey = e.getStorableWithPrimaryKey();
-
- if (withKey != null &&
- withKey.storableType() == replicaStorage.getStorableType())
- {
- // Delete corrupt replica entry.
- try {
- trigger.deleteReplica(withKey);
- log.info("Deleted corrupt replica entry: " +
- withKey.toStringKeyOnly(), e);
- break skipCorruption;
- } catch (PersistException e2) {
- log.warn("Unable to delete corrupt replica entry: " +
- withKey.toStringKeyOnly(), e2);
- }
- }
+ boolean skip = true;
+
+ Storable withKey = e.getStorableWithPrimaryKey();
+ S replicaWithKeyOnly = null;
+
+ if (withKey != null &&
+ withKey.storableType() == replicaStorage.getStorableType())
+ {
+ replicaWithKeyOnly = (S) withKey;
+ }
- // Just skip it.
+ if (replicaWithKeyOnly != null) {
+ // Delete corrupt replica entry.
try {
- skippedCount += replicaCursor.skipNext(1);
- log.info("Skipped corrupt replica entry", e);
- } catch (FetchException e2) {
- log.error("Unable to skip past corrupt replica entry", e2);
- throw e;
+ trigger.deleteReplica(replicaWithKeyOnly);
+ log.info("Deleted corrupt replica entry: " +
+ replicaWithKeyOnly.toStringKeyOnly(), e);
+ skip = false;
+ } catch (PersistException e2) {
+ log.warn("Unable to delete corrupt replica entry: " +
+ replicaWithKeyOnly.toStringKeyOnly(), e2);
}
}
@@ -492,6 +488,21 @@ class ReplicatedRepository
break;
}
replicaCursor = replicaQuery.fetchAfter(lastReplicaEntry);
+
+ if (skip) {
+ try {
+ skippedCount = replicaCursor.skipNext(++skippedCount);
+ log.info("Skipped corrupt replica entry", e);
+ if (replicaWithKeyOnly != null) {
+ // Try to update entry which could not be deleted.
+ replicaEntry = replicaWithKeyOnly;
+ break;
+ }
+ } catch (FetchException e2) {
+ log.error("Unable to skip past corrupt replica entry", e2);
+ throw e;
+ }
+ }
}
}
}
diff --git a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java
index 479f870..ebce796 100644
--- a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java
+++ b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java
@@ -244,31 +244,37 @@ class ReplicationTrigger<S extends Storable> extends Trigger<S> {
try {
if (replicaEntry != null) {
if (masterEntry == null) {
- log.info("Deleting bogus entry: " + replicaEntry);
+ log.info("Deleting bogus replica entry: " + replicaEntry);
+ }
+ try {
+ replicaEntry.tryDelete();
+ } catch (PersistException e) {
+ log.error("Unable to delete replica entry: " + replicaEntry, e);
+ if (masterEntry != null) {
+ // Try to update instead.
+ S newReplicaEntry = mReplicaStorage.prepare();
+ transferToReplicaEntry(replicaEntry, masterEntry, newReplicaEntry);
+ log.info("Replacing corrupt replica entry with: " + newReplicaEntry);
+ try {
+ newReplicaEntry.update();
+ } catch (PersistException e2) {
+ log.error("Unable to update replica entry: " + replicaEntry, e2);
+ return;
+ }
+ }
}
- replicaEntry.tryDelete();
}
if (masterEntry != null) {
- Storable newReplicaEntry = mReplicaStorage.prepare();
+ S newReplicaEntry = mReplicaStorage.prepare();
if (replicaEntry == null) {
masterEntry.copyAllProperties(newReplicaEntry);
- log.info("Adding missing entry: " + newReplicaEntry);
+ log.info("Adding missing replica entry: " + newReplicaEntry);
} else {
if (replicaEntry.equalProperties(masterEntry)) {
return;
}
- // First copy from old replica to preserve values of
- // any independent properties. Be sure not to copy
- // nulls from old replica to new replica, in case new
- // non-nullable properties have been added. This is why
- // copyUnequalProperties is called instead of
- // copyAllProperties.
- replicaEntry.copyUnequalProperties(newReplicaEntry);
- // Calling copyAllProperties will skip unsupported
- // independent properties in master, thus preserving
- // old independent property values.
- masterEntry.copyAllProperties(newReplicaEntry);
- log.info("Replacing stale entry with: " + newReplicaEntry);
+ transferToReplicaEntry(replicaEntry, masterEntry, newReplicaEntry);
+ log.info("Replacing stale replica entry with: " + newReplicaEntry);
}
if (!newReplicaEntry.tryInsert()) {
// Try to correct bizarre corruption.
@@ -285,6 +291,17 @@ class ReplicationTrigger<S extends Storable> extends Trigger<S> {
}
}
+ private void transferToReplicaEntry(S replicaEntry, S masterEntry, S newReplicaEntry) {
+ // First copy from old replica to preserve values of any independent
+ // properties. Be sure not to copy nulls from old replica to new
+ // replica, in case new non-nullable properties have been added. This
+ // is why copyUnequalProperties is called instead of copyAllProperties.
+ replicaEntry.copyUnequalProperties(newReplicaEntry);
+ // Calling copyAllProperties will skip unsupported independent
+ // properties in master, thus preserving old independent property values.
+ masterEntry.copyAllProperties(newReplicaEntry);
+ }
+
/**
* Runs a repair in a background thread. This is done for two reasons: It
* allows repair to not be hindered by locks acquired by transactions and