summaryrefslogtreecommitdiff
path: root/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'src/main')
-rw-r--r--src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java158
1 files changed, 87 insertions, 71 deletions
diff --git a/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java b/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java
index 7a597b6..4897d21 100644
--- a/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java
+++ b/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java
@@ -30,6 +30,7 @@ import com.amazon.carbonado.CorruptEncodingException;
import com.amazon.carbonado.Cursor;
import com.amazon.carbonado.FetchException;
import com.amazon.carbonado.IsolationLevel;
+import com.amazon.carbonado.PersistDeadlockException;
import com.amazon.carbonado.PersistException;
import com.amazon.carbonado.Query;
import com.amazon.carbonado.Repository;
@@ -440,102 +441,117 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {
indexEntryCursor = null;
}
+ boolean retry = false;
+ Storable indexEntry = null;
+
Iterator it = buffer.iterator();
bufferIterate: while (true) {
- Object obj;
- if (it.hasNext()) {
- obj = it.next();
- } else if (indexEntryCursor != null && indexEntryCursor.hasNext()) {
- obj = null;
- } else {
- break;
- }
-
- Storable indexEntry = (Storable) obj;
-
- if (indexEntry != null) {
- if (indexEntry.tryInsert()) {
- totalInserted++;
+ if (!retry) {
+ Object obj;
+ if (it.hasNext()) {
+ obj = it.next();
+ } else if (indexEntryCursor != null && indexEntryCursor.hasNext()) {
+ obj = null;
} else {
- // Couldn't insert because an index entry already exists.
- Storable existing = indexEntry.copy();
- boolean doUpdate = false;
- if (!existing.tryLoad()) {
- doUpdate = true;
- } else if (!existing.equalProperties(indexEntry)) {
- // If only the version differs, leave existing entry alone.
- indexEntry.copyVersionProperty(existing);
- doUpdate = !existing.equalProperties(indexEntry);
- }
- if (doUpdate) {
- indexEntry.tryDelete();
- indexEntry.tryInsert();
- totalUpdated++;
- }
+ break;
}
+
+ indexEntry = (Storable) obj;
}
- if (indexEntryCursor != null) {
- while (true) {
- if (existingIndexEntry == null) {
- if (indexEntryCursor.hasNext()) {
- existingIndexEntry = indexEntryCursor.next();
- } else {
- indexEntryCursor.close();
- // Don't try opening again.
- indexEntryCursor = null;
- break;
+ try {
+ if (indexEntry != null) {
+ if (indexEntry.tryInsert()) {
+ totalInserted++;
+ } else {
+ // Couldn't insert because an index entry already exists.
+ Storable existing = indexEntry.copy();
+ boolean doUpdate = false;
+ if (!existing.tryLoad()) {
+ doUpdate = true;
+ } else if (!existing.equalProperties(indexEntry)) {
+ // If only the version differs, leave existing entry alone.
+ indexEntry.copyVersionProperty(existing);
+ doUpdate = !existing.equalProperties(indexEntry);
+ }
+ if (doUpdate) {
+ indexEntry.tryDelete();
+ indexEntry.tryInsert();
+ totalUpdated++;
}
}
+ }
- int compare = c.compare(existingIndexEntry, indexEntry);
+ if (indexEntryCursor != null) {
+ while (true) {
+ if (existingIndexEntry == null) {
+ if (indexEntryCursor.hasNext()) {
+ existingIndexEntry = indexEntryCursor.next();
+ } else {
+ indexEntryCursor.close();
+ // Don't try opening again.
+ indexEntryCursor = null;
+ break;
+ }
+ }
- if (compare == 0) {
- // Existing entry cursor matches so allow cursor to advance.
- existingIndexEntry = null;
- break;
- } else if (compare > 0) {
- // Existing index entry is ahead so check later.
- break;
- } else {
- // Existing index entry is bogus.
- existingIndexEntry.tryDelete();
+ int compare = c.compare(existingIndexEntry, indexEntry);
- totalDeleted++;
+ if (compare == 0) {
+ // Existing entry cursor matches so allow cursor to advance.
+ existingIndexEntry = null;
+ break;
+ } else if (compare > 0) {
+ // Existing index entry is ahead so check later.
+ break;
+ } else {
+ // Existing index entry is bogus.
+ existingIndexEntry.tryDelete();
- if (totalDeleted % BUILD_BATCH_SIZE == 0) {
- txn.commit();
- txn.exit();
+ totalDeleted++;
- logProgress(log, totalProgress, bufferSize,
- totalInserted, totalUpdated, totalDeleted);
+ if (totalDeleted % BUILD_BATCH_SIZE == 0) {
+ txn.commit();
+ txn.exit();
- txn = mRepository
- .enterTopTransaction(IsolationLevel.READ_COMMITTED);
+ logProgress(log, totalProgress, bufferSize,
+ totalInserted, totalUpdated, totalDeleted);
- indexEntryCursor.close();
- indexEntryCursor = indexEntryQuery.fetchAfter(existingIndexEntry);
+ txn = mRepository
+ .enterTopTransaction(IsolationLevel.READ_COMMITTED);
- if (!indexEntryCursor.hasNext()) {
indexEntryCursor.close();
- // Don't try opening again.
- indexEntryCursor = null;
- break;
+ indexEntryCursor =
+ indexEntryQuery.fetchAfter(existingIndexEntry);
+
+ if (!indexEntryCursor.hasNext()) {
+ indexEntryCursor.close();
+ // Don't try opening again.
+ indexEntryCursor = null;
+ break;
+ }
}
- }
- existingIndexEntry = null;
+ existingIndexEntry = null;
- throttle(throttle, desiredSpeed);
+ throttle(throttle, desiredSpeed);
+ }
}
}
- }
- if (indexEntry != null) {
- totalProgress++;
+ if (indexEntry != null) {
+ totalProgress++;
+ }
+
+ retry = false;
+ } catch (PersistDeadlockException e) {
+ log.error("Deadlock during index repair; will retry: " + indexEntry, e);
+ // This re-uses the last index entry to repair and forces
+ // the current transaction to commit.
+ retry = true;
}
- if (totalProgress % BUILD_BATCH_SIZE == 0) {
+ if (retry || (totalProgress % BUILD_BATCH_SIZE == 0)) {
txn.commit();
txn.exit();