diff options
| author | Brian S. O'Neill <bronee@gmail.com> | 2011-11-10 18:07:42 +0000 | 
|---|---|---|
| committer | Brian S. O'Neill <bronee@gmail.com> | 2011-11-10 18:07:42 +0000 | 
| commit | 8fb93b47e3088b928af687a09e45b74b2d4833e4 (patch) | |
| tree | c50e955a0920d025a1e0b0a3bd464aa28d7cebe5 /src/main/java/com/amazon | |
| parent | c053f43052b189e0fb2998da398dce127d8f6a57 (diff) | |
Add retry logic to index repair.
Diffstat (limited to 'src/main/java/com/amazon')
| -rw-r--r-- | src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java | 102 | 
1 files changed, 60 insertions, 42 deletions
| diff --git a/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java b/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java index d9b585f..4200d1f 100644 --- a/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java +++ b/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java @@ -23,6 +23,8 @@ import java.lang.reflect.UndeclaredThrowableException;  import java.util.Comparator;
  import java.util.Iterator;
 +import java.util.concurrent.TimeUnit;
 +
  import org.apache.commons.logging.Log;
  import org.apache.commons.logging.LogFactory;
 @@ -30,6 +32,7 @@ import com.amazon.carbonado.CorruptEncodingException;  import com.amazon.carbonado.Cursor;
  import com.amazon.carbonado.FetchException;
  import com.amazon.carbonado.IsolationLevel;
 +import com.amazon.carbonado.PersistTimeoutException;
  import com.amazon.carbonado.PersistException;
  import com.amazon.carbonado.Query;
  import com.amazon.carbonado.Repository;
 @@ -430,6 +433,7 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {          txn = mRepository.enterTopTransaction(IsolationLevel.READ_COMMITTED);
          try {
              txn.setForUpdate(true);
 +            txn.setDesiredLockTimeout(0, TimeUnit.SECONDS);
              Cursor<? extends Storable> indexEntryCursor = indexEntryQuery.fetch();
              Storable existingIndexEntry = null;
 @@ -440,43 +444,49 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {                  indexEntryCursor = null;
              }
 +            boolean retry = false;
 +            Storable indexEntry = null;
 +            Storable lastIndexEntry = null;
 +
              Iterator it = buffer.iterator();
              bufferIterate: while (true) {
 -                Object obj;
 -                if (it.hasNext()) {
 -                    obj = it.next();
 -                } else if (indexEntryCursor != null && indexEntryCursor.hasNext()) {
 -                    obj = null;
 -                } else {
 -                    break;
 -                }
 +                if (!retry) {
 +                    Object obj;
 +                    if (it.hasNext()) {
 +                        obj = it.next();
 +                    } else if (indexEntryCursor != null && indexEntryCursor.hasNext()) {
 +                        obj = null;
 +                    } else {
 +                        break;
 +                    }
 -                Storable indexEntry = (Storable) obj;
 +                    indexEntry = (Storable) obj;
 +                }
 -                if (indexEntry != null) {
 -                    if (indexEntry.tryInsert()) {
 -                        totalInserted++;
 -                    } else {
 -                        // Couldn't insert because an index entry already exists.
 -                        Storable existing = indexEntry.copy();
 -                        boolean doUpdate = false;
 -                        if (!existing.tryLoad()) {
 -                            doUpdate = true;
 -                        } else if (!existing.equalProperties(indexEntry)) {
 -                            // If only the version differs, leave existing entry alone.
 -                            indexEntry.copyVersionProperty(existing);
 -                            doUpdate = !existing.equalProperties(indexEntry);
 -                        }
 -                        if (doUpdate) {
 -                            indexEntry.tryDelete();
 -                            indexEntry.tryInsert();
 -                            totalUpdated++;
 +                try {
 +                    if (indexEntry != null) {
 +                        if (indexEntry.tryInsert()) {
 +                            totalInserted++;
 +                        } else {
 +                            // Couldn't insert because an index entry already exists.
 +                            Storable existing = indexEntry.copy();
 +                            boolean doUpdate = false;
 +                            if (!existing.tryLoad()) {
 +                                doUpdate = true;
 +                            } else if (!existing.equalProperties(indexEntry)) {
 +                                // If only the version differs, leave existing entry alone.
 +                                indexEntry.copyVersionProperty(existing);
 +                                doUpdate = !existing.equalProperties(indexEntry);
 +                            }
 +                            if (doUpdate) {
 +                                indexEntry.tryDelete();
 +                                indexEntry.tryInsert();
 +                                totalUpdated++;
 +                            }
                          }
                      }
 -                }
 -                if (indexEntryCursor != null) {
 -                    while (true) {
 +                    if (indexEntryCursor != null) while (true) {
                          if (existingIndexEntry == null) {
                              if (indexEntryCursor.hasNext()) {
                                  existingIndexEntry = indexEntryCursor.next();
 @@ -513,6 +523,7 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {                                  txn = mRepository
                                      .enterTopTransaction(IsolationLevel.READ_COMMITTED);
                                  txn.setForUpdate(true);
 +                                txn.setDesiredLockTimeout(0, TimeUnit.SECONDS);
                                  indexEntryCursor.close();
                                  indexEntryCursor = indexEntryQuery.fetchAfter(existingIndexEntry);
 @@ -530,13 +541,22 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {                              throttle(throttle, desiredSpeed);
                          }
                      }
 -                }
 -                if (indexEntry != null) {
 -                    totalProgress++;
 +                    if (indexEntry != null) {
 +                        totalProgress++;
 +                    }
 +
 +                    lastIndexEntry = indexEntry;
 +                    retry = false;
 +                } catch (PersistTimeoutException e) {
 +                    log.warn("Lock conflict during index repair; will retry: " +
 +                             indexEntry + ", " + e);
 +                    // This re-uses the last index entry to repair and forces
 +                    // the current transaction to commit.
 +                    retry = true;
                  }
 -                if (totalProgress % BUILD_BATCH_SIZE == 0) {
 +                if (retry || (totalProgress % BUILD_BATCH_SIZE == 0)) {
                      txn.commit();
                      txn.exit();
 @@ -545,21 +565,19 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {                      txn = mRepository.enterTopTransaction(IsolationLevel.READ_COMMITTED);
                      txn.setForUpdate(true);
 +                    txn.setDesiredLockTimeout(0, TimeUnit.SECONDS);
                      if (indexEntryCursor != null) {
                          indexEntryCursor.close();
                          existingIndexEntry = null;
 -                        if (indexEntry == null) {
 +                        if (indexEntry == null || lastIndexEntry == null) {
                              indexEntryCursor = indexEntryQuery.fetch();
 -                        } else {
 +                        } else if (!retry) {
                              indexEntryCursor = indexEntryQuery.fetchAfter(indexEntry);
 -                        }
 -
 -                        if (!indexEntryCursor.hasNext()) {
 -                            indexEntryCursor.close();
 -                            // Don't try opening again.
 -                            indexEntryCursor = null;
 +                        } else {
 +                            // Re-fetch starting at the same spot.
 +                            indexEntryCursor = indexEntryQuery.fetchAfter(lastIndexEntry);
                          }
                      }
                  }
 | 
