diff options
| -rw-r--r-- | src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java | 71 | 
1 files changed, 46 insertions, 25 deletions
| diff --git a/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java b/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java index 701aa37..60a21d5 100644 --- a/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java +++ b/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java @@ -69,7 +69,7 @@ import com.amazon.carbonado.util.Throttle;  class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {
      private static final int BUILD_SORT_BUFFER_SIZE = 65536;
      private static final int BUILD_INFO_DELAY_MILLIS = 5000;
 -    static final int BUILD_BATCH_SIZE = 128;
 +    static final int BUILD_BATCH_SIZE = 1000;
      static final int BUILD_THROTTLE_WINDOW = BUILD_BATCH_SIZE * 10;
      static final int BUILD_THROTTLE_SLEEP_PRECISION = 10;
 @@ -458,6 +458,8 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {              Storable indexEntry = null;
              Storable lastIndexEntry = null;
 +            long nextReportTime = System.currentTimeMillis() + BUILD_INFO_DELAY_MILLIS;
 +
              Iterator it = buffer.iterator();
              bufferIterate: while (true) {
                  if (!retry) {
 @@ -518,28 +520,42 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {                              // Existing index entry is ahead so check later.
                              break;
                          } else {
 -                            // Existing index entry is bogus.
 -                            existingIndexEntry.tryDelete();
 -
 -                            totalDeleted++;
 +                            // Existing index entry might be bogus. Check again
 +                            // in case master record changed.
 +                            doDelete: {
 +                                S master = mMasterStorage.prepare();
 +                                copyToMasterPrimaryKey(existingIndexEntry, master);
 +                                if (master.tryLoad()) {
 +                                    Storable temp = makeIndexEntry(master);
 +                                    existingIndexEntry.copyVersionProperty(temp);
 +                                    if (existingIndexEntry.equalProperties(temp)) {
 +                                        break doDelete;
 +                                    }
 +                                }
 -                            if (totalDeleted % BUILD_BATCH_SIZE == 0) {
 -                                txn.commit();
 -                                txn.exit();
 +                                existingIndexEntry.tryDelete();
 +                                totalDeleted++;
 -                                logProgress(log, totalProgress, bufferSize,
 -                                            totalInserted, totalUpdated, totalDeleted);
 +                                if (totalDeleted % BUILD_BATCH_SIZE == 0) {
 +                                    txn.commit();
 +                                    txn.exit();
 -                                txn = enterBuildTxn();
 +                                    nextReportTime = logProgress
 +                                        (nextReportTime, log, totalProgress, bufferSize,
 +                                         totalInserted, totalUpdated, totalDeleted);
 -                                indexEntryCursor.close();
 -                                indexEntryCursor = indexEntryQuery.fetchAfter(existingIndexEntry);
 +                                    txn = enterBuildTxn();
 -                                if (!indexEntryCursor.hasNext()) {
                                      indexEntryCursor.close();
 -                                    // Don't try opening again.
 -                                    indexEntryCursor = null;
 -                                    break;
 +                                    indexEntryCursor = indexEntryQuery
 +                                        .fetchAfter(existingIndexEntry);
 +
 +                                    if (!indexEntryCursor.hasNext()) {
 +                                        indexEntryCursor.close();
 +                                        // Don't try opening again.
 +                                        indexEntryCursor = null;
 +                                        break;
 +                                    }
                                  }
                              }
 @@ -573,8 +589,8 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {                      txn.commit();
                      txn.exit();
 -                    logProgress(log, totalProgress, bufferSize,
 -                                totalInserted, totalUpdated, totalDeleted);
 +                    nextReportTime = logProgress(nextReportTime, log, totalProgress, bufferSize,
 +                                                 totalInserted, totalUpdated, totalDeleted);
                      txn = enterBuildTxn();
 @@ -627,16 +643,21 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {          }
      }
 -    private void logProgress(Log log,
 +    private long logProgress(long nextReportTime, Log log,
                               long totalProgress, int bufferSize,
                               long totalInserted, long totalUpdated, long totalDeleted)
      {
 -        if (log.isInfoEnabled()) {
 -            String format = "Index build progress: %.3f%% " +
 -                progressSubMessgage(totalInserted, totalUpdated, totalDeleted);
 -            double percent = 100.0 * totalProgress / bufferSize;
 -            log.info(String.format(format, percent));
 +        long now = System.currentTimeMillis();
 +        if (now >= nextReportTime) {
 +            if (log.isInfoEnabled()) {
 +                String format = "Index build progress: %.3f%% " +
 +                    progressSubMessgage(totalInserted, totalUpdated, totalDeleted);
 +                double percent = 100.0 * totalProgress / bufferSize;
 +                log.info(String.format(format, percent));
 +            }
 +            nextReportTime = now + BUILD_INFO_DELAY_MILLIS;
          }
 +        return nextReportTime;
      }
      private String progressSubMessgage(long totalInserted, long totalUpdated, long totalDeleted) {
 | 
