From 3340efb580c07e8d9c6c53f40ac46838bec05775 Mon Sep 17 00:00:00 2001 From: "Brian S. O'Neill" Date: Sat, 5 Nov 2011 21:00:52 +0000 Subject: Index repair retries following deadlocks. --- .../carbonado/repo/indexed/ManagedIndex.java | 158 ++++++++++++--------- 1 file changed, 87 insertions(+), 71 deletions(-) diff --git a/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java b/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java index 7a597b6..4897d21 100644 --- a/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java +++ b/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java @@ -30,6 +30,7 @@ import com.amazon.carbonado.CorruptEncodingException; import com.amazon.carbonado.Cursor; import com.amazon.carbonado.FetchException; import com.amazon.carbonado.IsolationLevel; +import com.amazon.carbonado.PersistDeadlockException; import com.amazon.carbonado.PersistException; import com.amazon.carbonado.Query; import com.amazon.carbonado.Repository; @@ -440,102 +441,117 @@ class ManagedIndex implements IndexEntryAccessor { indexEntryCursor = null; } + boolean retry = false; + Storable indexEntry = null; + Iterator it = buffer.iterator(); bufferIterate: while (true) { - Object obj; - if (it.hasNext()) { - obj = it.next(); - } else if (indexEntryCursor != null && indexEntryCursor.hasNext()) { - obj = null; - } else { - break; - } - - Storable indexEntry = (Storable) obj; - - if (indexEntry != null) { - if (indexEntry.tryInsert()) { - totalInserted++; + if (!retry) { + Object obj; + if (it.hasNext()) { + obj = it.next(); + } else if (indexEntryCursor != null && indexEntryCursor.hasNext()) { + obj = null; } else { - // Couldn't insert because an index entry already exists. - Storable existing = indexEntry.copy(); - boolean doUpdate = false; - if (!existing.tryLoad()) { - doUpdate = true; - } else if (!existing.equalProperties(indexEntry)) { - // If only the version differs, leave existing entry alone. - indexEntry.copyVersionProperty(existing); - doUpdate = !existing.equalProperties(indexEntry); - } - if (doUpdate) { - indexEntry.tryDelete(); - indexEntry.tryInsert(); - totalUpdated++; - } + break; } + + indexEntry = (Storable) obj; } - if (indexEntryCursor != null) { - while (true) { - if (existingIndexEntry == null) { - if (indexEntryCursor.hasNext()) { - existingIndexEntry = indexEntryCursor.next(); - } else { - indexEntryCursor.close(); - // Don't try opening again. - indexEntryCursor = null; - break; + try { + if (indexEntry != null) { + if (indexEntry.tryInsert()) { + totalInserted++; + } else { + // Couldn't insert because an index entry already exists. + Storable existing = indexEntry.copy(); + boolean doUpdate = false; + if (!existing.tryLoad()) { + doUpdate = true; + } else if (!existing.equalProperties(indexEntry)) { + // If only the version differs, leave existing entry alone. + indexEntry.copyVersionProperty(existing); + doUpdate = !existing.equalProperties(indexEntry); + } + if (doUpdate) { + indexEntry.tryDelete(); + indexEntry.tryInsert(); + totalUpdated++; } } + } - int compare = c.compare(existingIndexEntry, indexEntry); + if (indexEntryCursor != null) { + while (true) { + if (existingIndexEntry == null) { + if (indexEntryCursor.hasNext()) { + existingIndexEntry = indexEntryCursor.next(); + } else { + indexEntryCursor.close(); + // Don't try opening again. + indexEntryCursor = null; + break; + } + } - if (compare == 0) { - // Existing entry cursor matches so allow cursor to advance. - existingIndexEntry = null; - break; - } else if (compare > 0) { - // Existing index entry is ahead so check later. - break; - } else { - // Existing index entry is bogus. - existingIndexEntry.tryDelete(); + int compare = c.compare(existingIndexEntry, indexEntry); - totalDeleted++; + if (compare == 0) { + // Existing entry cursor matches so allow cursor to advance. + existingIndexEntry = null; + break; + } else if (compare > 0) { + // Existing index entry is ahead so check later. + break; + } else { + // Existing index entry is bogus. + existingIndexEntry.tryDelete(); - if (totalDeleted % BUILD_BATCH_SIZE == 0) { - txn.commit(); - txn.exit(); + totalDeleted++; - logProgress(log, totalProgress, bufferSize, - totalInserted, totalUpdated, totalDeleted); + if (totalDeleted % BUILD_BATCH_SIZE == 0) { + txn.commit(); + txn.exit(); - txn = mRepository - .enterTopTransaction(IsolationLevel.READ_COMMITTED); + logProgress(log, totalProgress, bufferSize, + totalInserted, totalUpdated, totalDeleted); - indexEntryCursor.close(); - indexEntryCursor = indexEntryQuery.fetchAfter(existingIndexEntry); + txn = mRepository + .enterTopTransaction(IsolationLevel.READ_COMMITTED); - if (!indexEntryCursor.hasNext()) { indexEntryCursor.close(); - // Don't try opening again. - indexEntryCursor = null; - break; + indexEntryCursor = + indexEntryQuery.fetchAfter(existingIndexEntry); + + if (!indexEntryCursor.hasNext()) { + indexEntryCursor.close(); + // Don't try opening again. + indexEntryCursor = null; + break; + } } - } - existingIndexEntry = null; + existingIndexEntry = null; - throttle(throttle, desiredSpeed); + throttle(throttle, desiredSpeed); + } } } - } - if (indexEntry != null) { - totalProgress++; + if (indexEntry != null) { + totalProgress++; + } + + retry = false; + } catch (PersistDeadlockException e) { + log.error("Deadlock during index repair; will retry: " + indexEntry, e); + // This re-uses the last index entry to repair and forces + // the current transaction to commit. + retry = true; } - if (totalProgress % BUILD_BATCH_SIZE == 0) { + if (retry || (totalProgress % BUILD_BATCH_SIZE == 0)) { txn.commit(); txn.exit(); -- cgit v1.2.3