From 50fbad96036fe7b214c2a33ff3f9fef6abaf2593 Mon Sep 17 00:00:00 2001 From: "Brian S. O'Neill" Date: Tue, 26 Feb 2008 01:36:27 +0000 Subject: Index build skips records which are corrupt instead of giving up entirely. --- .../carbonado/repo/indexed/ManagedIndex.java | 57 +++++++++++++++++++++- 1 file changed, 55 insertions(+), 2 deletions(-) (limited to 'src/main/java/com') diff --git a/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java b/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java index 2d7f983..507a09f 100644 --- a/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java +++ b/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java @@ -25,6 +25,7 @@ import java.util.Comparator; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import com.amazon.carbonado.CorruptEncodingException; import com.amazon.carbonado.Cursor; import com.amazon.carbonado.FetchException; import com.amazon.carbonado.IsolationLevel; @@ -42,7 +43,10 @@ import com.amazon.carbonado.filter.Filter; import com.amazon.carbonado.filter.RelOp; import com.amazon.carbonado.info.Direction; +import com.amazon.carbonado.info.OrderedProperty; +import com.amazon.carbonado.info.StorableKey; import com.amazon.carbonado.info.StorableIndex; +import com.amazon.carbonado.info.StorableIntrospector; import com.amazon.carbonado.cursor.MergeSortBuffer; @@ -216,11 +220,33 @@ class ManagedIndex implements IndexEntryAccessor { Log log = LogFactory.getLog(IndexedStorage.class); + Query masterQuery; + { + // Need to explicitly order master query by primary key in order + // for fetchAfter to work correctly in case corrupt records are + // encountered. + StorableKey pk = StorableIntrospector + .examine(masterStorage.getStorableType()).getPrimaryKey(); + String[] naturalOrdering = new String[pk.getProperties().size()]; + int i=0; + for (OrderedProperty prop : pk.getProperties()) { + String orderBy; + if (prop.getDirection() == Direction.DESCENDING) { + orderBy = prop.toString(); + } else { + orderBy = prop.getChainedProperty().toString(); + } + naturalOrdering[i++] = orderBy; + } + + masterQuery = masterStorage.query().orderBy(naturalOrdering); + } + // Enter top transaction with isolation level of none to make sure // preload operation does not run in a long nested transaction. Transaction txn = repo.enterTopTransaction(IsolationLevel.NONE); try { - Cursor cursor = masterStorage.query().fetch(); + Cursor cursor = masterQuery.fetch(); try { if (!cursor.hasNext()) { // Nothing exists in master, so nothing to populate. @@ -247,8 +273,33 @@ class ManagedIndex implements IndexEntryAccessor { buffer.prepare(c); long nextReportTime = System.currentTimeMillis() + POPULATE_INFO_DELAY_MILLIS; + + // These variables are used when corrupt records are encountered. + S lastUserStorable = null; + int skippedCount = 0; + while (cursor.hasNext()) { - buffer.add(makeIndexEntry(cursor.next())); + S userStorable; + try { + userStorable = cursor.next(); + skippedCount = 0; + } catch (CorruptEncodingException e) { + log.warn("Omitting corrupt record from index: " + e.toString()); + + // Exception forces cursor to close. Close again to be sure. + cursor.close(); + + if (lastUserStorable == null) { + cursor = masterQuery.fetch(); + } else { + cursor = masterQuery.fetchAfter(lastUserStorable); + } + + cursor.skipNext(++skippedCount); + continue; + } + + buffer.add(makeIndexEntry(userStorable)); if (log.isInfoEnabled()) { long now = System.currentTimeMillis(); @@ -257,6 +308,8 @@ class ManagedIndex implements IndexEntryAccessor { nextReportTime = now + POPULATE_INFO_DELAY_MILLIS; } } + + lastUserStorable = userStorable; } // No need to commit transaction because no changes should have been made. -- cgit v1.2.3