From 94b490414a163c88282d2378edc6699bb6dd4682 Mon Sep 17 00:00:00 2001 From: "Brian S. O'Neill" Date: Sat, 14 May 2011 00:06:43 +0000 Subject: Commit batches of deletes during index repair. --- .../carbonado/repo/indexed/ManagedIndex.java | 140 ++++++++++++++------- 1 file changed, 93 insertions(+), 47 deletions(-) (limited to 'src/main/java') diff --git a/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java b/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java index 6c28619..dbc7c06 100644 --- a/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java +++ b/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java @@ -21,6 +21,7 @@ package com.amazon.carbonado.repo.indexed; import java.lang.reflect.UndeclaredThrowableException; import java.util.Comparator; +import java.util.Iterator; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -293,20 +294,6 @@ class ManagedIndex implements IndexEntryAccessor { .orderBy(naturalOrdering(mMasterStorage.getStorableType())); } - // Quick check to see if any records exist in master. - { - Transaction txn = mRepository.enterTransaction(IsolationLevel.READ_COMMITTED); - try { - Cursor cursor = masterQuery.fetch(); - if (!cursor.hasNext()) { - // Nothing exists in master, so nothing to build. - return; - } - } finally { - txn.exit(); - } - } - // Enter top transaction with isolation level of none to make sure // preload operation does not run in a long nested transaction. Transaction txn = mRepository.enterTopTransaction(IsolationLevel.NONE); @@ -437,25 +424,38 @@ class ManagedIndex implements IndexEntryAccessor { indexEntryCursor = null; } - for (Object obj : buffer) { - Storable indexEntry = (Storable) obj; - if (indexEntry.tryInsert()) { - totalInserted++; + Iterator it = buffer.iterator(); + bufferIterate: while (true) { + Object obj; + if (it.hasNext()) { + obj = it.next(); + } else if (indexEntryCursor != null && indexEntryCursor.hasNext()) { + obj = null; } else { - // Couldn't insert because an index entry already exists. - Storable existing = indexEntry.copy(); - boolean doUpdate = false; - if (!existing.tryLoad()) { - doUpdate = true; - } else if (!existing.equalProperties(indexEntry)) { - // If only the version differs, leave existing entry alone. - indexEntry.copyVersionProperty(existing); - doUpdate = !existing.equalProperties(indexEntry); - } - if (doUpdate) { - indexEntry.tryDelete(); - indexEntry.tryInsert(); - totalUpdated++; + break; + } + + Storable indexEntry = (Storable) obj; + + if (indexEntry != null) { + if (indexEntry.tryInsert()) { + totalInserted++; + } else { + // Couldn't insert because an index entry already exists. + Storable existing = indexEntry.copy(); + boolean doUpdate = false; + if (!existing.tryLoad()) { + doUpdate = true; + } else if (!existing.equalProperties(indexEntry)) { + // If only the version differs, leave existing entry alone. + indexEntry.copyVersionProperty(existing); + doUpdate = !existing.equalProperties(indexEntry); + } + if (doUpdate) { + indexEntry.tryDelete(); + indexEntry.tryInsert(); + totalUpdated++; + } } } @@ -484,32 +484,60 @@ class ManagedIndex implements IndexEntryAccessor { } else { // Existing index entry is bogus. existingIndexEntry.tryDelete(); + totalDeleted++; + + if (totalDeleted % BUILD_BATCH_SIZE == 0) { + txn.commit(); + txn.exit(); + + logProgress(log, totalProgress, bufferSize, + totalInserted, totalUpdated, totalDeleted); + + txn = mRepository + .enterTopTransaction(IsolationLevel.READ_COMMITTED); + + indexEntryCursor.close(); + indexEntryCursor = indexEntryQuery.fetchAfter(existingIndexEntry); + + if (!indexEntryCursor.hasNext()) { + indexEntryCursor.close(); + // Don't try opening again. + indexEntryCursor = null; + break; + } + } + existingIndexEntry = null; + + throttle(throttle, desiredSpeed); } } } - totalProgress++; + if (indexEntry != null) { + totalProgress++; + } if (totalProgress % BUILD_BATCH_SIZE == 0) { txn.commit(); txn.exit(); - if (log.isInfoEnabled()) { - String format = "Index build progress: %.3f%% " + - progressSubMessgage(totalInserted, totalUpdated, totalDeleted); - double percent = 100.0 * totalProgress / bufferSize; - log.info(String.format(format, percent)); - } + logProgress(log, totalProgress, bufferSize, + totalInserted, totalUpdated, totalDeleted); txn = mRepository.enterTopTransaction(IsolationLevel.READ_COMMITTED); if (indexEntryCursor != null) { indexEntryCursor.close(); - indexEntryCursor = indexEntryQuery.fetchAfter(indexEntry); existingIndexEntry = null; + if (indexEntry == null) { + indexEntryCursor = indexEntryQuery.fetch(); + } else { + indexEntryCursor = indexEntryQuery.fetchAfter(indexEntry); + } + if (!indexEntryCursor.hasNext()) { indexEntryCursor.close(); // Don't try opening again. @@ -518,13 +546,7 @@ class ManagedIndex implements IndexEntryAccessor { } } - if (throttle != null) { - try { - throttle.throttle(desiredSpeed, BUILD_THROTTLE_SLEEP_PRECISION); - } catch (InterruptedException e) { - throw new RepositoryException("Index build interrupted"); - } - } + throttle(throttle, desiredSpeed); } txn.commit(); @@ -539,6 +561,30 @@ class ManagedIndex implements IndexEntryAccessor { } } + private static void throttle(Throttle throttle, double desiredSpeed) + throws RepositoryException + { + if (throttle != null) { + try { + throttle.throttle(desiredSpeed, BUILD_THROTTLE_SLEEP_PRECISION); + } catch (InterruptedException e) { + throw new RepositoryException("Index build interrupted"); + } + } + } + + private void logProgress(Log log, + long totalProgress, int bufferSize, + long totalInserted, long totalUpdated, long totalDeleted) + { + if (log.isInfoEnabled()) { + String format = "Index build progress: %.3f%% " + + progressSubMessgage(totalInserted, totalUpdated, totalDeleted); + double percent = 100.0 * totalProgress / bufferSize; + log.info(String.format(format, percent)); + } + } + private String progressSubMessgage(long totalInserted, long totalUpdated, long totalDeleted) { StringBuilder b = new StringBuilder(); b.append('('); -- cgit v1.2.3