From 8fb93b47e3088b928af687a09e45b74b2d4833e4 Mon Sep 17 00:00:00 2001 From: "Brian S. O'Neill" Date: Thu, 10 Nov 2011 18:07:42 +0000 Subject: Add retry logic to index repair. --- .../carbonado/repo/indexed/ManagedIndex.java | 102 ++++++++++++--------- 1 file changed, 60 insertions(+), 42 deletions(-) (limited to 'src/main/java') diff --git a/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java b/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java index d9b585f..4200d1f 100644 --- a/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java +++ b/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java @@ -23,6 +23,8 @@ import java.lang.reflect.UndeclaredThrowableException; import java.util.Comparator; import java.util.Iterator; +import java.util.concurrent.TimeUnit; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -30,6 +32,7 @@ import com.amazon.carbonado.CorruptEncodingException; import com.amazon.carbonado.Cursor; import com.amazon.carbonado.FetchException; import com.amazon.carbonado.IsolationLevel; +import com.amazon.carbonado.PersistTimeoutException; import com.amazon.carbonado.PersistException; import com.amazon.carbonado.Query; import com.amazon.carbonado.Repository; @@ -430,6 +433,7 @@ class ManagedIndex implements IndexEntryAccessor { txn = mRepository.enterTopTransaction(IsolationLevel.READ_COMMITTED); try { txn.setForUpdate(true); + txn.setDesiredLockTimeout(0, TimeUnit.SECONDS); Cursor indexEntryCursor = indexEntryQuery.fetch(); Storable existingIndexEntry = null; @@ -440,43 +444,49 @@ class ManagedIndex implements IndexEntryAccessor { indexEntryCursor = null; } + boolean retry = false; + Storable indexEntry = null; + Storable lastIndexEntry = null; + Iterator it = buffer.iterator(); bufferIterate: while (true) { - Object obj; - if (it.hasNext()) { - obj = it.next(); - } else if (indexEntryCursor != null && indexEntryCursor.hasNext()) { - obj = null; - } else { - break; - } + if (!retry) { + Object obj; + if (it.hasNext()) { + obj = it.next(); + } else if (indexEntryCursor != null && indexEntryCursor.hasNext()) { + obj = null; + } else { + break; + } - Storable indexEntry = (Storable) obj; + indexEntry = (Storable) obj; + } - if (indexEntry != null) { - if (indexEntry.tryInsert()) { - totalInserted++; - } else { - // Couldn't insert because an index entry already exists. - Storable existing = indexEntry.copy(); - boolean doUpdate = false; - if (!existing.tryLoad()) { - doUpdate = true; - } else if (!existing.equalProperties(indexEntry)) { - // If only the version differs, leave existing entry alone. - indexEntry.copyVersionProperty(existing); - doUpdate = !existing.equalProperties(indexEntry); - } - if (doUpdate) { - indexEntry.tryDelete(); - indexEntry.tryInsert(); - totalUpdated++; + try { + if (indexEntry != null) { + if (indexEntry.tryInsert()) { + totalInserted++; + } else { + // Couldn't insert because an index entry already exists. + Storable existing = indexEntry.copy(); + boolean doUpdate = false; + if (!existing.tryLoad()) { + doUpdate = true; + } else if (!existing.equalProperties(indexEntry)) { + // If only the version differs, leave existing entry alone. + indexEntry.copyVersionProperty(existing); + doUpdate = !existing.equalProperties(indexEntry); + } + if (doUpdate) { + indexEntry.tryDelete(); + indexEntry.tryInsert(); + totalUpdated++; + } } } - } - if (indexEntryCursor != null) { - while (true) { + if (indexEntryCursor != null) while (true) { if (existingIndexEntry == null) { if (indexEntryCursor.hasNext()) { existingIndexEntry = indexEntryCursor.next(); @@ -513,6 +523,7 @@ class ManagedIndex implements IndexEntryAccessor { txn = mRepository .enterTopTransaction(IsolationLevel.READ_COMMITTED); txn.setForUpdate(true); + txn.setDesiredLockTimeout(0, TimeUnit.SECONDS); indexEntryCursor.close(); indexEntryCursor = indexEntryQuery.fetchAfter(existingIndexEntry); @@ -530,13 +541,22 @@ class ManagedIndex implements IndexEntryAccessor { throttle(throttle, desiredSpeed); } } - } - if (indexEntry != null) { - totalProgress++; + if (indexEntry != null) { + totalProgress++; + } + + lastIndexEntry = indexEntry; + retry = false; + } catch (PersistTimeoutException e) { + log.warn("Lock conflict during index repair; will retry: " + + indexEntry + ", " + e); + // This re-uses the last index entry to repair and forces + // the current transaction to commit. + retry = true; } - if (totalProgress % BUILD_BATCH_SIZE == 0) { + if (retry || (totalProgress % BUILD_BATCH_SIZE == 0)) { txn.commit(); txn.exit(); @@ -545,21 +565,19 @@ class ManagedIndex implements IndexEntryAccessor { txn = mRepository.enterTopTransaction(IsolationLevel.READ_COMMITTED); txn.setForUpdate(true); + txn.setDesiredLockTimeout(0, TimeUnit.SECONDS); if (indexEntryCursor != null) { indexEntryCursor.close(); existingIndexEntry = null; - if (indexEntry == null) { + if (indexEntry == null || lastIndexEntry == null) { indexEntryCursor = indexEntryQuery.fetch(); - } else { + } else if (!retry) { indexEntryCursor = indexEntryQuery.fetchAfter(indexEntry); - } - - if (!indexEntryCursor.hasNext()) { - indexEntryCursor.close(); - // Don't try opening again. - indexEntryCursor = null; + } else { + // Re-fetch starting at the same spot. + indexEntryCursor = indexEntryQuery.fetchAfter(lastIndexEntry); } } } -- cgit v1.2.3