summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java52
1 files changed, 35 insertions, 17 deletions
diff --git a/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java b/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java
index 4200d1f..f3ede30 100644
--- a/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java
+++ b/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java
@@ -31,6 +31,7 @@ import org.apache.commons.logging.LogFactory;
import com.amazon.carbonado.CorruptEncodingException;
import com.amazon.carbonado.Cursor;
import com.amazon.carbonado.FetchException;
+import com.amazon.carbonado.FetchTimeoutException;
import com.amazon.carbonado.IsolationLevel;
import com.amazon.carbonado.PersistTimeoutException;
import com.amazon.carbonado.PersistException;
@@ -72,6 +73,18 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {
static final int BUILD_THROTTLE_WINDOW = BUILD_BATCH_SIZE * 10;
static final int BUILD_THROTTLE_SLEEP_PRECISION = 10;
+ private static final int BUILD_TXN_TIMEOUT_MILLIS;
+
+ static {
+ int timeout = 100;
+ String prop = System.getProperty
+ ("com.amazon.carbonado.repo.indexed.BUILD_TXN_TIMEOUT_MILLIS");
+ if (prop != null) {
+ timeout = Integer.parseInt(prop);
+ }
+ BUILD_TXN_TIMEOUT_MILLIS = timeout;
+ }
+
private static String[] naturalOrdering(Class<? extends Storable> type) {
StorableKey<?> pk = StorableIntrospector.examine(type).getPrimaryKey();
String[] naturalOrdering = new String[pk.getProperties().size()];
@@ -430,11 +443,8 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {
long totalDeleted = 0;
long totalProgress = 0;
- txn = mRepository.enterTopTransaction(IsolationLevel.READ_COMMITTED);
+ txn = enterBuildTxn();
try {
- txn.setForUpdate(true);
- txn.setDesiredLockTimeout(0, TimeUnit.SECONDS);
-
Cursor<? extends Storable> indexEntryCursor = indexEntryQuery.fetch();
Storable existingIndexEntry = null;
@@ -520,10 +530,7 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {
logProgress(log, totalProgress, bufferSize,
totalInserted, totalUpdated, totalDeleted);
- txn = mRepository
- .enterTopTransaction(IsolationLevel.READ_COMMITTED);
- txn.setForUpdate(true);
- txn.setDesiredLockTimeout(0, TimeUnit.SECONDS);
+ txn = enterBuildTxn();
indexEntryCursor.close();
indexEntryCursor = indexEntryQuery.fetchAfter(existingIndexEntry);
@@ -548,12 +555,18 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {
lastIndexEntry = indexEntry;
retry = false;
- } catch (PersistTimeoutException e) {
- log.warn("Lock conflict during index repair; will retry: " +
- indexEntry + ", " + e);
- // This re-uses the last index entry to repair and forces
- // the current transaction to commit.
- retry = true;
+ } catch (RepositoryException e) {
+ if (e instanceof FetchTimeoutException ||
+ e instanceof PersistTimeoutException)
+ {
+ log.warn("Lock conflict during index repair; will retry: " +
+ indexEntry + ", " + e);
+ // This re-uses the last index entry to repair and forces
+ // the current transaction to commit.
+ retry = true;
+ } else {
+ throw e;
+ }
}
if (retry || (totalProgress % BUILD_BATCH_SIZE == 0)) {
@@ -563,9 +576,7 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {
logProgress(log, totalProgress, bufferSize,
totalInserted, totalUpdated, totalDeleted);
- txn = mRepository.enterTopTransaction(IsolationLevel.READ_COMMITTED);
- txn.setForUpdate(true);
- txn.setDesiredLockTimeout(0, TimeUnit.SECONDS);
+ txn = enterBuildTxn();
if (indexEntryCursor != null) {
indexEntryCursor.close();
@@ -597,6 +608,13 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {
}
}
+ private Transaction enterBuildTxn() {
+ Transaction txn = mRepository.enterTopTransaction(IsolationLevel.READ_COMMITTED);
+ txn.setForUpdate(true);
+ txn.setDesiredLockTimeout(BUILD_TXN_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+ return txn;
+ }
+
private static void throttle(Throttle throttle, double desiredSpeed)
throws RepositoryException
{