summaryrefslogtreecommitdiff
path: root/src/main/java/com/amazon/carbonado/repo
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/amazon/carbonado/repo')
-rw-r--r--src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java71
1 files changed, 46 insertions, 25 deletions
diff --git a/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java b/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java
index 701aa37..60a21d5 100644
--- a/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java
+++ b/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java
@@ -69,7 +69,7 @@ import com.amazon.carbonado.util.Throttle;
class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {
private static final int BUILD_SORT_BUFFER_SIZE = 65536;
private static final int BUILD_INFO_DELAY_MILLIS = 5000;
- static final int BUILD_BATCH_SIZE = 128;
+ static final int BUILD_BATCH_SIZE = 1000;
static final int BUILD_THROTTLE_WINDOW = BUILD_BATCH_SIZE * 10;
static final int BUILD_THROTTLE_SLEEP_PRECISION = 10;
@@ -458,6 +458,8 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {
Storable indexEntry = null;
Storable lastIndexEntry = null;
+ long nextReportTime = System.currentTimeMillis() + BUILD_INFO_DELAY_MILLIS;
+
Iterator it = buffer.iterator();
bufferIterate: while (true) {
if (!retry) {
@@ -518,28 +520,42 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {
// Existing index entry is ahead so check later.
break;
} else {
- // Existing index entry is bogus.
- existingIndexEntry.tryDelete();
-
- totalDeleted++;
+ // Existing index entry might be bogus. Check again
+ // in case master record changed.
+ doDelete: {
+ S master = mMasterStorage.prepare();
+ copyToMasterPrimaryKey(existingIndexEntry, master);
+ if (master.tryLoad()) {
+ Storable temp = makeIndexEntry(master);
+ existingIndexEntry.copyVersionProperty(temp);
+ if (existingIndexEntry.equalProperties(temp)) {
+ break doDelete;
+ }
+ }
- if (totalDeleted % BUILD_BATCH_SIZE == 0) {
- txn.commit();
- txn.exit();
+ existingIndexEntry.tryDelete();
+ totalDeleted++;
- logProgress(log, totalProgress, bufferSize,
- totalInserted, totalUpdated, totalDeleted);
+ if (totalDeleted % BUILD_BATCH_SIZE == 0) {
+ txn.commit();
+ txn.exit();
- txn = enterBuildTxn();
+ nextReportTime = logProgress
+ (nextReportTime, log, totalProgress, bufferSize,
+ totalInserted, totalUpdated, totalDeleted);
- indexEntryCursor.close();
- indexEntryCursor = indexEntryQuery.fetchAfter(existingIndexEntry);
+ txn = enterBuildTxn();
- if (!indexEntryCursor.hasNext()) {
indexEntryCursor.close();
- // Don't try opening again.
- indexEntryCursor = null;
- break;
+ indexEntryCursor = indexEntryQuery
+ .fetchAfter(existingIndexEntry);
+
+ if (!indexEntryCursor.hasNext()) {
+ indexEntryCursor.close();
+ // Don't try opening again.
+ indexEntryCursor = null;
+ break;
+ }
}
}
@@ -573,8 +589,8 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {
txn.commit();
txn.exit();
- logProgress(log, totalProgress, bufferSize,
- totalInserted, totalUpdated, totalDeleted);
+ nextReportTime = logProgress(nextReportTime, log, totalProgress, bufferSize,
+ totalInserted, totalUpdated, totalDeleted);
txn = enterBuildTxn();
@@ -627,16 +643,21 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {
}
}
- private void logProgress(Log log,
+ private long logProgress(long nextReportTime, Log log,
long totalProgress, int bufferSize,
long totalInserted, long totalUpdated, long totalDeleted)
{
- if (log.isInfoEnabled()) {
- String format = "Index build progress: %.3f%% " +
- progressSubMessgage(totalInserted, totalUpdated, totalDeleted);
- double percent = 100.0 * totalProgress / bufferSize;
- log.info(String.format(format, percent));
+ long now = System.currentTimeMillis();
+ if (now >= nextReportTime) {
+ if (log.isInfoEnabled()) {
+ String format = "Index build progress: %.3f%% " +
+ progressSubMessgage(totalInserted, totalUpdated, totalDeleted);
+ double percent = 100.0 * totalProgress / bufferSize;
+ log.info(String.format(format, percent));
+ }
+ nextReportTime = now + BUILD_INFO_DELAY_MILLIS;
}
+ return nextReportTime;
}
private String progressSubMessgage(long totalInserted, long totalUpdated, long totalDeleted) {