summaryrefslogtreecommitdiff
path: root/src/main/java/com/amazon/carbonado/repo
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/amazon/carbonado/repo')
-rw-r--r--src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java57
1 files changed, 55 insertions, 2 deletions
diff --git a/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java b/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java
index 2d7f983..507a09f 100644
--- a/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java
+++ b/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java
@@ -25,6 +25,7 @@ import java.util.Comparator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import com.amazon.carbonado.CorruptEncodingException;
import com.amazon.carbonado.Cursor;
import com.amazon.carbonado.FetchException;
import com.amazon.carbonado.IsolationLevel;
@@ -42,7 +43,10 @@ import com.amazon.carbonado.filter.Filter;
import com.amazon.carbonado.filter.RelOp;
import com.amazon.carbonado.info.Direction;
+import com.amazon.carbonado.info.OrderedProperty;
+import com.amazon.carbonado.info.StorableKey;
import com.amazon.carbonado.info.StorableIndex;
+import com.amazon.carbonado.info.StorableIntrospector;
import com.amazon.carbonado.cursor.MergeSortBuffer;
@@ -216,11 +220,33 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {
Log log = LogFactory.getLog(IndexedStorage.class);
+ Query<S> masterQuery;
+ {
+ // Need to explicitly order master query by primary key in order
+ // for fetchAfter to work correctly in case corrupt records are
+ // encountered.
+ StorableKey<S> pk = StorableIntrospector
+ .examine(masterStorage.getStorableType()).getPrimaryKey();
+ String[] naturalOrdering = new String[pk.getProperties().size()];
+ int i=0;
+ for (OrderedProperty<S> prop : pk.getProperties()) {
+ String orderBy;
+ if (prop.getDirection() == Direction.DESCENDING) {
+ orderBy = prop.toString();
+ } else {
+ orderBy = prop.getChainedProperty().toString();
+ }
+ naturalOrdering[i++] = orderBy;
+ }
+
+ masterQuery = masterStorage.query().orderBy(naturalOrdering);
+ }
+
// Enter top transaction with isolation level of none to make sure
// preload operation does not run in a long nested transaction.
Transaction txn = repo.enterTopTransaction(IsolationLevel.NONE);
try {
- Cursor<S> cursor = masterStorage.query().fetch();
+ Cursor<S> cursor = masterQuery.fetch();
try {
if (!cursor.hasNext()) {
// Nothing exists in master, so nothing to populate.
@@ -247,8 +273,33 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {
buffer.prepare(c);
long nextReportTime = System.currentTimeMillis() + POPULATE_INFO_DELAY_MILLIS;
+
+ // These variables are used when corrupt records are encountered.
+ S lastUserStorable = null;
+ int skippedCount = 0;
+
while (cursor.hasNext()) {
- buffer.add(makeIndexEntry(cursor.next()));
+ S userStorable;
+ try {
+ userStorable = cursor.next();
+ skippedCount = 0;
+ } catch (CorruptEncodingException e) {
+ log.warn("Omitting corrupt record from index: " + e.toString());
+
+ // Exception forces cursor to close. Close again to be sure.
+ cursor.close();
+
+ if (lastUserStorable == null) {
+ cursor = masterQuery.fetch();
+ } else {
+ cursor = masterQuery.fetchAfter(lastUserStorable);
+ }
+
+ cursor.skipNext(++skippedCount);
+ continue;
+ }
+
+ buffer.add(makeIndexEntry(userStorable));
if (log.isInfoEnabled()) {
long now = System.currentTimeMillis();
@@ -257,6 +308,8 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {
nextReportTime = now + POPULATE_INFO_DELAY_MILLIS;
}
}
+
+ lastUserStorable = userStorable;
}
// No need to commit transaction because no changes should have been made.