diff options
author | Brian S. O'Neill <bronee@gmail.com> | 2007-03-15 22:18:27 +0000 |
---|---|---|
committer | Brian S. O'Neill <bronee@gmail.com> | 2007-03-15 22:18:27 +0000 |
commit | 9a9e104f22226a5d43fc51f48d51efa9f136afdd (patch) | |
tree | 7ac424217e2f80e65e9ce48aaa0068bc17c0eb5d /src/main/java/com/amazon/carbonado/repo | |
parent | a9c497ca3f29e7f48dfc3299e8237c753e4e54be (diff) |
IndexedRepository supports optional and throttled index repair.
Diffstat (limited to 'src/main/java/com/amazon/carbonado/repo')
5 files changed, 391 insertions, 131 deletions
diff --git a/src/main/java/com/amazon/carbonado/repo/indexed/IndexedRepository.java b/src/main/java/com/amazon/carbonado/repo/indexed/IndexedRepository.java index 8516628..d558950 100644 --- a/src/main/java/com/amazon/carbonado/repo/indexed/IndexedRepository.java +++ b/src/main/java/com/amazon/carbonado/repo/indexed/IndexedRepository.java @@ -60,12 +60,20 @@ class IndexedRepository implements Repository, private final AtomicReference<Repository> mRootRef;
private final Repository mRepository;
private final String mName;
+ private final boolean mIndexRepairEnabled;
+ private final double mIndexThrottle;
private final StorageCollection mStorages;
- IndexedRepository(AtomicReference<Repository> rootRef, String name, Repository repository) {
+ IndexedRepository(AtomicReference<Repository> rootRef, String name,
+ Repository repository,
+ boolean indexRepairEnabled,
+ double indexThrottle)
+ {
mRootRef = rootRef;
mRepository = repository;
mName = name;
+ mIndexRepairEnabled = indexRepairEnabled;
+ mIndexThrottle = indexThrottle;
mStorages = new StorageCollection() {
protected <S extends Storable> Storage<S> createStorage(Class<S> type)
@@ -76,7 +84,7 @@ class IndexedRepository implements Repository, if (Unindexed.class.isAssignableFrom(type)) {
// Verify no indexes.
int indexCount = IndexedStorage
- .gatherRequiredIndexes(StorableIntrospector.examine(type)).size();
+ .gatherDesiredIndexes(StorableIntrospector.examine(type)).size();
if (indexCount > 0) {
throw new MalformedTypeException
(type, "Storable cannot have any indexes: " + type +
@@ -91,7 +99,8 @@ class IndexedRepository implements Repository, if (repository.getCapability(IndexInfoCapability.class) == null) {
throw new UnsupportedOperationException
- ("Wrapped repository doesn't support being indexed");
+ ("Wrapped repository doesn't support being indexed -- " +
+ "it must support IndexInfoCapability.");
}
}
@@ -204,4 +213,12 @@ class IndexedRepository implements Repository, Repository getWrappedRepository() {
return mRepository;
}
+
+ boolean isIndexRepairEnabled() {
+ return mIndexRepairEnabled;
+ }
+
+ double getIndexRepairThrottle() {
+ return mIndexThrottle;
+ }
}
diff --git a/src/main/java/com/amazon/carbonado/repo/indexed/IndexedRepositoryBuilder.java b/src/main/java/com/amazon/carbonado/repo/indexed/IndexedRepositoryBuilder.java index 2010230..4cd4afb 100644 --- a/src/main/java/com/amazon/carbonado/repo/indexed/IndexedRepositoryBuilder.java +++ b/src/main/java/com/amazon/carbonado/repo/indexed/IndexedRepositoryBuilder.java @@ -47,6 +47,8 @@ public class IndexedRepositoryBuilder extends AbstractRepositoryBuilder { private String mName;
private boolean mIsMaster = true;
private RepositoryBuilder mRepoBuilder;
+ private boolean mIndexRepairEnabled = true;
+ private double mIndexThrottle = 1.0;
public IndexedRepositoryBuilder() {
}
@@ -71,7 +73,9 @@ public class IndexedRepositoryBuilder extends AbstractRepositoryBuilder { return wrapped;
}
- Repository repo = new IndexedRepository(rootRef, getName(), wrapped);
+ Repository repo = new IndexedRepository(rootRef, getName(), wrapped,
+ isIndexRepairEnabled(),
+ getIndexRepairThrottle());
rootRef.set(repo);
return repo;
}
@@ -111,6 +115,58 @@ public class IndexedRepositoryBuilder extends AbstractRepositoryBuilder { mRepoBuilder = repoBuilder;
}
+ /**
+ * @see #setIndexRepairEnabled(boolean)
+ *
+ * @return true by default
+ */
+ public boolean isIndexRepairEnabled() {
+ return mIndexRepairEnabled;
+ }
+
+ /**
+ * By default, index repair is enabled. In this mode, the first time a
+ * Storable type is used, new indexes are populated and old indexes are
+ * removed. Until finished, access to the Storable is blocked.
+ *
+ * <p>When index repair is disabled, the Storable is immediately
+ * available. This does have consequences, however. The set of indexes
+ * available for queries is defined by the <i>intersection</i> of the old
+ * and new index sets. The set of indexes that are kept up-to-date is
+ * defined by the <i>union</i> of the old and new index sets.
+ *
+ * <p>While index repair is disabled, another process can safely repair the
+ * indexes in the background. When it is complete, index repair can be
+ * enabled for this repository too.
+ */
+ public void setIndexRepairEnabled(boolean enabled) {
+ mIndexRepairEnabled = enabled;
+ }
+
+ /**
+ * Returns the throttle parameter used when indexes are added, dropped or
+ * bulk repaired. By default this value is 1.0, or maximum speed.
+ */
+ public double getIndexRepairThrottle() {
+ return mIndexThrottle;
+ }
+
+ /**
+ * Sets the throttle parameter used when indexes are added, dropped or bulk
+ * repaired. By default this value is 1.0, or maximum speed.
+ *
+ * @param desiredSpeed 1.0 = perform work at full speed,
+ * 0.5 = perform work at half speed, 0.0 = fully suspend work
+ */
+ public void setIndexRepairThrottle(double desiredSpeed) {
+ if (desiredSpeed < 0.0) {
+ desiredSpeed = 0.0;
+ } else if (desiredSpeed > 1.0) {
+ desiredSpeed = 1.0;
+ }
+ mIndexThrottle = desiredSpeed;
+ }
+
public void errorCheck(Collection<String> messages) throws ConfigurationException {
super.errorCheck(messages);
if (null == getWrappedRepository()) {
diff --git a/src/main/java/com/amazon/carbonado/repo/indexed/IndexedStorage.java b/src/main/java/com/amazon/carbonado/repo/indexed/IndexedStorage.java index 5fb94eb..7b66f50 100644 --- a/src/main/java/com/amazon/carbonado/repo/indexed/IndexedStorage.java +++ b/src/main/java/com/amazon/carbonado/repo/indexed/IndexedStorage.java @@ -58,13 +58,17 @@ import com.amazon.carbonado.qe.StorageAccess; import com.amazon.carbonado.spi.StorableIndexSet;
+import com.amazon.carbonado.util.Throttle;
+
+import static com.amazon.carbonado.repo.indexed.ManagedIndex.*;
+
/**
*
*
* @author Brian S O'Neill
*/
class IndexedStorage<S extends Storable> implements Storage<S>, StorageAccess<S> {
- static <S extends Storable> StorableIndexSet<S> gatherRequiredIndexes(StorableInfo<S> info) {
+ static <S extends Storable> StorableIndexSet<S> gatherDesiredIndexes(StorableInfo<S> info) {
StorableIndexSet<S> indexSet = new StorableIndexSet<S>();
indexSet.addIndexes(info);
indexSet.addAlternateKeys(info);
@@ -74,8 +78,11 @@ class IndexedStorage<S extends Storable> implements Storage<S>, StorageAccess<S> final IndexedRepository mRepository;
final Storage<S> mMasterStorage;
- private final Map<StorableIndex<S>, IndexInfo> mIndexInfoMap;
- private final StorableIndexSet<S> mIndexSet;
+ // Maps managed and queryable indexes to IndexInfo objects.
+ private final Map<StorableIndex<S>, IndexInfo> mAllIndexInfoMap;
+
+ // Set of indexes available for queries to use.
+ private final StorableIndexSet<S> mQueryableIndexSet;
private final QueryEngine<S> mQueryEngine;
@@ -87,78 +94,69 @@ class IndexedStorage<S extends Storable> implements Storage<S>, StorageAccess<S> {
mRepository = repository;
mMasterStorage = masterStorage;
- mIndexInfoMap = new IdentityHashMap<StorableIndex<S>, IndexInfo>();
+ mAllIndexInfoMap = new IdentityHashMap<StorableIndex<S>, IndexInfo>();
StorableInfo<S> info = StorableIntrospector.examine(masterStorage.getStorableType());
- // Determine what the set of indexes should be.
- StorableIndexSet<S> newIndexSet = gatherRequiredIndexes(info);
-
- // Mix in the indexes we get for free, but remove after reduce. A free
- // index is one that the underlying storage is providing for us. We
- // don't want to create redundant indexes.
- IndexInfo[] infos = repository.getWrappedRepository()
- .getCapability(IndexInfoCapability.class)
- .getIndexInfo(masterStorage.getStorableType());
-
- StorableIndex<S>[] freeIndexes = new StorableIndex[infos.length];
- for (int i=0; i<infos.length; i++) {
- try {
- freeIndexes[i] = new StorableIndex<S>(masterStorage.getStorableType(), infos[i]);
- newIndexSet.add(freeIndexes[i]);
- mIndexInfoMap.put(freeIndexes[i], infos[i]);
- } catch (IllegalArgumentException e) {
- // Assume index is bogus, so ignore it.
- }
- }
-
- newIndexSet.reduce(Direction.ASCENDING);
-
- // Gather current indexes.
- StorableIndexSet<S> currentIndexSet = new StorableIndexSet<S>();
- // Gather indexes to remove.
- StorableIndexSet<S> indexesToRemove = new StorableIndexSet<S>();
-
- Query<StoredIndexInfo> query = repository.getWrappedRepository()
- .storageFor(StoredIndexInfo.class)
- // Primary key of StoredIndexInfo is an index descriptor, which
- // starts with the storable type name. This emulates a "wildcard at
- // the end" search.
- .query("indexName >= ? & indexName < ?")
- .with(getStorableType().getName() + '~')
- .with(getStorableType().getName() + '~' + '\uffff');
-
- List<StoredIndexInfo> storedInfos;
- Transaction txn = repository.getWrappedRepository()
- .enterTopTransaction(IsolationLevel.READ_COMMITTED);
- try {
- storedInfos = query.fetch().toList();
- } finally {
- txn.exit();
+ // The set of indexes that the Storable defines, reduced.
+ final StorableIndexSet<S> desiredIndexSet;
+ {
+ desiredIndexSet = gatherDesiredIndexes(info);
+ desiredIndexSet.reduce(Direction.ASCENDING);
}
- for (StoredIndexInfo indexInfo : storedInfos) {
- String name = indexInfo.getIndexName();
- StorableIndex index;
+ // The set of indexes that are populated and available for use. This is
+ // determined by examining index metadata. If the Storable has not
+ // changed, it will be the same as desiredIndexSet. If any existing
+ // indexes use a property whose type has changed, it is added to
+ // bogusIndexSet. Bogus indexes are removed if repair is enabled.
+ final StorableIndexSet<S> existingIndexSet;
+ final StorableIndexSet<S> bogusIndexSet;
+ {
+ existingIndexSet = new StorableIndexSet<S>();
+ bogusIndexSet = new StorableIndexSet<S>();
+
+ Query<StoredIndexInfo> query = repository.getWrappedRepository()
+ .storageFor(StoredIndexInfo.class)
+ // Primary key of StoredIndexInfo is an index descriptor, which
+ // starts with the storable type name. This emulates a
+ // "wildcard at the end" search.
+ .query("indexName >= ? & indexName < ?")
+ .with(getStorableType().getName() + '~')
+ .with(getStorableType().getName() + '~' + '\uffff');
+
+ List<StoredIndexInfo> storedInfos;
+ Transaction txn = repository.getWrappedRepository()
+ .enterTopTransaction(IsolationLevel.READ_COMMITTED);
try {
- index = StorableIndex.parseNameDescriptor(name, info);
- } catch (IllegalArgumentException e) {
- // Remove unrecognized descriptors.
- unregisterIndex(name);
- continue;
+ storedInfos = query.fetch().toList();
+ } finally {
+ txn.exit();
}
- if (index.getTypeDescriptor().equals(indexInfo.getIndexTypeDescriptor())) {
- currentIndexSet.add(index);
- } else {
- indexesToRemove.add(index);
+
+ for (StoredIndexInfo indexInfo : storedInfos) {
+ String name = indexInfo.getIndexName();
+ StorableIndex index;
+ try {
+ index = StorableIndex.parseNameDescriptor(name, info);
+ } catch (IllegalArgumentException e) {
+ // Remove unrecognized descriptors.
+ unregisterIndex(name);
+ continue;
+ }
+ if (index.getTypeDescriptor().equals(indexInfo.getIndexTypeDescriptor())) {
+ existingIndexSet.add(index);
+ } else {
+ bogusIndexSet.add(index);
+ }
}
}
nonUniqueSearch: {
- // If any current indexes are non-unique, then indexes are for an
+ // If any existing indexes are non-unique, then indexes are for an
// older version. For compatibility, don't uniquify the
// indexes. Otherwise, these indexes would need to be rebuilt.
- for (StorableIndex<S> index : currentIndexSet) {
+ for (StorableIndex<S> index : existingIndexSet) {
if (!index.isUnique()) {
break nonUniqueSearch;
}
@@ -169,58 +167,129 @@ class IndexedStorage<S extends Storable> implements Storage<S>, StorageAccess<S> // properties. As a side-effect of uniquify, all indexes are
// unique, and thus have 'U' in the descriptor. Each time
// nonUniqueSearch is run, it will not find any non-unique indexes.
- newIndexSet.uniquify(info);
+ desiredIndexSet.uniquify(info);
}
- // Remove any old indexes.
+ // Gather free indexes, which are already provided by the underlying
+ // storage. They can be used for querying, but we should not manage them.
+ final StorableIndexSet<S> freeIndexSet;
{
- indexesToRemove.addAll(currentIndexSet);
+ freeIndexSet = new StorableIndexSet<S>();
+
+ IndexInfoCapability cap = repository.getWrappedRepository()
+ .getCapability(IndexInfoCapability.class);
+
+ if (cap != null) {
+ for (IndexInfo ii : cap.getIndexInfo(masterStorage.getStorableType())) {
+ StorableIndex<S> freeIndex;
+ try {
+ freeIndex = new StorableIndex<S>(masterStorage.getStorableType(), ii);
+ } catch (IllegalArgumentException e) {
+ // Assume index is malformed, so ignore it.
+ continue;
+ }
+ mAllIndexInfoMap.put(freeIndex, ii);
+ freeIndexSet.add(freeIndex);
+ }
+ }
+ }
- // Remove "free" indexes, since they don't need to be built.
- for (int i=0; i<freeIndexes.length; i++) {
- newIndexSet.remove(freeIndexes[i]);
+ // The set of indexes that can actually be used for querying. If index
+ // repair is enabled, this set will be the same as
+ // desiredIndexSet. Otherwise, it will be the intersection of
+ // existingIndexSet and desiredIndexSet. In both cases, "free" indexes
+ // are added to the set too.
+ final StorableIndexSet<S> queryableIndexSet;
+ {
+ queryableIndexSet = new StorableIndexSet<S>(desiredIndexSet);
+
+ if (!mRepository.isIndexRepairEnabled()) {
+ // Can only query the intersection.
+ queryableIndexSet.retainAll(existingIndexSet);
}
- indexesToRemove.removeAll(newIndexSet);
+ // Add the indexes we get for free.
+ queryableIndexSet.addAll(freeIndexSet);
+ }
+
+ // The set of indexes that should be kept up-to-date. If index repair
+ // is enabled, this set will be the same as desiredIndexSet. Otherwise,
+ // it will be the union of existingIndexSet and desiredIndexSet. In
+ // both cases, "free" indexes are removed from the set too. By doing a
+ // union, no harm is caused by changing the index set and then reverting.
+ final StorableIndexSet<S> managedIndexSet;
+ {
+ managedIndexSet = new StorableIndexSet<S>(desiredIndexSet);
- for (StorableIndex<S> index : indexesToRemove) {
- removeIndex(index);
+ if (!mRepository.isIndexRepairEnabled()) {
+ // Must manage the union.
+ managedIndexSet.addAll(existingIndexSet);
}
+
+ // Remove the indexes we get for free.
+ managedIndexSet.removeAll(freeIndexSet);
}
- currentIndexSet = newIndexSet;
+ // The set of indexes that should be removed and no longer managed. If
+ // index repair is enabled, this set will be the existingIndexSet minus
+ // desiredIndexSet minus freeIndexSet plus bogusIndexSet. Otherwise, it
+ // will be empty.
+ final StorableIndexSet<S> removeIndexSet;
+ {
+ removeIndexSet = new StorableIndexSet<S>();
- // Open all the indexes.
- List<ManagedIndex<S>> managedIndexList = new ArrayList<ManagedIndex<S>>();
- for (StorableIndex<S> index : currentIndexSet) {
- IndexEntryGenerator<S> builder = IndexEntryGenerator.getInstance(index);
- Class<? extends Storable> indexEntryClass = builder.getIndexEntryClass();
- Storage<?> indexEntryStorage = repository.getIndexEntryStorageFor(indexEntryClass);
- ManagedIndex managedIndex = new ManagedIndex<S>(index, builder, indexEntryStorage);
+ if (mRepository.isIndexRepairEnabled()) {
+ removeIndexSet.addAll(existingIndexSet);
+ removeIndexSet.removeAll(desiredIndexSet);
+ removeIndexSet.removeAll(freeIndexSet);
+ removeIndexSet.addAll(bogusIndexSet);
+ }
+ }
- registerIndex(managedIndex);
+ // The set of indexes that should be freshly populated. If index repair
+ // is enabled, this set will be the desiredIndexSet minus
+ // existingIndexSet minus freeIndexSet. Otherwise, it will be empty.
+ final StorableIndexSet<S> addIndexSet;
+ {
+ addIndexSet = new StorableIndexSet<S>();
- mIndexInfoMap.put(index, managedIndex);
- managedIndexList.add(managedIndex);
+ if (mRepository.isIndexRepairEnabled()) {
+ addIndexSet.addAll(desiredIndexSet);
+ addIndexSet.removeAll(existingIndexSet);
+ addIndexSet.removeAll(freeIndexSet);
+ }
}
- if (managedIndexList.size() > 0) {
- // Add trigger to keep indexes up-to-date.
- ManagedIndex<S>[] managedIndexes =
- managedIndexList.toArray(new ManagedIndex[managedIndexList.size()]);
+ // Support for managed indexes...
+ if (managedIndexSet.size() > 0) {
+ ManagedIndex<S>[] managedIndexes = new ManagedIndex[managedIndexSet.size()];
+ int i = 0;
+ for (StorableIndex<S> index : managedIndexSet) {
+ IndexEntryGenerator<S> builder = IndexEntryGenerator.getInstance(index);
+ Class<? extends Storable> indexEntryClass = builder.getIndexEntryClass();
+ Storage<?> indexEntryStorage = repository.getIndexEntryStorageFor(indexEntryClass);
+ ManagedIndex managedIndex = new ManagedIndex<S>(index, builder, indexEntryStorage);
+
+ mAllIndexInfoMap.put(index, managedIndex);
+ managedIndexes[i++] = managedIndex;
+ }
if (!addTrigger(new IndexesTrigger<S>(managedIndexes))) {
throw new RepositoryException("Unable to add trigger for managing indexes");
}
}
- // Add "free" indexes back, in order for query engine to consider them.
- for (int i=0; i<freeIndexes.length; i++) {
- currentIndexSet.add(freeIndexes[i]);
+ // Okay, now start doing some damage. First, remove unnecessary indexes.
+ for (StorableIndex<S> index : removeIndexSet) {
+ removeIndex(index);
}
- mIndexSet = currentIndexSet;
+ // Now add new indexes.
+ for (StorableIndex<S> index : addIndexSet) {
+ registerIndex((ManagedIndex) mAllIndexInfoMap.get(index));
+ }
+ mQueryableIndexSet = queryableIndexSet;
mQueryEngine = new QueryEngine<S>(masterStorage.getStorableType(), repository);
}
@@ -252,16 +321,18 @@ class IndexedStorage<S extends Storable> implements Storage<S>, StorageAccess<S> return mMasterStorage.removeTrigger(trigger);
}
+ // Required by IndexInfoCapability.
public IndexInfo[] getIndexInfo() {
- IndexInfo[] infos = new IndexInfo[mIndexInfoMap.size()];
- return mIndexInfoMap.values().toArray(infos);
+ IndexInfo[] infos = new IndexInfo[mAllIndexInfoMap.size()];
+ return mAllIndexInfoMap.values().toArray(infos);
}
+ // Required by IndexEntryAccessCapability.
@SuppressWarnings("unchecked")
public IndexEntryAccessor<S>[] getIndexEntryAccessors() {
List<IndexEntryAccessor<S>> accessors =
- new ArrayList<IndexEntryAccessor<S>>(mIndexInfoMap.size());
- for (IndexInfo info : mIndexInfoMap.values()) {
+ new ArrayList<IndexEntryAccessor<S>>(mAllIndexInfoMap.size());
+ for (IndexInfo info : mAllIndexInfoMap.values()) {
if (info instanceof IndexEntryAccessor) {
accessors.add((IndexEntryAccessor<S>) info);
}
@@ -269,16 +340,19 @@ class IndexedStorage<S extends Storable> implements Storage<S>, StorageAccess<S> return accessors.toArray(new IndexEntryAccessor[accessors.size()]);
}
+ // Required by StorageAccess.
public QueryExecutorFactory<S> getQueryExecutorFactory() {
return mQueryEngine;
}
+ // Required by StorageAccess.
public Collection<StorableIndex<S>> getAllIndexes() {
- return mIndexSet;
+ return mQueryableIndexSet;
}
+ // Required by StorageAccess.
public Storage<S> storageDelegate(StorableIndex<S> index) {
- if (mIndexInfoMap.get(index) instanceof ManagedIndex) {
+ if (mAllIndexInfoMap.get(index) instanceof ManagedIndex) {
// Index is managed by this storage, which is typical.
return null;
}
@@ -336,7 +410,7 @@ class IndexedStorage<S extends Storable> implements Storage<S>, StorageAccess<S> // reversal. Only the lowest storage layer should examine this
// parameter.
- ManagedIndex<S> indexInfo = (ManagedIndex<S>) mIndexInfoMap.get(index);
+ ManagedIndex<S> indexInfo = (ManagedIndex<S>) mAllIndexInfoMap.get(index);
Query<?> query = indexInfo.getIndexEntryQueryFor
(identityValues == null ? 0 : identityValues.length,
@@ -383,7 +457,8 @@ class IndexedStorage<S extends Storable> implements Storage<S>, StorageAccess<S> }
// New index, so populate it.
- managedIndex.populateIndex(mRepository, mMasterStorage);
+ managedIndex.populateIndex(mRepository, mMasterStorage,
+ mRepository.getIndexRepairThrottle());
txn = mRepository.getWrappedRepository()
.enterTopTransaction(IsolationLevel.READ_COMMITTED);
@@ -444,39 +519,60 @@ class IndexedStorage<S extends Storable> implements Storage<S>, StorageAccess<S> return;
}
- // Doesn't completely remove the index, but it should free up space.
- // TODO: when truncate method exists, call that instead
- // TODO: set batchsize based on repository locktable size
- int batchSize = 10;
- while (true) {
- Transaction txn = mRepository.getWrappedRepository()
- .enterTopTransaction(IsolationLevel.READ_COMMITTED);
- txn.setForUpdate(true);
-
- try {
- Cursor<? extends Storable> cursor = indexEntryStorage.query().fetch();
- if (!cursor.hasNext()) {
- break;
- }
- int count = 0;
+ {
+ // Doesn't completely remove the index, but it should free up space.
+ double desiredSpeed = mRepository.getIndexRepairThrottle();
+ Throttle throttle = desiredSpeed < 1.0 ? new Throttle(POPULATE_THROTTLE_WINDOW) : null;
+
+ long totalDropped = 0;
+ while (true) {
+ Transaction txn = mRepository.getWrappedRepository()
+ .enterTopTransaction(IsolationLevel.READ_COMMITTED);
+ txn.setForUpdate(true);
try {
- while (count++ < batchSize && cursor.hasNext()) {
- cursor.next().tryDelete();
+ Cursor<? extends Storable> cursor = indexEntryStorage.query().fetch();
+ if (!cursor.hasNext()) {
+ break;
+ }
+ int count = 0;
+ final long savedTotal = totalDropped;
+ boolean anyFailure = false;
+ try {
+ while (count++ < POPULATE_BATCH_SIZE && cursor.hasNext()) {
+ if (cursor.next().tryDelete()) {
+ totalDropped++;
+ } else {
+ anyFailure = true;
+ }
+ }
+ } finally {
+ cursor.close();
}
- } finally {
- cursor.close();
- }
- if (txn != null) {
txn.commit();
- }
- } catch (FetchException e) {
- throw e.toPersistException();
- } finally {
- if (txn != null) {
+ if (log.isInfoEnabled()) {
+ log.info("Removed " + totalDropped + " index entries");
+ }
+ if (anyFailure && totalDropped <= savedTotal) {
+ log.warn("No indexes removed in last batch. " +
+ "Aborting index removal cleanup");
+ break;
+ }
+ } catch (FetchException e) {
+ throw e.toPersistException();
+ } finally {
txn.exit();
}
+
+ if (throttle != null) {
+ try {
+ throttle.throttle(desiredSpeed, POPULATE_THROTTLE_SLEEP_PRECISION);
+ } catch (InterruptedException e) {
+ throw new RepositoryException("Index removal interrupted");
+ }
+ }
}
}
+
unregisterIndex(index);
}
}
diff --git a/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java b/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java index 958bd7b..2e4cea3 100644 --- a/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java +++ b/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java @@ -45,13 +45,19 @@ import com.amazon.carbonado.spi.RepairExecutor; import com.amazon.carbonado.qe.BoundaryType;
+import com.amazon.carbonado.util.Throttle;
+
/**
* Encapsulates info and operations for a single index.
*
* @author Brian S O'Neill
*/
class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {
- private static final int POPULATE_BATCH_SIZE = 256;
+ private static final int POPULATE_SORT_BUFFER_SIZE = 65536;
+ private static final int POPULATE_INFO_DELAY_MILLIS = 5000;
+ static final int POPULATE_BATCH_SIZE = 128;
+ static final int POPULATE_THROTTLE_WINDOW = POPULATE_BATCH_SIZE * 10;
+ static final int POPULATE_THROTTLE_SLEEP_PRECISION = 10;
private final StorableIndex mIndex;
private final IndexEntryGenerator<S> mGenerator;
@@ -293,7 +299,9 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> { *
* @param repo used to enter transactions
*/
- void populateIndex(Repository repo, Storage<S> masterStorage) throws RepositoryException {
+ void populateIndex(Repository repo, Storage<S> masterStorage, double desiredSpeed)
+ throws RepositoryException
+ {
MergeSortBuffer buffer;
Comparator c;
@@ -325,12 +333,21 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> { // Preload and sort all index entries for improved performance.
- buffer = new MergeSortBuffer(mIndexEntryStorage);
+ buffer = new MergeSortBuffer(mIndexEntryStorage, null, POPULATE_SORT_BUFFER_SIZE);
c = mGenerator.getComparator();
buffer.prepare(c);
+ long nextReportTime = System.currentTimeMillis() + POPULATE_INFO_DELAY_MILLIS;
while (cursor.hasNext()) {
buffer.add(makeIndexEntry(cursor.next()));
+
+ if (log.isInfoEnabled()) {
+ long now = System.currentTimeMillis();
+ if (now >= nextReportTime) {
+ log.info("Prepared " + buffer.size() + " new index entries");
+ nextReportTime = now + POPULATE_INFO_DELAY_MILLIS;
+ }
+ }
}
// No need to commit transaction because no changes should have been made.
@@ -341,6 +358,8 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> { txn.exit();
}
+ // This is not expected to take long, since MergeSortBuffer sorts as
+ // needed. This just finishes off what was not written to a file.
buffer.sort();
if (isUnique()) {
@@ -348,6 +367,10 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> { // _before_ inserting index entries. If there are duplicates,
// fail, since unique index cannot be built.
+ if (log.isInfoEnabled()) {
+ log.info("Verifying unique index");
+ }
+
Object last = null;
for (Object obj : buffer) {
if (last != null) {
@@ -363,6 +386,13 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> { }
final int bufferSize = buffer.size();
+
+ if (log.isInfoEnabled()) {
+ log.info("Begin insert of " + bufferSize + " new index entries");
+ }
+
+ Throttle throttle = desiredSpeed < 1.0 ? new Throttle(POPULATE_THROTTLE_WINDOW) : null;
+
int totalInserted = 0;
txn = repo.enterTopTransaction(IsolationLevel.READ_COMMITTED);
@@ -386,6 +416,7 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> { indexEntry.tryInsert();
}
}
+
totalInserted++;
if (totalInserted % POPULATE_BATCH_SIZE == 0) {
txn.commit();
@@ -399,7 +430,16 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> { txn = repo.enterTopTransaction(IsolationLevel.READ_COMMITTED);
}
+
+ if (throttle != null) {
+ try {
+ throttle.throttle(desiredSpeed, POPULATE_THROTTLE_SLEEP_PRECISION);
+ } catch (InterruptedException e) {
+ throw new RepositoryException("Index populate interrupted");
+ }
+ }
}
+
txn.commit();
} finally {
txn.exit();
diff --git a/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBRepositoryBuilder.java b/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBRepositoryBuilder.java index e332157..1f60bfd 100644 --- a/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBRepositoryBuilder.java +++ b/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBRepositoryBuilder.java @@ -86,6 +86,8 @@ public class BDBRepositoryBuilder extends AbstractRepositoryBuilder { private File mDataHome;
private String mSingleFileName;
private boolean mIndexSupport = true;
+ private boolean mIndexRepairEnabled = true;
+ private double mIndexThrottle = 1.0;
private boolean mReadOnly;
private Long mCacheSize;
private double mLockTimeout = 0.5;
@@ -120,6 +122,8 @@ public class BDBRepositoryBuilder extends AbstractRepositoryBuilder { IndexedRepositoryBuilder ixBuilder = new IndexedRepositoryBuilder();
ixBuilder.setWrappedRepository(this);
ixBuilder.setMaster(isMaster());
+ ixBuilder.setIndexRepairEnabled(mIndexRepairEnabled);
+ ixBuilder.setIndexRepairThrottle(mIndexThrottle);
return ixBuilder.build(rootRef);
} finally {
mIndexSupport = true;
@@ -339,6 +343,53 @@ public class BDBRepositoryBuilder extends AbstractRepositoryBuilder { }
/**
+ * @see #setIndexRepairEnabled(boolean)
+ *
+ * @return true by default
+ */
+ public boolean isIndexRepairEnabled() {
+ return mIndexRepairEnabled;
+ }
+
+ /**
+ * By default, index repair is enabled. In this mode, the first time a
+ * Storable type is used, new indexes are populated and old indexes are
+ * removed. Until finished, access to the Storable is blocked.
+ *
+ * <p>When index repair is disabled, the Storable is immediately
+ * available. This does have consequences, however. The set of indexes
+ * available for queries is defined by the <i>intersection</i> of the old
+ * and new index sets. The set of indexes that are kept up-to-date is
+ * defined by the <i>union</i> of the old and new index sets.
+ *
+ * <p>While index repair is disabled, another process can safely repair the
+ * indexes in the background. When it is complete, index repair can be
+ * enabled for this repository too.
+ */
+ public void setIndexRepairEnabled(boolean enabled) {
+ mIndexRepairEnabled = enabled;
+ }
+
+ /**
+ * Returns the throttle parameter used when indexes are added, dropped or
+ * bulk repaired. By default this value is 1.0, or maximum speed.
+ */
+ public double getIndexRepairThrottle() {
+ return mIndexThrottle;
+ }
+
+ /**
+ * Sets the throttle parameter used when indexes are added, dropped or bulk
+ * repaired. By default this value is 1.0, or maximum speed.
+ *
+ * @param desiredSpeed 1.0 = perform work at full speed,
+ * 0.5 = perform work at half speed, 0.0 = fully suspend work
+ */
+ public void setIndexRepairThrottle(double desiredSpeed) {
+ mIndexThrottle = desiredSpeed;
+ }
+
+ /**
* Sets the repository to read-only mode. By default, repository is opened
* for reads and writes.
*/
|