diff options
Diffstat (limited to 'src')
3 files changed, 90 insertions, 42 deletions
| diff --git a/src/main/java/com/amazon/carbonado/repo/indexed/IndexEntryAccessor.java b/src/main/java/com/amazon/carbonado/repo/indexed/IndexEntryAccessor.java index 42ce214..1b7f910 100644 --- a/src/main/java/com/amazon/carbonado/repo/indexed/IndexEntryAccessor.java +++ b/src/main/java/com/amazon/carbonado/repo/indexed/IndexEntryAccessor.java @@ -20,6 +20,7 @@ package com.amazon.carbonado.repo.indexed;  import java.util.Comparator;
 +import com.amazon.carbonado.RepositoryException;
  import com.amazon.carbonado.Storable;
  import com.amazon.carbonado.Storage;
 @@ -30,6 +31,7 @@ import com.amazon.carbonado.capability.IndexInfo;   * inspection and repair.
   *
   * @author Brian S O'Neill
 + * @see IndexEntryAccessCapability
   */
  public interface IndexEntryAccessor<S extends Storable> extends IndexInfo {
      /**
 @@ -66,6 +68,15 @@ public interface IndexEntryAccessor<S extends Storable> extends IndexInfo {      boolean isConsistent(Storable indexEntry, S master);
      /**
 +     * Repairs the index by inserting missing entries and fixing
 +     * inconsistencies.
 +     *
 +     * @param desiredSpeed throttling parameter - 1.0 = full speed, 0.5 = half
 +     * speed, 0.1 = one-tenth speed, etc
 +     */
 +    void repair(double desiredSpeed) throws RepositoryException;
 +
 +    /**
       * Returns a comparator for ordering index entries.
       */
      Comparator<? extends Storable> getComparator();
 diff --git a/src/main/java/com/amazon/carbonado/repo/indexed/IndexedStorage.java b/src/main/java/com/amazon/carbonado/repo/indexed/IndexedStorage.java index cefb031..fb55433 100644 --- a/src/main/java/com/amazon/carbonado/repo/indexed/IndexedStorage.java +++ b/src/main/java/com/amazon/carbonado/repo/indexed/IndexedStorage.java @@ -287,7 +287,8 @@ class IndexedStorage<S extends Storable> implements Storage<S>, StorageAccess<S>                  IndexEntryGenerator<S> builder = IndexEntryGenerator.getInstance(index);
                  Class<? extends Storable> indexEntryClass = builder.getIndexEntryClass();
                  Storage<?> indexEntryStorage = repository.getIndexEntryStorageFor(indexEntryClass);
 -                ManagedIndex managedIndex = new ManagedIndex<S>(index, builder, indexEntryStorage);
 +                ManagedIndex managedIndex =
 +                    new ManagedIndex<S>(this, index, builder, indexEntryStorage);
                  mAllIndexInfoMap.put(index, managedIndex);
                  managedIndexes[i++] = managedIndex;
 @@ -510,9 +511,8 @@ class IndexedStorage<S extends Storable> implements Storage<S>, StorageAccess<S>              }
          }
 -        // New index, so populate it.
 -        managedIndex.populateIndex(mRepository, mMasterStorage,
 -                                   mRepository.getIndexRepairThrottle());
 +        // New index, so build it.
 +        managedIndex.buildIndex(mRepository.getIndexRepairThrottle());
          boolean top = true;
          while (true) {
 diff --git a/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java b/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java index ea3bbb3..5806529 100644 --- a/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java +++ b/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java @@ -29,6 +29,7 @@ import com.amazon.carbonado.CorruptEncodingException;  import com.amazon.carbonado.Cursor;
  import com.amazon.carbonado.FetchException;
  import com.amazon.carbonado.IsolationLevel;
 +import com.amazon.carbonado.OptimisticLockException;
  import com.amazon.carbonado.PersistException;
  import com.amazon.carbonado.Query;
  import com.amazon.carbonado.Repository;
 @@ -62,23 +63,26 @@ import com.amazon.carbonado.util.Throttle;   * @author Brian S O'Neill
   */
  class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {
 -    private static final int POPULATE_SORT_BUFFER_SIZE = 65536;
 -    private static final int POPULATE_INFO_DELAY_MILLIS = 5000;
 -    static final int POPULATE_BATCH_SIZE = 128;
 -    static final int POPULATE_THROTTLE_WINDOW = POPULATE_BATCH_SIZE * 10;
 -    static final int POPULATE_THROTTLE_SLEEP_PRECISION = 10;
 +    private static final int BUILD_SORT_BUFFER_SIZE = 65536;
 +    private static final int BUILD_INFO_DELAY_MILLIS = 5000;
 +    static final int BUILD_BATCH_SIZE = 128;
 +    static final int BUILD_THROTTLE_WINDOW = BUILD_BATCH_SIZE * 10;
 +    static final int BUILD_THROTTLE_SLEEP_PRECISION = 10;
 +    private final IndexedStorage<S> mIndexedStorage;
      private final StorableIndex mIndex;
      private final IndexEntryGenerator<S> mGenerator;
      private final Storage<?> mIndexEntryStorage;
      private Query<?> mSingleMatchQuery;
 -    ManagedIndex(StorableIndex<S> index,
 +    ManagedIndex(IndexedStorage<S> indexedStorage,
 +                 StorableIndex<S> index,
                   IndexEntryGenerator<S> generator,
                   Storage<?> indexEntryStorage)
          throws SupportException
      {
 +        mIndexedStorage = indexedStorage;
          mIndex = index;
          mGenerator = generator;
          mIndexEntryStorage = indexEntryStorage;
 @@ -139,6 +143,18 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {      }
      // Required by IndexEntryAccessor interface.
 +    public void repair(double desiredSpeed) throws RepositoryException {
 +        /* FIXME: Delete bogus entries. Index inserts and updates are committed
 +           in batches, which covers a contiguous range of entries. Before
 +           committing the batch, a range query is executed on the index and
 +           verifies the batch about to be committed. Any entries that aren't in
 +           the batch are to be deleted. The progress message that is logged
 +           should be changed to show indexes inserted, updated, and deleted.
 +        */
 +        buildIndex(desiredSpeed);
 +    }
 +
 +    // Required by IndexEntryAccessor interface.
      public Comparator<? extends Storable> getComparator() {
          return mGenerator.getComparator();
      }
 @@ -208,13 +224,14 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {      }
      /**
 -     * Populates the entire index, repairing as it goes.
 +     * Build the entire index, repairing as it goes.
       *
       * @param repo used to enter transactions
       */
 -    void populateIndex(Repository repo, Storage<S> masterStorage, double desiredSpeed)
 -        throws RepositoryException
 -    {
 +    void buildIndex(double desiredSpeed) throws RepositoryException {
 +        Repository repo = mIndexedStorage.mRepository;
 +        Storage<S> masterStorage = mIndexedStorage.mMasterStorage;
 +
          MergeSortBuffer buffer;
          Comparator c;
 @@ -248,7 +265,7 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {              try {
                  Cursor<S> cursor = masterQuery.fetch();
                  if (!cursor.hasNext()) {
 -                    // Nothing exists in master, so nothing to populate.
 +                    // Nothing exists in master, so nothing to build.
                      return;
                  }
              } finally {
 @@ -264,7 +281,7 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {              try {
                  if (log.isInfoEnabled()) {
                      StringBuilder b = new StringBuilder();
 -                    b.append("Populating index on ");
 +                    b.append("Building index on ");
                      b.append(masterStorage.getStorableType().getName());
                      b.append(": ");
                      try {
 @@ -277,11 +294,11 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {                  // Preload and sort all index entries for improved performance.
 -                buffer = new MergeSortBuffer(mIndexEntryStorage, null, POPULATE_SORT_BUFFER_SIZE);
 +                buffer = new MergeSortBuffer(mIndexEntryStorage, null, BUILD_SORT_BUFFER_SIZE);
                  c = mGenerator.getComparator();
                  buffer.prepare(c);
 -                long nextReportTime = System.currentTimeMillis() + POPULATE_INFO_DELAY_MILLIS;
 +                long nextReportTime = System.currentTimeMillis() + BUILD_INFO_DELAY_MILLIS;
                  // These variables are used when corrupt records are encountered.
                  S lastUserStorable = null;
 @@ -313,8 +330,8 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {                      if (log.isInfoEnabled()) {
                          long now = System.currentTimeMillis();
                          if (now >= nextReportTime) {
 -                            log.info("Prepared " + buffer.size() + " new index entries");
 -                            nextReportTime = now + POPULATE_INFO_DELAY_MILLIS;
 +                            log.info("Prepared " + buffer.size() + " index entries");
 +                            nextReportTime = now + BUILD_INFO_DELAY_MILLIS;
                          }
                      }
 @@ -359,44 +376,52 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {          final int bufferSize = buffer.size();
          if (log.isInfoEnabled()) {
 -            log.info("Begin insert of " + bufferSize + " new index entries");
 +            log.info("Begin build of " + bufferSize + " index entries");
          }
 -        Throttle throttle = desiredSpeed < 1.0 ? new Throttle(POPULATE_THROTTLE_WINDOW) : null;
 +        Throttle throttle = desiredSpeed < 1.0 ? new Throttle(BUILD_THROTTLE_WINDOW) : null;
 -        int totalInserted = 0;
 +        long totalInserted = 0;
 +        long totalUpdated = 0;
 +        long totalDeleted = 0;
 +        long totalProgress = 0;
          txn = repo.enterTopTransaction(IsolationLevel.READ_COMMITTED);
          try {
 +
              for (Object obj : buffer) {
                  Storable indexEntry = (Storable) obj;
 -                if (!indexEntry.tryInsert()) {
 +                if (indexEntry.tryInsert()) {
 +                    totalInserted++;
 +                } else {
                      // Couldn't insert because an index entry already exists.
                      Storable existing = indexEntry.copy();
 -                    if (existing.tryLoad()) {
 -                        if (!existing.equalProperties(indexEntry)) {
 -                            // Existing entry differs, so update it.
 -                            indexEntry.copyUnequalProperties(existing);
 -                            existing.tryUpdate();
 -                            indexEntry = existing;
 -                        }
 -                    } else {
 -                        // Couldn't find existing entry for some reason, so
 -                        // repair by brute force.
 +                    boolean doUpdate = false;
 +                    if (!existing.tryLoad()) {
 +                        doUpdate = true;
 +                    } else if (!existing.equalProperties(indexEntry)) {
 +                        // If only the version differs, leave existing entry alone.
 +                        indexEntry.copyVersionProperty(existing);
 +                        doUpdate = !existing.equalProperties(indexEntry);
 +                    }
 +                    if (doUpdate) {
                          indexEntry.tryDelete();
                          indexEntry.tryInsert();
 +                        totalUpdated++;
                      }
                  }
 -                totalInserted++;
 -                if (totalInserted % POPULATE_BATCH_SIZE == 0) {
 +                totalProgress++;
 +
 +                if (totalProgress % BUILD_BATCH_SIZE == 0) {
                      txn.commit();
                      txn.exit();
                      if (log.isInfoEnabled()) {
 -                        String format = "Committed %d new index entries (%.3f%%)";
 -                        double percent = 100.0 * totalInserted / bufferSize;
 -                        log.info(String.format(format, totalInserted, percent));
 +                        String format = "Index build progress: %.3f%% " +
 +                            progressSubMessgage(totalInserted, totalUpdated, totalDeleted);
 +                        double percent = 100.0 * totalProgress / bufferSize;
 +                        log.info(String.format(format, percent));
                      }
                      txn = repo.enterTopTransaction(IsolationLevel.READ_COMMITTED);
 @@ -404,9 +429,9 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {                  if (throttle != null) {
                      try {
 -                        throttle.throttle(desiredSpeed, POPULATE_THROTTLE_SLEEP_PRECISION);
 +                        throttle.throttle(desiredSpeed, BUILD_THROTTLE_SLEEP_PRECISION);
                      } catch (InterruptedException e) {
 -                        throw new RepositoryException("Index populate interrupted");
 +                        throw new RepositoryException("Index build interrupted");
                      }
                  }
              }
 @@ -418,7 +443,19 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {          }
          if (log.isInfoEnabled()) {
 -            log.info("Finished inserting " + totalInserted + " new index entries");
 +            log.info("Finished building " + totalProgress + " index entries " +
 +                     progressSubMessgage(totalInserted, totalUpdated, totalDeleted));
 +        }
 +    }
 +
 +    private String progressSubMessgage(long totalInserted, long totalUpdated, long totalDeleted) {
 +        if (totalDeleted > 0) {
 +            return "(" + totalInserted + " inserted, " + totalUpdated + " updated, " +
 +                totalDeleted + " deleted)";
 +        } else if (totalUpdated > 0) {
 +            return "(" + totalInserted + " inserted, " + totalUpdated + " updated)";
 +        } else {
 +            return "(" + totalInserted + " inserted)";
          }
      }
 | 
