From 2ec36e83960af4f9b64b6fcbb27d7927cf5564b9 Mon Sep 17 00:00:00 2001 From: "Brian S. O'Neill" Date: Fri, 11 Nov 2011 16:07:07 +0000 Subject: Handle more timeout types, make timeout configurable. --- .../carbonado/repo/indexed/ManagedIndex.java | 52 +++++++++++++++------- 1 file changed, 35 insertions(+), 17 deletions(-) (limited to 'src/main/java') diff --git a/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java b/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java index 4200d1f..f3ede30 100644 --- a/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java +++ b/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java @@ -31,6 +31,7 @@ import org.apache.commons.logging.LogFactory; import com.amazon.carbonado.CorruptEncodingException; import com.amazon.carbonado.Cursor; import com.amazon.carbonado.FetchException; +import com.amazon.carbonado.FetchTimeoutException; import com.amazon.carbonado.IsolationLevel; import com.amazon.carbonado.PersistTimeoutException; import com.amazon.carbonado.PersistException; @@ -72,6 +73,18 @@ class ManagedIndex implements IndexEntryAccessor { static final int BUILD_THROTTLE_WINDOW = BUILD_BATCH_SIZE * 10; static final int BUILD_THROTTLE_SLEEP_PRECISION = 10; + private static final int BUILD_TXN_TIMEOUT_MILLIS; + + static { + int timeout = 100; + String prop = System.getProperty + ("com.amazon.carbonado.repo.indexed.BUILD_TXN_TIMEOUT_MILLIS"); + if (prop != null) { + timeout = Integer.parseInt(prop); + } + BUILD_TXN_TIMEOUT_MILLIS = timeout; + } + private static String[] naturalOrdering(Class type) { StorableKey pk = StorableIntrospector.examine(type).getPrimaryKey(); String[] naturalOrdering = new String[pk.getProperties().size()]; @@ -430,11 +443,8 @@ class ManagedIndex implements IndexEntryAccessor { long totalDeleted = 0; long totalProgress = 0; - txn = mRepository.enterTopTransaction(IsolationLevel.READ_COMMITTED); + txn = enterBuildTxn(); try { - txn.setForUpdate(true); - txn.setDesiredLockTimeout(0, TimeUnit.SECONDS); - Cursor indexEntryCursor = indexEntryQuery.fetch(); Storable existingIndexEntry = null; @@ -520,10 +530,7 @@ class ManagedIndex implements IndexEntryAccessor { logProgress(log, totalProgress, bufferSize, totalInserted, totalUpdated, totalDeleted); - txn = mRepository - .enterTopTransaction(IsolationLevel.READ_COMMITTED); - txn.setForUpdate(true); - txn.setDesiredLockTimeout(0, TimeUnit.SECONDS); + txn = enterBuildTxn(); indexEntryCursor.close(); indexEntryCursor = indexEntryQuery.fetchAfter(existingIndexEntry); @@ -548,12 +555,18 @@ class ManagedIndex implements IndexEntryAccessor { lastIndexEntry = indexEntry; retry = false; - } catch (PersistTimeoutException e) { - log.warn("Lock conflict during index repair; will retry: " + - indexEntry + ", " + e); - // This re-uses the last index entry to repair and forces - // the current transaction to commit. - retry = true; + } catch (RepositoryException e) { + if (e instanceof FetchTimeoutException || + e instanceof PersistTimeoutException) + { + log.warn("Lock conflict during index repair; will retry: " + + indexEntry + ", " + e); + // This re-uses the last index entry to repair and forces + // the current transaction to commit. + retry = true; + } else { + throw e; + } } if (retry || (totalProgress % BUILD_BATCH_SIZE == 0)) { @@ -563,9 +576,7 @@ class ManagedIndex implements IndexEntryAccessor { logProgress(log, totalProgress, bufferSize, totalInserted, totalUpdated, totalDeleted); - txn = mRepository.enterTopTransaction(IsolationLevel.READ_COMMITTED); - txn.setForUpdate(true); - txn.setDesiredLockTimeout(0, TimeUnit.SECONDS); + txn = enterBuildTxn(); if (indexEntryCursor != null) { indexEntryCursor.close(); @@ -597,6 +608,13 @@ class ManagedIndex implements IndexEntryAccessor { } } + private Transaction enterBuildTxn() { + Transaction txn = mRepository.enterTopTransaction(IsolationLevel.READ_COMMITTED); + txn.setForUpdate(true); + txn.setDesiredLockTimeout(BUILD_TXN_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); + return txn; + } + private static void throttle(Throttle throttle, double desiredSpeed) throws RepositoryException { -- cgit v1.2.3