summaryrefslogtreecommitdiff
path: root/src/main/java/com/amazon/carbonado/repo/indexed
diff options
context:
space:
mode:
authorBrian S. O'Neill <bronee@gmail.com>2011-11-10 18:07:42 +0000
committerBrian S. O'Neill <bronee@gmail.com>2011-11-10 18:07:42 +0000
commit8fb93b47e3088b928af687a09e45b74b2d4833e4 (patch)
treec50e955a0920d025a1e0b0a3bd464aa28d7cebe5 /src/main/java/com/amazon/carbonado/repo/indexed
parentc053f43052b189e0fb2998da398dce127d8f6a57 (diff)
Add retry logic to index repair.
Diffstat (limited to 'src/main/java/com/amazon/carbonado/repo/indexed')
-rw-r--r--src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java102
1 files changed, 60 insertions, 42 deletions
diff --git a/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java b/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java
index d9b585f..4200d1f 100644
--- a/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java
+++ b/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java
@@ -23,6 +23,8 @@ import java.lang.reflect.UndeclaredThrowableException;
import java.util.Comparator;
import java.util.Iterator;
+import java.util.concurrent.TimeUnit;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -30,6 +32,7 @@ import com.amazon.carbonado.CorruptEncodingException;
import com.amazon.carbonado.Cursor;
import com.amazon.carbonado.FetchException;
import com.amazon.carbonado.IsolationLevel;
+import com.amazon.carbonado.PersistTimeoutException;
import com.amazon.carbonado.PersistException;
import com.amazon.carbonado.Query;
import com.amazon.carbonado.Repository;
@@ -430,6 +433,7 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {
txn = mRepository.enterTopTransaction(IsolationLevel.READ_COMMITTED);
try {
txn.setForUpdate(true);
+ txn.setDesiredLockTimeout(0, TimeUnit.SECONDS);
Cursor<? extends Storable> indexEntryCursor = indexEntryQuery.fetch();
Storable existingIndexEntry = null;
@@ -440,43 +444,49 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {
indexEntryCursor = null;
}
+ boolean retry = false;
+ Storable indexEntry = null;
+ Storable lastIndexEntry = null;
+
Iterator it = buffer.iterator();
bufferIterate: while (true) {
- Object obj;
- if (it.hasNext()) {
- obj = it.next();
- } else if (indexEntryCursor != null && indexEntryCursor.hasNext()) {
- obj = null;
- } else {
- break;
- }
+ if (!retry) {
+ Object obj;
+ if (it.hasNext()) {
+ obj = it.next();
+ } else if (indexEntryCursor != null && indexEntryCursor.hasNext()) {
+ obj = null;
+ } else {
+ break;
+ }
- Storable indexEntry = (Storable) obj;
+ indexEntry = (Storable) obj;
+ }
- if (indexEntry != null) {
- if (indexEntry.tryInsert()) {
- totalInserted++;
- } else {
- // Couldn't insert because an index entry already exists.
- Storable existing = indexEntry.copy();
- boolean doUpdate = false;
- if (!existing.tryLoad()) {
- doUpdate = true;
- } else if (!existing.equalProperties(indexEntry)) {
- // If only the version differs, leave existing entry alone.
- indexEntry.copyVersionProperty(existing);
- doUpdate = !existing.equalProperties(indexEntry);
- }
- if (doUpdate) {
- indexEntry.tryDelete();
- indexEntry.tryInsert();
- totalUpdated++;
+ try {
+ if (indexEntry != null) {
+ if (indexEntry.tryInsert()) {
+ totalInserted++;
+ } else {
+ // Couldn't insert because an index entry already exists.
+ Storable existing = indexEntry.copy();
+ boolean doUpdate = false;
+ if (!existing.tryLoad()) {
+ doUpdate = true;
+ } else if (!existing.equalProperties(indexEntry)) {
+ // If only the version differs, leave existing entry alone.
+ indexEntry.copyVersionProperty(existing);
+ doUpdate = !existing.equalProperties(indexEntry);
+ }
+ if (doUpdate) {
+ indexEntry.tryDelete();
+ indexEntry.tryInsert();
+ totalUpdated++;
+ }
}
}
- }
- if (indexEntryCursor != null) {
- while (true) {
+ if (indexEntryCursor != null) while (true) {
if (existingIndexEntry == null) {
if (indexEntryCursor.hasNext()) {
existingIndexEntry = indexEntryCursor.next();
@@ -513,6 +523,7 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {
txn = mRepository
.enterTopTransaction(IsolationLevel.READ_COMMITTED);
txn.setForUpdate(true);
+ txn.setDesiredLockTimeout(0, TimeUnit.SECONDS);
indexEntryCursor.close();
indexEntryCursor = indexEntryQuery.fetchAfter(existingIndexEntry);
@@ -530,13 +541,22 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {
throttle(throttle, desiredSpeed);
}
}
- }
- if (indexEntry != null) {
- totalProgress++;
+ if (indexEntry != null) {
+ totalProgress++;
+ }
+
+ lastIndexEntry = indexEntry;
+ retry = false;
+ } catch (PersistTimeoutException e) {
+ log.warn("Lock conflict during index repair; will retry: " +
+ indexEntry + ", " + e);
+ // This re-uses the last index entry to repair and forces
+ // the current transaction to commit.
+ retry = true;
}
- if (totalProgress % BUILD_BATCH_SIZE == 0) {
+ if (retry || (totalProgress % BUILD_BATCH_SIZE == 0)) {
txn.commit();
txn.exit();
@@ -545,21 +565,19 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {
txn = mRepository.enterTopTransaction(IsolationLevel.READ_COMMITTED);
txn.setForUpdate(true);
+ txn.setDesiredLockTimeout(0, TimeUnit.SECONDS);
if (indexEntryCursor != null) {
indexEntryCursor.close();
existingIndexEntry = null;
- if (indexEntry == null) {
+ if (indexEntry == null || lastIndexEntry == null) {
indexEntryCursor = indexEntryQuery.fetch();
- } else {
+ } else if (!retry) {
indexEntryCursor = indexEntryQuery.fetchAfter(indexEntry);
- }
-
- if (!indexEntryCursor.hasNext()) {
- indexEntryCursor.close();
- // Don't try opening again.
- indexEntryCursor = null;
+ } else {
+ // Re-fetch starting at the same spot.
+ indexEntryCursor = indexEntryQuery.fetchAfter(lastIndexEntry);
}
}
}