diff options
| author | Brian S. O'Neill <bronee@gmail.com> | 2007-03-15 22:18:27 +0000 | 
|---|---|---|
| committer | Brian S. O'Neill <bronee@gmail.com> | 2007-03-15 22:18:27 +0000 | 
| commit | 9a9e104f22226a5d43fc51f48d51efa9f136afdd (patch) | |
| tree | 7ac424217e2f80e65e9ce48aaa0068bc17c0eb5d /src/main/java/com/amazon | |
| parent | a9c497ca3f29e7f48dfc3299e8237c753e4e54be (diff) | |
IndexedRepository supports optional and throttled index repair.
Diffstat (limited to 'src/main/java/com/amazon')
5 files changed, 391 insertions, 131 deletions
| diff --git a/src/main/java/com/amazon/carbonado/repo/indexed/IndexedRepository.java b/src/main/java/com/amazon/carbonado/repo/indexed/IndexedRepository.java index 8516628..d558950 100644 --- a/src/main/java/com/amazon/carbonado/repo/indexed/IndexedRepository.java +++ b/src/main/java/com/amazon/carbonado/repo/indexed/IndexedRepository.java @@ -60,12 +60,20 @@ class IndexedRepository implements Repository,      private final AtomicReference<Repository> mRootRef;
      private final Repository mRepository;
      private final String mName;
 +    private final boolean mIndexRepairEnabled;
 +    private final double mIndexThrottle;
      private final StorageCollection mStorages;
 -    IndexedRepository(AtomicReference<Repository> rootRef, String name, Repository repository) {
 +    IndexedRepository(AtomicReference<Repository> rootRef, String name,
 +                      Repository repository,
 +                      boolean indexRepairEnabled,
 +                      double indexThrottle)
 +    {
          mRootRef = rootRef;
          mRepository = repository;
          mName = name;
 +        mIndexRepairEnabled = indexRepairEnabled;
 +        mIndexThrottle = indexThrottle;
          mStorages = new StorageCollection() {
              protected <S extends Storable> Storage<S> createStorage(Class<S> type)
 @@ -76,7 +84,7 @@ class IndexedRepository implements Repository,                  if (Unindexed.class.isAssignableFrom(type)) {
                      // Verify no indexes.
                      int indexCount = IndexedStorage
 -                        .gatherRequiredIndexes(StorableIntrospector.examine(type)).size();
 +                        .gatherDesiredIndexes(StorableIntrospector.examine(type)).size();
                      if (indexCount > 0) {
                          throw new MalformedTypeException
                              (type, "Storable cannot have any indexes: " + type +
 @@ -91,7 +99,8 @@ class IndexedRepository implements Repository,          if (repository.getCapability(IndexInfoCapability.class) == null) {
              throw new UnsupportedOperationException
 -                ("Wrapped repository doesn't support being indexed");
 +                ("Wrapped repository doesn't support being indexed -- " +
 +                 "it must support IndexInfoCapability.");
          }
      }
 @@ -204,4 +213,12 @@ class IndexedRepository implements Repository,      Repository getWrappedRepository() {
          return mRepository;
      }
 +
 +    boolean isIndexRepairEnabled() {
 +        return mIndexRepairEnabled;
 +    }
 +
 +    double getIndexRepairThrottle() {
 +        return mIndexThrottle;
 +    }
  }
 diff --git a/src/main/java/com/amazon/carbonado/repo/indexed/IndexedRepositoryBuilder.java b/src/main/java/com/amazon/carbonado/repo/indexed/IndexedRepositoryBuilder.java index 2010230..4cd4afb 100644 --- a/src/main/java/com/amazon/carbonado/repo/indexed/IndexedRepositoryBuilder.java +++ b/src/main/java/com/amazon/carbonado/repo/indexed/IndexedRepositoryBuilder.java @@ -47,6 +47,8 @@ public class IndexedRepositoryBuilder extends AbstractRepositoryBuilder {      private String mName;
      private boolean mIsMaster = true;
      private RepositoryBuilder mRepoBuilder;
 +    private boolean mIndexRepairEnabled = true;
 +    private double mIndexThrottle = 1.0;
      public IndexedRepositoryBuilder() {
      }
 @@ -71,7 +73,9 @@ public class IndexedRepositoryBuilder extends AbstractRepositoryBuilder {              return wrapped;
          }
 -        Repository repo = new IndexedRepository(rootRef, getName(), wrapped);
 +        Repository repo = new IndexedRepository(rootRef, getName(), wrapped,
 +                                                isIndexRepairEnabled(),
 +                                                getIndexRepairThrottle());
          rootRef.set(repo);
          return repo;
      }
 @@ -111,6 +115,58 @@ public class IndexedRepositoryBuilder extends AbstractRepositoryBuilder {          mRepoBuilder = repoBuilder;
      }
 +    /**
 +     * @see #setIndexRepairEnabled(boolean)
 +     *
 +     * @return true by default
 +     */
 +    public boolean isIndexRepairEnabled() {
 +        return mIndexRepairEnabled;
 +    }
 +
 +    /**
 +     * By default, index repair is enabled. In this mode, the first time a
 +     * Storable type is used, new indexes are populated and old indexes are
 +     * removed. Until finished, access to the Storable is blocked.
 +     *
 +     * <p>When index repair is disabled, the Storable is immediately
 +     * available. This does have consequences, however. The set of indexes
 +     * available for queries is defined by the <i>intersection</i> of the old
 +     * and new index sets. The set of indexes that are kept up-to-date is
 +     * defined by the <i>union</i> of the old and new index sets.
 +     *
 +     * <p>While index repair is disabled, another process can safely repair the
 +     * indexes in the background. When it is complete, index repair can be
 +     * enabled for this repository too.
 +     */
 +    public void setIndexRepairEnabled(boolean enabled) {
 +        mIndexRepairEnabled = enabled;
 +    }
 +
 +    /**
 +     * Returns the throttle parameter used when indexes are added, dropped or
 +     * bulk repaired. By default this value is 1.0, or maximum speed.
 +     */
 +    public double getIndexRepairThrottle() {
 +        return mIndexThrottle;
 +    }
 +
 +    /**
 +     * Sets the throttle parameter used when indexes are added, dropped or bulk
 +     * repaired. By default this value is 1.0, or maximum speed.
 +     *
 +     * @param desiredSpeed 1.0 = perform work at full speed,
 +     * 0.5 = perform work at half speed, 0.0 = fully suspend work
 +     */
 +    public void setIndexRepairThrottle(double desiredSpeed) {
 +        if (desiredSpeed < 0.0) {
 +            desiredSpeed = 0.0;
 +        } else if (desiredSpeed > 1.0) {
 +            desiredSpeed = 1.0;
 +        }
 +        mIndexThrottle = desiredSpeed;
 +    }
 +
      public void errorCheck(Collection<String> messages) throws ConfigurationException {
          super.errorCheck(messages);
          if (null == getWrappedRepository()) {
 diff --git a/src/main/java/com/amazon/carbonado/repo/indexed/IndexedStorage.java b/src/main/java/com/amazon/carbonado/repo/indexed/IndexedStorage.java index 5fb94eb..7b66f50 100644 --- a/src/main/java/com/amazon/carbonado/repo/indexed/IndexedStorage.java +++ b/src/main/java/com/amazon/carbonado/repo/indexed/IndexedStorage.java @@ -58,13 +58,17 @@ import com.amazon.carbonado.qe.StorageAccess;  import com.amazon.carbonado.spi.StorableIndexSet;
 +import com.amazon.carbonado.util.Throttle;
 +
 +import static com.amazon.carbonado.repo.indexed.ManagedIndex.*;
 +
  /**
   *
   *
   * @author Brian S O'Neill
   */
  class IndexedStorage<S extends Storable> implements Storage<S>, StorageAccess<S> {
 -    static <S extends Storable> StorableIndexSet<S> gatherRequiredIndexes(StorableInfo<S> info) {
 +    static <S extends Storable> StorableIndexSet<S> gatherDesiredIndexes(StorableInfo<S> info) {
          StorableIndexSet<S> indexSet = new StorableIndexSet<S>();
          indexSet.addIndexes(info);
          indexSet.addAlternateKeys(info);
 @@ -74,8 +78,11 @@ class IndexedStorage<S extends Storable> implements Storage<S>, StorageAccess<S>      final IndexedRepository mRepository;
      final Storage<S> mMasterStorage;
 -    private final Map<StorableIndex<S>, IndexInfo> mIndexInfoMap;
 -    private final StorableIndexSet<S> mIndexSet;
 +    // Maps managed and queryable indexes to IndexInfo objects.
 +    private final Map<StorableIndex<S>, IndexInfo> mAllIndexInfoMap;
 +
 +    // Set of indexes available for queries to use.
 +    private final StorableIndexSet<S> mQueryableIndexSet;
      private final QueryEngine<S> mQueryEngine;
 @@ -87,78 +94,69 @@ class IndexedStorage<S extends Storable> implements Storage<S>, StorageAccess<S>      {
          mRepository = repository;
          mMasterStorage = masterStorage;
 -        mIndexInfoMap = new IdentityHashMap<StorableIndex<S>, IndexInfo>();
 +        mAllIndexInfoMap = new IdentityHashMap<StorableIndex<S>, IndexInfo>();
          StorableInfo<S> info = StorableIntrospector.examine(masterStorage.getStorableType());
 -        // Determine what the set of indexes should be.
 -        StorableIndexSet<S> newIndexSet = gatherRequiredIndexes(info);
 -
 -        // Mix in the indexes we get for free, but remove after reduce. A free
 -        // index is one that the underlying storage is providing for us. We
 -        // don't want to create redundant indexes.
 -        IndexInfo[] infos = repository.getWrappedRepository()
 -            .getCapability(IndexInfoCapability.class)
 -            .getIndexInfo(masterStorage.getStorableType());
 -
 -        StorableIndex<S>[] freeIndexes = new StorableIndex[infos.length];
 -        for (int i=0; i<infos.length; i++) {
 -            try {
 -                freeIndexes[i] = new StorableIndex<S>(masterStorage.getStorableType(), infos[i]);
 -                newIndexSet.add(freeIndexes[i]);
 -                mIndexInfoMap.put(freeIndexes[i], infos[i]);
 -            } catch (IllegalArgumentException e) {
 -                // Assume index is bogus, so ignore it.
 -            }
 -        }
 -
 -        newIndexSet.reduce(Direction.ASCENDING);
 -
 -        // Gather current indexes.
 -        StorableIndexSet<S> currentIndexSet = new StorableIndexSet<S>();
 -        // Gather indexes to remove.
 -        StorableIndexSet<S> indexesToRemove = new StorableIndexSet<S>();
 -
 -        Query<StoredIndexInfo> query = repository.getWrappedRepository()
 -            .storageFor(StoredIndexInfo.class)
 -            // Primary key of StoredIndexInfo is an index descriptor, which
 -            // starts with the storable type name. This emulates a "wildcard at
 -            // the end" search.
 -            .query("indexName >= ? & indexName < ?")
 -            .with(getStorableType().getName() + '~')
 -            .with(getStorableType().getName() + '~' + '\uffff');
 -
 -        List<StoredIndexInfo> storedInfos;
 -        Transaction txn = repository.getWrappedRepository()
 -            .enterTopTransaction(IsolationLevel.READ_COMMITTED);
 -        try {
 -            storedInfos = query.fetch().toList();
 -        } finally {
 -            txn.exit();
 +        // The set of indexes that the Storable defines, reduced.
 +        final StorableIndexSet<S> desiredIndexSet;
 +        {
 +            desiredIndexSet = gatherDesiredIndexes(info);
 +            desiredIndexSet.reduce(Direction.ASCENDING);
          }
 -        for (StoredIndexInfo indexInfo : storedInfos) {
 -            String name = indexInfo.getIndexName();
 -            StorableIndex index;
 +        // The set of indexes that are populated and available for use. This is
 +        // determined by examining index metadata. If the Storable has not
 +        // changed, it will be the same as desiredIndexSet. If any existing
 +        // indexes use a property whose type has changed, it is added to
 +        // bogusIndexSet. Bogus indexes are removed if repair is enabled.
 +        final StorableIndexSet<S> existingIndexSet;
 +        final StorableIndexSet<S> bogusIndexSet;
 +        {
 +            existingIndexSet = new StorableIndexSet<S>();
 +            bogusIndexSet = new StorableIndexSet<S>();
 +
 +            Query<StoredIndexInfo> query = repository.getWrappedRepository()
 +                .storageFor(StoredIndexInfo.class)
 +                // Primary key of StoredIndexInfo is an index descriptor, which
 +                // starts with the storable type name. This emulates a
 +                // "wildcard at the end" search.
 +                .query("indexName >= ? & indexName < ?")
 +                .with(getStorableType().getName() + '~')
 +                .with(getStorableType().getName() + '~' + '\uffff');
 +
 +            List<StoredIndexInfo> storedInfos;
 +            Transaction txn = repository.getWrappedRepository()
 +                .enterTopTransaction(IsolationLevel.READ_COMMITTED);
              try {
 -                index = StorableIndex.parseNameDescriptor(name, info);
 -            } catch (IllegalArgumentException e) {
 -                // Remove unrecognized descriptors.
 -                unregisterIndex(name);
 -                continue;
 +                storedInfos = query.fetch().toList();
 +            } finally {
 +                txn.exit();
              }
 -            if (index.getTypeDescriptor().equals(indexInfo.getIndexTypeDescriptor())) {
 -                currentIndexSet.add(index);
 -            } else {
 -                indexesToRemove.add(index);
 +
 +            for (StoredIndexInfo indexInfo : storedInfos) {
 +                String name = indexInfo.getIndexName();
 +                StorableIndex index;
 +                try {
 +                    index = StorableIndex.parseNameDescriptor(name, info);
 +                } catch (IllegalArgumentException e) {
 +                    // Remove unrecognized descriptors.
 +                    unregisterIndex(name);
 +                    continue;
 +                }
 +                if (index.getTypeDescriptor().equals(indexInfo.getIndexTypeDescriptor())) {
 +                    existingIndexSet.add(index);
 +                } else {
 +                    bogusIndexSet.add(index);
 +                }
              }
          }
          nonUniqueSearch: {
 -            // If any current indexes are non-unique, then indexes are for an
 +            // If any existing indexes are non-unique, then indexes are for an
              // older version. For compatibility, don't uniquify the
              // indexes. Otherwise, these indexes would need to be rebuilt.
 -            for (StorableIndex<S> index : currentIndexSet) {
 +            for (StorableIndex<S> index : existingIndexSet) {
                  if (!index.isUnique()) {
                      break nonUniqueSearch;
                  }
 @@ -169,58 +167,129 @@ class IndexedStorage<S extends Storable> implements Storage<S>, StorageAccess<S>              // properties. As a side-effect of uniquify, all indexes are
              // unique, and thus have 'U' in the descriptor. Each time
              // nonUniqueSearch is run, it will not find any non-unique indexes.
 -            newIndexSet.uniquify(info);
 +            desiredIndexSet.uniquify(info);
          }
 -        // Remove any old indexes.
 +        // Gather free indexes, which are already provided by the underlying
 +        // storage. They can be used for querying, but we should not manage them.
 +        final StorableIndexSet<S> freeIndexSet;
          {
 -            indexesToRemove.addAll(currentIndexSet);
 +            freeIndexSet = new StorableIndexSet<S>();
 +
 +            IndexInfoCapability cap = repository.getWrappedRepository()
 +                .getCapability(IndexInfoCapability.class);
 +
 +            if (cap != null) {
 +                for (IndexInfo ii : cap.getIndexInfo(masterStorage.getStorableType())) {
 +                    StorableIndex<S> freeIndex;
 +                    try {
 +                        freeIndex = new StorableIndex<S>(masterStorage.getStorableType(), ii);
 +                    } catch (IllegalArgumentException e) {
 +                        // Assume index is malformed, so ignore it.
 +                        continue;
 +                    }
 +                    mAllIndexInfoMap.put(freeIndex, ii);
 +                    freeIndexSet.add(freeIndex);
 +                }
 +            }
 +        }
 -            // Remove "free" indexes, since they don't need to be built.
 -            for (int i=0; i<freeIndexes.length; i++) {
 -                newIndexSet.remove(freeIndexes[i]);
 +        // The set of indexes that can actually be used for querying. If index
 +        // repair is enabled, this set will be the same as
 +        // desiredIndexSet. Otherwise, it will be the intersection of
 +        // existingIndexSet and desiredIndexSet. In both cases, "free" indexes
 +        // are added to the set too.
 +        final StorableIndexSet<S> queryableIndexSet;
 +        {
 +            queryableIndexSet = new StorableIndexSet<S>(desiredIndexSet);
 +
 +            if (!mRepository.isIndexRepairEnabled()) {
 +                // Can only query the intersection.
 +                queryableIndexSet.retainAll(existingIndexSet);
              }
 -            indexesToRemove.removeAll(newIndexSet);
 +            // Add the indexes we get for free.
 +            queryableIndexSet.addAll(freeIndexSet);
 +        }
 +
 +        // The set of indexes that should be kept up-to-date. If index repair
 +        // is enabled, this set will be the same as desiredIndexSet. Otherwise,
 +        // it will be the union of existingIndexSet and desiredIndexSet. In
 +        // both cases, "free" indexes are removed from the set too. By doing a
 +        // union, no harm is caused by changing the index set and then reverting.
 +        final StorableIndexSet<S> managedIndexSet;
 +        {
 +            managedIndexSet = new StorableIndexSet<S>(desiredIndexSet);
 -            for (StorableIndex<S> index : indexesToRemove) {
 -                removeIndex(index);
 +            if (!mRepository.isIndexRepairEnabled()) {
 +                // Must manage the union.
 +                managedIndexSet.addAll(existingIndexSet);
              }
 +
 +            // Remove the indexes we get for free.
 +            managedIndexSet.removeAll(freeIndexSet);
          }
 -        currentIndexSet = newIndexSet;
 +        // The set of indexes that should be removed and no longer managed. If
 +        // index repair is enabled, this set will be the existingIndexSet minus
 +        // desiredIndexSet minus freeIndexSet plus bogusIndexSet. Otherwise, it
 +        // will be empty.
 +        final StorableIndexSet<S> removeIndexSet;
 +        {
 +            removeIndexSet = new StorableIndexSet<S>();
 -        // Open all the indexes.
 -        List<ManagedIndex<S>> managedIndexList = new ArrayList<ManagedIndex<S>>();
 -        for (StorableIndex<S> index : currentIndexSet) {
 -            IndexEntryGenerator<S> builder = IndexEntryGenerator.getInstance(index);
 -            Class<? extends Storable> indexEntryClass = builder.getIndexEntryClass();
 -            Storage<?> indexEntryStorage = repository.getIndexEntryStorageFor(indexEntryClass);
 -            ManagedIndex managedIndex = new ManagedIndex<S>(index, builder, indexEntryStorage);
 +            if (mRepository.isIndexRepairEnabled()) {
 +                removeIndexSet.addAll(existingIndexSet);
 +                removeIndexSet.removeAll(desiredIndexSet);
 +                removeIndexSet.removeAll(freeIndexSet);
 +                removeIndexSet.addAll(bogusIndexSet);
 +            }
 +        }
 -            registerIndex(managedIndex);
 +        // The set of indexes that should be freshly populated. If index repair
 +        // is enabled, this set will be the desiredIndexSet minus
 +        // existingIndexSet minus freeIndexSet. Otherwise, it will be empty.
 +        final StorableIndexSet<S> addIndexSet;
 +        {
 +            addIndexSet = new StorableIndexSet<S>();
 -            mIndexInfoMap.put(index, managedIndex);
 -            managedIndexList.add(managedIndex);
 +            if (mRepository.isIndexRepairEnabled()) {
 +                addIndexSet.addAll(desiredIndexSet);
 +                addIndexSet.removeAll(existingIndexSet);
 +                addIndexSet.removeAll(freeIndexSet);
 +            }
          }
 -        if (managedIndexList.size() > 0) {
 -            // Add trigger to keep indexes up-to-date.
 -            ManagedIndex<S>[] managedIndexes =
 -                managedIndexList.toArray(new ManagedIndex[managedIndexList.size()]);
 +        // Support for managed indexes...
 +        if (managedIndexSet.size() > 0) {
 +            ManagedIndex<S>[] managedIndexes = new ManagedIndex[managedIndexSet.size()];
 +            int i = 0;
 +            for (StorableIndex<S> index : managedIndexSet) {
 +                IndexEntryGenerator<S> builder = IndexEntryGenerator.getInstance(index);
 +                Class<? extends Storable> indexEntryClass = builder.getIndexEntryClass();
 +                Storage<?> indexEntryStorage = repository.getIndexEntryStorageFor(indexEntryClass);
 +                ManagedIndex managedIndex = new ManagedIndex<S>(index, builder, indexEntryStorage);
 +
 +                mAllIndexInfoMap.put(index, managedIndex);
 +                managedIndexes[i++] = managedIndex;
 +            }
              if (!addTrigger(new IndexesTrigger<S>(managedIndexes))) {
                  throw new RepositoryException("Unable to add trigger for managing indexes");
              }
          }
 -        // Add "free" indexes back, in order for query engine to consider them.
 -        for (int i=0; i<freeIndexes.length; i++) {
 -            currentIndexSet.add(freeIndexes[i]);
 +        // Okay, now start doing some damage. First, remove unnecessary indexes.
 +        for (StorableIndex<S> index : removeIndexSet) {
 +            removeIndex(index);
          }
 -        mIndexSet = currentIndexSet;
 +        // Now add new indexes.
 +        for (StorableIndex<S> index : addIndexSet) {
 +            registerIndex((ManagedIndex) mAllIndexInfoMap.get(index));
 +        }
 +        mQueryableIndexSet = queryableIndexSet;
          mQueryEngine = new QueryEngine<S>(masterStorage.getStorableType(), repository);
      }
 @@ -252,16 +321,18 @@ class IndexedStorage<S extends Storable> implements Storage<S>, StorageAccess<S>          return mMasterStorage.removeTrigger(trigger);
      }
 +    // Required by IndexInfoCapability.
      public IndexInfo[] getIndexInfo() {
 -        IndexInfo[] infos = new IndexInfo[mIndexInfoMap.size()];
 -        return mIndexInfoMap.values().toArray(infos);
 +        IndexInfo[] infos = new IndexInfo[mAllIndexInfoMap.size()];
 +        return mAllIndexInfoMap.values().toArray(infos);
      }
 +    // Required by IndexEntryAccessCapability.
      @SuppressWarnings("unchecked")
      public IndexEntryAccessor<S>[] getIndexEntryAccessors() {
          List<IndexEntryAccessor<S>> accessors =
 -            new ArrayList<IndexEntryAccessor<S>>(mIndexInfoMap.size());
 -        for (IndexInfo info : mIndexInfoMap.values()) {
 +            new ArrayList<IndexEntryAccessor<S>>(mAllIndexInfoMap.size());
 +        for (IndexInfo info : mAllIndexInfoMap.values()) {
              if (info instanceof IndexEntryAccessor) {
                  accessors.add((IndexEntryAccessor<S>) info);
              }
 @@ -269,16 +340,19 @@ class IndexedStorage<S extends Storable> implements Storage<S>, StorageAccess<S>          return accessors.toArray(new IndexEntryAccessor[accessors.size()]);
      }
 +    // Required by StorageAccess.
      public QueryExecutorFactory<S> getQueryExecutorFactory() {
          return mQueryEngine;
      }
 +    // Required by StorageAccess.
      public Collection<StorableIndex<S>> getAllIndexes() {
 -        return mIndexSet;
 +        return mQueryableIndexSet;
      }
 +    // Required by StorageAccess.
      public Storage<S> storageDelegate(StorableIndex<S> index) {
 -        if (mIndexInfoMap.get(index) instanceof ManagedIndex) {
 +        if (mAllIndexInfoMap.get(index) instanceof ManagedIndex) {
              // Index is managed by this storage, which is typical.
              return null;
          }
 @@ -336,7 +410,7 @@ class IndexedStorage<S extends Storable> implements Storage<S>, StorageAccess<S>          // reversal. Only the lowest storage layer should examine this
          // parameter.
 -        ManagedIndex<S> indexInfo = (ManagedIndex<S>) mIndexInfoMap.get(index);
 +        ManagedIndex<S> indexInfo = (ManagedIndex<S>) mAllIndexInfoMap.get(index);
          Query<?> query = indexInfo.getIndexEntryQueryFor
              (identityValues == null ? 0 : identityValues.length,
 @@ -383,7 +457,8 @@ class IndexedStorage<S extends Storable> implements Storage<S>, StorageAccess<S>          }
          // New index, so populate it.
 -        managedIndex.populateIndex(mRepository, mMasterStorage);
 +        managedIndex.populateIndex(mRepository, mMasterStorage,
 +                                   mRepository.getIndexRepairThrottle());
          txn = mRepository.getWrappedRepository()
              .enterTopTransaction(IsolationLevel.READ_COMMITTED);
 @@ -444,39 +519,60 @@ class IndexedStorage<S extends Storable> implements Storage<S>, StorageAccess<S>              return;
          }
 -        // Doesn't completely remove the index, but it should free up space.
 -        // TODO: when truncate method exists, call that instead
 -        // TODO: set batchsize based on repository locktable size
 -        int batchSize = 10;
 -        while (true) {
 -            Transaction txn = mRepository.getWrappedRepository()
 -                .enterTopTransaction(IsolationLevel.READ_COMMITTED);
 -            txn.setForUpdate(true);
 -
 -            try {
 -                Cursor<? extends Storable> cursor = indexEntryStorage.query().fetch();
 -                if (!cursor.hasNext()) {
 -                    break;
 -                }
 -                int count = 0;
 +        {
 +            // Doesn't completely remove the index, but it should free up space.
 +            double desiredSpeed = mRepository.getIndexRepairThrottle();
 +            Throttle throttle = desiredSpeed < 1.0 ? new Throttle(POPULATE_THROTTLE_WINDOW) : null;
 +
 +            long totalDropped = 0;
 +            while (true) {
 +                Transaction txn = mRepository.getWrappedRepository()
 +                    .enterTopTransaction(IsolationLevel.READ_COMMITTED);
 +                txn.setForUpdate(true);
                  try {
 -                    while (count++ < batchSize && cursor.hasNext()) {
 -                        cursor.next().tryDelete();
 +                    Cursor<? extends Storable> cursor = indexEntryStorage.query().fetch();
 +                    if (!cursor.hasNext()) {
 +                        break;
 +                    }
 +                    int count = 0;
 +                        final long savedTotal = totalDropped;
 +                        boolean anyFailure = false;
 +                        try {
 +                            while (count++ < POPULATE_BATCH_SIZE && cursor.hasNext()) {
 +                                if (cursor.next().tryDelete()) {
 +                                    totalDropped++;
 +                                } else {
 +                                    anyFailure = true;
 +                                }
 +                        }
 +                    } finally {
 +                        cursor.close();
                      }
 -                } finally {
 -                    cursor.close();
 -                }
 -                if (txn != null) {
                      txn.commit();
 -                }
 -            } catch (FetchException e) {
 -                throw e.toPersistException();
 -            } finally {
 -                if (txn != null) {
 +                    if (log.isInfoEnabled()) {
 +                        log.info("Removed " + totalDropped + " index entries");
 +                    }
 +                        if (anyFailure && totalDropped <= savedTotal) {
 +                            log.warn("No indexes removed in last batch. " +
 +                                     "Aborting index removal cleanup");
 +                            break;
 +                        }
 +                } catch (FetchException e) {
 +                    throw e.toPersistException();
 +                } finally {
                      txn.exit();
                  }
 +
 +                if (throttle != null) {
 +                    try {
 +                        throttle.throttle(desiredSpeed, POPULATE_THROTTLE_SLEEP_PRECISION);
 +                    } catch (InterruptedException e) {
 +                        throw new RepositoryException("Index removal interrupted");
 +                    }
 +                }
              }
          }
 +
          unregisterIndex(index);
      }
  }
 diff --git a/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java b/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java index 958bd7b..2e4cea3 100644 --- a/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java +++ b/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java @@ -45,13 +45,19 @@ import com.amazon.carbonado.spi.RepairExecutor;  import com.amazon.carbonado.qe.BoundaryType;
 +import com.amazon.carbonado.util.Throttle;
 +
  /**
   * Encapsulates info and operations for a single index.
   *
   * @author Brian S O'Neill
   */
  class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {
 -    private static final int POPULATE_BATCH_SIZE = 256;
 +    private static final int POPULATE_SORT_BUFFER_SIZE = 65536;
 +    private static final int POPULATE_INFO_DELAY_MILLIS = 5000;
 +    static final int POPULATE_BATCH_SIZE = 128;
 +    static final int POPULATE_THROTTLE_WINDOW = POPULATE_BATCH_SIZE * 10;
 +    static final int POPULATE_THROTTLE_SLEEP_PRECISION = 10;
      private final StorableIndex mIndex;
      private final IndexEntryGenerator<S> mGenerator;
 @@ -293,7 +299,9 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {       *
       * @param repo used to enter transactions
       */
 -    void populateIndex(Repository repo, Storage<S> masterStorage) throws RepositoryException {
 +    void populateIndex(Repository repo, Storage<S> masterStorage, double desiredSpeed)
 +        throws RepositoryException
 +    {
          MergeSortBuffer buffer;
          Comparator c;
 @@ -325,12 +333,21 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {                  // Preload and sort all index entries for improved performance.
 -                buffer = new MergeSortBuffer(mIndexEntryStorage);
 +                buffer = new MergeSortBuffer(mIndexEntryStorage, null, POPULATE_SORT_BUFFER_SIZE);
                  c = mGenerator.getComparator();
                  buffer.prepare(c);
 +                long nextReportTime = System.currentTimeMillis() + POPULATE_INFO_DELAY_MILLIS;
                  while (cursor.hasNext()) {
                      buffer.add(makeIndexEntry(cursor.next()));
 +
 +                    if (log.isInfoEnabled()) {
 +                        long now = System.currentTimeMillis();
 +                        if (now >= nextReportTime) {
 +                            log.info("Prepared " + buffer.size() + " new index entries");
 +                            nextReportTime = now + POPULATE_INFO_DELAY_MILLIS;
 +                        }
 +                    }
                  }
                  // No need to commit transaction because no changes should have been made.
 @@ -341,6 +358,8 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {              txn.exit();
          }
 +        // This is not expected to take long, since MergeSortBuffer sorts as
 +        // needed. This just finishes off what was not written to a file.
          buffer.sort();
          if (isUnique()) {
 @@ -348,6 +367,10 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {              // _before_ inserting index entries. If there are duplicates,
              // fail, since unique index cannot be built.
 +            if (log.isInfoEnabled()) {
 +                log.info("Verifying unique index");
 +            }
 +
              Object last = null;
              for (Object obj : buffer) {
                  if (last != null) {
 @@ -363,6 +386,13 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {          }
          final int bufferSize = buffer.size();
 +
 +        if (log.isInfoEnabled()) {
 +            log.info("Begin insert of " + bufferSize + " new index entries");
 +        }
 +
 +        Throttle throttle = desiredSpeed < 1.0 ? new Throttle(POPULATE_THROTTLE_WINDOW) : null;
 +
          int totalInserted = 0;
          txn = repo.enterTopTransaction(IsolationLevel.READ_COMMITTED);
 @@ -386,6 +416,7 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {                          indexEntry.tryInsert();
                      }
                  }
 +
                  totalInserted++;
                  if (totalInserted % POPULATE_BATCH_SIZE == 0) {
                      txn.commit();
 @@ -399,7 +430,16 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {                      txn = repo.enterTopTransaction(IsolationLevel.READ_COMMITTED);
                  }
 +
 +                if (throttle != null) {
 +                    try {
 +                        throttle.throttle(desiredSpeed, POPULATE_THROTTLE_SLEEP_PRECISION);
 +                    } catch (InterruptedException e) {
 +                        throw new RepositoryException("Index populate interrupted");
 +                    }
 +                }
              }
 +
              txn.commit();
          } finally {
              txn.exit();
 diff --git a/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBRepositoryBuilder.java b/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBRepositoryBuilder.java index e332157..1f60bfd 100644 --- a/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBRepositoryBuilder.java +++ b/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBRepositoryBuilder.java @@ -86,6 +86,8 @@ public class BDBRepositoryBuilder extends AbstractRepositoryBuilder {      private File mDataHome;
      private String mSingleFileName;
      private boolean mIndexSupport = true;
 +    private boolean mIndexRepairEnabled = true;
 +    private double mIndexThrottle = 1.0;
      private boolean mReadOnly;
      private Long mCacheSize;
      private double mLockTimeout = 0.5;
 @@ -120,6 +122,8 @@ public class BDBRepositoryBuilder extends AbstractRepositoryBuilder {                  IndexedRepositoryBuilder ixBuilder = new IndexedRepositoryBuilder();
                  ixBuilder.setWrappedRepository(this);
                  ixBuilder.setMaster(isMaster());
 +                ixBuilder.setIndexRepairEnabled(mIndexRepairEnabled);
 +                ixBuilder.setIndexRepairThrottle(mIndexThrottle);
                  return ixBuilder.build(rootRef);
              } finally {
                  mIndexSupport = true;
 @@ -339,6 +343,53 @@ public class BDBRepositoryBuilder extends AbstractRepositoryBuilder {      }
      /**
 +     * @see #setIndexRepairEnabled(boolean)
 +     *
 +     * @return true by default
 +     */
 +    public boolean isIndexRepairEnabled() {
 +        return mIndexRepairEnabled;
 +    }
 +
 +    /**
 +     * By default, index repair is enabled. In this mode, the first time a
 +     * Storable type is used, new indexes are populated and old indexes are
 +     * removed. Until finished, access to the Storable is blocked.
 +     *
 +     * <p>When index repair is disabled, the Storable is immediately
 +     * available. This does have consequences, however. The set of indexes
 +     * available for queries is defined by the <i>intersection</i> of the old
 +     * and new index sets. The set of indexes that are kept up-to-date is
 +     * defined by the <i>union</i> of the old and new index sets.
 +     *
 +     * <p>While index repair is disabled, another process can safely repair the
 +     * indexes in the background. When it is complete, index repair can be
 +     * enabled for this repository too.
 +     */
 +    public void setIndexRepairEnabled(boolean enabled) {
 +        mIndexRepairEnabled = enabled;
 +    }
 +
 +    /**
 +     * Returns the throttle parameter used when indexes are added, dropped or
 +     * bulk repaired. By default this value is 1.0, or maximum speed.
 +     */
 +    public double getIndexRepairThrottle() {
 +        return mIndexThrottle;
 +    }
 +
 +    /**
 +     * Sets the throttle parameter used when indexes are added, dropped or bulk
 +     * repaired. By default this value is 1.0, or maximum speed.
 +     *
 +     * @param desiredSpeed 1.0 = perform work at full speed,
 +     * 0.5 = perform work at half speed, 0.0 = fully suspend work
 +     */
 +    public void setIndexRepairThrottle(double desiredSpeed) {
 +        mIndexThrottle = desiredSpeed;
 +    }
 +
 +    /**
       * Sets the repository to read-only mode. By default, repository is opened
       * for reads and writes.
       */
 | 
