diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java | 145 | 
1 files changed, 110 insertions, 35 deletions
| diff --git a/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java b/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java index a3cc790..a8f5788 100644 --- a/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java +++ b/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java @@ -66,6 +66,22 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {      static final int BUILD_THROTTLE_WINDOW = BUILD_BATCH_SIZE * 10;
      static final int BUILD_THROTTLE_SLEEP_PRECISION = 10;
 +    private static String[] naturalOrdering(Class<? extends Storable> type) {
 +        StorableKey<?> pk = StorableIntrospector.examine(type).getPrimaryKey();
 +        String[] naturalOrdering = new String[pk.getProperties().size()];
 +        int i=0;
 +        for (OrderedProperty<?> prop : pk.getProperties()) {
 +            String orderBy;
 +            if (prop.getDirection() == Direction.DESCENDING) {
 +                orderBy = prop.toString();
 +            } else {
 +                orderBy = prop.getChainedProperty().toString();
 +            }
 +            naturalOrdering[i++] = orderBy;
 +        }
 +        return naturalOrdering;
 +    }
 +
      private final IndexedStorage<S> mIndexedStorage;
      private final StorableIndex mIndex;
      private final IndexEntryGenerator<S> mGenerator;
 @@ -141,13 +157,6 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {      // Required by IndexEntryAccessor interface.
      public void repair(double desiredSpeed) throws RepositoryException {
 -        /* FIXME: Delete bogus entries. Index inserts and updates are committed
 -           in batches, which covers a contiguous range of entries. Before
 -           committing the batch, a range query is executed on the index and
 -           verifies the batch about to be committed. Any entries that aren't in
 -           the batch are to be deleted. The progress message that is logged
 -           should be changed to show indexes inserted, updated, and deleted.
 -        */
          buildIndex(desiredSpeed);
      }
 @@ -227,34 +236,21 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {       * @param repo used to enter transactions
       */
      void buildIndex(double desiredSpeed) throws RepositoryException {
 -        Repository repo = mIndexedStorage.mRepository;
 -        Storage<S> masterStorage = mIndexedStorage.mMasterStorage;
 +        final Repository repo = mIndexedStorage.mRepository;
 +        final Storage<S> masterStorage = mIndexedStorage.mMasterStorage;
 -        MergeSortBuffer buffer;
 -        Comparator c;
 +        final MergeSortBuffer buffer;
 +        final Comparator c;
 -        Log log = LogFactory.getLog(IndexedStorage.class);
 +        final Log log = LogFactory.getLog(IndexedStorage.class);
 -        Query<S> masterQuery;
 +        final Query<S> masterQuery;
          {
              // Need to explicitly order master query by primary key in order
              // for fetchAfter to work correctly in case corrupt records are
              // encountered.
 -            StorableKey<S> pk = StorableIntrospector
 -                .examine(masterStorage.getStorableType()).getPrimaryKey();
 -            String[] naturalOrdering = new String[pk.getProperties().size()];
 -            int i=0;
 -            for (OrderedProperty<S> prop : pk.getProperties()) {
 -                String orderBy;
 -                if (prop.getDirection() == Direction.DESCENDING) {
 -                    orderBy = prop.toString();
 -                } else {
 -                    orderBy = prop.getChainedProperty().toString();
 -                }
 -                naturalOrdering[i++] = orderBy;
 -            }
 -
 -            masterQuery = masterStorage.query().orderBy(naturalOrdering);
 +            masterQuery = masterStorage.query()
 +                .orderBy(naturalOrdering(masterStorage.getStorableType()));
          }
          // Quick check to see if any records exist in master.
 @@ -293,7 +289,7 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {                  // Preload and sort all index entries for improved performance.
                  buffer = new MergeSortBuffer(mIndexEntryStorage, null, BUILD_SORT_BUFFER_SIZE);
 -                c = mGenerator.getComparator();
 +                c = getComparator();
                  buffer.prepare(c);
                  long nextReportTime = System.currentTimeMillis() + BUILD_INFO_DELAY_MILLIS;
 @@ -377,6 +373,10 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {              log.info("Begin build of " + bufferSize + " index entries");
          }
 +        // Need this index entry query for deleting bogus entries.
 +        final Query indexEntryQuery = mIndexEntryStorage.query()
 +            .orderBy(naturalOrdering(mIndexEntryStorage.getStorableType()));
 +
          Throttle throttle = desiredSpeed < 1.0 ? new Throttle(BUILD_THROTTLE_WINDOW) : null;
          long totalInserted = 0;
 @@ -386,6 +386,17 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {          txn = repo.enterTopTransaction(IsolationLevel.READ_COMMITTED);
          try {
 +            txn.setForUpdate(true);
 +
 +            Cursor<? extends Storable> indexEntryCursor = indexEntryQuery.fetch();
 +            Storable existingIndexEntry = null;
 +
 +            if (!indexEntryCursor.hasNext()) {
 +                indexEntryCursor.close();
 +                // Don't try opening again.
 +                indexEntryCursor = null;
 +            }
 +
              for (Object obj : buffer) {
                  Storable indexEntry = (Storable) obj;
                  if (indexEntry.tryInsert()) {
 @@ -408,6 +419,37 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {                      }
                  }
 +                if (indexEntryCursor != null) {
 +                    while (true) {
 +                        if (existingIndexEntry == null) {
 +                            if (indexEntryCursor.hasNext()) {
 +                                existingIndexEntry = indexEntryCursor.next();
 +                            } else {
 +                                indexEntryCursor.close();
 +                                // Don't try opening again.
 +                                indexEntryCursor = null;
 +                                break;
 +                            }
 +                        }
 +
 +                        int compare = c.compare(existingIndexEntry, indexEntry);
 +
 +                        if (compare == 0) {
 +                            // Existing entry cursor matches so allow cursor to advance.
 +                            existingIndexEntry = null;
 +                            break;
 +                        } else if (compare > 0) {
 +                            // Existing index entry is ahead so check later.
 +                            break;
 +                        } else {
 +                            // Existing index entry is bogus.
 +                            existingIndexEntry.tryDelete();
 +                            totalDeleted++;
 +                            existingIndexEntry = null;
 +                        }
 +                    }
 +                }
 +
                  totalProgress++;
                  if (totalProgress % BUILD_BATCH_SIZE == 0) {
 @@ -422,6 +464,18 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {                      }
                      txn = repo.enterTopTransaction(IsolationLevel.READ_COMMITTED);
 +
 +                    if (indexEntryCursor != null) {
 +                        indexEntryCursor.close();
 +                        indexEntryCursor = indexEntryQuery.fetchAfter(indexEntry);
 +                        existingIndexEntry = null;
 +
 +                        if (!indexEntryCursor.hasNext()) {
 +                            indexEntryCursor.close();
 +                            // Don't try opening again.
 +                            indexEntryCursor = null;
 +                        }
 +                    }
                  }
                  if (throttle != null) {
 @@ -446,14 +500,35 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {      }
      private String progressSubMessgage(long totalInserted, long totalUpdated, long totalDeleted) {
 +        StringBuilder b = new StringBuilder();
 +        b.append('(');
 +
 +        if (totalInserted > 0) {
 +            b.append(totalInserted);
 +            b.append(" inserted");
 +        }
 +        if (totalUpdated > 0) {
 +            if (b.length() > 1) {
 +                b.append(", ");
 +            }
 +            b.append(totalUpdated);
 +            b.append(" updated");
 +        }
          if (totalDeleted > 0) {
 -            return "(" + totalInserted + " inserted, " + totalUpdated + " updated, " +
 -                totalDeleted + " deleted)";
 -        } else if (totalUpdated > 0) {
 -            return "(" + totalInserted + " inserted, " + totalUpdated + " updated)";
 -        } else {
 -            return "(" + totalInserted + " inserted)";
 +            if (b.length() > 1) {
 +                b.append(", ");
 +            }
 +            b.append(totalDeleted);
 +            b.append(" deleted");
          }
 +
 +        if (b.length() == 1) {
 +            b.append("no changes made");
 +        }
 +
 +        b.append(')');
 +
 +        return b.toString();
      }
      private Storable makeIndexEntry(S userStorable) throws PersistException {
 | 
