From 2f14a36ed762b2287bab1c97fdb4cbee118e2921 Mon Sep 17 00:00:00 2001 From: "Brian S. O'Neill" Date: Tue, 7 Nov 2006 02:29:16 +0000 Subject: Allow resync to delete or skip over corrupt replica entries. --- .../repo/replicated/ReplicatedRepository.java | 53 +++++++++++++++++++++- 1 file changed, 51 insertions(+), 2 deletions(-) (limited to 'src/main/java/com/amazon/carbonado/repo') diff --git a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java index 11212c4..fa64533 100644 --- a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java +++ b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java @@ -21,14 +21,18 @@ import java.util.Comparator; import java.util.LinkedHashSet; import java.util.Set; +import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.cojen.util.BeanComparator; +import com.amazon.carbonado.CorruptEncodingException; import com.amazon.carbonado.Cursor; +import com.amazon.carbonado.FetchException; import com.amazon.carbonado.FetchInterruptedException; import com.amazon.carbonado.IsolationLevel; import com.amazon.carbonado.MalformedTypeException; +import com.amazon.carbonado.PersistException; import com.amazon.carbonado.Query; import com.amazon.carbonado.Repository; import com.amazon.carbonado.RepositoryException; @@ -409,6 +413,8 @@ class ReplicatedRepository Comparator comparator, Transaction replicaTxn) throws RepositoryException { + final Log log = LogFactory.getLog(ReplicatedRepository.class); + Cursor replicaCursor = null; Cursor masterCursor = null; @@ -430,8 +436,51 @@ class ReplicatedRepository } } - if (replicaEntry == null && replicaCursor != null && replicaCursor.hasNext()) { - replicaEntry = replicaCursor.next(); + if (replicaEntry == null && replicaCursor != null) { + long skippedCount = 0; + while (replicaCursor.hasNext()) { + try { + replicaEntry = replicaCursor.next(); + if (skippedCount > 0) { + if (skippedCount == 1) { + log.warn("Skipped corrupt replica entry before this one: " + + replicaEntry); + } else { + log.warn("Skipped " + skippedCount + + " corrupt replica entries before this one: " + + replicaEntry); + } + } + break; + } catch (CorruptEncodingException e) { + replicaEntry = null; + Storable withKey = e.getStorableWithPrimaryKey(); + + if (withKey != null && + withKey.storableType() == replicaStorage.getStorableType()) + { + // Delete corrupt replica entry. + try { + withKey.delete(); + log.info("Deleted corrupt replica entry: " + + withKey.toStringKeyOnly(), e); + continue; + } catch (PersistException e2) { + log.warn("Unable to delete corrupt replica entry: " + + withKey.toStringKeyOnly(), e2); + } + } + + // Just skip it. + try { + skippedCount += replicaCursor.skipNext(1); + log.info("Skipped corrupt replica entry", e); + } catch (FetchException e2) { + log.error("Unable to skip past corrupt replica entry", e2); + throw e; + } + } + } } if (count++ >= RESYNC_WATERMARK || txnCount >= RESYNC_BATCH_SIZE) { -- cgit v1.2.3