From 8fb89b4dcf778e8673ce5c031796328d0dd917d6 Mon Sep 17 00:00:00 2001
From: "Brian S. O'Neill" <bronee@gmail.com>
Date: Thu, 1 May 2008 20:21:22 +0000
Subject: Add index repair method.

---
 .../carbonado/repo/indexed/IndexEntryAccessor.java |  11 ++
 .../carbonado/repo/indexed/IndexedStorage.java     |   8 +-
 .../carbonado/repo/indexed/ManagedIndex.java       | 113 ++++++++++++++-------
 3 files changed, 90 insertions(+), 42 deletions(-)

(limited to 'src/main/java')

diff --git a/src/main/java/com/amazon/carbonado/repo/indexed/IndexEntryAccessor.java b/src/main/java/com/amazon/carbonado/repo/indexed/IndexEntryAccessor.java
index 42ce214..1b7f910 100644
--- a/src/main/java/com/amazon/carbonado/repo/indexed/IndexEntryAccessor.java
+++ b/src/main/java/com/amazon/carbonado/repo/indexed/IndexEntryAccessor.java
@@ -20,6 +20,7 @@ package com.amazon.carbonado.repo.indexed;
 
 import java.util.Comparator;
 
+import com.amazon.carbonado.RepositoryException;
 import com.amazon.carbonado.Storable;
 import com.amazon.carbonado.Storage;
 
@@ -30,6 +31,7 @@ import com.amazon.carbonado.capability.IndexInfo;
  * inspection and repair.
  *
  * @author Brian S O'Neill
+ * @see IndexEntryAccessCapability
  */
 public interface IndexEntryAccessor<S extends Storable> extends IndexInfo {
     /**
@@ -65,6 +67,15 @@ public interface IndexEntryAccessor<S extends Storable> extends IndexInfo {
      */
     boolean isConsistent(Storable indexEntry, S master);
 
+    /**
+     * Repairs the index by inserting missing entries and fixing
+     * inconsistencies.
+     *
+     * @param desiredSpeed throttling parameter - 1.0 = full speed, 0.5 = half
+     * speed, 0.1 = one-tenth speed, etc
+     */
+    void repair(double desiredSpeed) throws RepositoryException;
+
     /**
      * Returns a comparator for ordering index entries.
      */
diff --git a/src/main/java/com/amazon/carbonado/repo/indexed/IndexedStorage.java b/src/main/java/com/amazon/carbonado/repo/indexed/IndexedStorage.java
index cefb031..fb55433 100644
--- a/src/main/java/com/amazon/carbonado/repo/indexed/IndexedStorage.java
+++ b/src/main/java/com/amazon/carbonado/repo/indexed/IndexedStorage.java
@@ -287,7 +287,8 @@ class IndexedStorage<S extends Storable> implements Storage<S>, StorageAccess<S>
                 IndexEntryGenerator<S> builder = IndexEntryGenerator.getInstance(index);
                 Class<? extends Storable> indexEntryClass = builder.getIndexEntryClass();
                 Storage<?> indexEntryStorage = repository.getIndexEntryStorageFor(indexEntryClass);
-                ManagedIndex managedIndex = new ManagedIndex<S>(index, builder, indexEntryStorage);
+                ManagedIndex managedIndex =
+                    new ManagedIndex<S>(this, index, builder, indexEntryStorage);
 
                 mAllIndexInfoMap.put(index, managedIndex);
                 managedIndexes[i++] = managedIndex;
@@ -510,9 +511,8 @@ class IndexedStorage<S extends Storable> implements Storage<S>, StorageAccess<S>
             }
         }
 
