From fe4799047d37a61788aafd1dab8ff8f1f05a7d17 Mon Sep 17 00:00:00 2001 From: "Brian S. O'Neill" Date: Wed, 14 May 2008 07:38:48 +0000 Subject: Index repair now deletes bogus entries. --- .../carbonado/repo/indexed/ManagedIndex.java | 145 ++++++++++++++++----- 1 file changed, 110 insertions(+), 35 deletions(-) (limited to 'src/main/java/com/amazon/carbonado') diff --git a/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java b/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java index a3cc790..a8f5788 100644 --- a/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java +++ b/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java @@ -66,6 +66,22 @@ class ManagedIndex implements IndexEntryAccessor { static final int BUILD_THROTTLE_WINDOW = BUILD_BATCH_SIZE * 10; static final int BUILD_THROTTLE_SLEEP_PRECISION = 10; + private static String[] naturalOrdering(Class type) { + StorableKey pk = StorableIntrospector.examine(type).getPrimaryKey(); + String[] naturalOrdering = new String[pk.getProperties().size()]; + int i=0; + for (OrderedProperty prop : pk.getProperties()) { + String orderBy; + if (prop.getDirection() == Direction.DESCENDING) { + orderBy = prop.toString(); + } else { + orderBy = prop.getChainedProperty().toString(); + } + naturalOrdering[i++] = orderBy; + } + return naturalOrdering; + } + private final IndexedStorage mIndexedStorage; private final StorableIndex mIndex; private final IndexEntryGenerator mGenerator; @@ -141,13 +157,6 @@ class ManagedIndex implements IndexEntryAccessor { // Required by IndexEntryAccessor interface. public void repair(double desiredSpeed) throws RepositoryException { - /* FIXME: Delete bogus entries. Index inserts and updates are committed - in batches, which covers a contiguous range of entries. Before - committing the batch, a range query is executed on the index and - verifies the batch about to be committed. Any entries that aren't in - the batch are to be deleted. The progress message that is logged - should be changed to show indexes inserted, updated, and deleted. - */ buildIndex(desiredSpeed); } @@ -227,34 +236,21 @@ class ManagedIndex implements IndexEntryAccessor { * @param repo used to enter transactions */ void buildIndex(double desiredSpeed) throws RepositoryException { - Repository repo = mIndexedStorage.mRepository; - Storage masterStorage = mIndexedStorage.mMasterStorage; + final Repository repo = mIndexedStorage.mRepository; + final Storage masterStorage = mIndexedStorage.mMasterStorage; - MergeSortBuffer buffer; - Comparator c; + final MergeSortBuffer buffer; + final Comparator c; - Log log = LogFactory.getLog(IndexedStorage.class); + final Log log = LogFactory.getLog(IndexedStorage.class); - Query masterQuery; + final Query masterQuery; { // Need to explicitly order master query by primary key in order // for fetchAfter to work correctly in case corrupt records are // encountered. - StorableKey pk = StorableIntrospector - .examine(masterStorage.getStorableType()).getPrimaryKey(); - String[] naturalOrdering = new String[pk.getProperties().size()]; - int i=0; - for (OrderedProperty prop : pk.getProperties()) { - String orderBy; - if (prop.getDirection() == Direction.DESCENDING) { - orderBy = prop.toString(); - } else { - orderBy = prop.getChainedProperty().toString(); - } - naturalOrdering[i++] = orderBy; - } - - masterQuery = masterStorage.query().orderBy(naturalOrdering); + masterQuery = masterStorage.query() + .orderBy(naturalOrdering(masterStorage.getStorableType())); } // Quick check to see if any records exist in master. @@ -293,7 +289,7 @@ class ManagedIndex implements IndexEntryAccessor { // Preload and sort all index entries for improved performance. buffer = new MergeSortBuffer(mIndexEntryStorage, null, BUILD_SORT_BUFFER_SIZE); - c = mGenerator.getComparator(); + c = getComparator(); buffer.prepare(c); long nextReportTime = System.currentTimeMillis() + BUILD_INFO_DELAY_MILLIS; @@ -377,6 +373,10 @@ class ManagedIndex implements IndexEntryAccessor { log.info("Begin build of " + bufferSize + " index entries"); } + // Need this index entry query for deleting bogus entries. + final Query indexEntryQuery = mIndexEntryStorage.query() + .orderBy(naturalOrdering(mIndexEntryStorage.getStorableType())); + Throttle throttle = desiredSpeed < 1.0 ? new Throttle(BUILD_THROTTLE_WINDOW) : null; long totalInserted = 0; @@ -386,6 +386,17 @@ class ManagedIndex implements IndexEntryAccessor { txn = repo.enterTopTransaction(IsolationLevel.READ_COMMITTED); try { + txn.setForUpdate(true); + + Cursor indexEntryCursor = indexEntryQuery.fetch(); + Storable existingIndexEntry = null; + + if (!indexEntryCursor.hasNext()) { + indexEntryCursor.close(); + // Don't try opening again. + indexEntryCursor = null; + } + for (Object obj : buffer) { Storable indexEntry = (Storable) obj; if (indexEntry.tryInsert()) { @@ -408,6 +419,37 @@ class ManagedIndex implements IndexEntryAccessor { } } + if (indexEntryCursor != null) { + while (true) { + if (existingIndexEntry == null) { + if (indexEntryCursor.hasNext()) { + existingIndexEntry = indexEntryCursor.next(); + } else { + indexEntryCursor.close(); + // Don't try opening again. + indexEntryCursor = null; + break; + } + } + + int compare = c.compare(existingIndexEntry, indexEntry); + + if (compare == 0) { + // Existing entry cursor matches so allow cursor to advance. + existingIndexEntry = null; + break; + } else if (compare > 0) { + // Existing index entry is ahead so check later. + break; + } else { + // Existing index entry is bogus. + existingIndexEntry.tryDelete(); + totalDeleted++; + existingIndexEntry = null; + } + } + } + totalProgress++; if (totalProgress % BUILD_BATCH_SIZE == 0) { @@ -422,6 +464,18 @@ class ManagedIndex implements IndexEntryAccessor { } txn = repo.enterTopTransaction(IsolationLevel.READ_COMMITTED); + + if (indexEntryCursor != null) { + indexEntryCursor.close(); + indexEntryCursor = indexEntryQuery.fetchAfter(indexEntry); + existingIndexEntry = null; + + if (!indexEntryCursor.hasNext()) { + indexEntryCursor.close(); + // Don't try opening again. + indexEntryCursor = null; + } + } } if (throttle != null) { @@ -446,14 +500,35 @@ class ManagedIndex implements IndexEntryAccessor { } private String progressSubMessgage(long totalInserted, long totalUpdated, long totalDeleted) { + StringBuilder b = new StringBuilder(); + b.append('('); + + if (totalInserted > 0) { + b.append(totalInserted); + b.append(" inserted"); + } + if (totalUpdated > 0) { + if (b.length() > 1) { + b.append(", "); + } + b.append(totalUpdated); + b.append(" updated"); + } if (totalDeleted > 0) { - return "(" + totalInserted + " inserted, " + totalUpdated + " updated, " + - totalDeleted + " deleted)"; - } else if (totalUpdated > 0) { - return "(" + totalInserted + " inserted, " + totalUpdated + " updated)"; - } else { - return "(" + totalInserted + " inserted)"; + if (b.length() > 1) { + b.append(", "); + } + b.append(totalDeleted); + b.append(" deleted"); } + + if (b.length() == 1) { + b.append("no changes made"); + } + + b.append(')'); + + return b.toString(); } private Storable makeIndexEntry(S userStorable) throws PersistException { -- cgit v1.2.3