diff options
| -rw-r--r-- | src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java | 52 | 
1 files changed, 35 insertions, 17 deletions
| diff --git a/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java b/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java index 4200d1f..f3ede30 100644 --- a/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java +++ b/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java @@ -31,6 +31,7 @@ import org.apache.commons.logging.LogFactory;  import com.amazon.carbonado.CorruptEncodingException;
  import com.amazon.carbonado.Cursor;
  import com.amazon.carbonado.FetchException;
 +import com.amazon.carbonado.FetchTimeoutException;
  import com.amazon.carbonado.IsolationLevel;
  import com.amazon.carbonado.PersistTimeoutException;
  import com.amazon.carbonado.PersistException;
 @@ -72,6 +73,18 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {      static final int BUILD_THROTTLE_WINDOW = BUILD_BATCH_SIZE * 10;
      static final int BUILD_THROTTLE_SLEEP_PRECISION = 10;
 +    private static final int BUILD_TXN_TIMEOUT_MILLIS;
 +
 +    static {
 +        int timeout = 100;
 +        String prop = System.getProperty
 +            ("com.amazon.carbonado.repo.indexed.BUILD_TXN_TIMEOUT_MILLIS");
 +        if (prop != null) {
 +            timeout = Integer.parseInt(prop);
 +        }
 +        BUILD_TXN_TIMEOUT_MILLIS = timeout;
 +    }
 +
      private static String[] naturalOrdering(Class<? extends Storable> type) {
          StorableKey<?> pk = StorableIntrospector.examine(type).getPrimaryKey();
          String[] naturalOrdering = new String[pk.getProperties().size()];
 @@ -430,11 +443,8 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {          long totalDeleted = 0;
          long totalProgress = 0;
 -        txn = mRepository.enterTopTransaction(IsolationLevel.READ_COMMITTED);
 +        txn = enterBuildTxn();
          try {
 -            txn.setForUpdate(true);
 -            txn.setDesiredLockTimeout(0, TimeUnit.SECONDS);
 -
              Cursor<? extends Storable> indexEntryCursor = indexEntryQuery.fetch();
              Storable existingIndexEntry = null;
 @@ -520,10 +530,7 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {                                  logProgress(log, totalProgress, bufferSize,
                                              totalInserted, totalUpdated, totalDeleted);
 -                                txn = mRepository
 -                                    .enterTopTransaction(IsolationLevel.READ_COMMITTED);
 -                                txn.setForUpdate(true);
 -                                txn.setDesiredLockTimeout(0, TimeUnit.SECONDS);
 +                                txn = enterBuildTxn();
                                  indexEntryCursor.close();
                                  indexEntryCursor = indexEntryQuery.fetchAfter(existingIndexEntry);
 @@ -548,12 +555,18 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {                      lastIndexEntry = indexEntry;
                      retry = false;
 -                } catch (PersistTimeoutException e) {
 -                    log.warn("Lock conflict during index repair; will retry: " +
 -                             indexEntry + ", " + e);
 -                    // This re-uses the last index entry to repair and forces
 -                    // the current transaction to commit.
 -                    retry = true;
 +                } catch (RepositoryException e) {
 +                    if (e instanceof FetchTimeoutException ||
 +                        e instanceof PersistTimeoutException)
 +                    {
 +                        log.warn("Lock conflict during index repair; will retry: " +
 +                                 indexEntry + ", " + e);
 +                        // This re-uses the last index entry to repair and forces
 +                        // the current transaction to commit.
 +                        retry = true;
 +                    } else {
 +                        throw e;
 +                    }
                  }
                  if (retry || (totalProgress % BUILD_BATCH_SIZE == 0)) {
 @@ -563,9 +576,7 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {                      logProgress(log, totalProgress, bufferSize,
                                  totalInserted, totalUpdated, totalDeleted);
 -                    txn = mRepository.enterTopTransaction(IsolationLevel.READ_COMMITTED);
 -                    txn.setForUpdate(true);
 -                    txn.setDesiredLockTimeout(0, TimeUnit.SECONDS);
 +                    txn = enterBuildTxn();
                      if (indexEntryCursor != null) {
                          indexEntryCursor.close();
 @@ -597,6 +608,13 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {          }
      }
 +    private Transaction enterBuildTxn() {
 +        Transaction txn = mRepository.enterTopTransaction(IsolationLevel.READ_COMMITTED);
 +        txn.setForUpdate(true);
 +        txn.setDesiredLockTimeout(BUILD_TXN_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
 +        return txn;
 +    }
 +
      private static void throttle(Throttle throttle, double desiredSpeed)
          throws RepositoryException
      {
 | 