-        // New index, so populate it.
-        managedIndex.populateIndex(mRepository, mMasterStorage,
-                                   mRepository.getIndexRepairThrottle());
+        // New index, so build it.
+        managedIndex.buildIndex(mRepository.getIndexRepairThrottle());
 
         boolean top = true;
         while (true) {
diff --git a/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java b/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java
index ea3bbb3..5806529 100644
--- a/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java
+++ b/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java
@@ -29,6 +29,7 @@ import com.amazon.carbonado.CorruptEncodingException;
 import com.amazon.carbonado.Cursor;
 import com.amazon.carbonado.FetchException;
 import com.amazon.carbonado.IsolationLevel;
+import com.amazon.carbonado.OptimisticLockException;
 import com.amazon.carbonado.PersistException;
 import com.amazon.carbonado.Query;
 import com.amazon.carbonado.Repository;
@@ -62,23 +63,26 @@ import com.amazon.carbonado.util.Throttle;
  * @author Brian S O'Neill
  */
 class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {
-    private static final int POPULATE_SORT_BUFFER_SIZE = 65536;
-    private static final int POPULATE_INFO_DELAY_MILLIS = 5000;
-    static final int POPULATE_BATCH_SIZE = 128;
-    static final int POPULATE_THROTTLE_WINDOW = POPULATE_BATCH_SIZE * 10;
-    static final int POPULATE_THROTTLE_SLEEP_PRECISION = 10;
+    private static final int BUILD_SORT_BUFFER_SIZE = 65536;
+    private static final int BUILD_INFO_DELAY_MILLIS = 5000;
+    static final int BUILD_BATCH_SIZE = 128;
+    static final int BUILD_THROTTLE_WINDOW = BUILD_BATCH_SIZE * 10;
+    static final int BUILD_THROTTLE_SLEEP_PRECISION = 10;
 
+    private final IndexedStorage<S> mIndexedStorage;
     private final StorableIndex mIndex;
     private final IndexEntryGenerator<S> mGenerator;
     private final Storage<?> mIndexEntryStorage;
 
     private Query<?> mSingleMatchQuery;
 
-    ManagedIndex(StorableIndex<S> index,
+    ManagedIndex(IndexedStorage<S> indexedStorage,
+                 StorableIndex<S> index,
                  IndexEntryGenerator<S> generator,
                  Storage<?> indexEntryStorage)
         throws SupportException
     {
+        mIndexedStorage = indexedStorage;
         mIndex = index;
         mGenerator = generator;
         mIndexEntryStorage = indexEntryStorage;
@@ -138,6 +142,18 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {
         return mGenerator.isConsistent(indexEntry, master);
     }
 
+    // Required by IndexEntryAccessor interface.
+    public void repair(double desiredSpeed) throws RepositoryException {
+        /* FIXME: Delete bogus entries. Index inserts and updates are committed
+           in batches, which covers a contiguous range of entries. Before
+           committing the batch, a range query is executed on the index and
+           verifies the batch about to be committed. Any entries that aren't in
+           the batch are to be deleted. The progress message that is logged
+           should be changed to show indexes inserted, updated, and deleted.
+        */
+        buildIndex(desiredSpeed);
+    }
+
     // Required by IndexEntryAccessor interface.
     public Comparator<? extends Storable> getComparator() {
         return mGenerator.getComparator();
@@ -208,13 +224,14 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {
     }
 
     /**
-     * Populates the entire index, repairing as it goes.
+     * Build the entire index, repairing as it goes.
      *
      * @param repo used to enter transactions
      */
-    void populateIndex(Repository repo, Storage<S> masterStorage, double desiredSpeed)
-        throws RepositoryException
-    {
+    void buildIndex(double desiredSpeed) throws RepositoryException {
+        Repository repo = mIndexedStorage.mRepository;
+        Storage<S> masterStorage = mIndexedStorage.mMasterStorage;
+
         MergeSortBuffer buffer;
         Comparator c;
 
@@ -248,7 +265,7 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {
             try {
                 Cursor<S> cursor = masterQuery.fetch();
                 if (!cursor.hasNext()) {
-                    // Nothing exists in master, so nothing to populate.
+                    // Nothing exists in master, so nothing to build.
                     return;
                 }
             } finally {
@@ -264,7 +281,7 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {
             try {
                 if (log.isInfoEnabled()) {
                     StringBuilder b = new StringBuilder();
-                    b.append("Populating index on ");
+                    b.append("Building index on ");
                     b.append(masterStorage.getStorableType().getName());
                     b.append(": ");
                     try {
@@ -277,11 +294,11 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {
 
                 // Preload and sort all index entries for improved performance.
 
-                buffer = new MergeSortBuffer(mIndexEntryStorage, null, POPULATE_SORT_BUFFER_SIZE);
+                buffer = new MergeSortBuffer(mIndexEntryStorage, null, BUILD_SORT_BUFFER_SIZE);
                 c = mGenerator.getComparator();
                 buffer.prepare(c);
 
-                long nextReportTime = System.currentTimeMillis() + POPULATE_INFO_DELAY_MILLIS;
+                long nextReportTime = System.currentTimeMillis() + BUILD_INFO_DELAY_MILLIS;
 
                 // These variables are used when corrupt records are encountered.
                 S lastUserStorable = null;
@@ -313,8 +330,8 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {
                     if (log.isInfoEnabled()) {
                         long now = System.currentTimeMillis();
                         if (now >= nextReportTime) {
-                            log.info("Prepared " + buffer.size() + " new index entries");
-                            nextReportTime = now + POPULATE_INFO_DELAY_MILLIS;
+                            log.info("Prepared " + buffer.size() + " index entries");
+                            nextReportTime = now + BUILD_INFO_DELAY_MILLIS;
                         }
                     }
 
@@ -359,44 +376,52 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {
         final int bufferSize = buffer.size();
 
         if (log.isInfoEnabled()) {
-            log.info("Begin insert of " + bufferSize + " new index entries");
+            log.info("Begin build of " + bufferSize + " index entries");
         }
 
-        Throttle throttle = desiredSpeed < 1.0 ? new Throttle(POPULATE_THROTTLE_WINDOW) : null;
+        Throttle throttle = desiredSpeed < 1.0 ? new Throttle(BUILD_THROTTLE_WINDOW) : null;
 
-        int totalInserted = 0;
+        long totalInserted = 0;
+        long totalUpdated = 0;
+        long totalDeleted = 0;
+        long totalProgress = 0;
 
         txn = repo.enterTopTransaction(IsolationLevel.READ_COMMITTED);
         try {
+
             for (Object obj : buffer) {
                 Storable indexEntry = (Storable) obj;
-                if (!indexEntry.tryInsert()) {
+                if (indexEntry.tryInsert()) {
+                    totalInserted++;
+                } else {
                     // Couldn't insert because an index entry already exists.
                     Storable existing = indexEntry.copy();
-                    if (existing.tryLoad()) {
-                        if (!existing.equalProperties(indexEntry)) {
-                            // Existing entry differs, so update it.
-                            indexEntry.copyUnequalProperties(existing);
-                            existing.tryUpdate();
-                            indexEntry = existing;
-                        }
-                    } else {
-                        // Couldn't find existing entry for some reason, so
-                        // repair by brute force.
+                    boolean doUpdate = false;
+                    if (!existing.tryLoad()) {
+                        doUpdate = true;
+                    } else if (!existing.equalProperties(indexEntry)) {
+                        // If only the version differs, leave existing entry alone.
+                        indexEntry.copyVersionProperty(existing);
+                        doUpdate = !existing.equalProperties(indexEntry);
+                    }
+                    if (doUpdate) {
                         indexEntry.tryDelete();
                         indexEntry.tryInsert();
+                        totalUpdated++;
                     }
                 }
 
-                totalInserted++;
-                if (totalInserted % POPULATE_BATCH_SIZE == 0) {
+                totalProgress++;
+
+                if (totalProgress % BUILD_BATCH_SIZE == 0) {
                     txn.commit();
                     txn.exit();
 
                     if (log.isInfoEnabled()) {
-                        String format = "Committed %d new index entries (%.3f%%)";
-                        double percent = 100.0 * totalInserted / bufferSize;
-                        log.info(String.format(format, totalInserted, percent));
+                        String format = "Index build progress: %.3f%% " +
+                            progressSubMessgage(totalInserted, totalUpdated, totalDeleted);
+                        double percent = 100.0 * totalProgress / bufferSize;
+                        log.info(String.format(format, percent));
                     }
 
                     txn = repo.enterTopTransaction(IsolationLevel.READ_COMMITTED);
@@ -404,9 +429,9 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {
 
                 if (throttle != null) {
                     try {
-                        throttle.throttle(desiredSpeed, POPULATE_THROTTLE_SLEEP_PRECISION);
+                        throttle.throttle(desiredSpeed, BUILD_THROTTLE_SLEEP_PRECISION);
                     } catch (InterruptedException e) {
-                        throw new RepositoryException("Index populate interrupted");
+                        throw new RepositoryException("Index build interrupted");
                     }
                 }
             }
@@ -418,7 +443,19 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {
         }
 
         if (log.isInfoEnabled()) {
-            log.info("Finished inserting " + totalInserted + " new index entries");
+            log.info("Finished building " + totalProgress + " index entries " +
+                     progressSubMessgage(totalInserted, totalUpdated, totalDeleted));
+        }
+    }
+
+    private String progressSubMessgage(long totalInserted, long totalUpdated, long totalDeleted) {
+        if (totalDeleted > 0) {
+            return "(" + totalInserted + " inserted, " + totalUpdated + " updated, " +
+                totalDeleted + " deleted)";
+        } else if (totalUpdated > 0) {
+            return "(" + totalInserted + " inserted, " + totalUpdated + " updated)";
+        } else {
+            return "(" + totalInserted + " inserted)";
         }
     }
 
-- 
cgit v1.2.3