summaryrefslogtreecommitdiff
path: root/src/main/java/com/amazon/carbonado/repo
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/amazon/carbonado/repo')
-rw-r--r--src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java76
1 files changed, 52 insertions, 24 deletions
diff --git a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java
index 8f0e45d..d3414af 100644
--- a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java
+++ b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java
@@ -470,7 +470,18 @@ class ReplicatedRepository
Cursor<S> masterCursor = null;
try {
- replicaCursor = replicaQuery.fetch();
+ while (replicaCursor == null) {
+ try {
+ replicaCursor = replicaQuery.fetch();
+ } catch (CorruptEncodingException e) {
+ S replicaWithKeyOnly = recoverReplicaKey(replicaStorage, e);
+ if (!deleteCorruptEntry(replicationTrigger, replicaWithKeyOnly, e)) {
+ // Cannot delete it, so just give up.
+ throw e;
+ }
+ }
+ }
+
masterCursor = masterQuery.fetch();
S lastReplicaEntry = null;
@@ -512,29 +523,10 @@ class ReplicatedRepository
replicaCursor.close();
replicaCursor = null;
- boolean skip = true;
-
- Storable withKey = e.getStorableWithPrimaryKey();
- S replicaWithKeyOnly = null;
+ S replicaWithKeyOnly = recoverReplicaKey(replicaStorage, e);
- if (withKey != null &&
- withKey.storableType() == replicaStorage.getStorableType())
- {
- replicaWithKeyOnly = (S) withKey;
- }
-
- if (replicaWithKeyOnly != null) {
- // Delete corrupt replica entry.
- try {
- replicationTrigger.deleteReplica(replicaWithKeyOnly);
- log.info("Deleted corrupt replica entry: " +
- replicaWithKeyOnly.toStringKeyOnly(), e);
- skip = false;
- } catch (PersistException e2) {
- log.warn("Unable to delete corrupt replica entry: " +
- replicaWithKeyOnly.toStringKeyOnly(), e2);
- }
- }
+ boolean deleted = deleteCorruptEntry
+ (replicationTrigger, replicaWithKeyOnly, e);
// Re-open (if we can)
if (lastReplicaEntry == null) {
@@ -542,7 +534,8 @@ class ReplicatedRepository
}
replicaCursor = replicaQuery.fetchAfter(lastReplicaEntry);
- if (skip) {
+ if (deleted) {
+ // Skip if it cannot be deleted.
try {
skippedCount = replicaCursor.skipNext(++skippedCount);
log.info("Skipped corrupt replica entry", e);
@@ -644,6 +637,41 @@ class ReplicatedRepository
}
}
+ private <S extends Storable> S recoverReplicaKey(Storage<S> replicaStorage,
+ CorruptEncodingException e)
+ {
+ Storable withKey = e.getStorableWithPrimaryKey();
+ if (withKey != null && withKey.storableType() == replicaStorage.getStorableType()) {
+ return (S) withKey;
+ }
+ return null;
+ }
+
+ private <S extends Storable> boolean deleteCorruptEntry
+ (ReplicationTrigger<S> replicationTrigger,
+ S replicaWithKeyOnly,
+ CorruptEncodingException e)
+ throws RepositoryException
+ {
+ if (replicaWithKeyOnly == null) {
+ return false;
+ }
+
+ final Log log = LogFactory.getLog(ReplicatedRepository.class);
+
+ // Delete corrupt replica entry.
+ try {
+ replicationTrigger.deleteReplica(replicaWithKeyOnly);
+ log.info("Deleted corrupt replica entry: " +
+ replicaWithKeyOnly.toStringKeyOnly(), e);
+ return true;
+ } catch (PersistException e2) {
+ log.warn("Unable to delete corrupt replica entry: " +
+ replicaWithKeyOnly.toStringKeyOnly(), e2);
+ return false;
+ }
+ }
+
private <S extends Storable> Runnable prepareResyncTask
(final ReplicationTrigger<S> replicationTrigger,
final ResyncCapability.Listener<? super S> listener,