summaryrefslogtreecommitdiff
path: root/src/main/java/com/amazon/carbonado/repo
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/amazon/carbonado/repo')
-rw-r--r--src/main/java/com/amazon/carbonado/repo/indexed/IndexEntryAccessor.java11
-rw-r--r--src/main/java/com/amazon/carbonado/repo/indexed/IndexedStorage.java8
-rw-r--r--src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java113
3 files changed, 90 insertions, 42 deletions
diff --git a/src/main/java/com/amazon/carbonado/repo/indexed/IndexEntryAccessor.java b/src/main/java/com/amazon/carbonado/repo/indexed/IndexEntryAccessor.java
index 42ce214..1b7f910 100644
--- a/src/main/java/com/amazon/carbonado/repo/indexed/IndexEntryAccessor.java
+++ b/src/main/java/com/amazon/carbonado/repo/indexed/IndexEntryAccessor.java
@@ -20,6 +20,7 @@ package com.amazon.carbonado.repo.indexed;
import java.util.Comparator;
+import com.amazon.carbonado.RepositoryException;
import com.amazon.carbonado.Storable;
import com.amazon.carbonado.Storage;
@@ -30,6 +31,7 @@ import com.amazon.carbonado.capability.IndexInfo;
* inspection and repair.
*
* @author Brian S O'Neill
+ * @see IndexEntryAccessCapability
*/
public interface IndexEntryAccessor<S extends Storable> extends IndexInfo {
/**
@@ -66,6 +68,15 @@ public interface IndexEntryAccessor<S extends Storable> extends IndexInfo {
boolean isConsistent(Storable indexEntry, S master);
/**
+ * Repairs the index by inserting missing entries and fixing
+ * inconsistencies.
+ *
+ * @param desiredSpeed throttling parameter - 1.0 = full speed, 0.5 = half
+ * speed, 0.1 = one-tenth speed, etc
+ */
+ void repair(double desiredSpeed) throws RepositoryException;
+
+ /**
* Returns a comparator for ordering index entries.
*/
Comparator<? extends Storable> getComparator();
diff --git a/src/main/java/com/amazon/carbonado/repo/indexed/IndexedStorage.java b/src/main/java/com/amazon/carbonado/repo/indexed/IndexedStorage.java
index cefb031..fb55433 100644
--- a/src/main/java/com/amazon/carbonado/repo/indexed/IndexedStorage.java
+++ b/src/main/java/com/amazon/carbonado/repo/indexed/IndexedStorage.java
@@ -287,7 +287,8 @@ class IndexedStorage<S extends Storable> implements Storage<S>, StorageAccess<S>
IndexEntryGenerator<S> builder = IndexEntryGenerator.getInstance(index);
Class<? extends Storable> indexEntryClass = builder.getIndexEntryClass();
Storage<?> indexEntryStorage = repository.getIndexEntryStorageFor(indexEntryClass);
- ManagedIndex managedIndex = new ManagedIndex<S>(index, builder, indexEntryStorage);
+ ManagedIndex managedIndex =
+ new ManagedIndex<S>(this, index, builder, indexEntryStorage);
mAllIndexInfoMap.put(index, managedIndex);
managedIndexes[i++] = managedIndex;
@@ -510,9 +511,8 @@ class IndexedStorage<S extends Storable> implements Storage<S>, StorageAccess<S>
}
}
- // New index, so populate it.
- managedIndex.populateIndex(mRepository, mMasterStorage,
- mRepository.getIndexRepairThrottle());
+ // New index, so build it.
+ managedIndex.buildIndex(mRepository.getIndexRepairThrottle());
boolean top = true;
while (true) {
diff --git a/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java b/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java
index ea3bbb3..5806529 100644
--- a/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java
+++ b/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java
@@ -29,6 +29,7 @@ import com.amazon.carbonado.CorruptEncodingException;
import com.amazon.carbonado.Cursor;
import com.amazon.carbonado.FetchException;
import com.amazon.carbonado.IsolationLevel;
+import com.amazon.carbonado.OptimisticLockException;
import com.amazon.carbonado.PersistException;
import com.amazon.carbonado.Query;
import com.amazon.carbonado.Repository;
@@ -62,23 +63,26 @@ import com.amazon.carbonado.util.Throttle;
* @author Brian S O'Neill
*/
class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {
- private static final int POPULATE_SORT_BUFFER_SIZE = 65536;
- private static final int POPULATE_INFO_DELAY_MILLIS = 5000;
- static final int POPULATE_BATCH_SIZE = 128;
- static final int POPULATE_THROTTLE_WINDOW = POPULATE_BATCH_SIZE * 10;
- static final int POPULATE_THROTTLE_SLEEP_PRECISION = 10;
+ private static final int BUILD_SORT_BUFFER_SIZE = 65536;
+ private static final int BUILD_INFO_DELAY_MILLIS = 5000;
+ static final int BUILD_BATCH_SIZE = 128;
+ static final int BUILD_THROTTLE_WINDOW = BUILD_BATCH_SIZE * 10;
+ static final int BUILD_THROTTLE_SLEEP_PRECISION = 10;
+ private final IndexedStorage<S> mIndexedStorage;
private final StorableIndex mIndex;
private final IndexEntryGenerator<S> mGenerator;
private final Storage<?> mIndexEntryStorage;
private Query<?> mSingleMatchQuery;
- ManagedIndex(StorableIndex<S> index,
+ ManagedIndex(IndexedStorage<S> indexedStorage,
+ StorableIndex<S> index,
IndexEntryGenerator<S> generator,
Storage<?> indexEntryStorage)
throws SupportException
{
+ mIndexedStorage = indexedStorage;
mIndex = index;
mGenerator = generator;
mIndexEntryStorage = indexEntryStorage;
@@ -139,6 +143,18 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {
}
// Required by IndexEntryAccessor interface.
+ public void repair(double desiredSpeed) throws RepositoryException {
+ /* FIXME: Delete bogus entries. Index inserts and updates are committed
+ in batches, which covers a contiguous range of entries. Before
+ committing the batch, a range query is executed on the index and
+ verifies the batch about to be committed. Any entries that aren't in
+ the batch are to be deleted. The progress message that is logged
+ should be changed to show indexes inserted, updated, and deleted.
+ */
+ buildIndex(desiredSpeed);
+ }
+
+ // Required by IndexEntryAccessor interface.
public Comparator<? extends Storable> getComparator() {
return mGenerator.getComparator();
}
@@ -208,13 +224,14 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {
}
/**
- * Populates the entire index, repairing as it goes.
+ * Build the entire index, repairing as it goes.
*
* @param repo used to enter transactions
*/
- void populateIndex(Repository repo, Storage<S> masterStorage, double desiredSpeed)
- throws RepositoryException
- {
+ void buildIndex(double desiredSpeed) throws RepositoryException {
+ Repository repo = mIndexedStorage.mRepository;
+ Storage<S> masterStorage = mIndexedStorage.mMasterStorage;
+
MergeSortBuffer buffer;
Comparator c;
@@ -248,7 +265,7 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {
try {
Cursor<S> cursor = masterQuery.fetch();
if (!cursor.hasNext()) {
- // Nothing exists in master, so nothing to populate.
+ // Nothing exists in master, so nothing to build.
return;
}
} finally {
@@ -264,7 +281,7 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {
try {
if (log.isInfoEnabled()) {
StringBuilder b = new StringBuilder();
- b.append("Populating index on ");
+ b.append("Building index on ");
b.append(masterStorage.getStorableType().getName());
b.append(": ");
try {
@@ -277,11 +294,11 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {
// Preload and sort all index entries for improved performance.
- buffer = new MergeSortBuffer(mIndexEntryStorage, null, POPULATE_SORT_BUFFER_SIZE);
+ buffer = new MergeSortBuffer(mIndexEntryStorage, null, BUILD_SORT_BUFFER_SIZE);
c = mGenerator.getComparator();
buffer.prepare(c);
- long nextReportTime = System.currentTimeMillis() + POPULATE_INFO_DELAY_MILLIS;
+ long nextReportTime = System.currentTimeMillis() + BUILD_INFO_DELAY_MILLIS;
// These variables are used when corrupt records are encountered.
S lastUserStorable = null;
@@ -313,8 +330,8 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {
if (log.isInfoEnabled()) {
long now = System.currentTimeMillis();
if (now >= nextReportTime) {
- log.info("Prepared " + buffer.size() + " new index entries");
- nextReportTime = now + POPULATE_INFO_DELAY_MILLIS;
+ log.info("Prepared " + buffer.size() + " index entries");
+ nextReportTime = now + BUILD_INFO_DELAY_MILLIS;
}
}
@@ -359,44 +376,52 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {
final int bufferSize = buffer.size();
if (log.isInfoEnabled()) {
- log.info("Begin insert of " + bufferSize + " new index entries");
+ log.info("Begin build of " + bufferSize + " index entries");
}
- Throttle throttle = desiredSpeed < 1.0 ? new Throttle(POPULATE_THROTTLE_WINDOW) : null;
+ Throttle throttle = desiredSpeed < 1.0 ? new Throttle(BUILD_THROTTLE_WINDOW) : null;
- int totalInserted = 0;
+ long totalInserted = 0;
+ long totalUpdated = 0;
+ long totalDeleted = 0;
+ long totalProgress = 0;
txn = repo.enterTopTransaction(IsolationLevel.READ_COMMITTED);
try {
+
for (Object obj : buffer) {
Storable indexEntry = (Storable) obj;
- if (!indexEntry.tryInsert()) {
+ if (indexEntry.tryInsert()) {
+ totalInserted++;
+ } else {
// Couldn't insert because an index entry already exists.
Storable existing = indexEntry.copy();
- if (existing.tryLoad()) {
- if (!existing.equalProperties(indexEntry)) {
- // Existing entry differs, so update it.
- indexEntry.copyUnequalProperties(existing);
- existing.tryUpdate();
- indexEntry = existing;
- }
- } else {
- // Couldn't find existing entry for some reason, so
- // repair by brute force.
+ boolean doUpdate = false;
+ if (!existing.tryLoad()) {
+ doUpdate = true;
+ } else if (!existing.equalProperties(indexEntry)) {
+ // If only the version differs, leave existing entry alone.
+ indexEntry.copyVersionProperty(existing);
+ doUpdate = !existing.equalProperties(indexEntry);
+ }
+ if (doUpdate) {
indexEntry.tryDelete();
indexEntry.tryInsert();
+ totalUpdated++;
}
}
- totalInserted++;
- if (totalInserted % POPULATE_BATCH_SIZE == 0) {
+ totalProgress++;
+
+ if (totalProgress % BUILD_BATCH_SIZE == 0) {
txn.commit();
txn.exit();
if (log.isInfoEnabled()) {
- String format = "Committed %d new index entries (%.3f%%)";
- double percent = 100.0 * totalInserted / bufferSize;
- log.info(String.format(format, totalInserted, percent));
+ String format = "Index build progress: %.3f%% " +
+ progressSubMessgage(totalInserted, totalUpdated, totalDeleted);
+ double percent = 100.0 * totalProgress / bufferSize;
+ log.info(String.format(format, percent));
}
txn = repo.enterTopTransaction(IsolationLevel.READ_COMMITTED);
@@ -404,9 +429,9 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {
if (throttle != null) {
try {
- throttle.throttle(desiredSpeed, POPULATE_THROTTLE_SLEEP_PRECISION);
+ throttle.throttle(desiredSpeed, BUILD_THROTTLE_SLEEP_PRECISION);
} catch (InterruptedException e) {
- throw new RepositoryException("Index populate interrupted");
+ throw new RepositoryException("Index build interrupted");
}
}
}
@@ -418,7 +443,19 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {
}
if (log.isInfoEnabled()) {
- log.info("Finished inserting " + totalInserted + " new index entries");
+ log.info("Finished building " + totalProgress + " index entries " +
+ progressSubMessgage(totalInserted, totalUpdated, totalDeleted));
+ }
+ }
+
+ private String progressSubMessgage(long totalInserted, long totalUpdated, long totalDeleted) {
+ if (totalDeleted > 0) {
+ return "(" + totalInserted + " inserted, " + totalUpdated + " updated, " +
+ totalDeleted + " deleted)";
+ } else if (totalUpdated > 0) {
+ return "(" + totalInserted + " inserted, " + totalUpdated + " updated)";
+ } else {
+ return "(" + totalInserted + " inserted)";
}
}