summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java160
1 files changed, 73 insertions, 87 deletions
diff --git a/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java b/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java
index 4897d21..d9b585f 100644
--- a/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java
+++ b/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java
@@ -30,7 +30,6 @@ import com.amazon.carbonado.CorruptEncodingException;
import com.amazon.carbonado.Cursor;
import com.amazon.carbonado.FetchException;
import com.amazon.carbonado.IsolationLevel;
-import com.amazon.carbonado.PersistDeadlockException;
import com.amazon.carbonado.PersistException;
import com.amazon.carbonado.Query;
import com.amazon.carbonado.Repository;
@@ -441,117 +440,103 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {
indexEntryCursor = null;
}
- boolean retry = false;
- Storable indexEntry = null;
-
Iterator it = buffer.iterator();
bufferIterate: while (true) {
- if (!retry) {
- Object obj;
- if (it.hasNext()) {
- obj = it.next();
- } else if (indexEntryCursor != null && indexEntryCursor.hasNext()) {
- obj = null;
+ Object obj;
+ if (it.hasNext()) {
+ obj = it.next();
+ } else if (indexEntryCursor != null && indexEntryCursor.hasNext()) {
+ obj = null;
+ } else {
+ break;
+ }
+
+ Storable indexEntry = (Storable) obj;
+
+ if (indexEntry != null) {
+ if (indexEntry.tryInsert()) {
+ totalInserted++;
} else {
- break;
+ // Couldn't insert because an index entry already exists.
+ Storable existing = indexEntry.copy();
+ boolean doUpdate = false;
+ if (!existing.tryLoad()) {
+ doUpdate = true;
+ } else if (!existing.equalProperties(indexEntry)) {
+ // If only the version differs, leave existing entry alone.
+ indexEntry.copyVersionProperty(existing);
+ doUpdate = !existing.equalProperties(indexEntry);
+ }
+ if (doUpdate) {
+ indexEntry.tryDelete();
+ indexEntry.tryInsert();
+ totalUpdated++;
+ }
}
-
- indexEntry = (Storable) obj;
}
- try {
- if (indexEntry != null) {
- if (indexEntry.tryInsert()) {
- totalInserted++;
- } else {
- // Couldn't insert because an index entry already exists.
- Storable existing = indexEntry.copy();
- boolean doUpdate = false;
- if (!existing.tryLoad()) {
- doUpdate = true;
- } else if (!existing.equalProperties(indexEntry)) {
- // If only the version differs, leave existing entry alone.
- indexEntry.copyVersionProperty(existing);
- doUpdate = !existing.equalProperties(indexEntry);
- }
- if (doUpdate) {
- indexEntry.tryDelete();
- indexEntry.tryInsert();
- totalUpdated++;
+ if (indexEntryCursor != null) {
+ while (true) {
+ if (existingIndexEntry == null) {
+ if (indexEntryCursor.hasNext()) {
+ existingIndexEntry = indexEntryCursor.next();
+ } else {
+ indexEntryCursor.close();
+ // Don't try opening again.
+ indexEntryCursor = null;
+ break;
}
}
- }
- if (indexEntryCursor != null) {
- while (true) {
- if (existingIndexEntry == null) {
- if (indexEntryCursor.hasNext()) {
- existingIndexEntry = indexEntryCursor.next();
- } else {
- indexEntryCursor.close();
- // Don't try opening again.
- indexEntryCursor = null;
- break;
- }
- }
+ int compare = c.compare(existingIndexEntry, indexEntry);
- int compare = c.compare(existingIndexEntry, indexEntry);
+ if (compare == 0) {
+ // Existing entry cursor matches so allow cursor to advance.
+ existingIndexEntry = null;
+ break;
+ } else if (compare > 0) {
+ // Existing index entry is ahead so check later.
+ break;
+ } else {
+ // Existing index entry is bogus.
+ existingIndexEntry.tryDelete();
- if (compare == 0) {
- // Existing entry cursor matches so allow cursor to advance.
- existingIndexEntry = null;
- break;
- } else if (compare > 0) {
- // Existing index entry is ahead so check later.
- break;
- } else {
- // Existing index entry is bogus.
- existingIndexEntry.tryDelete();
+ totalDeleted++;
- totalDeleted++;
+ if (totalDeleted % BUILD_BATCH_SIZE == 0) {
+ txn.commit();
+ txn.exit();
- if (totalDeleted % BUILD_BATCH_SIZE == 0) {
- txn.commit();
- txn.exit();
+ logProgress(log, totalProgress, bufferSize,
+ totalInserted, totalUpdated, totalDeleted);
- logProgress(log, totalProgress, bufferSize,
- totalInserted, totalUpdated, totalDeleted);
+ txn = mRepository
+ .enterTopTransaction(IsolationLevel.READ_COMMITTED);
+ txn.setForUpdate(true);
- txn = mRepository
- .enterTopTransaction(IsolationLevel.READ_COMMITTED);
+ indexEntryCursor.close();
+ indexEntryCursor = indexEntryQuery.fetchAfter(existingIndexEntry);
+ if (!indexEntryCursor.hasNext()) {
indexEntryCursor.close();
- indexEntryCursor =
- indexEntryQuery.fetchAfter(existingIndexEntry);
-
- if (!indexEntryCursor.hasNext()) {
- indexEntryCursor.close();
- // Don't try opening again.
- indexEntryCursor = null;
- break;
- }
+ // Don't try opening again.
+ indexEntryCursor = null;
+ break;
}
+ }
- existingIndexEntry = null;
+ existingIndexEntry = null;
- throttle(throttle, desiredSpeed);
- }
+ throttle(throttle, desiredSpeed);
}
}
+ }
- if (indexEntry != null) {
- totalProgress++;
- }
-
- retry = false;
- } catch (PersistDeadlockException e) {
- log.error("Deadlock during index repair; will retry: " + indexEntry, e);
- // This re-uses the last index entry to repair and forces
- // the current transaction to commit.
- retry = true;
+ if (indexEntry != null) {
+ totalProgress++;
}
- if (retry || (totalProgress % BUILD_BATCH_SIZE == 0)) {
+ if (totalProgress % BUILD_BATCH_SIZE == 0) {
txn.commit();
txn.exit();
@@ -559,6 +544,7 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {
totalInserted, totalUpdated, totalDeleted);
txn = mRepository.enterTopTransaction(IsolationLevel.READ_COMMITTED);
+ txn.setForUpdate(true);
if (indexEntryCursor != null) {
indexEntryCursor.close();