From 8fb89b4dcf778e8673ce5c031796328d0dd917d6 Mon Sep 17 00:00:00 2001 From: "Brian S. O'Neill" Date: Thu, 1 May 2008 20:21:22 +0000 Subject: Add index repair method. --- .../carbonado/repo/indexed/IndexEntryAccessor.java | 11 ++ .../carbonado/repo/indexed/IndexedStorage.java | 8 +- .../carbonado/repo/indexed/ManagedIndex.java | 113 ++++++++++++++------- 3 files changed, 90 insertions(+), 42 deletions(-) diff --git a/src/main/java/com/amazon/carbonado/repo/indexed/IndexEntryAccessor.java b/src/main/java/com/amazon/carbonado/repo/indexed/IndexEntryAccessor.java index 42ce214..1b7f910 100644 --- a/src/main/java/com/amazon/carbonado/repo/indexed/IndexEntryAccessor.java +++ b/src/main/java/com/amazon/carbonado/repo/indexed/IndexEntryAccessor.java @@ -20,6 +20,7 @@ package com.amazon.carbonado.repo.indexed; import java.util.Comparator; +import com.amazon.carbonado.RepositoryException; import com.amazon.carbonado.Storable; import com.amazon.carbonado.Storage; @@ -30,6 +31,7 @@ import com.amazon.carbonado.capability.IndexInfo; * inspection and repair. * * @author Brian S O'Neill + * @see IndexEntryAccessCapability */ public interface IndexEntryAccessor extends IndexInfo { /** @@ -65,6 +67,15 @@ public interface IndexEntryAccessor extends IndexInfo { */ boolean isConsistent(Storable indexEntry, S master); + /** + * Repairs the index by inserting missing entries and fixing + * inconsistencies. + * + * @param desiredSpeed throttling parameter - 1.0 = full speed, 0.5 = half + * speed, 0.1 = one-tenth speed, etc + */ + void repair(double desiredSpeed) throws RepositoryException; + /** * Returns a comparator for ordering index entries. */ diff --git a/src/main/java/com/amazon/carbonado/repo/indexed/IndexedStorage.java b/src/main/java/com/amazon/carbonado/repo/indexed/IndexedStorage.java index cefb031..fb55433 100644 --- a/src/main/java/com/amazon/carbonado/repo/indexed/IndexedStorage.java +++ b/src/main/java/com/amazon/carbonado/repo/indexed/IndexedStorage.java @@ -287,7 +287,8 @@ class IndexedStorage implements Storage, StorageAccess IndexEntryGenerator builder = IndexEntryGenerator.getInstance(index); Class indexEntryClass = builder.getIndexEntryClass(); Storage indexEntryStorage = repository.getIndexEntryStorageFor(indexEntryClass); - ManagedIndex managedIndex = new ManagedIndex(index, builder, indexEntryStorage); + ManagedIndex managedIndex = + new ManagedIndex(this, index, builder, indexEntryStorage); mAllIndexInfoMap.put(index, managedIndex); managedIndexes[i++] = managedIndex; @@ -510,9 +511,8 @@ class IndexedStorage implements Storage, StorageAccess } } - // New index, so populate it. - managedIndex.populateIndex(mRepository, mMasterStorage, - mRepository.getIndexRepairThrottle()); + // New index, so build it. + managedIndex.buildIndex(mRepository.getIndexRepairThrottle()); boolean top = true; while (true) { diff --git a/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java b/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java index ea3bbb3..5806529 100644 --- a/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java +++ b/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java @@ -29,6 +29,7 @@ import com.amazon.carbonado.CorruptEncodingException; import com.amazon.carbonado.Cursor; import com.amazon.carbonado.FetchException; import com.amazon.carbonado.IsolationLevel; +import com.amazon.carbonado.OptimisticLockException; import com.amazon.carbonado.PersistException; import com.amazon.carbonado.Query; import com.amazon.carbonado.Repository; @@ -62,23 +63,26 @@ import com.amazon.carbonado.util.Throttle; * @author Brian S O'Neill */ class ManagedIndex implements IndexEntryAccessor { - private static final int POPULATE_SORT_BUFFER_SIZE = 65536; - private static final int POPULATE_INFO_DELAY_MILLIS = 5000; - static final int POPULATE_BATCH_SIZE = 128; - static final int POPULATE_THROTTLE_WINDOW = POPULATE_BATCH_SIZE * 10; - static final int POPULATE_THROTTLE_SLEEP_PRECISION = 10; + private static final int BUILD_SORT_BUFFER_SIZE = 65536; + private static final int BUILD_INFO_DELAY_MILLIS = 5000; + static final int BUILD_BATCH_SIZE = 128; + static final int BUILD_THROTTLE_WINDOW = BUILD_BATCH_SIZE * 10; + static final int BUILD_THROTTLE_SLEEP_PRECISION = 10; + private final IndexedStorage mIndexedStorage; private final StorableIndex mIndex; private final IndexEntryGenerator mGenerator; private final Storage mIndexEntryStorage; private Query mSingleMatchQuery; - ManagedIndex(StorableIndex index, + ManagedIndex(IndexedStorage indexedStorage, + StorableIndex index, IndexEntryGenerator generator, Storage indexEntryStorage) throws SupportException { + mIndexedStorage = indexedStorage; mIndex = index; mGenerator = generator; mIndexEntryStorage = indexEntryStorage; @@ -138,6 +142,18 @@ class ManagedIndex implements IndexEntryAccessor { return mGenerator.isConsistent(indexEntry, master); } + // Required by IndexEntryAccessor interface. + public void repair(double desiredSpeed) throws RepositoryException { + /* FIXME: Delete bogus entries. Index inserts and updates are committed + in batches, which covers a contiguous range of entries. Before + committing the batch, a range query is executed on the index and + verifies the batch about to be committed. Any entries that aren't in + the batch are to be deleted. The progress message that is logged + should be changed to show indexes inserted, updated, and deleted. + */ + buildIndex(desiredSpeed); + } + // Required by IndexEntryAccessor interface. public Comparator getComparator() { return mGenerator.getComparator(); @@ -208,13 +224,14 @@ class ManagedIndex implements IndexEntryAccessor { } /** - * Populates the entire index, repairing as it goes. + * Build the entire index, repairing as it goes. * * @param repo used to enter transactions */ - void populateIndex(Repository repo, Storage masterStorage, double desiredSpeed) - throws RepositoryException - { + void buildIndex(double desiredSpeed) throws RepositoryException { + Repository repo = mIndexedStorage.mRepository; + Storage masterStorage = mIndexedStorage.mMasterStorage; + MergeSortBuffer buffer; Comparator c; @@ -248,7 +265,7 @@ class ManagedIndex implements IndexEntryAccessor { try { Cursor cursor = masterQuery.fetch(); if (!cursor.hasNext()) { - // Nothing exists in master, so nothing to populate. + // Nothing exists in master, so nothing to build. return; } } finally { @@ -264,7 +281,7 @@ class ManagedIndex implements IndexEntryAccessor { try { if (log.isInfoEnabled()) { StringBuilder b = new StringBuilder(); - b.append("Populating index on "); + b.append("Building index on "); b.append(masterStorage.getStorableType().getName()); b.append(": "); try { @@ -277,11 +294,11 @@ class ManagedIndex implements IndexEntryAccessor { // Preload and sort all index entries for improved performance. - buffer = new MergeSortBuffer(mIndexEntryStorage, null, POPULATE_SORT_BUFFER_SIZE); + buffer = new MergeSortBuffer(mIndexEntryStorage, null, BUILD_SORT_BUFFER_SIZE); c = mGenerator.getComparator(); buffer.prepare(c); - long nextReportTime = System.currentTimeMillis() + POPULATE_INFO_DELAY_MILLIS; + long nextReportTime = System.currentTimeMillis() + BUILD_INFO_DELAY_MILLIS; // These variables are used when corrupt records are encountered. S lastUserStorable = null; @@ -313,8 +330,8 @@ class ManagedIndex implements IndexEntryAccessor { if (log.isInfoEnabled()) { long now = System.currentTimeMillis(); if (now >= nextReportTime) { - log.info("Prepared " + buffer.size() + " new index entries"); - nextReportTime = now + POPULATE_INFO_DELAY_MILLIS; + log.info("Prepared " + buffer.size() + " index entries"); + nextReportTime = now + BUILD_INFO_DELAY_MILLIS; } } @@ -359,44 +376,52 @@ class ManagedIndex implements IndexEntryAccessor { final int bufferSize = buffer.size(); if (log.isInfoEnabled()) { - log.info("Begin insert of " + bufferSize + " new index entries"); + log.info("Begin build of " + bufferSize + " index entries"); } - Throttle throttle = desiredSpeed < 1.0 ? new Throttle(POPULATE_THROTTLE_WINDOW) : null; + Throttle throttle = desiredSpeed < 1.0 ? new Throttle(BUILD_THROTTLE_WINDOW) : null; - int totalInserted = 0; + long totalInserted = 0; + long totalUpdated = 0; + long totalDeleted = 0; + long totalProgress = 0; txn = repo.enterTopTransaction(IsolationLevel.READ_COMMITTED); try { + for (Object obj : buffer) { Storable indexEntry = (Storable) obj; - if (!indexEntry.tryInsert()) { + if (indexEntry.tryInsert()) { + totalInserted++; + } else { // Couldn't insert because an index entry already exists. Storable existing = indexEntry.copy(); - if (existing.tryLoad()) { - if (!existing.equalProperties(indexEntry)) { - // Existing entry differs, so update it. - indexEntry.copyUnequalProperties(existing); - existing.tryUpdate(); - indexEntry = existing; - } - } else { - // Couldn't find existing entry for some reason, so - // repair by brute force. + boolean doUpdate = false; + if (!existing.tryLoad()) { + doUpdate = true; + } else if (!existing.equalProperties(indexEntry)) { + // If only the version differs, leave existing entry alone. + indexEntry.copyVersionProperty(existing); + doUpdate = !existing.equalProperties(indexEntry); + } + if (doUpdate) { indexEntry.tryDelete(); indexEntry.tryInsert(); + totalUpdated++; } } - totalInserted++; - if (totalInserted % POPULATE_BATCH_SIZE == 0) { + totalProgress++; + + if (totalProgress % BUILD_BATCH_SIZE == 0) { txn.commit(); txn.exit(); if (log.isInfoEnabled()) { - String format = "Committed %d new index entries (%.3f%%)"; - double percent = 100.0 * totalInserted / bufferSize; - log.info(String.format(format, totalInserted, percent)); + String format = "Index build progress: %.3f%% " + + progressSubMessgage(totalInserted, totalUpdated, totalDeleted); + double percent = 100.0 * totalProgress / bufferSize; + log.info(String.format(format, percent)); } txn = repo.enterTopTransaction(IsolationLevel.READ_COMMITTED); @@ -404,9 +429,9 @@ class ManagedIndex implements IndexEntryAccessor { if (throttle != null) { try { - throttle.throttle(desiredSpeed, POPULATE_THROTTLE_SLEEP_PRECISION); + throttle.throttle(desiredSpeed, BUILD_THROTTLE_SLEEP_PRECISION); } catch (InterruptedException e) { - throw new RepositoryException("Index populate interrupted"); + throw new RepositoryException("Index build interrupted"); } } } @@ -418,7 +443,19 @@ class ManagedIndex implements IndexEntryAccessor { } if (log.isInfoEnabled()) { - log.info("Finished inserting " + totalInserted + " new index entries"); + log.info("Finished building " + totalProgress + " index entries " + + progressSubMessgage(totalInserted, totalUpdated, totalDeleted)); + } + } + + private String progressSubMessgage(long totalInserted, long totalUpdated, long totalDeleted) { + if (totalDeleted > 0) { + return "(" + totalInserted + " inserted, " + totalUpdated + " updated, " + + totalDeleted + " deleted)"; + } else if (totalUpdated > 0) { + return "(" + totalInserted + " inserted, " + totalUpdated + " updated)"; + } else { + return "(" + totalInserted + " inserted)"; } } -- cgit v1.2.3