diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java | 76 | 
1 files changed, 52 insertions, 24 deletions
| diff --git a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java index 8f0e45d..d3414af 100644 --- a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java +++ b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java @@ -470,7 +470,18 @@ class ReplicatedRepository          Cursor<S> masterCursor = null;
          try {
 -            replicaCursor = replicaQuery.fetch();
 +            while (replicaCursor == null) {
 +                try {
 +                    replicaCursor = replicaQuery.fetch();
 +                } catch (CorruptEncodingException e) {
 +                    S replicaWithKeyOnly = recoverReplicaKey(replicaStorage, e);
 +                    if (!deleteCorruptEntry(replicationTrigger, replicaWithKeyOnly, e)) {
 +                        // Cannot delete it, so just give up.
 +                        throw e;
 +                    }
 +                }
 +            }
 +
              masterCursor = masterQuery.fetch();
              S lastReplicaEntry = null;
 @@ -512,29 +523,10 @@ class ReplicatedRepository                              replicaCursor.close();
                              replicaCursor = null;
 -                            boolean skip = true;
 -
 -                            Storable withKey = e.getStorableWithPrimaryKey();
 -                            S replicaWithKeyOnly = null;
 +                            S replicaWithKeyOnly = recoverReplicaKey(replicaStorage, e);
 -                            if (withKey != null &&
 -                                withKey.storableType() == replicaStorage.getStorableType())
 -                            {
 -                                replicaWithKeyOnly = (S) withKey;
 -                            }
 -
 -                            if (replicaWithKeyOnly != null) {
 -                                // Delete corrupt replica entry.
 -                                try {
 -                                    replicationTrigger.deleteReplica(replicaWithKeyOnly);
 -                                    log.info("Deleted corrupt replica entry: " +
 -                                             replicaWithKeyOnly.toStringKeyOnly(), e);
 -                                    skip = false;
 -                                } catch (PersistException e2) {
 -                                    log.warn("Unable to delete corrupt replica entry: " +
 -                                             replicaWithKeyOnly.toStringKeyOnly(), e2);
 -                                }
 -                            }
 +                            boolean deleted = deleteCorruptEntry
 +                                (replicationTrigger, replicaWithKeyOnly, e);
                              // Re-open (if we can)
                              if (lastReplicaEntry == null) {
 @@ -542,7 +534,8 @@ class ReplicatedRepository                              }
                              replicaCursor = replicaQuery.fetchAfter(lastReplicaEntry);
 -                            if (skip) {
 +                            if (deleted) {
 +                                // Skip if it cannot be deleted.
                                  try {
                                      skippedCount = replicaCursor.skipNext(++skippedCount);
                                      log.info("Skipped corrupt replica entry", e);
 @@ -644,6 +637,41 @@ class ReplicatedRepository          }
      }
 +    private <S extends Storable> S recoverReplicaKey(Storage<S> replicaStorage,
 +                                                     CorruptEncodingException e)
 +    {
 +        Storable withKey = e.getStorableWithPrimaryKey();
 +        if (withKey != null && withKey.storableType() == replicaStorage.getStorableType()) {
 +            return (S) withKey;
 +        }
 +        return null;
 +    }
 +
 +    private <S extends Storable> boolean deleteCorruptEntry
 +                       (ReplicationTrigger<S> replicationTrigger,
 +                        S replicaWithKeyOnly,
 +                        CorruptEncodingException e)
 +        throws RepositoryException
 +    {
 +        if (replicaWithKeyOnly == null) {
 +            return false;
 +        }
 +
 +        final Log log = LogFactory.getLog(ReplicatedRepository.class);
 +
 +        // Delete corrupt replica entry.
 +        try {
 +            replicationTrigger.deleteReplica(replicaWithKeyOnly);
 +            log.info("Deleted corrupt replica entry: " +
 +                     replicaWithKeyOnly.toStringKeyOnly(), e);
 +            return true;
 +        } catch (PersistException e2) {
 +            log.warn("Unable to delete corrupt replica entry: " +
 +                     replicaWithKeyOnly.toStringKeyOnly(), e2);
 +            return false;
 +        }
 +    }
 +
      private <S extends Storable> Runnable prepareResyncTask
                         (final ReplicationTrigger<S> replicationTrigger,
                          final ResyncCapability.Listener<? super S> listener,
 | 
