summaryrefslogtreecommitdiff
path: root/src/main/java/com/amazon/carbonado/repo/replicated
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/amazon/carbonado/repo/replicated')
-rw-r--r--src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java57
-rw-r--r--src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java26
2 files changed, 57 insertions, 26 deletions
diff --git a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java
index fa64533..0c30ce0 100644
--- a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java
+++ b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java
@@ -422,6 +422,8 @@ class ReplicatedRepository
replicaCursor = replicaQuery.fetch();
masterCursor = masterQuery.fetch();
+ S lastReplicaEntry = null;
+ S lastMasterEntry = null;
S replicaEntry = null;
S masterEntry = null;
@@ -453,32 +455,43 @@ class ReplicatedRepository
}
break;
} catch (CorruptEncodingException e) {
- replicaEntry = null;
- Storable withKey = e.getStorableWithPrimaryKey();
+ // Exception forces cursor to close. Close again to be sure.
+ replicaCursor.close();
+ replicaCursor = null;
+
+ skipCorruption: {
+ Storable withKey = e.getStorableWithPrimaryKey();
+
+ if (withKey != null &&
+ withKey.storableType() == replicaStorage.getStorableType())
+ {
+ // Delete corrupt replica entry.
+ try {
+ trigger.deleteReplica(withKey);
+ log.info("Deleted corrupt replica entry: " +
+ withKey.toStringKeyOnly(), e);
+ break skipCorruption;
+ } catch (PersistException e2) {
+ log.warn("Unable to delete corrupt replica entry: " +
+ withKey.toStringKeyOnly(), e2);
+ }
+ }
- if (withKey != null &&
- withKey.storableType() == replicaStorage.getStorableType())
- {
- // Delete corrupt replica entry.
+ // Just skip it.
try {
- withKey.delete();
- log.info("Deleted corrupt replica entry: " +
- withKey.toStringKeyOnly(), e);
- continue;
- } catch (PersistException e2) {
- log.warn("Unable to delete corrupt replica entry: " +
- withKey.toStringKeyOnly(), e2);
+ skippedCount += replicaCursor.skipNext(1);
+ log.info("Skipped corrupt replica entry", e);
+ } catch (FetchException e2) {
+ log.error("Unable to skip past corrupt replica entry", e2);
+ throw e;
}
}
- // Just skip it.
- try {
- skippedCount += replicaCursor.skipNext(1);
- log.info("Skipped corrupt replica entry", e);
- } catch (FetchException e2) {
- log.error("Unable to skip past corrupt replica entry", e2);
- throw e;
+ // Re-open (if we can)
+ if (lastReplicaEntry == null) {
+ break;
}
+ replicaCursor = replicaQuery.fetchAfter(lastReplicaEntry);
}
}
}
@@ -512,11 +525,13 @@ class ReplicatedRepository
if (replicaCursor == null) {
replicaCursor = replicaQuery.fetchAfter(replicaEntry);
}
+ lastReplicaEntry = replicaEntry;
replicaEntry = null;
} else if (compare > 0) {
// Replica cursor is missing an entry so copy it.
resyncTask = prepareResyncTask(trigger, null, masterEntry);
// Allow master to advance.
+ lastMasterEntry = masterEntry;
masterEntry = null;
} else {
if (replicaEntry == null && masterEntry == null) {
@@ -533,6 +548,8 @@ class ReplicatedRepository
if (replicaCursor == null) {
replicaCursor = replicaQuery.fetchAfter(replicaEntry);
}
+ lastReplicaEntry = replicaEntry;
+ lastMasterEntry = masterEntry;
replicaEntry = null;
masterEntry = null;
}
diff --git a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java
index 7ac60a6..479f870 100644
--- a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java
+++ b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java
@@ -149,7 +149,7 @@ class ReplicationTrigger<S extends Storable> extends Trigger<S> {
if (!master.tryLoad()) {
// Master record does not exist. To ensure consistency,
// delete record from replica.
- deleteReplica(replica);
+ tryDeleteReplica(replica);
throw abortTry();
}
} else {
@@ -158,7 +158,7 @@ class ReplicationTrigger<S extends Storable> extends Trigger<S> {
} catch (FetchNoneException e) {
// Master record does not exist. To ensure consistency,
// delete record from replica.
- deleteReplica(replica);
+ tryDeleteReplica(replica);
throw e;
}
}
@@ -175,7 +175,7 @@ class ReplicationTrigger<S extends Storable> extends Trigger<S> {
if (!master.tryUpdate()) {
// Master record does not exist. To ensure consistency,
// delete record from replica.
- deleteReplica(replica);
+ tryDeleteReplica(replica);
throw abortTry();
}
} else {
@@ -184,7 +184,7 @@ class ReplicationTrigger<S extends Storable> extends Trigger<S> {
} catch (PersistNoneException e) {
// Master record does not exist. To ensure consistency,
// delete record from replica.
- deleteReplica(replica);
+ tryDeleteReplica(replica);
throw e;
}
}
@@ -350,12 +350,26 @@ class ReplicationTrigger<S extends Storable> extends Trigger<S> {
/**
* Deletes the replica entry with replication disabled.
*/
- private void deleteReplica(S replica) throws PersistException {
+ boolean tryDeleteReplica(Storable replica) throws PersistException {
// Disable replication to prevent trigger from being invoked by
// deleting replica.
setReplicationDisabled(true);
try {
- replica.tryDelete();
+ return replica.tryDelete();
+ } finally {
+ setReplicationDisabled(false);
+ }
+ }
+
+ /**
+ * Deletes the replica entry with replication disabled.
+ */
+ void deleteReplica(Storable replica) throws PersistException {
+ // Disable replication to prevent trigger from being invoked by
+ // deleting replica.
+ setReplicationDisabled(true);
+ try {
+ replica.delete();
} finally {
setReplicationDisabled(false);
}