From c053f43052b189e0fb2998da398dce127d8f6a57 Mon Sep 17 00:00:00 2001 From: "Brian S. O'Neill" Date: Mon, 7 Nov 2011 06:02:07 +0000 Subject: Ensure transaction is in "for update" mode during index repair. --- .../carbonado/repo/indexed/ManagedIndex.java | 160 ++++++++++----------- 1 file changed, 73 insertions(+), 87 deletions(-) (limited to 'src/main/java/com/amazon/carbonado/repo/indexed') diff --git a/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java b/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java index 4897d21..d9b585f 100644 --- a/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java +++ b/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java @@ -30,7 +30,6 @@ import com.amazon.carbonado.CorruptEncodingException; import com.amazon.carbonado.Cursor; import com.amazon.carbonado.FetchException; import com.amazon.carbonado.IsolationLevel; -import com.amazon.carbonado.PersistDeadlockException; import com.amazon.carbonado.PersistException; import com.amazon.carbonado.Query; import com.amazon.carbonado.Repository; @@ -441,117 +440,103 @@ class ManagedIndex implements IndexEntryAccessor { indexEntryCursor = null; } - boolean retry = false; - Storable indexEntry = null; - Iterator it = buffer.iterator(); bufferIterate: while (true) { - if (!retry) { - Object obj; - if (it.hasNext()) { - obj = it.next(); - } else if (indexEntryCursor != null && indexEntryCursor.hasNext()) { - obj = null; + Object obj; + if (it.hasNext()) { + obj = it.next(); + } else if (indexEntryCursor != null && indexEntryCursor.hasNext()) { + obj = null; + } else { + break; + } + + Storable indexEntry = (Storable) obj; + + if (indexEntry != null) { + if (indexEntry.tryInsert()) { + totalInserted++; } else { - break; + // Couldn't insert because an index entry already exists. + Storable existing = indexEntry.copy(); + boolean doUpdate = false; + if (!existing.tryLoad()) { + doUpdate = true; + } else if (!existing.equalProperties(indexEntry)) { + // If only the version differs, leave existing entry alone. + indexEntry.copyVersionProperty(existing); + doUpdate = !existing.equalProperties(indexEntry); + } + if (doUpdate) { + indexEntry.tryDelete(); + indexEntry.tryInsert(); + totalUpdated++; + } } - - indexEntry = (Storable) obj; } - try { - if (indexEntry != null) { - if (indexEntry.tryInsert()) { - totalInserted++; - } else { - // Couldn't insert because an index entry already exists. - Storable existing = indexEntry.copy(); - boolean doUpdate = false; - if (!existing.tryLoad()) { - doUpdate = true; - } else if (!existing.equalProperties(indexEntry)) { - // If only the version differs, leave existing entry alone. - indexEntry.copyVersionProperty(existing); - doUpdate = !existing.equalProperties(indexEntry); - } - if (doUpdate) { - indexEntry.tryDelete(); - indexEntry.tryInsert(); - totalUpdated++; + if (indexEntryCursor != null) { + while (true) { + if (existingIndexEntry == null) { + if (indexEntryCursor.hasNext()) { + existingIndexEntry = indexEntryCursor.next(); + } else { + indexEntryCursor.close(); + // Don't try opening again. + indexEntryCursor = null; + break; } } - } - if (indexEntryCursor != null) { - while (true) { - if (existingIndexEntry == null) { - if (indexEntryCursor.hasNext()) { - existingIndexEntry = indexEntryCursor.next(); - } else { - indexEntryCursor.close(); - // Don't try opening again. - indexEntryCursor = null; - break; - } - } + int compare = c.compare(existingIndexEntry, indexEntry); - int compare = c.compare(existingIndexEntry, indexEntry); + if (compare == 0) { + // Existing entry cursor matches so allow cursor to advance. + existingIndexEntry = null; + break; + } else if (compare > 0) { + // Existing index entry is ahead so check later. + break; + } else { + // Existing index entry is bogus. + existingIndexEntry.tryDelete(); - if (compare == 0) { - // Existing entry cursor matches so allow cursor to advance. - existingIndexEntry = null; - break; - } else if (compare > 0) { - // Existing index entry is ahead so check later. - break; - } else { - // Existing index entry is bogus. - existingIndexEntry.tryDelete(); + totalDeleted++; - totalDeleted++; + if (totalDeleted % BUILD_BATCH_SIZE == 0) { + txn.commit(); + txn.exit(); - if (totalDeleted % BUILD_BATCH_SIZE == 0) { - txn.commit(); - txn.exit(); + logProgress(log, totalProgress, bufferSize, + totalInserted, totalUpdated, totalDeleted); - logProgress(log, totalProgress, bufferSize, - totalInserted, totalUpdated, totalDeleted); + txn = mRepository + .enterTopTransaction(IsolationLevel.READ_COMMITTED); + txn.setForUpdate(true); - txn = mRepository - .enterTopTransaction(IsolationLevel.READ_COMMITTED); + indexEntryCursor.close(); + indexEntryCursor = indexEntryQuery.fetchAfter(existingIndexEntry); + if (!indexEntryCursor.hasNext()) { indexEntryCursor.close(); - indexEntryCursor = - indexEntryQuery.fetchAfter(existingIndexEntry); - - if (!indexEntryCursor.hasNext()) { - indexEntryCursor.close(); - // Don't try opening again. - indexEntryCursor = null; - break; - } + // Don't try opening again. + indexEntryCursor = null; + break; } + } - existingIndexEntry = null; + existingIndexEntry = null; - throttle(throttle, desiredSpeed); - } + throttle(throttle, desiredSpeed); } } + } - if (indexEntry != null) { - totalProgress++; - } - - retry = false; - } catch (PersistDeadlockException e) { - log.error("Deadlock during index repair; will retry: " + indexEntry, e); - // This re-uses the last index entry to repair and forces - // the current transaction to commit. - retry = true; + if (indexEntry != null) { + totalProgress++; } - if (retry || (totalProgress % BUILD_BATCH_SIZE == 0)) { + if (totalProgress % BUILD_BATCH_SIZE == 0) { txn.commit(); txn.exit(); @@ -559,6 +544,7 @@ class ManagedIndex implements IndexEntryAccessor { totalInserted, totalUpdated, totalDeleted); txn = mRepository.enterTopTransaction(IsolationLevel.READ_COMMITTED); + txn.setForUpdate(true); if (indexEntryCursor != null) { indexEntryCursor.close(); -- cgit v1.2.3