summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java145
1 files changed, 110 insertions, 35 deletions
diff --git a/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java b/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java
index a3cc790..a8f5788 100644
--- a/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java
+++ b/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java
@@ -66,6 +66,22 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {
static final int BUILD_THROTTLE_WINDOW = BUILD_BATCH_SIZE * 10;
static final int BUILD_THROTTLE_SLEEP_PRECISION = 10;
+ private static String[] naturalOrdering(Class<? extends Storable> type) {
+ StorableKey<?> pk = StorableIntrospector.examine(type).getPrimaryKey();
+ String[] naturalOrdering = new String[pk.getProperties().size()];
+ int i=0;
+ for (OrderedProperty<?> prop : pk.getProperties()) {
+ String orderBy;
+ if (prop.getDirection() == Direction.DESCENDING) {
+ orderBy = prop.toString();
+ } else {
+ orderBy = prop.getChainedProperty().toString();
+ }
+ naturalOrdering[i++] = orderBy;
+ }
+ return naturalOrdering;
+ }
+
private final IndexedStorage<S> mIndexedStorage;
private final StorableIndex mIndex;
private final IndexEntryGenerator<S> mGenerator;
@@ -141,13 +157,6 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {
// Required by IndexEntryAccessor interface.
public void repair(double desiredSpeed) throws RepositoryException {
- /* FIXME: Delete bogus entries. Index inserts and updates are committed
- in batches, which covers a contiguous range of entries. Before
- committing the batch, a range query is executed on the index and
- verifies the batch about to be committed. Any entries that aren't in
- the batch are to be deleted. The progress message that is logged
- should be changed to show indexes inserted, updated, and deleted.
- */
buildIndex(desiredSpeed);
}
@@ -227,34 +236,21 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {
* @param repo used to enter transactions
*/
void buildIndex(double desiredSpeed) throws RepositoryException {
- Repository repo = mIndexedStorage.mRepository;
- Storage<S> masterStorage = mIndexedStorage.mMasterStorage;
+ final Repository repo = mIndexedStorage.mRepository;
+ final Storage<S> masterStorage = mIndexedStorage.mMasterStorage;
- MergeSortBuffer buffer;
- Comparator c;
+ final MergeSortBuffer buffer;
+ final Comparator c;
- Log log = LogFactory.getLog(IndexedStorage.class);
+ final Log log = LogFactory.getLog(IndexedStorage.class);
- Query<S> masterQuery;
+ final Query<S> masterQuery;
{
// Need to explicitly order master query by primary key in order
// for fetchAfter to work correctly in case corrupt records are
// encountered.
- StorableKey<S> pk = StorableIntrospector
- .examine(masterStorage.getStorableType()).getPrimaryKey();
- String[] naturalOrdering = new String[pk.getProperties().size()];
- int i=0;
- for (OrderedProperty<S> prop : pk.getProperties()) {
- String orderBy;
- if (prop.getDirection() == Direction.DESCENDING) {
- orderBy = prop.toString();
- } else {
- orderBy = prop.getChainedProperty().toString();
- }
- naturalOrdering[i++] = orderBy;
- }
-
- masterQuery = masterStorage.query().orderBy(naturalOrdering);
+ masterQuery = masterStorage.query()
+ .orderBy(naturalOrdering(masterStorage.getStorableType()));
}
// Quick check to see if any records exist in master.
@@ -293,7 +289,7 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {
// Preload and sort all index entries for improved performance.
buffer = new MergeSortBuffer(mIndexEntryStorage, null, BUILD_SORT_BUFFER_SIZE);
- c = mGenerator.getComparator();
+ c = getComparator();
buffer.prepare(c);
long nextReportTime = System.currentTimeMillis() + BUILD_INFO_DELAY_MILLIS;
@@ -377,6 +373,10 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {
log.info("Begin build of " + bufferSize + " index entries");
}
+ // Need this index entry query for deleting bogus entries.
+ final Query indexEntryQuery = mIndexEntryStorage.query()
+ .orderBy(naturalOrdering(mIndexEntryStorage.getStorableType()));
+
Throttle throttle = desiredSpeed < 1.0 ? new Throttle(BUILD_THROTTLE_WINDOW) : null;
long totalInserted = 0;
@@ -386,6 +386,17 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {
txn = repo.enterTopTransaction(IsolationLevel.READ_COMMITTED);
try {
+ txn.setForUpdate(true);
+
+ Cursor<? extends Storable> indexEntryCursor = indexEntryQuery.fetch();
+ Storable existingIndexEntry = null;
+
+ if (!indexEntryCursor.hasNext()) {
+ indexEntryCursor.close();
+ // Don't try opening again.
+ indexEntryCursor = null;
+ }
+
for (Object obj : buffer) {
Storable indexEntry = (Storable) obj;
if (indexEntry.tryInsert()) {
@@ -408,6 +419,37 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {
}
}
+ if (indexEntryCursor != null) {
+ while (true) {
+ if (existingIndexEntry == null) {
+ if (indexEntryCursor.hasNext()) {
+ existingIndexEntry = indexEntryCursor.next();
+ } else {
+ indexEntryCursor.close();
+ // Don't try opening again.
+ indexEntryCursor = null;
+ break;
+ }
+ }
+
+ int compare = c.compare(existingIndexEntry, indexEntry);
+
+ if (compare == 0) {
+ // Existing entry cursor matches so allow cursor to advance.
+ existingIndexEntry = null;
+ break;
+ } else if (compare > 0) {
+ // Existing index entry is ahead so check later.
+ break;
+ } else {
+ // Existing index entry is bogus.
+ existingIndexEntry.tryDelete();
+ totalDeleted++;
+ existingIndexEntry = null;
+ }
+ }
+ }
+
totalProgress++;
if (totalProgress % BUILD_BATCH_SIZE == 0) {
@@ -422,6 +464,18 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {
}
txn = repo.enterTopTransaction(IsolationLevel.READ_COMMITTED);
+
+ if (indexEntryCursor != null) {
+ indexEntryCursor.close();
+ indexEntryCursor = indexEntryQuery.fetchAfter(indexEntry);
+ existingIndexEntry = null;
+
+ if (!indexEntryCursor.hasNext()) {
+ indexEntryCursor.close();
+ // Don't try opening again.
+ indexEntryCursor = null;
+ }
+ }
}
if (throttle != null) {
@@ -446,14 +500,35 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {
}
private String progressSubMessgage(long totalInserted, long totalUpdated, long totalDeleted) {
+ StringBuilder b = new StringBuilder();
+ b.append('(');
+
+ if (totalInserted > 0) {
+ b.append(totalInserted);
+ b.append(" inserted");
+ }
+ if (totalUpdated > 0) {
+ if (b.length() > 1) {
+ b.append(", ");
+ }
+ b.append(totalUpdated);
+ b.append(" updated");
+ }
if (totalDeleted > 0) {
- return "(" + totalInserted + " inserted, " + totalUpdated + " updated, " +
- totalDeleted + " deleted)";
- } else if (totalUpdated > 0) {
- return "(" + totalInserted + " inserted, " + totalUpdated + " updated)";
- } else {
- return "(" + totalInserted + " inserted)";
+ if (b.length() > 1) {
+ b.append(", ");
+ }
+ b.append(totalDeleted);
+ b.append(" deleted");
}
+
+ if (b.length() == 1) {
+ b.append("no changes made");
+ }
+
+ b.append(')');
+
+ return b.toString();
}
private Storable makeIndexEntry(S userStorable) throws PersistException {