diff options
author | Brian S. O'Neill <bronee@gmail.com> | 2008-11-19 00:27:14 +0000 |
---|---|---|
committer | Brian S. O'Neill <bronee@gmail.com> | 2008-11-19 00:27:14 +0000 |
commit | 98c76da614b649ad53201476c332c91be5e7bb06 (patch) | |
tree | 121e30e5a244848584db88441056c567a9b4e042 | |
parent | cf570d55bf09d7aa7af196fafe90db2200002aa5 (diff) |
Fix resync when query is known to return one entry.
-rw-r--r-- | src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java | 76 |
1 files changed, 52 insertions, 24 deletions
diff --git a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java index 8f0e45d..d3414af 100644 --- a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java +++ b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java @@ -470,7 +470,18 @@ class ReplicatedRepository Cursor<S> masterCursor = null;
try {
- replicaCursor = replicaQuery.fetch();
+ while (replicaCursor == null) {
+ try {
+ replicaCursor = replicaQuery.fetch();
+ } catch (CorruptEncodingException e) {
+ S replicaWithKeyOnly = recoverReplicaKey(replicaStorage, e);
+ if (!deleteCorruptEntry(replicationTrigger, replicaWithKeyOnly, e)) {
+ // Cannot delete it, so just give up.
+ throw e;
+ }
+ }
+ }
+
masterCursor = masterQuery.fetch();
S lastReplicaEntry = null;
@@ -512,29 +523,10 @@ class ReplicatedRepository replicaCursor.close();
replicaCursor = null;
- boolean skip = true;
-
- Storable withKey = e.getStorableWithPrimaryKey();
- S replicaWithKeyOnly = null;
+ S replicaWithKeyOnly = recoverReplicaKey(replicaStorage, e);
- if (withKey != null &&
- withKey.storableType() == replicaStorage.getStorableType())
- {
- replicaWithKeyOnly = (S) withKey;
- }
-
- if (replicaWithKeyOnly != null) {
- // Delete corrupt replica entry.
- try {
- replicationTrigger.deleteReplica(replicaWithKeyOnly);
- log.info("Deleted corrupt replica entry: " +
- replicaWithKeyOnly.toStringKeyOnly(), e);
- skip = false;
- } catch (PersistException e2) {
- log.warn("Unable to delete corrupt replica entry: " +
- replicaWithKeyOnly.toStringKeyOnly(), e2);
- }
- }
+ boolean deleted = deleteCorruptEntry
+ (replicationTrigger, replicaWithKeyOnly, e);
// Re-open (if we can)
if (lastReplicaEntry == null) {
@@ -542,7 +534,8 @@ class ReplicatedRepository }
replicaCursor = replicaQuery.fetchAfter(lastReplicaEntry);
- if (skip) {
+ if (deleted) {
+ // Skip if it cannot be deleted.
try {
skippedCount = replicaCursor.skipNext(++skippedCount);
log.info("Skipped corrupt replica entry", e);
@@ -644,6 +637,41 @@ class ReplicatedRepository }
}
+ private <S extends Storable> S recoverReplicaKey(Storage<S> replicaStorage,
+ CorruptEncodingException e)
+ {
+ Storable withKey = e.getStorableWithPrimaryKey();
+ if (withKey != null && withKey.storableType() == replicaStorage.getStorableType()) {
+ return (S) withKey;
+ }
+ return null;
+ }
+
+ private <S extends Storable> boolean deleteCorruptEntry
+ (ReplicationTrigger<S> replicationTrigger,
+ S replicaWithKeyOnly,
+ CorruptEncodingException e)
+ throws RepositoryException
+ {
+ if (replicaWithKeyOnly == null) {
+ return false;
+ }
+
+ final Log log = LogFactory.getLog(ReplicatedRepository.class);
+
+ // Delete corrupt replica entry.
+ try {
+ replicationTrigger.deleteReplica(replicaWithKeyOnly);
+ log.info("Deleted corrupt replica entry: " +
+ replicaWithKeyOnly.toStringKeyOnly(), e);
+ return true;
+ } catch (PersistException e2) {
+ log.warn("Unable to delete corrupt replica entry: " +
+ replicaWithKeyOnly.toStringKeyOnly(), e2);
+ return false;
+ }
+ }
+
private <S extends Storable> Runnable prepareResyncTask
(final ReplicationTrigger<S> replicationTrigger,
final ResyncCapability.Listener<? super S> listener,
|