summaryrefslogtreecommitdiff
path: root/src/main/java/com/amazon/carbonado/repo
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/amazon/carbonado/repo')
-rw-r--r--src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java53
1 files changed, 51 insertions, 2 deletions
diff --git a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java
index 11212c4..fa64533 100644
--- a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java
+++ b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java
@@ -21,14 +21,18 @@ import java.util.Comparator;
import java.util.LinkedHashSet;
import java.util.Set;
+import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.cojen.util.BeanComparator;
+import com.amazon.carbonado.CorruptEncodingException;
import com.amazon.carbonado.Cursor;
+import com.amazon.carbonado.FetchException;
import com.amazon.carbonado.FetchInterruptedException;
import com.amazon.carbonado.IsolationLevel;
import com.amazon.carbonado.MalformedTypeException;
+import com.amazon.carbonado.PersistException;
import com.amazon.carbonado.Query;
import com.amazon.carbonado.Repository;
import com.amazon.carbonado.RepositoryException;
@@ -409,6 +413,8 @@ class ReplicatedRepository
Comparator comparator, Transaction replicaTxn)
throws RepositoryException
{
+ final Log log = LogFactory.getLog(ReplicatedRepository.class);
+
Cursor<S> replicaCursor = null;
Cursor<S> masterCursor = null;
@@ -430,8 +436,51 @@ class ReplicatedRepository
}
}
- if (replicaEntry == null && replicaCursor != null && replicaCursor.hasNext()) {
- replicaEntry = replicaCursor.next();
+ if (replicaEntry == null && replicaCursor != null) {
+ long skippedCount = 0;
+ while (replicaCursor.hasNext()) {
+ try {
+ replicaEntry = replicaCursor.next();
+ if (skippedCount > 0) {
+ if (skippedCount == 1) {
+ log.warn("Skipped corrupt replica entry before this one: " +
+ replicaEntry);
+ } else {
+ log.warn("Skipped " + skippedCount +
+ " corrupt replica entries before this one: " +
+ replicaEntry);
+ }
+ }
+ break;
+ } catch (CorruptEncodingException e) {
+ replicaEntry = null;
+ Storable withKey = e.getStorableWithPrimaryKey();
+
+ if (withKey != null &&
+ withKey.storableType() == replicaStorage.getStorableType())
+ {
+ // Delete corrupt replica entry.
+ try {
+ withKey.delete();
+ log.info("Deleted corrupt replica entry: " +
+ withKey.toStringKeyOnly(), e);
+ continue;
+ } catch (PersistException e2) {
+ log.warn("Unable to delete corrupt replica entry: " +
+ withKey.toStringKeyOnly(), e2);
+ }
+ }
+
+ // Just skip it.
+ try {
+ skippedCount += replicaCursor.skipNext(1);
+ log.info("Skipped corrupt replica entry", e);
+ } catch (FetchException e2) {
+ log.error("Unable to skip past corrupt replica entry", e2);
+ throw e;
+ }
+ }
+ }
}
if (count++ >= RESYNC_WATERMARK || txnCount >= RESYNC_BATCH_SIZE) {