From 9a9e104f22226a5d43fc51f48d51efa9f136afdd Mon Sep 17 00:00:00 2001 From: "Brian S. O'Neill" Date: Thu, 15 Mar 2007 22:18:27 +0000 Subject: IndexedRepository supports optional and throttled index repair. --- RELEASE-NOTES.txt | 1 + .../carbonado/repo/indexed/IndexedRepository.java | 23 +- .../repo/indexed/IndexedRepositoryBuilder.java | 58 +++- .../carbonado/repo/indexed/IndexedStorage.java | 344 +++++++++++++-------- .../carbonado/repo/indexed/ManagedIndex.java | 46 ++- .../repo/sleepycat/BDBRepositoryBuilder.java | 51 +++ 6 files changed, 392 insertions(+), 131 deletions(-) diff --git a/RELEASE-NOTES.txt b/RELEASE-NOTES.txt index 1a501f9..66a927d 100644 --- a/RELEASE-NOTES.txt +++ b/RELEASE-NOTES.txt @@ -14,6 +14,7 @@ Carbonado change history unnecessary delete/insert pairs. - JDBCRepository is more lenient with primary key validation. This allows Storables to be more easily mapped to views. +- IndexedRepository supports optional and throttled index repair. 1.1-BETA9 to 1.1-BETA10 ------------------------------- diff --git a/src/main/java/com/amazon/carbonado/repo/indexed/IndexedRepository.java b/src/main/java/com/amazon/carbonado/repo/indexed/IndexedRepository.java index 8516628..d558950 100644 --- a/src/main/java/com/amazon/carbonado/repo/indexed/IndexedRepository.java +++ b/src/main/java/com/amazon/carbonado/repo/indexed/IndexedRepository.java @@ -60,12 +60,20 @@ class IndexedRepository implements Repository, private final AtomicReference mRootRef; private final Repository mRepository; private final String mName; + private final boolean mIndexRepairEnabled; + private final double mIndexThrottle; private final StorageCollection mStorages; - IndexedRepository(AtomicReference rootRef, String name, Repository repository) { + IndexedRepository(AtomicReference rootRef, String name, + Repository repository, + boolean indexRepairEnabled, + double indexThrottle) + { mRootRef = rootRef; mRepository = repository; mName = name; + mIndexRepairEnabled = indexRepairEnabled; + mIndexThrottle = indexThrottle; mStorages = new StorageCollection() { protected Storage createStorage(Class type) @@ -76,7 +84,7 @@ class IndexedRepository implements Repository, if (Unindexed.class.isAssignableFrom(type)) { // Verify no indexes. int indexCount = IndexedStorage - .gatherRequiredIndexes(StorableIntrospector.examine(type)).size(); + .gatherDesiredIndexes(StorableIntrospector.examine(type)).size(); if (indexCount > 0) { throw new MalformedTypeException (type, "Storable cannot have any indexes: " + type + @@ -91,7 +99,8 @@ class IndexedRepository implements Repository, if (repository.getCapability(IndexInfoCapability.class) == null) { throw new UnsupportedOperationException - ("Wrapped repository doesn't support being indexed"); + ("Wrapped repository doesn't support being indexed -- " + + "it must support IndexInfoCapability."); } } @@ -204,4 +213,12 @@ class IndexedRepository implements Repository, Repository getWrappedRepository() { return mRepository; } + + boolean isIndexRepairEnabled() { + return mIndexRepairEnabled; + } + + double getIndexRepairThrottle() { + return mIndexThrottle; + } } diff --git a/src/main/java/com/amazon/carbonado/repo/indexed/IndexedRepositoryBuilder.java b/src/main/java/com/amazon/carbonado/repo/indexed/IndexedRepositoryBuilder.java index 2010230..4cd4afb 100644 --- a/src/main/java/com/amazon/carbonado/repo/indexed/IndexedRepositoryBuilder.java +++ b/src/main/java/com/amazon/carbonado/repo/indexed/IndexedRepositoryBuilder.java @@ -47,6 +47,8 @@ public class IndexedRepositoryBuilder extends AbstractRepositoryBuilder { private String mName; private boolean mIsMaster = true; private RepositoryBuilder mRepoBuilder; + private boolean mIndexRepairEnabled = true; + private double mIndexThrottle = 1.0; public IndexedRepositoryBuilder() { } @@ -71,7 +73,9 @@ public class IndexedRepositoryBuilder extends AbstractRepositoryBuilder { return wrapped; } - Repository repo = new IndexedRepository(rootRef, getName(), wrapped); + Repository repo = new IndexedRepository(rootRef, getName(), wrapped, + isIndexRepairEnabled(), + getIndexRepairThrottle()); rootRef.set(repo); return repo; } @@ -111,6 +115,58 @@ public class IndexedRepositoryBuilder extends AbstractRepositoryBuilder { mRepoBuilder = repoBuilder; } + /** + * @see #setIndexRepairEnabled(boolean) + * + * @return true by default + */ + public boolean isIndexRepairEnabled() { + return mIndexRepairEnabled; + } + + /** + * By default, index repair is enabled. In this mode, the first time a + * Storable type is used, new indexes are populated and old indexes are + * removed. Until finished, access to the Storable is blocked. + * + *

When index repair is disabled, the Storable is immediately + * available. This does have consequences, however. The set of indexes + * available for queries is defined by the intersection of the old + * and new index sets. The set of indexes that are kept up-to-date is + * defined by the union of the old and new index sets. + * + *

While index repair is disabled, another process can safely repair the + * indexes in the background. When it is complete, index repair can be + * enabled for this repository too. + */ + public void setIndexRepairEnabled(boolean enabled) { + mIndexRepairEnabled = enabled; + } + + /** + * Returns the throttle parameter used when indexes are added, dropped or + * bulk repaired. By default this value is 1.0, or maximum speed. + */ + public double getIndexRepairThrottle() { + return mIndexThrottle; + } + + /** + * Sets the throttle parameter used when indexes are added, dropped or bulk + * repaired. By default this value is 1.0, or maximum speed. + * + * @param desiredSpeed 1.0 = perform work at full speed, + * 0.5 = perform work at half speed, 0.0 = fully suspend work + */ + public void setIndexRepairThrottle(double desiredSpeed) { + if (desiredSpeed < 0.0) { + desiredSpeed = 0.0; + } else if (desiredSpeed > 1.0) { + desiredSpeed = 1.0; + } + mIndexThrottle = desiredSpeed; + } + public void errorCheck(Collection messages) throws ConfigurationException { super.errorCheck(messages); if (null == getWrappedRepository()) { diff --git a/src/main/java/com/amazon/carbonado/repo/indexed/IndexedStorage.java b/src/main/java/com/amazon/carbonado/repo/indexed/IndexedStorage.java index 5fb94eb..7b66f50 100644 --- a/src/main/java/com/amazon/carbonado/repo/indexed/IndexedStorage.java +++ b/src/main/java/com/amazon/carbonado/repo/indexed/IndexedStorage.java @@ -58,13 +58,17 @@ import com.amazon.carbonado.qe.StorageAccess; import com.amazon.carbonado.spi.StorableIndexSet; +import com.amazon.carbonado.util.Throttle; + +import static com.amazon.carbonado.repo.indexed.ManagedIndex.*; + /** * * * @author Brian S O'Neill */ class IndexedStorage implements Storage, StorageAccess { - static StorableIndexSet gatherRequiredIndexes(StorableInfo info) { + static StorableIndexSet gatherDesiredIndexes(StorableInfo info) { StorableIndexSet indexSet = new StorableIndexSet(); indexSet.addIndexes(info); indexSet.addAlternateKeys(info); @@ -74,8 +78,11 @@ class IndexedStorage implements Storage, StorageAccess final IndexedRepository mRepository; final Storage mMasterStorage; - private final Map, IndexInfo> mIndexInfoMap; - private final StorableIndexSet mIndexSet; + // Maps managed and queryable indexes to IndexInfo objects. + private final Map, IndexInfo> mAllIndexInfoMap; + + // Set of indexes available for queries to use. + private final StorableIndexSet mQueryableIndexSet; private final QueryEngine mQueryEngine; @@ -87,78 +94,69 @@ class IndexedStorage implements Storage, StorageAccess { mRepository = repository; mMasterStorage = masterStorage; - mIndexInfoMap = new IdentityHashMap, IndexInfo>(); + mAllIndexInfoMap = new IdentityHashMap, IndexInfo>(); StorableInfo info = StorableIntrospector.examine(masterStorage.getStorableType()); - // Determine what the set of indexes should be. - StorableIndexSet newIndexSet = gatherRequiredIndexes(info); - - // Mix in the indexes we get for free, but remove after reduce. A free - // index is one that the underlying storage is providing for us. We - // don't want to create redundant indexes. - IndexInfo[] infos = repository.getWrappedRepository() - .getCapability(IndexInfoCapability.class) - .getIndexInfo(masterStorage.getStorableType()); - - StorableIndex[] freeIndexes = new StorableIndex[infos.length]; - for (int i=0; i(masterStorage.getStorableType(), infos[i]); - newIndexSet.add(freeIndexes[i]); - mIndexInfoMap.put(freeIndexes[i], infos[i]); - } catch (IllegalArgumentException e) { - // Assume index is bogus, so ignore it. - } - } - - newIndexSet.reduce(Direction.ASCENDING); - - // Gather current indexes. - StorableIndexSet currentIndexSet = new StorableIndexSet(); - // Gather indexes to remove. - StorableIndexSet indexesToRemove = new StorableIndexSet(); - - Query query = repository.getWrappedRepository() - .storageFor(StoredIndexInfo.class) - // Primary key of StoredIndexInfo is an index descriptor, which - // starts with the storable type name. This emulates a "wildcard at - // the end" search. - .query("indexName >= ? & indexName < ?") - .with(getStorableType().getName() + '~') - .with(getStorableType().getName() + '~' + '\uffff'); - - List storedInfos; - Transaction txn = repository.getWrappedRepository() - .enterTopTransaction(IsolationLevel.READ_COMMITTED); - try { - storedInfos = query.fetch().toList(); - } finally { - txn.exit(); + // The set of indexes that the Storable defines, reduced. + final StorableIndexSet desiredIndexSet; + { + desiredIndexSet = gatherDesiredIndexes(info); + desiredIndexSet.reduce(Direction.ASCENDING); } - for (StoredIndexInfo indexInfo : storedInfos) { - String name = indexInfo.getIndexName(); - StorableIndex index; + // The set of indexes that are populated and available for use. This is + // determined by examining index metadata. If the Storable has not + // changed, it will be the same as desiredIndexSet. If any existing + // indexes use a property whose type has changed, it is added to + // bogusIndexSet. Bogus indexes are removed if repair is enabled. + final StorableIndexSet existingIndexSet; + final StorableIndexSet bogusIndexSet; + { + existingIndexSet = new StorableIndexSet(); + bogusIndexSet = new StorableIndexSet(); + + Query query = repository.getWrappedRepository() + .storageFor(StoredIndexInfo.class) + // Primary key of StoredIndexInfo is an index descriptor, which + // starts with the storable type name. This emulates a + // "wildcard at the end" search. + .query("indexName >= ? & indexName < ?") + .with(getStorableType().getName() + '~') + .with(getStorableType().getName() + '~' + '\uffff'); + + List storedInfos; + Transaction txn = repository.getWrappedRepository() + .enterTopTransaction(IsolationLevel.READ_COMMITTED); try { - index = StorableIndex.parseNameDescriptor(name, info); - } catch (IllegalArgumentException e) { - // Remove unrecognized descriptors. - unregisterIndex(name); - continue; + storedInfos = query.fetch().toList(); + } finally { + txn.exit(); } - if (index.getTypeDescriptor().equals(indexInfo.getIndexTypeDescriptor())) { - currentIndexSet.add(index); - } else { - indexesToRemove.add(index); + + for (StoredIndexInfo indexInfo : storedInfos) { + String name = indexInfo.getIndexName(); + StorableIndex index; + try { + index = StorableIndex.parseNameDescriptor(name, info); + } catch (IllegalArgumentException e) { + // Remove unrecognized descriptors. + unregisterIndex(name); + continue; + } + if (index.getTypeDescriptor().equals(indexInfo.getIndexTypeDescriptor())) { + existingIndexSet.add(index); + } else { + bogusIndexSet.add(index); + } } } nonUniqueSearch: { - // If any current indexes are non-unique, then indexes are for an + // If any existing indexes are non-unique, then indexes are for an // older version. For compatibility, don't uniquify the // indexes. Otherwise, these indexes would need to be rebuilt. - for (StorableIndex index : currentIndexSet) { + for (StorableIndex index : existingIndexSet) { if (!index.isUnique()) { break nonUniqueSearch; } @@ -169,58 +167,129 @@ class IndexedStorage implements Storage, StorageAccess // properties. As a side-effect of uniquify, all indexes are // unique, and thus have 'U' in the descriptor. Each time // nonUniqueSearch is run, it will not find any non-unique indexes. - newIndexSet.uniquify(info); + desiredIndexSet.uniquify(info); } - // Remove any old indexes. + // Gather free indexes, which are already provided by the underlying + // storage. They can be used for querying, but we should not manage them. + final StorableIndexSet freeIndexSet; { - indexesToRemove.addAll(currentIndexSet); + freeIndexSet = new StorableIndexSet(); + + IndexInfoCapability cap = repository.getWrappedRepository() + .getCapability(IndexInfoCapability.class); + + if (cap != null) { + for (IndexInfo ii : cap.getIndexInfo(masterStorage.getStorableType())) { + StorableIndex freeIndex; + try { + freeIndex = new StorableIndex(masterStorage.getStorableType(), ii); + } catch (IllegalArgumentException e) { + // Assume index is malformed, so ignore it. + continue; + } + mAllIndexInfoMap.put(freeIndex, ii); + freeIndexSet.add(freeIndex); + } + } + } - // Remove "free" indexes, since they don't need to be built. - for (int i=0; i queryableIndexSet; + { + queryableIndexSet = new StorableIndexSet(desiredIndexSet); + + if (!mRepository.isIndexRepairEnabled()) { + // Can only query the intersection. + queryableIndexSet.retainAll(existingIndexSet); } - indexesToRemove.removeAll(newIndexSet); + // Add the indexes we get for free. + queryableIndexSet.addAll(freeIndexSet); + } + + // The set of indexes that should be kept up-to-date. If index repair + // is enabled, this set will be the same as desiredIndexSet. Otherwise, + // it will be the union of existingIndexSet and desiredIndexSet. In + // both cases, "free" indexes are removed from the set too. By doing a + // union, no harm is caused by changing the index set and then reverting. + final StorableIndexSet managedIndexSet; + { + managedIndexSet = new StorableIndexSet(desiredIndexSet); - for (StorableIndex index : indexesToRemove) { - removeIndex(index); + if (!mRepository.isIndexRepairEnabled()) { + // Must manage the union. + managedIndexSet.addAll(existingIndexSet); } + + // Remove the indexes we get for free. + managedIndexSet.removeAll(freeIndexSet); } - currentIndexSet = newIndexSet; + // The set of indexes that should be removed and no longer managed. If + // index repair is enabled, this set will be the existingIndexSet minus + // desiredIndexSet minus freeIndexSet plus bogusIndexSet. Otherwise, it + // will be empty. + final StorableIndexSet removeIndexSet; + { + removeIndexSet = new StorableIndexSet(); - // Open all the indexes. - List> managedIndexList = new ArrayList>(); - for (StorableIndex index : currentIndexSet) { - IndexEntryGenerator builder = IndexEntryGenerator.getInstance(index); - Class indexEntryClass = builder.getIndexEntryClass(); - Storage indexEntryStorage = repository.getIndexEntryStorageFor(indexEntryClass); - ManagedIndex managedIndex = new ManagedIndex(index, builder, indexEntryStorage); + if (mRepository.isIndexRepairEnabled()) { + removeIndexSet.addAll(existingIndexSet); + removeIndexSet.removeAll(desiredIndexSet); + removeIndexSet.removeAll(freeIndexSet); + removeIndexSet.addAll(bogusIndexSet); + } + } - registerIndex(managedIndex); + // The set of indexes that should be freshly populated. If index repair + // is enabled, this set will be the desiredIndexSet minus + // existingIndexSet minus freeIndexSet. Otherwise, it will be empty. + final StorableIndexSet addIndexSet; + { + addIndexSet = new StorableIndexSet(); - mIndexInfoMap.put(index, managedIndex); - managedIndexList.add(managedIndex); + if (mRepository.isIndexRepairEnabled()) { + addIndexSet.addAll(desiredIndexSet); + addIndexSet.removeAll(existingIndexSet); + addIndexSet.removeAll(freeIndexSet); + } } - if (managedIndexList.size() > 0) { - // Add trigger to keep indexes up-to-date. - ManagedIndex[] managedIndexes = - managedIndexList.toArray(new ManagedIndex[managedIndexList.size()]); + // Support for managed indexes... + if (managedIndexSet.size() > 0) { + ManagedIndex[] managedIndexes = new ManagedIndex[managedIndexSet.size()]; + int i = 0; + for (StorableIndex index : managedIndexSet) { + IndexEntryGenerator builder = IndexEntryGenerator.getInstance(index); + Class indexEntryClass = builder.getIndexEntryClass(); + Storage indexEntryStorage = repository.getIndexEntryStorageFor(indexEntryClass); + ManagedIndex managedIndex = new ManagedIndex(index, builder, indexEntryStorage); + + mAllIndexInfoMap.put(index, managedIndex); + managedIndexes[i++] = managedIndex; + } if (!addTrigger(new IndexesTrigger(managedIndexes))) { throw new RepositoryException("Unable to add trigger for managing indexes"); } } - // Add "free" indexes back, in order for query engine to consider them. - for (int i=0; i index : removeIndexSet) { + removeIndex(index); } - mIndexSet = currentIndexSet; + // Now add new indexes. + for (StorableIndex index : addIndexSet) { + registerIndex((ManagedIndex) mAllIndexInfoMap.get(index)); + } + mQueryableIndexSet = queryableIndexSet; mQueryEngine = new QueryEngine(masterStorage.getStorableType(), repository); } @@ -252,16 +321,18 @@ class IndexedStorage implements Storage, StorageAccess return mMasterStorage.removeTrigger(trigger); } + // Required by IndexInfoCapability. public IndexInfo[] getIndexInfo() { - IndexInfo[] infos = new IndexInfo[mIndexInfoMap.size()]; - return mIndexInfoMap.values().toArray(infos); + IndexInfo[] infos = new IndexInfo[mAllIndexInfoMap.size()]; + return mAllIndexInfoMap.values().toArray(infos); } + // Required by IndexEntryAccessCapability. @SuppressWarnings("unchecked") public IndexEntryAccessor[] getIndexEntryAccessors() { List> accessors = - new ArrayList>(mIndexInfoMap.size()); - for (IndexInfo info : mIndexInfoMap.values()) { + new ArrayList>(mAllIndexInfoMap.size()); + for (IndexInfo info : mAllIndexInfoMap.values()) { if (info instanceof IndexEntryAccessor) { accessors.add((IndexEntryAccessor) info); } @@ -269,16 +340,19 @@ class IndexedStorage implements Storage, StorageAccess return accessors.toArray(new IndexEntryAccessor[accessors.size()]); } + // Required by StorageAccess. public QueryExecutorFactory getQueryExecutorFactory() { return mQueryEngine; } + // Required by StorageAccess. public Collection> getAllIndexes() { - return mIndexSet; + return mQueryableIndexSet; } + // Required by StorageAccess. public Storage storageDelegate(StorableIndex index) { - if (mIndexInfoMap.get(index) instanceof ManagedIndex) { + if (mAllIndexInfoMap.get(index) instanceof ManagedIndex) { // Index is managed by this storage, which is typical. return null; } @@ -336,7 +410,7 @@ class IndexedStorage implements Storage, StorageAccess // reversal. Only the lowest storage layer should examine this // parameter. - ManagedIndex indexInfo = (ManagedIndex) mIndexInfoMap.get(index); + ManagedIndex indexInfo = (ManagedIndex) mAllIndexInfoMap.get(index); Query query = indexInfo.getIndexEntryQueryFor (identityValues == null ? 0 : identityValues.length, @@ -383,7 +457,8 @@ class IndexedStorage implements Storage, StorageAccess } // New index, so populate it. - managedIndex.populateIndex(mRepository, mMasterStorage); + managedIndex.populateIndex(mRepository, mMasterStorage, + mRepository.getIndexRepairThrottle()); txn = mRepository.getWrappedRepository() .enterTopTransaction(IsolationLevel.READ_COMMITTED); @@ -444,39 +519,60 @@ class IndexedStorage implements Storage, StorageAccess return; } - // Doesn't completely remove the index, but it should free up space. - // TODO: when truncate method exists, call that instead - // TODO: set batchsize based on repository locktable size - int batchSize = 10; - while (true) { - Transaction txn = mRepository.getWrappedRepository() - .enterTopTransaction(IsolationLevel.READ_COMMITTED); - txn.setForUpdate(true); - - try { - Cursor cursor = indexEntryStorage.query().fetch(); - if (!cursor.hasNext()) { - break; - } - int count = 0; + { + // Doesn't completely remove the index, but it should free up space. + double desiredSpeed = mRepository.getIndexRepairThrottle(); + Throttle throttle = desiredSpeed < 1.0 ? new Throttle(POPULATE_THROTTLE_WINDOW) : null; + + long totalDropped = 0; + while (true) { + Transaction txn = mRepository.getWrappedRepository() + .enterTopTransaction(IsolationLevel.READ_COMMITTED); + txn.setForUpdate(true); try { - while (count++ < batchSize && cursor.hasNext()) { - cursor.next().tryDelete(); + Cursor cursor = indexEntryStorage.query().fetch(); + if (!cursor.hasNext()) { + break; + } + int count = 0; + final long savedTotal = totalDropped; + boolean anyFailure = false; + try { + while (count++ < POPULATE_BATCH_SIZE && cursor.hasNext()) { + if (cursor.next().tryDelete()) { + totalDropped++; + } else { + anyFailure = true; + } + } + } finally { + cursor.close(); } - } finally { - cursor.close(); - } - if (txn != null) { txn.commit(); - } - } catch (FetchException e) { - throw e.toPersistException(); - } finally { - if (txn != null) { + if (log.isInfoEnabled()) { + log.info("Removed " + totalDropped + " index entries"); + } + if (anyFailure && totalDropped <= savedTotal) { + log.warn("No indexes removed in last batch. " + + "Aborting index removal cleanup"); + break; + } + } catch (FetchException e) { + throw e.toPersistException(); + } finally { txn.exit(); } + + if (throttle != null) { + try { + throttle.throttle(desiredSpeed, POPULATE_THROTTLE_SLEEP_PRECISION); + } catch (InterruptedException e) { + throw new RepositoryException("Index removal interrupted"); + } + } } } + unregisterIndex(index); } } diff --git a/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java b/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java index 958bd7b..2e4cea3 100644 --- a/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java +++ b/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java @@ -45,13 +45,19 @@ import com.amazon.carbonado.spi.RepairExecutor; import com.amazon.carbonado.qe.BoundaryType; +import com.amazon.carbonado.util.Throttle; + /** * Encapsulates info and operations for a single index. * * @author Brian S O'Neill */ class ManagedIndex implements IndexEntryAccessor { - private static final int POPULATE_BATCH_SIZE = 256; + private static final int POPULATE_SORT_BUFFER_SIZE = 65536; + private static final int POPULATE_INFO_DELAY_MILLIS = 5000; + static final int POPULATE_BATCH_SIZE = 128; + static final int POPULATE_THROTTLE_WINDOW = POPULATE_BATCH_SIZE * 10; + static final int POPULATE_THROTTLE_SLEEP_PRECISION = 10; private final StorableIndex mIndex; private final IndexEntryGenerator mGenerator; @@ -293,7 +299,9 @@ class ManagedIndex implements IndexEntryAccessor { * * @param repo used to enter transactions */ - void populateIndex(Repository repo, Storage masterStorage) throws RepositoryException { + void populateIndex(Repository repo, Storage masterStorage, double desiredSpeed) + throws RepositoryException + { MergeSortBuffer buffer; Comparator c; @@ -325,12 +333,21 @@ class ManagedIndex implements IndexEntryAccessor { // Preload and sort all index entries for improved performance. - buffer = new MergeSortBuffer(mIndexEntryStorage); + buffer = new MergeSortBuffer(mIndexEntryStorage, null, POPULATE_SORT_BUFFER_SIZE); c = mGenerator.getComparator(); buffer.prepare(c); + long nextReportTime = System.currentTimeMillis() + POPULATE_INFO_DELAY_MILLIS; while (cursor.hasNext()) { buffer.add(makeIndexEntry(cursor.next())); + + if (log.isInfoEnabled()) { + long now = System.currentTimeMillis(); + if (now >= nextReportTime) { + log.info("Prepared " + buffer.size() + " new index entries"); + nextReportTime = now + POPULATE_INFO_DELAY_MILLIS; + } + } } // No need to commit transaction because no changes should have been made. @@ -341,6 +358,8 @@ class ManagedIndex implements IndexEntryAccessor { txn.exit(); } + // This is not expected to take long, since MergeSortBuffer sorts as + // needed. This just finishes off what was not written to a file. buffer.sort(); if (isUnique()) { @@ -348,6 +367,10 @@ class ManagedIndex implements IndexEntryAccessor { // _before_ inserting index entries. If there are duplicates, // fail, since unique index cannot be built. + if (log.isInfoEnabled()) { + log.info("Verifying unique index"); + } + Object last = null; for (Object obj : buffer) { if (last != null) { @@ -363,6 +386,13 @@ class ManagedIndex implements IndexEntryAccessor { } final int bufferSize = buffer.size(); + + if (log.isInfoEnabled()) { + log.info("Begin insert of " + bufferSize + " new index entries"); + } + + Throttle throttle = desiredSpeed < 1.0 ? new Throttle(POPULATE_THROTTLE_WINDOW) : null; + int totalInserted = 0; txn = repo.enterTopTransaction(IsolationLevel.READ_COMMITTED); @@ -386,6 +416,7 @@ class ManagedIndex implements IndexEntryAccessor { indexEntry.tryInsert(); } } + totalInserted++; if (totalInserted % POPULATE_BATCH_SIZE == 0) { txn.commit(); @@ -399,7 +430,16 @@ class ManagedIndex implements IndexEntryAccessor { txn = repo.enterTopTransaction(IsolationLevel.READ_COMMITTED); } + + if (throttle != null) { + try { + throttle.throttle(desiredSpeed, POPULATE_THROTTLE_SLEEP_PRECISION); + } catch (InterruptedException e) { + throw new RepositoryException("Index populate interrupted"); + } + } } + txn.commit(); } finally { txn.exit(); diff --git a/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBRepositoryBuilder.java b/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBRepositoryBuilder.java index e332157..1f60bfd 100644 --- a/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBRepositoryBuilder.java +++ b/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBRepositoryBuilder.java @@ -86,6 +86,8 @@ public class BDBRepositoryBuilder extends AbstractRepositoryBuilder { private File mDataHome; private String mSingleFileName; private boolean mIndexSupport = true; + private boolean mIndexRepairEnabled = true; + private double mIndexThrottle = 1.0; private boolean mReadOnly; private Long mCacheSize; private double mLockTimeout = 0.5; @@ -120,6 +122,8 @@ public class BDBRepositoryBuilder extends AbstractRepositoryBuilder { IndexedRepositoryBuilder ixBuilder = new IndexedRepositoryBuilder(); ixBuilder.setWrappedRepository(this); ixBuilder.setMaster(isMaster()); + ixBuilder.setIndexRepairEnabled(mIndexRepairEnabled); + ixBuilder.setIndexRepairThrottle(mIndexThrottle); return ixBuilder.build(rootRef); } finally { mIndexSupport = true; @@ -338,6 +342,53 @@ public class BDBRepositoryBuilder extends AbstractRepositoryBuilder { return mIndexSupport; } + /** + * @see #setIndexRepairEnabled(boolean) + * + * @return true by default + */ + public boolean isIndexRepairEnabled() { + return mIndexRepairEnabled; + } + + /** + * By default, index repair is enabled. In this mode, the first time a + * Storable type is used, new indexes are populated and old indexes are + * removed. Until finished, access to the Storable is blocked. + * + *

When index repair is disabled, the Storable is immediately + * available. This does have consequences, however. The set of indexes + * available for queries is defined by the intersection of the old + * and new index sets. The set of indexes that are kept up-to-date is + * defined by the union of the old and new index sets. + * + *

While index repair is disabled, another process can safely repair the + * indexes in the background. When it is complete, index repair can be + * enabled for this repository too. + */ + public void setIndexRepairEnabled(boolean enabled) { + mIndexRepairEnabled = enabled; + } + + /** + * Returns the throttle parameter used when indexes are added, dropped or + * bulk repaired. By default this value is 1.0, or maximum speed. + */ + public double getIndexRepairThrottle() { + return mIndexThrottle; + } + + /** + * Sets the throttle parameter used when indexes are added, dropped or bulk + * repaired. By default this value is 1.0, or maximum speed. + * + * @param desiredSpeed 1.0 = perform work at full speed, + * 0.5 = perform work at half speed, 0.0 = fully suspend work + */ + public void setIndexRepairThrottle(double desiredSpeed) { + mIndexThrottle = desiredSpeed; + } + /** * Sets the repository to read-only mode. By default, repository is opened * for reads and writes. -- cgit v1.2.3