diff options
Diffstat (limited to 'src/main')
| -rw-r--r-- | src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java | 140 | 
1 files changed, 93 insertions, 47 deletions
| diff --git a/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java b/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java index 6c28619..dbc7c06 100644 --- a/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java +++ b/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java @@ -21,6 +21,7 @@ package com.amazon.carbonado.repo.indexed;  import java.lang.reflect.UndeclaredThrowableException;
  import java.util.Comparator;
 +import java.util.Iterator;
  import org.apache.commons.logging.Log;
  import org.apache.commons.logging.LogFactory;
 @@ -293,20 +294,6 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {                  .orderBy(naturalOrdering(mMasterStorage.getStorableType()));
          }
 -        // Quick check to see if any records exist in master.
 -        {
 -            Transaction txn = mRepository.enterTransaction(IsolationLevel.READ_COMMITTED);
 -            try {
 -                Cursor<S> cursor = masterQuery.fetch();
 -                if (!cursor.hasNext()) {
 -                    // Nothing exists in master, so nothing to build.
 -                    return;
 -                }
 -            } finally {
 -                txn.exit();
 -            }
 -        }
 -
          // Enter top transaction with isolation level of none to make sure
          // preload operation does not run in a long nested transaction.
          Transaction txn = mRepository.enterTopTransaction(IsolationLevel.NONE);
 @@ -437,25 +424,38 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {                  indexEntryCursor = null;
              }
 -            for (Object obj : buffer) {
 -                Storable indexEntry = (Storable) obj;
 -                if (indexEntry.tryInsert()) {
 -                    totalInserted++;
 +            Iterator it = buffer.iterator();
 +            bufferIterate: while (true) {
 +                Object obj;
 +                if (it.hasNext()) {
 +                    obj = it.next();
 +                } else if (indexEntryCursor != null && indexEntryCursor.hasNext()) {
 +                    obj = null;
                  } else {
 -                    // Couldn't insert because an index entry already exists.
 -                    Storable existing = indexEntry.copy();
 -                    boolean doUpdate = false;
 -                    if (!existing.tryLoad()) {
 -                        doUpdate = true;
 -                    } else if (!existing.equalProperties(indexEntry)) {
 -                        // If only the version differs, leave existing entry alone.
 -                        indexEntry.copyVersionProperty(existing);
 -                        doUpdate = !existing.equalProperties(indexEntry);
 -                    }
 -                    if (doUpdate) {
 -                        indexEntry.tryDelete();
 -                        indexEntry.tryInsert();
 -                        totalUpdated++;
 +                    break;
 +                }
 +
 +                Storable indexEntry = (Storable) obj;
 +
 +                if (indexEntry != null) {
 +                    if (indexEntry.tryInsert()) {
 +                        totalInserted++;
 +                    } else {
 +                        // Couldn't insert because an index entry already exists.
 +                        Storable existing = indexEntry.copy();
 +                        boolean doUpdate = false;
 +                        if (!existing.tryLoad()) {
 +                            doUpdate = true;
 +                        } else if (!existing.equalProperties(indexEntry)) {
 +                            // If only the version differs, leave existing entry alone.
 +                            indexEntry.copyVersionProperty(existing);
 +                            doUpdate = !existing.equalProperties(indexEntry);
 +                        }
 +                        if (doUpdate) {
 +                            indexEntry.tryDelete();
 +                            indexEntry.tryInsert();
 +                            totalUpdated++;
 +                        }
                      }
                  }
 @@ -484,32 +484,60 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {                          } else {
                              // Existing index entry is bogus.
                              existingIndexEntry.tryDelete();
 +
                              totalDeleted++;
 +
 +                            if (totalDeleted % BUILD_BATCH_SIZE == 0) {
 +                                txn.commit();
 +                                txn.exit();
 +
 +                                logProgress(log, totalProgress, bufferSize,
 +                                            totalInserted, totalUpdated, totalDeleted);
 +
 +                                txn = mRepository
 +                                    .enterTopTransaction(IsolationLevel.READ_COMMITTED);
 +
 +                                indexEntryCursor.close();
 +                                indexEntryCursor = indexEntryQuery.fetchAfter(existingIndexEntry);
 +
 +                                if (!indexEntryCursor.hasNext()) {
 +                                    indexEntryCursor.close();
 +                                    // Don't try opening again.
 +                                    indexEntryCursor = null;
 +                                    break;
 +                                }
 +                            }
 +
                              existingIndexEntry = null;
 +
 +                            throttle(throttle, desiredSpeed);
                          }
                      }
                  }
 -                totalProgress++;
 +                if (indexEntry != null) {
 +                    totalProgress++;
 +                }
                  if (totalProgress % BUILD_BATCH_SIZE == 0) {
                      txn.commit();
                      txn.exit();
 -                    if (log.isInfoEnabled()) {
 -                        String format = "Index build progress: %.3f%% " +
 -                            progressSubMessgage(totalInserted, totalUpdated, totalDeleted);
 -                        double percent = 100.0 * totalProgress / bufferSize;
 -                        log.info(String.format(format, percent));
 -                    }
 +                    logProgress(log, totalProgress, bufferSize,
 +                                totalInserted, totalUpdated, totalDeleted);
                      txn = mRepository.enterTopTransaction(IsolationLevel.READ_COMMITTED);
                      if (indexEntryCursor != null) {
                          indexEntryCursor.close();
 -                        indexEntryCursor = indexEntryQuery.fetchAfter(indexEntry);
                          existingIndexEntry = null;
 +                        if (indexEntry == null) {
 +                            indexEntryCursor = indexEntryQuery.fetch();
 +                        } else {
 +                            indexEntryCursor = indexEntryQuery.fetchAfter(indexEntry);
 +                        }
 +
                          if (!indexEntryCursor.hasNext()) {
                              indexEntryCursor.close();
                              // Don't try opening again.
 @@ -518,13 +546,7 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {                      }
                  }
 -                if (throttle != null) {
 -                    try {
 -                        throttle.throttle(desiredSpeed, BUILD_THROTTLE_SLEEP_PRECISION);
 -                    } catch (InterruptedException e) {
 -                        throw new RepositoryException("Index build interrupted");
 -                    }
 -                }
 +                throttle(throttle, desiredSpeed);
              }
              txn.commit();
 @@ -539,6 +561,30 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {          }
      }
 +    private static void throttle(Throttle throttle, double desiredSpeed)
 +        throws RepositoryException
 +    {
 +        if (throttle != null) {
 +            try {
 +                throttle.throttle(desiredSpeed, BUILD_THROTTLE_SLEEP_PRECISION);
 +            } catch (InterruptedException e) {
 +                throw new RepositoryException("Index build interrupted");
 +            }
 +        }
 +    }
 +
 +    private void logProgress(Log log,
 +                             long totalProgress, int bufferSize,
 +                             long totalInserted, long totalUpdated, long totalDeleted)
 +    {
 +        if (log.isInfoEnabled()) {
 +            String format = "Index build progress: %.3f%% " +
 +                progressSubMessgage(totalInserted, totalUpdated, totalDeleted);
 +            double percent = 100.0 * totalProgress / bufferSize;
 +            log.info(String.format(format, percent));
 +        }
 +    }
 +
      private String progressSubMessgage(long totalInserted, long totalUpdated, long totalDeleted) {
          StringBuilder b = new StringBuilder();
          b.append('(');
 | 
