diff options
author | Brian S. O'Neill <bronee@gmail.com> | 2006-11-07 02:29:16 +0000 |
---|---|---|
committer | Brian S. O'Neill <bronee@gmail.com> | 2006-11-07 02:29:16 +0000 |
commit | 2f14a36ed762b2287bab1c97fdb4cbee118e2921 (patch) | |
tree | 6f79b5010d0b5100171cc975f4ad560b1ae4d18b /src/main/java/com/amazon/carbonado/repo/replicated | |
parent | 2d942b153e7f2f9ca16c35c67b452c89eb762b7d (diff) |
Allow resync to delete or skip over corrupt replica entries.
Diffstat (limited to 'src/main/java/com/amazon/carbonado/repo/replicated')
-rw-r--r-- | src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java | 53 |
1 files changed, 51 insertions, 2 deletions
diff --git a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java index 11212c4..fa64533 100644 --- a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java +++ b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java @@ -21,14 +21,18 @@ import java.util.Comparator; import java.util.LinkedHashSet;
import java.util.Set;
+import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.cojen.util.BeanComparator;
+import com.amazon.carbonado.CorruptEncodingException;
import com.amazon.carbonado.Cursor;
+import com.amazon.carbonado.FetchException;
import com.amazon.carbonado.FetchInterruptedException;
import com.amazon.carbonado.IsolationLevel;
import com.amazon.carbonado.MalformedTypeException;
+import com.amazon.carbonado.PersistException;
import com.amazon.carbonado.Query;
import com.amazon.carbonado.Repository;
import com.amazon.carbonado.RepositoryException;
@@ -409,6 +413,8 @@ class ReplicatedRepository Comparator comparator, Transaction replicaTxn)
throws RepositoryException
{
+ final Log log = LogFactory.getLog(ReplicatedRepository.class);
+
Cursor<S> replicaCursor = null;
Cursor<S> masterCursor = null;
@@ -430,8 +436,51 @@ class ReplicatedRepository }
}
- if (replicaEntry == null && replicaCursor != null && replicaCursor.hasNext()) {
- replicaEntry = replicaCursor.next();
+ if (replicaEntry == null && replicaCursor != null) {
+ long skippedCount = 0;
+ while (replicaCursor.hasNext()) {
+ try {
+ replicaEntry = replicaCursor.next();
+ if (skippedCount > 0) {
+ if (skippedCount == 1) {
+ log.warn("Skipped corrupt replica entry before this one: " +
+ replicaEntry);
+ } else {
+ log.warn("Skipped " + skippedCount +
+ " corrupt replica entries before this one: " +
+ replicaEntry);
+ }
+ }
+ break;
+ } catch (CorruptEncodingException e) {
+ replicaEntry = null;
+ Storable withKey = e.getStorableWithPrimaryKey();
+
+ if (withKey != null &&
+ withKey.storableType() == replicaStorage.getStorableType())
+ {
+ // Delete corrupt replica entry.
+ try {
+ withKey.delete();
+ log.info("Deleted corrupt replica entry: " +
+ withKey.toStringKeyOnly(), e);
+ continue;
+ } catch (PersistException e2) {
+ log.warn("Unable to delete corrupt replica entry: " +
+ withKey.toStringKeyOnly(), e2);
+ }
+ }
+
+ // Just skip it.
+ try {
+ skippedCount += replicaCursor.skipNext(1);
+ log.info("Skipped corrupt replica entry", e);
+ } catch (FetchException e2) {
+ log.error("Unable to skip past corrupt replica entry", e2);
+ throw e;
+ }
+ }
+ }
}
if (count++ >= RESYNC_WATERMARK || txnCount >= RESYNC_BATCH_SIZE) {
|