diff options
Diffstat (limited to 'src/main/java/com/amazon')
| -rw-r--r-- | src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java | 160 | 
1 files changed, 73 insertions, 87 deletions
| diff --git a/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java b/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java index 4897d21..d9b585f 100644 --- a/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java +++ b/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java @@ -30,7 +30,6 @@ import com.amazon.carbonado.CorruptEncodingException;  import com.amazon.carbonado.Cursor;
  import com.amazon.carbonado.FetchException;
  import com.amazon.carbonado.IsolationLevel;
 -import com.amazon.carbonado.PersistDeadlockException;
  import com.amazon.carbonado.PersistException;
  import com.amazon.carbonado.Query;
  import com.amazon.carbonado.Repository;
 @@ -441,117 +440,103 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {                  indexEntryCursor = null;
              }
 -            boolean retry = false;
 -            Storable indexEntry = null;
 -
              Iterator it = buffer.iterator();
              bufferIterate: while (true) {
 -                if (!retry) {
 -                    Object obj;
 -                    if (it.hasNext()) {
 -                        obj = it.next();
 -                    } else if (indexEntryCursor != null && indexEntryCursor.hasNext()) {
 -                        obj = null;
 +                Object obj;
 +                if (it.hasNext()) {
 +                    obj = it.next();
 +                } else if (indexEntryCursor != null && indexEntryCursor.hasNext()) {
 +                    obj = null;
 +                } else {
 +                    break;
 +                }
 +
 +                Storable indexEntry = (Storable) obj;
 +
 +                if (indexEntry != null) {
 +                    if (indexEntry.tryInsert()) {
 +                        totalInserted++;
                      } else {
 -                        break;
 +                        // Couldn't insert because an index entry already exists.
 +                        Storable existing = indexEntry.copy();
 +                        boolean doUpdate = false;
 +                        if (!existing.tryLoad()) {
 +                            doUpdate = true;
 +                        } else if (!existing.equalProperties(indexEntry)) {
 +                            // If only the version differs, leave existing entry alone.
 +                            indexEntry.copyVersionProperty(existing);
 +                            doUpdate = !existing.equalProperties(indexEntry);
 +                        }
 +                        if (doUpdate) {
 +                            indexEntry.tryDelete();
 +                            indexEntry.tryInsert();
 +                            totalUpdated++;
 +                        }
                      }
 -
 -                    indexEntry = (Storable) obj;
                  }
 -                try {
 -                    if (indexEntry != null) {
 -                        if (indexEntry.tryInsert()) {
 -                            totalInserted++;
 -                        } else {
 -                            // Couldn't insert because an index entry already exists.
 -                            Storable existing = indexEntry.copy();
 -                            boolean doUpdate = false;
 -                            if (!existing.tryLoad()) {
 -                                doUpdate = true;
 -                            } else if (!existing.equalProperties(indexEntry)) {
 -                                // If only the version differs, leave existing entry alone.
 -                                indexEntry.copyVersionProperty(existing);
 -                                doUpdate = !existing.equalProperties(indexEntry);
 -                            }
 -                            if (doUpdate) {
 -                                indexEntry.tryDelete();
 -                                indexEntry.tryInsert();
 -                                totalUpdated++;
 +                if (indexEntryCursor != null) {
 +                    while (true) {
 +                        if (existingIndexEntry == null) {
 +                            if (indexEntryCursor.hasNext()) {
 +                                existingIndexEntry = indexEntryCursor.next();
 +                            } else {
 +                                indexEntryCursor.close();
 +                                // Don't try opening again.
 +                                indexEntryCursor = null;
 +                                break;
                              }
                          }
 -                    }
 -                    if (indexEntryCursor != null) {
 -                        while (true) {
 -                            if (existingIndexEntry == null) {
 -                                if (indexEntryCursor.hasNext()) {
 -                                    existingIndexEntry = indexEntryCursor.next();
 -                                } else {
 -                                    indexEntryCursor.close();
 -                                    // Don't try opening again.
 -                                    indexEntryCursor = null;
 -                                    break;
 -                                }
 -                            }
 +                        int compare = c.compare(existingIndexEntry, indexEntry);
 -                            int compare = c.compare(existingIndexEntry, indexEntry);
 +                        if (compare == 0) {
 +                            // Existing entry cursor matches so allow cursor to advance.
 +                            existingIndexEntry = null;
 +                            break;
 +                        } else if (compare > 0) {
 +                            // Existing index entry is ahead so check later.
 +                            break;
 +                        } else {
 +                            // Existing index entry is bogus.
 +                            existingIndexEntry.tryDelete();
 -                            if (compare == 0) {
 -                                // Existing entry cursor matches so allow cursor to advance.
 -                                existingIndexEntry = null;
 -                                break;
 -                            } else if (compare > 0) {
 -                                // Existing index entry is ahead so check later.
 -                                break;
 -                            } else {
 -                                // Existing index entry is bogus.
 -                                existingIndexEntry.tryDelete();
 +                            totalDeleted++;
 -                                totalDeleted++;
 +                            if (totalDeleted % BUILD_BATCH_SIZE == 0) {
 +                                txn.commit();
 +                                txn.exit();
 -                                if (totalDeleted % BUILD_BATCH_SIZE == 0) {
 -                                    txn.commit();
 -                                    txn.exit();
 +                                logProgress(log, totalProgress, bufferSize,
 +                                            totalInserted, totalUpdated, totalDeleted);
 -                                    logProgress(log, totalProgress, bufferSize,
 -                                                totalInserted, totalUpdated, totalDeleted);
 +                                txn = mRepository
 +                                    .enterTopTransaction(IsolationLevel.READ_COMMITTED);
 +                                txn.setForUpdate(true);
 -                                    txn = mRepository
 -                                        .enterTopTransaction(IsolationLevel.READ_COMMITTED);
 +                                indexEntryCursor.close();
 +                                indexEntryCursor = indexEntryQuery.fetchAfter(existingIndexEntry);
 +                                if (!indexEntryCursor.hasNext()) {
                                      indexEntryCursor.close();
 -                                    indexEntryCursor =
 -                                        indexEntryQuery.fetchAfter(existingIndexEntry);
 -
 -                                    if (!indexEntryCursor.hasNext()) {
 -                                        indexEntryCursor.close();
 -                                        // Don't try opening again.
 -                                        indexEntryCursor = null;
 -                                        break;
 -                                    }
 +                                    // Don't try opening again.
 +                                    indexEntryCursor = null;
 +                                    break;
                                  }
 +                            }
 -                                existingIndexEntry = null;
 +                            existingIndexEntry = null;
 -                                throttle(throttle, desiredSpeed);
 -                            }
 +                            throttle(throttle, desiredSpeed);
                          }
                      }
 +                }
 -                    if (indexEntry != null) {
 -                        totalProgress++;
 -                    }
 -
 -                    retry = false;
 -                } catch (PersistDeadlockException e) {
 -                    log.error("Deadlock during index repair; will retry: " + indexEntry, e);
 -                    // This re-uses the last index entry to repair and forces
 -                    // the current transaction to commit.
 -                    retry = true;
 +                if (indexEntry != null) {
 +                    totalProgress++;
                  }
 -                if (retry || (totalProgress % BUILD_BATCH_SIZE == 0)) {
 +                if (totalProgress % BUILD_BATCH_SIZE == 0) {
                      txn.commit();
                      txn.exit();
 @@ -559,6 +544,7 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {                                  totalInserted, totalUpdated, totalDeleted);
                      txn = mRepository.enterTopTransaction(IsolationLevel.READ_COMMITTED);
 +                    txn.setForUpdate(true);
                      if (indexEntryCursor != null) {
                          indexEntryCursor.close();
 | 
