diff options
Diffstat (limited to 'src/main/java')
| -rw-r--r-- | src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java | 59 | ||||
| -rw-r--r-- | src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java | 49 | 
2 files changed, 68 insertions, 40 deletions
| diff --git a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java index 0c30ce0..e76ee9a 100644 --- a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java +++ b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java @@ -439,7 +439,7 @@ class ReplicatedRepository                  }
                  if (replicaEntry == null && replicaCursor != null) {
 -                    long skippedCount = 0;
 +                    int skippedCount = 0;
                      while (replicaCursor.hasNext()) {
                          try {
                              replicaEntry = replicaCursor.next();
 @@ -459,31 +459,27 @@ class ReplicatedRepository                              replicaCursor.close();
                              replicaCursor = null;
 -                            skipCorruption: {
 -                                Storable withKey = e.getStorableWithPrimaryKey();
 -
 -                                if (withKey != null &&
 -                                    withKey.storableType() == replicaStorage.getStorableType())
 -                                {
 -                                    // Delete corrupt replica entry.
 -                                    try {
 -                                        trigger.deleteReplica(withKey);
 -                                        log.info("Deleted corrupt replica entry: " +
 -                                                 withKey.toStringKeyOnly(), e);
 -                                        break skipCorruption;
 -                                    } catch (PersistException e2) {
 -                                        log.warn("Unable to delete corrupt replica entry: " +
 -                                                 withKey.toStringKeyOnly(), e2);
 -                                    }
 -                                }
 +                            boolean skip = true;
 +
 +                            Storable withKey = e.getStorableWithPrimaryKey();
 +                            S replicaWithKeyOnly = null;
 +
 +                            if (withKey != null &&
 +                                withKey.storableType() == replicaStorage.getStorableType())
 +                            {
 +                                replicaWithKeyOnly = (S) withKey;
 +                            }
 -                                // Just skip it.
 +                            if (replicaWithKeyOnly != null) {
 +                                // Delete corrupt replica entry.
                                  try {
 -                                    skippedCount += replicaCursor.skipNext(1);
 -                                    log.info("Skipped corrupt replica entry", e);
 -                                } catch (FetchException e2) {
 -                                    log.error("Unable to skip past corrupt replica entry", e2);
 -                                    throw e;
 +                                    trigger.deleteReplica(replicaWithKeyOnly);
 +                                    log.info("Deleted corrupt replica entry: " +
 +                                             replicaWithKeyOnly.toStringKeyOnly(), e);
 +                                    skip = false;
 +                                } catch (PersistException e2) {
 +                                    log.warn("Unable to delete corrupt replica entry: " +
 +                                             replicaWithKeyOnly.toStringKeyOnly(), e2);
                                  }
                              }
 @@ -492,6 +488,21 @@ class ReplicatedRepository                                  break;
                              }
                              replicaCursor = replicaQuery.fetchAfter(lastReplicaEntry);
 +
 +                            if (skip) {
 +                                try {
 +                                    skippedCount = replicaCursor.skipNext(++skippedCount);
 +                                    log.info("Skipped corrupt replica entry", e);
 +                                    if (replicaWithKeyOnly != null) {
 +                                        // Try to update entry which could not be deleted.
 +                                        replicaEntry = replicaWithKeyOnly;
 +                                        break;
 +                                    }
 +                                } catch (FetchException e2) {
 +                                    log.error("Unable to skip past corrupt replica entry", e2);
 +                                    throw e;
 +                                }
 +                            }
                          }
                      }
                  }
 diff --git a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java index 479f870..ebce796 100644 --- a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java +++ b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicationTrigger.java @@ -244,31 +244,37 @@ class ReplicationTrigger<S extends Storable> extends Trigger<S> {              try {
                  if (replicaEntry != null) {
                      if (masterEntry == null) {
 -                        log.info("Deleting bogus entry: " + replicaEntry);
 +                        log.info("Deleting bogus replica entry: " + replicaEntry);
 +                    }
 +                    try {
 +                        replicaEntry.tryDelete();
 +                    } catch (PersistException e) {
 +                        log.error("Unable to delete replica entry: " + replicaEntry, e);
 +                        if (masterEntry != null) {
 +                            // Try to update instead.
 +                            S newReplicaEntry = mReplicaStorage.prepare();
 +                            transferToReplicaEntry(replicaEntry, masterEntry, newReplicaEntry);
 +                            log.info("Replacing corrupt replica entry with: " + newReplicaEntry);
 +                            try {
 +                                newReplicaEntry.update();
 +                            } catch (PersistException e2) {
 +                                log.error("Unable to update replica entry: " + replicaEntry, e2);
 +                                return;
 +                            }
 +                        }
                      }
 -                    replicaEntry.tryDelete();
                  }
                  if (masterEntry != null) {
 -                    Storable newReplicaEntry = mReplicaStorage.prepare();
 +                    S newReplicaEntry = mReplicaStorage.prepare();
                      if (replicaEntry == null) {
                          masterEntry.copyAllProperties(newReplicaEntry);
 -                        log.info("Adding missing entry: " + newReplicaEntry);
 +                        log.info("Adding missing replica entry: " + newReplicaEntry);
                      } else {
                          if (replicaEntry.equalProperties(masterEntry)) {
                              return;
                          }
 -                        // First copy from old replica to preserve values of
 -                        // any independent properties. Be sure not to copy
 -                        // nulls from old replica to new replica, in case new
 -                        // non-nullable properties have been added. This is why
 -                        // copyUnequalProperties is called instead of
 -                        // copyAllProperties.
 -                        replicaEntry.copyUnequalProperties(newReplicaEntry);
 -                        // Calling copyAllProperties will skip unsupported
 -                        // independent properties in master, thus preserving
 -                        // old independent property values.
 -                        masterEntry.copyAllProperties(newReplicaEntry);
 -                        log.info("Replacing stale entry with: " + newReplicaEntry);
 +                        transferToReplicaEntry(replicaEntry, masterEntry, newReplicaEntry);
 +                        log.info("Replacing stale replica entry with: " + newReplicaEntry);
                      }
                      if (!newReplicaEntry.tryInsert()) {
                          // Try to correct bizarre corruption.
 @@ -285,6 +291,17 @@ class ReplicationTrigger<S extends Storable> extends Trigger<S> {          }
      }
 +    private void transferToReplicaEntry(S replicaEntry, S masterEntry, S newReplicaEntry) {
 +        // First copy from old replica to preserve values of any independent
 +        // properties. Be sure not to copy nulls from old replica to new
 +        // replica, in case new non-nullable properties have been added. This
 +        // is why copyUnequalProperties is called instead of copyAllProperties.
 +        replicaEntry.copyUnequalProperties(newReplicaEntry);
 +        // Calling copyAllProperties will skip unsupported independent
 +        // properties in master, thus preserving old independent property values.
 +        masterEntry.copyAllProperties(newReplicaEntry);
 +    }
 +
      /**
       * Runs a repair in a background thread. This is done for two reasons: It
       * allows repair to not be hindered by locks acquired by transactions and
 | 
