summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java140
1 files changed, 93 insertions, 47 deletions
diff --git a/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java b/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java
index 6c28619..dbc7c06 100644
--- a/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java
+++ b/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java
@@ -21,6 +21,7 @@ package com.amazon.carbonado.repo.indexed;
import java.lang.reflect.UndeclaredThrowableException;
import java.util.Comparator;
+import java.util.Iterator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -293,20 +294,6 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {
.orderBy(naturalOrdering(mMasterStorage.getStorableType()));
}
- // Quick check to see if any records exist in master.
- {
- Transaction txn = mRepository.enterTransaction(IsolationLevel.READ_COMMITTED);
- try {
- Cursor<S> cursor = masterQuery.fetch();
- if (!cursor.hasNext()) {
- // Nothing exists in master, so nothing to build.
- return;
- }
- } finally {
- txn.exit();
- }
- }
-
// Enter top transaction with isolation level of none to make sure
// preload operation does not run in a long nested transaction.
Transaction txn = mRepository.enterTopTransaction(IsolationLevel.NONE);
@@ -437,25 +424,38 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {
indexEntryCursor = null;
}
- for (Object obj : buffer) {
- Storable indexEntry = (Storable) obj;
- if (indexEntry.tryInsert()) {
- totalInserted++;
+ Iterator it = buffer.iterator();
+ bufferIterate: while (true) {
+ Object obj;
+ if (it.hasNext()) {
+ obj = it.next();
+ } else if (indexEntryCursor != null && indexEntryCursor.hasNext()) {
+ obj = null;
} else {
- // Couldn't insert because an index entry already exists.
- Storable existing = indexEntry.copy();
- boolean doUpdate = false;
- if (!existing.tryLoad()) {
- doUpdate = true;
- } else if (!existing.equalProperties(indexEntry)) {
- // If only the version differs, leave existing entry alone.
- indexEntry.copyVersionProperty(existing);
- doUpdate = !existing.equalProperties(indexEntry);
- }
- if (doUpdate) {
- indexEntry.tryDelete();
- indexEntry.tryInsert();
- totalUpdated++;
+ break;
+ }
+
+ Storable indexEntry = (Storable) obj;
+
+ if (indexEntry != null) {
+ if (indexEntry.tryInsert()) {
+ totalInserted++;
+ } else {
+ // Couldn't insert because an index entry already exists.
+ Storable existing = indexEntry.copy();
+ boolean doUpdate = false;
+ if (!existing.tryLoad()) {
+ doUpdate = true;
+ } else if (!existing.equalProperties(indexEntry)) {
+ // If only the version differs, leave existing entry alone.
+ indexEntry.copyVersionProperty(existing);
+ doUpdate = !existing.equalProperties(indexEntry);
+ }
+ if (doUpdate) {
+ indexEntry.tryDelete();
+ indexEntry.tryInsert();
+ totalUpdated++;
+ }
}
}
@@ -484,32 +484,60 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {
} else {
// Existing index entry is bogus.
existingIndexEntry.tryDelete();
+
totalDeleted++;
+
+ if (totalDeleted % BUILD_BATCH_SIZE == 0) {
+ txn.commit();
+ txn.exit();
+
+ logProgress(log, totalProgress, bufferSize,
+ totalInserted, totalUpdated, totalDeleted);
+
+ txn = mRepository
+ .enterTopTransaction(IsolationLevel.READ_COMMITTED);
+
+ indexEntryCursor.close();
+ indexEntryCursor = indexEntryQuery.fetchAfter(existingIndexEntry);
+
+ if (!indexEntryCursor.hasNext()) {
+ indexEntryCursor.close();
+ // Don't try opening again.
+ indexEntryCursor = null;
+ break;
+ }
+ }
+
existingIndexEntry = null;
+
+ throttle(throttle, desiredSpeed);
}
}
}
- totalProgress++;
+ if (indexEntry != null) {
+ totalProgress++;
+ }
if (totalProgress % BUILD_BATCH_SIZE == 0) {
txn.commit();
txn.exit();
- if (log.isInfoEnabled()) {
- String format = "Index build progress: %.3f%% " +
- progressSubMessgage(totalInserted, totalUpdated, totalDeleted);
- double percent = 100.0 * totalProgress / bufferSize;
- log.info(String.format(format, percent));
- }
+ logProgress(log, totalProgress, bufferSize,
+ totalInserted, totalUpdated, totalDeleted);
txn = mRepository.enterTopTransaction(IsolationLevel.READ_COMMITTED);
if (indexEntryCursor != null) {
indexEntryCursor.close();
- indexEntryCursor = indexEntryQuery.fetchAfter(indexEntry);
existingIndexEntry = null;
+ if (indexEntry == null) {
+ indexEntryCursor = indexEntryQuery.fetch();
+ } else {
+ indexEntryCursor = indexEntryQuery.fetchAfter(indexEntry);
+ }
+
if (!indexEntryCursor.hasNext()) {
indexEntryCursor.close();
// Don't try opening again.
@@ -518,13 +546,7 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {
}
}
- if (throttle != null) {
- try {
- throttle.throttle(desiredSpeed, BUILD_THROTTLE_SLEEP_PRECISION);
- } catch (InterruptedException e) {
- throw new RepositoryException("Index build interrupted");
- }
- }
+ throttle(throttle, desiredSpeed);
}
txn.commit();
@@ -539,6 +561,30 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {
}
}
+ private static void throttle(Throttle throttle, double desiredSpeed)
+ throws RepositoryException
+ {
+ if (throttle != null) {
+ try {
+ throttle.throttle(desiredSpeed, BUILD_THROTTLE_SLEEP_PRECISION);
+ } catch (InterruptedException e) {
+ throw new RepositoryException("Index build interrupted");
+ }
+ }
+ }
+
+ private void logProgress(Log log,
+ long totalProgress, int bufferSize,
+ long totalInserted, long totalUpdated, long totalDeleted)
+ {
+ if (log.isInfoEnabled()) {
+ String format = "Index build progress: %.3f%% " +
+ progressSubMessgage(totalInserted, totalUpdated, totalDeleted);
+ double percent = 100.0 * totalProgress / bufferSize;
+ log.info(String.format(format, percent));
+ }
+ }
+
private String progressSubMessgage(long totalInserted, long totalUpdated, long totalDeleted) {
StringBuilder b = new StringBuilder();
b.append('(');