From 8fb93b47e3088b928af687a09e45b74b2d4833e4 Mon Sep 17 00:00:00 2001
From: "Brian S. O'Neill" <bronee@gmail.com>
Date: Thu, 10 Nov 2011 18:07:42 +0000
Subject: Add retry logic to index repair.

---
 .../carbonado/repo/indexed/ManagedIndex.java       | 102 ++++++++++++---------
 1 file changed, 60 insertions(+), 42 deletions(-)

(limited to 'src/main/java/com')

diff --git a/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java b/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java
index d9b585f..4200d1f 100644
--- a/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java
+++ b/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java
@@ -23,6 +23,8 @@ import java.lang.reflect.UndeclaredThrowableException;
 import java.util.Comparator;
 import java.util.Iterator;
 
+import java.util.concurrent.TimeUnit;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -30,6 +32,7 @@ import com.amazon.carbonado.CorruptEncodingException;
 import com.amazon.carbonado.Cursor;
 import com.amazon.carbonado.FetchException;
 import com.amazon.carbonado.IsolationLevel;
+import com.amazon.carbonado.PersistTimeoutException;
 import com.amazon.carbonado.PersistException;
 import com.amazon.carbonado.Query;
 import com.amazon.carbonado.Repository;
@@ -430,6 +433,7 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {
         txn = mRepository.enterTopTransaction(IsolationLevel.READ_COMMITTED);
         try {
             txn.setForUpdate(true);
+            txn.setDesiredLockTimeout(0, TimeUnit.SECONDS);
 
             Cursor<? extends Storable> indexEntryCursor = indexEntryQuery.fetch();
             Storable existingIndexEntry = null;
@@ -440,43 +444,49 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {
                 indexEntryCursor = null;
             }
 
+            boolean retry = false;
+            Storable indexEntry = null;
+            Storable lastIndexEntry = null;
+
             Iterator it = buffer.iterator();
             bufferIterate: while (true) {
-                Object obj;
-                if (it.hasNext()) {
-                    obj = it.next();
-                } else if (indexEntryCursor != null && indexEntryCursor.hasNext()) {
-                    obj = null;
-                } else {
-                    break;
-                }
+                if (!retry) {
+                    Object obj;
+                    if (it.hasNext()) {
+                        obj = it.next();
+                    } else if (indexEntryCursor != null && indexEntryCursor.hasNext()) {
+                        obj = null;
+                    } else {
+                        break;
+                    }
 
-                Storable indexEntry = (Storable) obj;
+                    indexEntry = (Storable) obj;
+                }
 
-                if (indexEntry != null) {
-                    if (indexEntry.tryInsert()) {
-                        totalInserted++;
-                    } else {
-                        // Couldn't insert because an index entry already exists.
-                        Storable existing = indexEntry.copy();
-                        boolean doUpdate = false;
-                        if (!existing.tryLoad()) {
-                            doUpdate = true;
-                        } else if (!existing.equalProperties(indexEntry)) {
-                            // If only the version differs, leave existing entry alone.
-                            indexEntry.copyVersionProperty(existing);
-                            doUpdate = !existing.equalProperties(indexEntry);
-                        }
-                        if (doUpdate) {
-                            indexEntry.tryDelete();
-                            indexEntry.tryInsert();
-                            totalUpdated++;
+                try {
+                    if (indexEntry != null) {
+                        if (indexEntry.tryInsert()) {
+                            totalInserted++;
+                        } else {
+                            // Couldn't insert because an index entry already exists.
+                            Storable existing = indexEntry.copy();
+                            boolean doUpdate = false;
+                            if (!existing.tryLoad()) {
+                                doUpdate = true;
+                            } else if (!existing.equalProperties(indexEntry)) {
+                                // If only the version differs, leave existing entry alone.
+                                indexEntry.copyVersionProperty(existing);
+                                doUpdate = !existing.equalProperties(indexEntry);
+                            }
+                            if (doUpdate) {
+                                indexEntry.tryDelete();
+                                indexEntry.tryInsert();
+                                totalUpdated++;
+                            }
                         }
                     }
-                }
 
-                if (indexEntryCursor != null) {
-                    while (true) {
+                    if (indexEntryCursor != null) while (true) {
                         if (existingIndexEntry == null) {
                             if (indexEntryCursor.hasNext()) {
                                 existingIndexEntry = indexEntryCursor.next();
@@ -513,6 +523,7 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {
                                 txn = mRepository
                                     .enterTopTransaction(IsolationLevel.READ_COMMITTED);
                                 txn.setForUpdate(true);
+                                txn.setDesiredLockTimeout(0, TimeUnit.SECONDS);
 
                                 indexEntryCursor.close();
                                 indexEntryCursor = indexEntryQuery.fetchAfter(existingIndexEntry);
@@ -530,13 +541,22 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {
                             throttle(throttle, desiredSpeed);
                         }
                     }
-                }
 
-                if (indexEntry != null) {
-                    totalProgress++;
+                    if (indexEntry != null) {
+                        totalProgress++;
+                    }
+
+                    lastIndexEntry = indexEntry;
+                    retry = false;
+                } catch (PersistTimeoutException e) {
+                    log.warn("Lock conflict during index repair; will retry: " +
+                             indexEntry + ", " + e);
+                    // This re-uses the last index entry to repair and forces
+                    // the current transaction to commit.
+                    retry = true;
                 }
 
-                if (totalProgress % BUILD_BATCH_SIZE == 0) {
+                if (retry || (totalProgress % BUILD_BATCH_SIZE == 0)) {
                     txn.commit();
                     txn.exit();
 
@@ -545,21 +565,19 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {
 
                     txn = mRepository.enterTopTransaction(IsolationLevel.READ_COMMITTED);
                     txn.setForUpdate(true);
+                    txn.setDesiredLockTimeout(0, TimeUnit.SECONDS);
 
                     if (indexEntryCursor != null) {
                         indexEntryCursor.close();
                         existingIndexEntry = null;
 
-                        if (indexEntry == null) {
+                        if (indexEntry == null || lastIndexEntry == null) {
                             indexEntryCursor = indexEntryQuery.fetch();
-                        } else {
+                        } else if (!retry) {
                             indexEntryCursor = indexEntryQuery.fetchAfter(indexEntry);
-                        }
-
-                        if (!indexEntryCursor.hasNext()) {
-                            indexEntryCursor.close();
-                            // Don't try opening again.
-                            indexEntryCursor = null;
+                        } else {
+                            // Re-fetch starting at the same spot.
+                            indexEntryCursor = indexEntryQuery.fetchAfter(lastIndexEntry);
                         }
                     }
                 }
-- 
cgit v1.2.3