diff options
Diffstat (limited to 'src/main/java/com/amazon/carbonado/repo')
3 files changed, 65 insertions, 30 deletions
| diff --git a/src/main/java/com/amazon/carbonado/repo/indexed/IndexedStorage.java b/src/main/java/com/amazon/carbonado/repo/indexed/IndexedStorage.java index 99407a8..9b7f408 100644 --- a/src/main/java/com/amazon/carbonado/repo/indexed/IndexedStorage.java +++ b/src/main/java/com/amazon/carbonado/repo/indexed/IndexedStorage.java @@ -29,6 +29,7 @@ import org.apache.commons.logging.LogFactory;  import com.amazon.carbonado.Cursor;
  import com.amazon.carbonado.FetchException;
 +import com.amazon.carbonado.IsolationLevel;
  import com.amazon.carbonado.Query;
  import com.amazon.carbonado.RepositoryException;
  import com.amazon.carbonado.Storable;
 @@ -127,7 +128,16 @@ class IndexedStorage<S extends Storable> implements Storage<S>, StorageAccess<S>              .with(getStorableType().getName() + '~')
              .with(getStorableType().getName() + '~' + '\uffff');
 -        for (StoredIndexInfo indexInfo : query.fetch().toList()) {
 +        List<StoredIndexInfo> storedInfos;
 +        Transaction txn = repository.getWrappedRepository()
 +            .enterTopTransaction(IsolationLevel.READ_COMMITTED);
 +        try {
 +            storedInfos = query.fetch().toList();
 +        } finally {
 +            txn.exit();
 +        }
 +
 +        for (StoredIndexInfo indexInfo : storedInfos) {
              String name = indexInfo.getIndexName();
              StorableIndex index;
              try {
 @@ -361,15 +371,23 @@ class IndexedStorage<S extends Storable> implements Storage<S>, StorageAccess<S>              .storageFor(StoredIndexInfo.class).prepare();
          info.setIndexName(index.getNameDescriptor());
 -        if (info.tryLoad()) {
 -            // Index already exists and is registered.
 -            return;
 +        Transaction txn = mRepository.getWrappedRepository()
 +            .enterTopTransaction(IsolationLevel.READ_COMMITTED);
 +        try {
 +            if (info.tryLoad()) {
 +                // Index already exists and is registered.
 +                return;
 +            }
 +        } finally {
 +            txn.exit();
          }
          // New index, so populate it.
          managedIndex.populateIndex(mRepository, mMasterStorage);
 -        Transaction txn = mRepository.getWrappedRepository().enterTransaction();
 +        txn = mRepository.getWrappedRepository()
 +            .enterTopTransaction(IsolationLevel.READ_COMMITTED);
 +        txn.setForUpdate(true);
          try {
              if (!info.tryLoad()) {
                  info.setIndexTypeDescriptor(index.getTypeDescriptor());
 diff --git a/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java b/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java index 107067c..cfc399b 100644 --- a/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java +++ b/src/main/java/com/amazon/carbonado/repo/indexed/ManagedIndex.java @@ -25,6 +25,7 @@ import org.apache.commons.logging.LogFactory;  import com.amazon.carbonado.Cursor;
  import com.amazon.carbonado.FetchException;
 +import com.amazon.carbonado.IsolationLevel;
  import com.amazon.carbonado.PersistException;
  import com.amazon.carbonado.Query;
  import com.amazon.carbonado.Repository;
 @@ -293,35 +294,51 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {       * @param repo used to enter transactions
       */
      void populateIndex(Repository repo, Storage<S> masterStorage) throws RepositoryException {
 -        Cursor<S> cursor = masterStorage.query().fetch();
 -        if (!cursor.hasNext()) {
 -            // Nothing exists in master, so nothing to populate.
 -            cursor.close();
 -            return;
 +        MergeSortBuffer buffer;
 +        Comparator c;
 +        Transaction txn;
 +
 +        if (repo.getTransactionIsolationLevel() == null) {
 +            txn = null;
 +        } else {
 +            txn = repo.enterTopTransaction(IsolationLevel.READ_COMMITTED);
          }
 -        Log log = LogFactory.getLog(IndexedStorage.class);
 -        if (log.isInfoEnabled()) {
 -            StringBuilder b = new StringBuilder();
 -            b.append("Populating index on ");
 -            b.append(masterStorage.getStorableType().getName());
 -            b.append(": ");
 -            try {
 -                mIndex.appendTo(b);
 -            } catch (java.io.IOException e) {
 -                // Not gonna happen.
 +        try {
 +            Cursor<S> cursor = masterStorage.query().fetch();
 +            if (!cursor.hasNext()) {
 +                // Nothing exists in master, so nothing to populate.
 +                cursor.close();
 +                return;
              }
 -            log.info(b.toString());
 -        }
 -        // Preload and sort all index entries for improved performance.
 +            Log log = LogFactory.getLog(IndexedStorage.class);
 +            if (log.isInfoEnabled()) {
 +                StringBuilder b = new StringBuilder();
 +                b.append("Populating index on ");
 +                b.append(masterStorage.getStorableType().getName());
 +                b.append(": ");
 +                try {
 +                    mIndex.appendTo(b);
 +                } catch (java.io.IOException e) {
 +                    // Not gonna happen.
 +                }
 +                log.info(b.toString());
 +            }
 -        MergeSortBuffer buffer = new MergeSortBuffer(mIndexEntryStorage);
 -        Comparator c = mGenerator.getComparator();
 -        buffer.prepare(c);
 +            // Preload and sort all index entries for improved performance.
 -        while (cursor.hasNext()) {
 -            buffer.add(makeIndexEntry(cursor.next()));
 +            buffer = new MergeSortBuffer(mIndexEntryStorage);
 +            c = mGenerator.getComparator();
 +            buffer.prepare(c);
 +
 +            while (cursor.hasNext()) {
 +                buffer.add(makeIndexEntry(cursor.next()));
 +            }
 +        } finally {
 +            if (txn != null) {
 +                txn.exit();
 +            }
          }
          buffer.sort();
 @@ -345,7 +362,7 @@ class ManagedIndex<S extends Storable> implements IndexEntryAccessor<S> {              }
          }
 -        Transaction txn = repo.enterTransaction();
 +        txn = repo.enterTopTransaction(IsolationLevel.READ_COMMITTED);
          try {
              int totalInserted = 0;
              for (Object obj : buffer) {
 diff --git a/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBStorage.java b/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBStorage.java index 20ab89f..331c7b0 100644 --- a/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBStorage.java +++ b/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBStorage.java @@ -721,8 +721,8 @@ abstract class BDBStorage<Txn, S extends Storable> implements Storage<S>, Storag          }
          info.setDatabaseName(getStorableType().getName());
 -
          Transaction txn = repo.enterTopTransaction(IsolationLevel.READ_COMMITTED);
 +        txn.setForUpdate(true);
          try {
              if (!info.tryLoad()) {
                  if (layout == null) {
 | 
