diff options
Diffstat (limited to 'src')
12 files changed, 726 insertions, 704 deletions
| diff --git a/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCBlob.java b/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCBlob.java index ba5cdcf..69a8f3d 100644 --- a/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCBlob.java +++ b/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCBlob.java @@ -126,7 +126,7 @@ class JDBCBlob extends AbstractBlob implements JDBCLob {                  throw new FetchException("Blob value is null");
              }
              try {
 -                JDBCTransaction txn = mRepo.localTxnManager().getTxn();
 +                JDBCTransaction txn = mRepo.localTxnScope().getTxn();
                  if (txn != null) {
                      txn.register(this);
                  }
 @@ -143,7 +143,7 @@ class JDBCBlob extends AbstractBlob implements JDBCLob {                  if ((mBlob = mLoader.load(mRepo)) == null) {
                      throw new PersistException("Blob value is null");
                  }
 -                JDBCTransaction txn = mRepo.localTxnManager().getTxn();
 +                JDBCTransaction txn = mRepo.localTxnScope().getTxn();
                  if (txn != null) {
                      txn.register(this);
                  }
 diff --git a/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCClob.java b/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCClob.java index c4e74f1..eedaf80 100644 --- a/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCClob.java +++ b/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCClob.java @@ -126,7 +126,7 @@ class JDBCClob extends AbstractClob implements JDBCLob {                  throw new FetchException("Clob value is null");
              }
              try {
 -                JDBCTransaction txn = mRepo.localTxnManager().getTxn();
 +                JDBCTransaction txn = mRepo.localTxnScope().getTxn();
                  if (txn != null) {
                      txn.register(this);
                  }
 @@ -143,7 +143,7 @@ class JDBCClob extends AbstractClob implements JDBCLob {                  if ((mClob = mLoader.load(mRepo)) == null) {
                      throw new PersistException("Clob value is null");
                  }
 -                JDBCTransaction txn = mRepo.localTxnManager().getTxn();
 +                JDBCTransaction txn = mRepo.localTxnScope().getTxn();
                  if (txn != null) {
                      txn.register(this);
                  }
 diff --git a/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCRepository.java b/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCRepository.java index 0d93570..dd4e1ad 100644 --- a/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCRepository.java +++ b/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCRepository.java @@ -55,7 +55,7 @@ import com.amazon.carbonado.info.StorableProperty;  import com.amazon.carbonado.sequence.SequenceCapability;
  import com.amazon.carbonado.sequence.SequenceValueProducer;
  import com.amazon.carbonado.spi.AbstractRepository;
 -import com.amazon.carbonado.spi.TransactionManager;
 +import com.amazon.carbonado.spi.TransactionScope;
  import com.amazon.carbonado.util.ThrowUnchecked;
  /**
 @@ -324,7 +324,7 @@ public class JDBCRepository extends AbstractRepository<JDBCTransaction>       */
      // Is called by auto-generated code and must be public.
      public boolean isTransactionForUpdate() {
 -        return localTransactionManager().isForUpdate();
 +        return localTransactionScope().isForUpdate();
      }
      /**
 @@ -419,7 +419,7 @@ public class JDBCRepository extends AbstractRepository<JDBCTransaction>                  throw new FetchException("Repository is closed");
              }
 -            JDBCTransaction txn = localTransactionManager().getTxn();
 +            JDBCTransaction txn = localTransactionScope().getTxn();
              if (txn != null) {
                  // Return the connection used by the current transaction.
                  return txn.getConnection();
 @@ -656,10 +656,6 @@ public class JDBCRepository extends AbstractRepository<JDBCTransaction>          return mLog;
      }
 -    protected TransactionManager<JDBCTransaction> createTransactionManager() {
 -        return new JDBCTransactionManager(this);
 -    }
 -
      protected <S extends Storable> Storage<S> createStorage(Class<S> type)
          throws RepositoryException
      {
 @@ -701,11 +697,12 @@ public class JDBCRepository extends AbstractRepository<JDBCTransaction>          return mSupportStrategy.createSequenceValueProducer(name);
      }
 -    /**
 -     * Returns the thread-local JDBCTransactionManager, creating it if needed.
 -     */
 -    // Provides access to transaction manager from other classes.
 -    TransactionManager<JDBCTransaction> localTxnManager() {
 -        return localTransactionManager();
 +    protected JDBCTransactionManager createTransactionManager() {
 +        return new JDBCTransactionManager(this);
 +    }
 +
 +    // Provides access to transaction scope from other classes.
 +    final TransactionScope<JDBCTransaction> localTxnScope() {
 +        return localTransactionScope();
      }
  }
 diff --git a/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCStorage.java b/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCStorage.java index 3fbd302..e6fe277 100644 --- a/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCStorage.java +++ b/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCStorage.java @@ -213,7 +213,7 @@ class JDBCStorage<S extends Storable> extends StandardQueryFactory<S>          if (jblob != null) {
              try {
 -                JDBCTransaction txn = mRepository.localTxnManager().getTxn();
 +                JDBCTransaction txn = mRepository.localTxnScope().getTxn();
                  if (txn != null) {
                      txn.register(jblob);
                  }
 @@ -235,7 +235,7 @@ class JDBCStorage<S extends Storable> extends StandardQueryFactory<S>          if (jclob != null) {
              try {
 -                JDBCTransaction txn = mRepository.localTxnManager().getTxn();
 +                JDBCTransaction txn = mRepository.localTxnScope().getTxn();
                  if (txn != null) {
                      txn.register(jclob);
                  }
 @@ -606,7 +606,7 @@ class JDBCStorage<S extends Storable> extends StandardQueryFactory<S>          }
          public Cursor<S> fetch(FilterValues<S> values) throws FetchException {
 -            boolean forUpdate = mRepository.localTxnManager().isForUpdate();
 +            boolean forUpdate = mRepository.localTxnScope().isForUpdate();
              Connection con = mRepository.getConnection();
              try {
                  PreparedStatement ps = con.prepareStatement(prepareSelect(values, forUpdate));
 @@ -674,7 +674,7 @@ class JDBCStorage<S extends Storable> extends StandardQueryFactory<S>              throws IOException
          {
              indent(app, indentLevel);
 -            boolean forUpdate = mRepository.localTxnManager().isForUpdate();
 +            boolean forUpdate = mRepository.localTxnScope().isForUpdate();
              app.append(prepareSelect(values, forUpdate));
              app.append('\n');
              return true;
 @@ -684,7 +684,7 @@ class JDBCStorage<S extends Storable> extends StandardQueryFactory<S>              throws IOException
          {
              try {
 -                boolean forUpdate = mRepository.localTxnManager().isForUpdate();
 +                boolean forUpdate = mRepository.localTxnScope().isForUpdate();
                  String statement = prepareSelect(values, forUpdate);
                  return mRepository.getSupportStrategy().printPlan(app, indentLevel, statement);
              } catch (FetchException e) {
 diff --git a/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCTransactionManager.java b/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCTransactionManager.java index a70e4c3..ca63331 100644 --- a/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCTransactionManager.java +++ b/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCTransactionManager.java @@ -29,8 +29,7 @@ import com.amazon.carbonado.Transaction;  import com.amazon.carbonado.spi.TransactionManager;
  /**
 - * Manages transactions for JDBCRepository. Only one instance is allocated per
 - * thread.
 + * Manages transactions for JDBCRepository.
   *
   * @author Brian S O'Neill
   */
 @@ -46,11 +45,6 @@ class JDBCTransactionManager extends TransactionManager<JDBCTransaction> {          mRepositoryRef = new WeakReference<JDBCRepository>(repository);
      }
 -    @Override
 -    public boolean isForUpdate() {
 -        return super.isForUpdate() && mRepositoryRef.get().supportsSelectForUpdate();
 -    }
 -
      protected IsolationLevel selectIsolationLevel(Transaction parent, IsolationLevel level) {
          JDBCRepository repo = mRepositoryRef.get();
          if (repo == null) {
 @@ -59,6 +53,11 @@ class JDBCTransactionManager extends TransactionManager<JDBCTransaction> {          return repo.selectIsolationLevel(parent, level);
      }
 +    protected boolean supportsForUpdate() {
 +        JDBCRepository repo = mRepositoryRef.get();
 +        return repo != null && repo.supportsSelectForUpdate();
 +    }
 +
      protected JDBCTransaction createTxn(JDBCTransaction parent, IsolationLevel level)
          throws SQLException, FetchException
      {
 diff --git a/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBCursor.java b/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBCursor.java index f16d2f7..b2a1f32 100644 --- a/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBCursor.java +++ b/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBCursor.java @@ -25,7 +25,7 @@ import com.amazon.carbonado.Storable;  import com.amazon.carbonado.raw.RawCursor;
  import com.amazon.carbonado.raw.RawUtil;
 -import com.amazon.carbonado.spi.TransactionManager;
 +import com.amazon.carbonado.spi.TransactionScope;
  /**
   *
 @@ -35,10 +35,10 @@ import com.amazon.carbonado.spi.TransactionManager;  abstract class BDBCursor<Txn, S extends Storable> extends RawCursor<S> {
      private static final byte[] NO_DATA = new byte[0];
 -    private final TransactionManager<Txn> mTxnMgr;
 +    private final TransactionScope<Txn> mScope;
      private final BDBStorage<Txn, S> mStorage;
      /**
 -     * @param txnMgr
 +     * @param scope
       * @param startBound specify the starting key for the cursor, or null if first
       * @param inclusiveStart true if start bound is inclusive
       * @param endBound specify the ending key for the cursor, or null if last
 @@ -50,7 +50,7 @@ abstract class BDBCursor<Txn, S extends Storable> extends RawCursor<S> {       * @throws ClassCastException if lock is not an object passed by
       * {@link BDBStorage#openCursor BDBStorage.openCursor}
       */
 -    protected BDBCursor(TransactionManager<Txn> txnMgr,
 +    protected BDBCursor(TransactionScope<Txn> scope,
                          byte[] startBound, boolean inclusiveStart,
                          byte[] endBound, boolean inclusiveEnd,
                          int maxPrefix,
 @@ -58,19 +58,19 @@ abstract class BDBCursor<Txn, S extends Storable> extends RawCursor<S> {                          BDBStorage<Txn, S> storage)
          throws FetchException
      {
 -        super(txnMgr.getLock(),
 +        super(scope.getLock(),
                startBound, inclusiveStart,
                endBound, inclusiveEnd,
                maxPrefix, reverse);
 -        mTxnMgr = txnMgr;
 +        mScope = scope;
          mStorage = storage;
 -        txnMgr.register(storage.getStorableType(), this);
 +        scope.register(storage.getStorableType(), this);
      }
      void open() throws FetchException {
          try {
 -            cursor_open(mTxnMgr.getTxn(), mTxnMgr.getIsolationLevel());
 +            cursor_open(mScope.getTxn(), mScope.getIsolationLevel());
          } catch (Exception e) {
              throw mStorage.toFetchException(e);
          }
 @@ -80,7 +80,7 @@ abstract class BDBCursor<Txn, S extends Storable> extends RawCursor<S> {          try {
              super.close();
          } finally {
 -            mTxnMgr.unregister(mStorage.getStorableType(), this);
 +            mScope.unregister(mStorage.getStorableType(), this);
          }
      }
 diff --git a/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBRepository.java b/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBRepository.java index 45fe85c..a0c549b 100644 --- a/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBRepository.java +++ b/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBRepository.java @@ -63,7 +63,7 @@ import com.amazon.carbonado.sequence.SequenceValueProducer;  import com.amazon.carbonado.spi.AbstractRepository;
  import com.amazon.carbonado.spi.ExceptionTransformer;
  import com.amazon.carbonado.spi.LobEngine;
 -import com.amazon.carbonado.spi.TransactionManager;
 +import com.amazon.carbonado.spi.TransactionScope;
  /**
   * Repository implementation backed by a Berkeley DB. Data is encoded in the
 @@ -326,10 +326,6 @@ abstract class BDBRepository<Txn> extends AbstractRepository<Txn>          return mLog;
      }
 -    protected TransactionManager createTransactionManager() {
 -        return new BDBTransactionManager(mExTransformer, this);
 -    }
 -
      protected <S extends Storable> Storage createStorage(Class<S> type)
          throws RepositoryException
      {
 @@ -346,6 +342,10 @@ abstract class BDBRepository<Txn> extends AbstractRepository<Txn>          return new SequenceValueGenerator(BDBRepository.this, name);
      }
 +    protected BDBTransactionManager<Txn> createTransactionManager() {
 +        return new BDBTransactionManager<Txn>(mExTransformer, this);
 +    }
 +
      /**
       * @see com.amazon.carbonado.spi.RepositoryBuilder#isMaster
       */
 @@ -489,12 +489,9 @@ abstract class BDBRepository<Txn> extends AbstractRepository<Txn>          return mExTransformer.toRepositoryException(e);
      }
 -    /**
 -     * Returns the thread-local BDBTransactionManager, creating it if needed.
 -     */
 -    // Provides access to transaction manager from other classes.
 -    TransactionManager<Txn> localTxnManager() {
 -        return localTransactionManager();
 +    // Provides access to transaction scope from other classes.
 +    final TransactionScope<Txn> localTxnScope() {
 +        return localTransactionScope();
      }
      /**
 diff --git a/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBStorage.java b/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBStorage.java index 0b9f53c..77ea7d7 100644 --- a/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBStorage.java +++ b/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBStorage.java @@ -78,7 +78,7 @@ import com.amazon.carbonado.sequence.SequenceValueProducer;  import com.amazon.carbonado.spi.IndexInfoImpl;
  import com.amazon.carbonado.spi.LobEngine;
  import com.amazon.carbonado.spi.StorableIndexSet;
 -import com.amazon.carbonado.spi.TransactionManager;
 +import com.amazon.carbonado.spi.TransactionScope;
  import com.amazon.carbonado.spi.TriggerManager;
  /**
 @@ -203,18 +203,18 @@ abstract class BDBStorage<Txn, S extends Storable> implements Storage<S>, Storag              }
          }
 -        TransactionManager<Txn> txnMgr = localTxnManager();
 +        TransactionScope<Txn> scope = localTxnScope();
          // Lock out shutdown task.
 -        txnMgr.getLock().lock();
 +        scope.getLock().lock();
          try {
              try {
 -                db_truncate(txnMgr.getTxn());
 +                db_truncate(scope.getTxn());
              } catch (Exception e) {
                  throw toPersistException(e);
              }
          } finally {
 -            txnMgr.getLock().unlock();
 +            scope.getLock().unlock();
          }
      }
 @@ -317,7 +317,7 @@ abstract class BDBStorage<Txn, S extends Storable> implements Storage<S>, Storag                                   boolean reverseOrder)
          throws FetchException
      {
 -        TransactionManager<Txn> txnMgr = localTxnManager();
 +        TransactionScope<Txn> scope = localTxnScope();
          if (reverseRange) {
              {
 @@ -334,7 +334,7 @@ abstract class BDBStorage<Txn, S extends Storable> implements Storage<S>, Storag          }
          // Lock out shutdown task.
 -        txnMgr.getLock().lock();
 +        scope.getLock().lock();
          try {
              StorableCodec<S> codec = mStorableCodec;
 @@ -381,7 +381,7 @@ abstract class BDBStorage<Txn, S extends Storable> implements Storage<S>, Storag              try {
                  BDBCursor<Txn, S> cursor = openCursor
 -                    (txnMgr,
 +                    (scope,
                       startBound, inclusiveStart,
                       endBound, inclusiveEnd,
                       mStorableCodec.getPrimaryKeyPrefixLength(),
 @@ -394,7 +394,7 @@ abstract class BDBStorage<Txn, S extends Storable> implements Storage<S>, Storag                  throw toFetchException(e);
              }
          } finally {
 -            txnMgr.getLock().unlock();
 +            scope.getLock().unlock();
          }
      }
 @@ -447,15 +447,15 @@ abstract class BDBStorage<Txn, S extends Storable> implements Storage<S>, Storag          boolean isPrimaryEmpty;
          try {
 -            TransactionManager<Txn> txnMgr = mRepository.localTxnManager();
 +            TransactionScope<Txn> scope = mRepository.localTxnScope();
              // Lock out shutdown task.
 -            txnMgr.getLock().lock();
 +            scope.getLock().lock();
              try {
                  primaryDatabase = env_openPrimaryDatabase(openTxn, databaseName);
                  primaryInfo = registerPrimaryDatabase(readOnly, layout);
 -                isPrimaryEmpty = db_isEmpty(null, primaryDatabase, txnMgr.isForUpdate());
 +                isPrimaryEmpty = db_isEmpty(null, primaryDatabase, scope.isForUpdate());
              } finally {
 -                txnMgr.getLock().unlock();
 +                scope.getLock().unlock();
              }
          } catch (Exception e) {
              throw toRepositoryException(e);
 @@ -572,7 +572,7 @@ abstract class BDBStorage<Txn, S extends Storable> implements Storage<S>, Storag          }
          try {
 -            Txn txn = mRepository.localTxnManager().getTxn();
 +            Txn txn = mRepository.localTxnScope().getTxn();
              return db_compact(txn, mPrimaryDatabase, start, end);
          } catch (Exception e) {
              throw mRepository.toRepositoryException(e);
 @@ -637,7 +637,7 @@ abstract class BDBStorage<Txn, S extends Storable> implements Storage<S>, Storag      /**
       * @param txn optional transaction to commit when cursor is closed
 -     * @param txnMgr
 +     * @param scope
       * @param startBound specify the starting key for the cursor, or null if first
       * @param inclusiveStart true if start bound is inclusive
       * @param endBound specify the ending key for the cursor, or null if last
 @@ -647,7 +647,7 @@ abstract class BDBStorage<Txn, S extends Storable> implements Storage<S>, Storag       * @param database database to use
       */
      protected abstract BDBCursor<Txn, S> openCursor
 -        (TransactionManager<Txn> txnMgr,
 +        (TransactionScope<Txn> scope,
           byte[] startBound, boolean inclusiveStart,
           byte[] endBound, boolean inclusiveEnd,
           int maxPrefix,
 @@ -667,8 +667,8 @@ abstract class BDBStorage<Txn, S extends Storable> implements Storage<S>, Storag          return mRepository.toRepositoryException(e);
      }
 -    TransactionManager<Txn> localTxnManager() {
 -        return mRepository.localTxnManager();
 +    TransactionScope<Txn> localTxnScope() {
 +        return mRepository.localTxnScope();
      }
      /**
 @@ -726,15 +726,15 @@ abstract class BDBStorage<Txn, S extends Storable> implements Storage<S>, Storag       * prevent threads from starting work that will likely fail along the way.
       */
      void checkClosed() throws FetchException {
 -        TransactionManager<Txn> txnMgr = localTxnManager();
 +        TransactionScope<Txn> scope = localTxnScope();
          // Lock out shutdown task.
 -        txnMgr.getLock().lock();
 +        scope.getLock().lock();
          try {
              if (mPrimaryDatabase == null) {
                  // If shuting down, this will force us to block forever.
                  try {
 -                    txnMgr.getTxn();
 +                    scope.getTxn();
                  } catch (Exception e) {
                      // Don't care.
                  }
 @@ -742,20 +742,20 @@ abstract class BDBStorage<Txn, S extends Storable> implements Storage<S>, Storag                  throw new FetchException("Repository closed");
              }
          } finally {
 -            txnMgr.getLock().unlock();
 +            scope.getLock().unlock();
          }
      }
      void close() throws Exception {
 -        TransactionManager<Txn> txnMgr = mRepository.localTxnManager();
 -        txnMgr.getLock().lock();
 +        TransactionScope<Txn> scope = mRepository.localTxnScope();
 +        scope.getLock().lock();
          try {
              if (mPrimaryDatabase != null) {
                  db_close(mPrimaryDatabase);
                  mPrimaryDatabase = null;
              }
          } finally {
 -            txnMgr.getLock().unlock();
 +            scope.getLock().unlock();
          }
      }
 @@ -998,18 +998,18 @@ abstract class BDBStorage<Txn, S extends Storable> implements Storage<S>, Storag          }
          public byte[] tryLoad(byte[] key) throws FetchException {
 -            TransactionManager<Txn> txnMgr = mStorage.localTxnManager();
 +            TransactionScope<Txn> scope = mStorage.localTxnScope();
              byte[] result;
              // Lock out shutdown task.
 -            txnMgr.getLock().lock();
 +            scope.getLock().lock();
              try {
                  try {
 -                    result = mStorage.db_get(txnMgr.getTxn(), key, txnMgr.isForUpdate());
 +                    result = mStorage.db_get(scope.getTxn(), key, scope.isForUpdate());
                  } catch (Throwable e) {
                      throw mStorage.toFetchException(e);
                  }
              } finally {
 -                txnMgr.getLock().unlock();
 +                scope.getLock().unlock();
              }
              if (result == NOT_FOUND) {
                  return null;
 @@ -1021,18 +1021,18 @@ abstract class BDBStorage<Txn, S extends Storable> implements Storage<S>, Storag          }
          public boolean tryInsert(S storable, byte[] key, byte[] value) throws PersistException {
 -            TransactionManager<Txn> txnMgr = mStorage.localTxnManager();
 +            TransactionScope<Txn> scope = mStorage.localTxnScope();
              Object result;
              // Lock out shutdown task.
 -            txnMgr.getLock().lock();
 +            scope.getLock().lock();
              try {
                  try {
 -                    result = mStorage.db_putNoOverwrite(txnMgr.getTxn(), key, value);
 +                    result = mStorage.db_putNoOverwrite(scope.getTxn(), key, value);
                  } catch (Throwable e) {
                      throw mStorage.toPersistException(e);
                  }
              } finally {
 -                txnMgr.getLock().unlock();
 +                scope.getLock().unlock();
              }
              if (result == KEY_EXIST) {
                  return false;
 @@ -1044,34 +1044,34 @@ abstract class BDBStorage<Txn, S extends Storable> implements Storage<S>, Storag          }
          public void store(S storable, byte[] key, byte[] value) throws PersistException {
 -            TransactionManager<Txn> txnMgr = mStorage.localTxnManager();
 +            TransactionScope<Txn> scope = mStorage.localTxnScope();
              // Lock out shutdown task.
 -            txnMgr.getLock().lock();
 +            scope.getLock().lock();
              try {
                  try {
 -                    if (!mStorage.db_put(txnMgr.getTxn(), key, value)) {
 +                    if (!mStorage.db_put(scope.getTxn(), key, value)) {
                          throw new PersistException("Failed");
                      }
                  } catch (Throwable e) {
                      throw mStorage.toPersistException(e);
                  }
              } finally {
 -                txnMgr.getLock().unlock();
 +                scope.getLock().unlock();
              }
          }
          public boolean tryDelete(byte[] key) throws PersistException {
 -            TransactionManager<Txn> txnMgr = mStorage.localTxnManager();
 +            TransactionScope<Txn> scope = mStorage.localTxnScope();
              // Lock out shutdown task.
 -            txnMgr.getLock().lock();
 +            scope.getLock().lock();
              try {
                  try {
 -                    return mStorage.db_delete(txnMgr.getTxn(), key);
 +                    return mStorage.db_delete(scope.getTxn(), key);
                  } catch (Throwable e) {
                      throw mStorage.toPersistException(e);
                  }
              } finally {
 -                txnMgr.getLock().unlock();
 +                scope.getLock().unlock();
              }
          }
 diff --git a/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBTransactionManager.java b/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBTransactionManager.java index f9b0c6c..5e0af3b 100644 --- a/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBTransactionManager.java +++ b/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBTransactionManager.java @@ -29,9 +29,7 @@ import com.amazon.carbonado.spi.ExceptionTransformer;  import com.amazon.carbonado.spi.TransactionManager;
  /**
 - * This class is used for tracking transactions and open cursors. Each
 - * thread that uses the BDBRepository instance is assigned at most one
 - * BDBTransactionManager instance.
 + * This class is used for tracking transactions and open cursors.
   *
   * @author Brian S O'Neill
   */
 @@ -51,6 +49,10 @@ class BDBTransactionManager<Txn> extends TransactionManager<Txn> {          return repository().selectIsolationLevel(parent, level);
      }
 +    protected boolean supportsForUpdate() {
 +        return true;
 +    }
 +
      protected Txn createTxn(Txn parent, IsolationLevel level) throws Exception {
          if (level == IsolationLevel.NONE) {
              return null;
 diff --git a/src/main/java/com/amazon/carbonado/spi/AbstractRepository.java b/src/main/java/com/amazon/carbonado/spi/AbstractRepository.java index 0dc1e74..7150f56 100644 --- a/src/main/java/com/amazon/carbonado/spi/AbstractRepository.java +++ b/src/main/java/com/amazon/carbonado/spi/AbstractRepository.java @@ -28,8 +28,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;  import org.apache.commons.logging.Log;
 -import org.cojen.util.WeakIdentityMap;
 -
  import com.amazon.carbonado.IsolationLevel;
  import com.amazon.carbonado.Repository;
  import com.amazon.carbonado.RepositoryException;
 @@ -56,13 +54,9 @@ public abstract class AbstractRepository<Txn>      implements Repository, ShutdownCapability, SequenceCapability
  {
      private final String mName;
 +    private final TransactionManager<Txn> mTxnMgr;
      private final ReadWriteLock mShutdownLock;
 -    private final ThreadLocal<TransactionManager<Txn>> mCurrentTxnMgr;
 -
 -    // Weakly tracks all TransactionManager instances for shutdown hook.
 -    private final Map<TransactionManager<Txn>, ?> mAllTxnMgrs;
 -
      private final StoragePool mStoragePool;
      private final SequenceValueProducerPool mSequencePool;
 @@ -74,10 +68,9 @@ public abstract class AbstractRepository<Txn>          if (name == null) {
              throw new IllegalArgumentException("Repository name cannot be null");
          }
 +
          mName = name;
          mShutdownLock = new ReentrantReadWriteLock();
 -        mCurrentTxnMgr = new ThreadLocal<TransactionManager<Txn>>();
 -        mAllTxnMgrs = new WeakIdentityMap();
          mStoragePool = new StoragePool() {
              protected <S extends Storable> Storage<S> createStorage(Class<S> type)
 @@ -104,6 +97,8 @@ public abstract class AbstractRepository<Txn>                  }
              }
          };
 +
 +        mTxnMgr = createTransactionManager();
      }
      public String getName() {
 @@ -117,19 +112,19 @@ public abstract class AbstractRepository<Txn>      }
      public Transaction enterTransaction() {
 -        return localTransactionManager().enter(null);
 +        return mTxnMgr.localTransactionScope().enter(null);
      }
      public Transaction enterTransaction(IsolationLevel level) {
 -        return localTransactionManager().enter(level);
 +        return mTxnMgr.localTransactionScope().enter(level);
      }
      public Transaction enterTopTransaction(IsolationLevel level) {
 -        return localTransactionManager().enterTop(level);
 +        return mTxnMgr.localTransactionScope().enterTop(level);
      }
      public IsolationLevel getTransactionIsolationLevel() {
 -        return localTransactionManager().getIsolationLevel();
 +        return mTxnMgr.localTransactionScope().getIsolationLevel();
      }
      /**
 @@ -212,21 +207,17 @@ public abstract class AbstractRepository<Txn>      }
      /**
 -     * Returns the thread-local TransactionManager, creating it if needed.
 +     * Returns the TransactionManager which was passed into the constructor.
       */
 -    protected TransactionManager<Txn> localTransactionManager() {
 -        TransactionManager<Txn> txnMgr = mCurrentTxnMgr.get();
 -        if (txnMgr == null) {
 -            lockoutShutdown();
 -            try {
 -                txnMgr = createTransactionManager();
 -                mCurrentTxnMgr.set(txnMgr);
 -                mAllTxnMgrs.put(txnMgr, null);
 -            } finally {
 -                unlockoutShutdown();
 -            }
 -        }
 -        return txnMgr;
 +    protected TransactionManager<Txn> transactionManager() {
 +        return mTxnMgr;
 +    }
 +
 +    /**
 +     * Returns the thread-local TransactionScope, creating it if needed.
 +     */
 +    protected TransactionScope<Txn> localTransactionScope() {
 +        return mTxnMgr.localTransactionScope();
      }
      /**
 @@ -279,11 +270,6 @@ public abstract class AbstractRepository<Txn>      protected abstract Log getLog();
      /**
 -     * Called upon to create a new thread-local TransactionManager instance.
 -     */
 -    protected abstract TransactionManager<Txn> createTransactionManager();
 -
 -    /**
       * Called upon to create a new Storage instance.
       */
      protected abstract <S extends Storable> Storage<S> createStorage(Class<S> type)
 @@ -295,6 +281,11 @@ public abstract class AbstractRepository<Txn>      protected abstract SequenceValueProducer createSequenceValueProducer(String name)
          throws RepositoryException;
 +    /**
 +     * Called upon to create a new TransactionManager instance.
 +     */
 +    protected abstract TransactionManager<Txn> createTransactionManager();
 +
      void info(String message) {
          Log log = getLog();
          if (log != null) {
 @@ -345,19 +336,10 @@ public abstract class AbstractRepository<Txn>                  // Return unused sequence values.
                  repository.mSequencePool.returnReservedValues(null);
 -                // Close transactions and cursors.
 -                for (TransactionManager<?> txnMgr : repository.mAllTxnMgrs.keySet()) {
 -                    if (suspendThreads) {
 -                        // Lock transaction manager but don't release it. This
 -                        // prevents other threads from beginning work during
 -                        // shutdown, which will likely fail along the way.
 -                        txnMgr.getLock().lock();
 -                    }
 -                    try {
 -                        txnMgr.close();
 -                    } catch (Throwable e) {
 -                        repository.error("Failed to close TransactionManager", e);
 -                    }
 +                try {
 +                    repository.mTxnMgr.close(suspendThreads);
 +                } catch (Throwable e) {
 +                    repository.error("Failed to close TransactionManager", e);
                  }
                  repository.shutdownHook();
 diff --git a/src/main/java/com/amazon/carbonado/spi/TransactionManager.java b/src/main/java/com/amazon/carbonado/spi/TransactionManager.java index fe08a01..8820974 100644 --- a/src/main/java/com/amazon/carbonado/spi/TransactionManager.java +++ b/src/main/java/com/amazon/carbonado/spi/TransactionManager.java @@ -18,280 +18,82 @@  package com.amazon.carbonado.spi;
 -import java.util.IdentityHashMap;
  import java.util.Map;
  import java.util.concurrent.TimeUnit;
 -import java.util.concurrent.locks.Lock;
 -import java.util.concurrent.locks.ReentrantLock;
 -import com.amazon.carbonado.Cursor;
 -import com.amazon.carbonado.FetchException;
 +import org.cojen.util.WeakIdentityMap;
 +
  import com.amazon.carbonado.IsolationLevel;
  import com.amazon.carbonado.PersistException;
  import com.amazon.carbonado.RepositoryException;
 -import com.amazon.carbonado.Storable;
  import com.amazon.carbonado.Transaction;
  /**
 - * Generic transaction manager for repositories. Repositories should only have
 - * thread local instances.
 + * Generic transaction manager for repositories.
   *
 + * @param <Txn> Transaction type
   * @author Brian S O'Neill
   */
  public abstract class TransactionManager<Txn> {
 +    private static final int NOT_CLOSED = 0, CLOSED = 1, SUSPENDED = 2;
 -    final Lock mLock;
 -    final ExceptionTransformer mExTransformer;
 -
 -    TransactionImpl<Txn> mCurrent;
 -
 -    // Tracks all registered cursors by storage type.
 -    private Map<Class<?>, CursorList<TransactionImpl<Txn>>> mCursors;
 +    private final ThreadLocal<TransactionScope<Txn>> mCurrentScope;
 +    private final Map<TransactionScope<Txn>, ?> mAllScopes;
 -    private boolean mClosed;
 -
 -    /**
 -     * @deprecated
 -     */
 -    public TransactionManager(ExceptionTransformer exTransformer) {
 -        // The use of a fair lock is essential for shutdown hooks that attempt
 -        // to acquire the locks of all TransactionManagers. Otherwise, the
 -        // shutdown can take a long time.
 -        mLock = new ReentrantLock(true);
 -        mExTransformer = exTransformer;
 -    }
 +    private int mClosedState;
      public TransactionManager() {
 -        this(null);
 -    }
 -
 -    /**
 -     * Returns the exception transformer in use.
 -     *
 -     * @deprecated
 -     */
 -    public ExceptionTransformer getExceptionTransformer() {
 -        return mExTransformer;
 -    }
 -
 -    /**
 -     * Enters a new transaction scope.
 -     *
 -     * @param level desired isolation level (may be null)
 -     * @throws UnsupportedOperationException if isolation level higher than
 -     * supported by repository
 -     */
 -    public Transaction enter(IsolationLevel level) {
 -        mLock.lock();
 -        try {
 -            TransactionImpl<Txn> parent = mCurrent;
 -            IsolationLevel actualLevel = selectIsolationLevel(parent, level);
 -            if (actualLevel == null) {
 -                if (parent == null) {
 -                    throw new UnsupportedOperationException
 -                        ("Desired isolation level not supported: " + level);
 -                } else {
 -                    throw new UnsupportedOperationException
 -                        ("Desired isolation level not supported: " + level
 -                         + "; parent isolation level: " + parent.getIsolationLevel());
 -                }
 -            }
 -
 -            return mCurrent = new TransactionImpl<Txn>(this, parent, false, actualLevel);
 -        } finally {
 -            mLock.unlock();
 -        }
 +        mCurrentScope = new ThreadLocal<TransactionScope<Txn>>();
 +        mAllScopes = new WeakIdentityMap();
      }
      /**
 -     * Enters a new top-level transaction scope.
 -     *
 -     * @param level desired isolation level (may be null)
 -     * @throws UnsupportedOperationException if isolation level higher than
 -     * supported by repository
 +     * Returns the thread-local TransactionScope, creating it if needed.
       */
 -    public Transaction enterTop(IsolationLevel level) {
 -        mLock.lock();
 -        try {
 -            IsolationLevel actualLevel = selectIsolationLevel(null, level);
 -            if (actualLevel == null) {
 -                throw new UnsupportedOperationException
 -                    ("Desired isolation level not supported: " + level);
 +    public TransactionScope<Txn> localTransactionScope() {
 +        TransactionScope<Txn> scope = mCurrentScope.get();
 +        if (scope == null) {
 +            int closedState;
 +            synchronized (this) {
 +                closedState = mClosedState;
 +                scope = new TransactionScope<Txn>(this, closedState != NOT_CLOSED);
 +                mAllScopes.put(scope, null);
              }
 -
 -            return mCurrent = new TransactionImpl<Txn>(this, mCurrent, true, actualLevel);
 -        } finally {
 -            mLock.unlock();
 -        }
 -    }
 -
 -    /**
 -     * Registers the given cursor against the current transaction, allowing
 -     * it to be closed on transaction exit or transaction manager close. If
 -     * there is no current transaction scope, the cursor is registered as not
 -     * part of a transaction. Cursors should register when created.
 -     */
 -    public <S extends Storable> void register(Class<S> type, Cursor<S> cursor) {
 -        mLock.lock();
 -        try {
 -            checkState();
 -            if (mCursors == null) {
 -                mCursors = new IdentityHashMap<Class<?>, CursorList<TransactionImpl<Txn>>>();
 +            mCurrentScope.set(scope);
 +            if (closedState == SUSPENDED) {
 +                // Immediately suspend new scope.
 +                scope.getLock().lock();
              }
 -
 -            CursorList<TransactionImpl<Txn>> cursorList = mCursors.get(type);
 -            if (cursorList == null) {
 -                cursorList = new CursorList<TransactionImpl<Txn>>();
 -                mCursors.put(type, cursorList);
 -            }
 -
 -            cursorList.register(cursor, mCurrent);
 -
 -            if (mCurrent != null) {
 -                mCurrent.register(cursor);
 -            }
 -        } finally {
 -            mLock.unlock();
          }
 +        return scope;
      }
      /**
 -     * Unregisters a previously registered cursor. Cursors should unregister
 -     * when closed.
 -     */
 -    public <S extends Storable> void unregister(Class<S> type, Cursor<S> cursor) {
 -        mLock.lock();
 -        try {
 -            if (mCursors != null) {
 -                CursorList<TransactionImpl<Txn>> cursorList = mCursors.get(type);
 -                if (cursorList != null) {
 -                    TransactionImpl<Txn> txnImpl = cursorList.unregister(cursor);
 -                    if (txnImpl != null) {
 -                        txnImpl.unregister(cursor);
 -                    }
 -                }
 -            }
 -        } finally {
 -            mLock.unlock();
 -        }
 -    }
 -
 -    /**
 -     * Returns the count of registered cursors of a specific type.
 -     */
 -    public <S extends Storable> int getRegisteredCount(Class<S> type) {
 -        mLock.lock();
 -        try {
 -            if (mCursors != null) {
 -                CursorList<TransactionImpl<Txn>> cursorList = mCursors.get(type);
 -                if (cursorList != null) {
 -                    return cursorList.size();
 -                }
 -            }
 -        } finally {
 -            mLock.unlock();
 -        }
 -        return 0;
 -    }
 -
 -    /**
 -     * Returns a registered cursor of the given type, or null if none at given index.
 -     */
 -    @SuppressWarnings("unchecked")
 -    public <S extends Storable> Cursor<S> getRegisteredCursor(Class<S> type, int index) {
 -        mLock.lock();
 -        try {
 -            if (mCursors != null) {
 -                CursorList<TransactionImpl<Txn>> cursorList = mCursors.get(type);
 -                if (cursorList != null) {
 -                    if (index < cursorList.size()) {
 -                        return (Cursor<S>) cursorList.getCursor(index);
 -                    }
 -                }
 -            }
 -        } finally {
 -            mLock.unlock();
 -        }
 -        return null;
 -    }
 -
 -    /**
 -     * Returns lock used by TransactionManager. While holding lock, operations
 -     * are suspended.
 -     */
 -    public Lock getLock() {
 -        return mLock;
 -    }
 -
 -    /**
 -     * Exits all transactions and closes all cursors. Should be called only
 -     * when repository is closed.
 -     */
 -    public void close() throws RepositoryException {
 -        mLock.lock();
 -        try {
 -            if (!mClosed) {
 -                while (mCurrent != null) {
 -                    mCurrent.exit();
 -                }
 -                if (mCursors != null) {
 -                    for (CursorList<TransactionImpl<Txn>> cursorList : mCursors.values()) {
 -                        cursorList.closeCursors();
 -                    }
 -                }
 -            }
 -        } finally {
 -            mClosed = true;
 -            mLock.unlock();
 -        }
 -    }
 -
 -    /**
 -     * Returns null if no transaction is in progress.
 +     * Closes all transaction scopes. Should be called only when repository is
 +     * closed.
       *
 -     * @throws Exception thrown by createTxn or reuseTxn
 +     * @param suspend when true, indefinitely suspend all threads interacting
 +     * with transactions
       */
 -    public Txn getTxn() throws Exception {
 -        mLock.lock();
 -        try {
 -            checkState();
 -            return mCurrent == null ? null : mCurrent.getTxn();
 -        } finally {
 -            mLock.unlock();
 +    public synchronized void close(boolean suspend) throws RepositoryException {
 +        if (mClosedState == SUSPENDED) {
 +            // If suspended, attempting to close again will likely deadlock.
 +            return;
          }
 -    }
 -    /**
 -     * Returns true if a transaction is in progress and it is for update.
 -     */
 -    public boolean isForUpdate() {
 -        mLock.lock();
 -        try {
 -            return (mClosed || mCurrent == null) ? false : mCurrent.isForUpdate();
 -        } finally {
 -            mLock.unlock();
 +        if (suspend) {
 +            for (TransactionScope<?> scope : mAllScopes.keySet()) {
 +                // Lock scope but don't release it. This prevents other threads
 +                // from beginning work during shutdown, which will likely fail
 +                // along the way.
 +                scope.getLock().lock();
 +            }
          }
 -    }
 -    /**
 -     * Returns the isolation level of the current transaction, or null if there
 -     * is no transaction in the current thread.
 -     */
 -    public IsolationLevel getIsolationLevel() {
 -        mLock.lock();
 -        try {
 -            return (mClosed || mCurrent == null) ? null : mCurrent.getIsolationLevel();
 -        } finally {
 -            mLock.unlock();
 -        }
 -    }
 +        mClosedState = suspend ? SUSPENDED : CLOSED;
 -    /**
 -     * Caller must hold mLock.
 -     */
 -    private void checkState() {
 -        if (mClosed) {
 -            throw new IllegalStateException("Repository is closed");
 +        for (TransactionScope<?> scope : mAllScopes.keySet()) {
 +            scope.close();
          }
      }
 @@ -306,6 +108,13 @@ public abstract class TransactionManager<Txn> {                                                             IsolationLevel level);
      /**
 +     * Return true if transactions support "for update" mode.
 +     *
 +     * @since 1.2
 +     */
 +    protected abstract boolean supportsForUpdate();
 +
 +    /**
       * Creates an internal transaction representation, with the optional parent
       * transaction. If parent is not null and real nested transactions are not
       * supported, simply return parent transaction for supporting fake nested
 @@ -363,330 +172,4 @@ public abstract class TransactionManager<Txn> {       * Aborts and closes the given internal transaction.
       */
      protected abstract void abortTxn(Txn txn) throws PersistException;
 -
 -    private static class TransactionImpl<Txn> implements Transaction {
 -        private final TransactionManager<Txn> mTxnMgr;
 -        private final TransactionImpl<Txn> mParent;
 -        private final boolean mTop;
 -        private final IsolationLevel mLevel;
 -
 -        private boolean mForUpdate;
 -        private int mDesiredLockTimeout;
 -        private TimeUnit mTimeoutUnit;
 -
 -        private TransactionImpl<Txn> mChild;
 -        private boolean mExited;
 -        private Txn mTxn;
 -
 -        // Tracks all registered cursors.
 -        private CursorList<?> mCursorList;
 -
 -        TransactionImpl(TransactionManager<Txn> txnMgr,
 -                        TransactionImpl<Txn> parent,
 -                        boolean top,
 -                        IsolationLevel level) {
 -            mTxnMgr = txnMgr;
 -            mParent = parent;
 -            mTop = top;
 -            mLevel = level;
 -            if (!top && parent != null) {
 -                parent.mChild = this;
 -                mDesiredLockTimeout = parent.mDesiredLockTimeout;
 -                mTimeoutUnit = parent.mTimeoutUnit;
 -            }
 -        }
 -
 -        public void commit() throws PersistException {
 -            TransactionManager<Txn> txnMgr = mTxnMgr;
 -            txnMgr.mLock.lock();
 -            try {
 -                if (!mExited) {
 -                    if (mChild != null) {
 -                        mChild.commit();
 -                    }
 -
 -                    closeCursors();
 -
 -                    if (mTxn != null) {
 -                        if (mParent == null || mParent.mTxn != mTxn) {
 -                            try {
 -                                if (!txnMgr.commitTxn(mTxn)) {
 -                                    mTxn = null;
 -                                }
 -                            } catch (Throwable e) {
 -                                mTxn = null;
 -                                if (txnMgr.mExTransformer != null) {
 -                                    throw txnMgr.mExTransformer.toPersistException(e);
 -                                }
 -                                throw ExceptionTransformer.getInstance().toPersistException(e);
 -                            }
 -                        } else {
 -                            // Indicate fake nested transaction committed.
 -                            mTxn = null;
 -                        }
 -                    }
 -                }
 -            } finally {
 -                txnMgr.mLock.unlock();
 -            }
 -        }
 -
 -        public void exit() throws PersistException {
 -            TransactionManager<Txn> txnMgr = mTxnMgr;
 -            txnMgr.mLock.lock();
 -            try {
 -                if (!mExited) {
 -                    if (mChild != null) {
 -                        mChild.exit();
 -                    }
 -
 -                    closeCursors();
 -
 -                    if (mTxn != null) {
 -                        try {
 -                            if (mParent == null || mParent.mTxn != mTxn) {
 -                                try {
 -                                    txnMgr.abortTxn(mTxn);
 -                                } catch (Throwable e) {
 -                                    if (txnMgr.mExTransformer != null) {
 -                                        throw txnMgr.mExTransformer.toPersistException(e);
 -                                    }
 -                                    throw ExceptionTransformer.getInstance().toPersistException(e);
 -                                }
 -                            }
 -                        } finally {
 -                            mTxn = null;
 -                        }
 -                    }
 -
 -                    txnMgr.mCurrent = mParent;
 -
 -                    mExited = true;
 -                }
 -            } finally {
 -                txnMgr.mLock.unlock();
 -            }
 -        }
 -
 -        public void setForUpdate(boolean forUpdate) {
 -            mForUpdate = forUpdate;
 -        }
 -
 -        public boolean isForUpdate() {
 -            return mForUpdate;
 -        }
 -
 -        public void setDesiredLockTimeout(int timeout, TimeUnit unit) {
 -            TransactionManager<Txn> txnMgr = mTxnMgr;
 -            txnMgr.mLock.lock();
 -            try {
 -                if (timeout < 0) {
 -                    mDesiredLockTimeout = 0;
 -                    mTimeoutUnit = null;
 -                } else {
 -                    mDesiredLockTimeout = timeout;
 -                    mTimeoutUnit = unit;
 -                }
 -            } finally {
 -                txnMgr.mLock.unlock();
 -            }
 -        }
 -
 -        public IsolationLevel getIsolationLevel() {
 -            return mLevel;
 -        }
 -
 -        // Caller must hold mLock.
 -        <S extends Storable> void register(Cursor<S> cursor) {
 -            if (mCursorList == null) {
 -                mCursorList = new CursorList<Object>();
 -            }
 -            mCursorList.register(cursor, null);
 -        }
 -
 -        // Caller must hold mLock.
 -        <S extends Storable> void unregister(Cursor<S> cursor) {
 -            if (mCursorList != null) {
 -                mCursorList.unregister(cursor);
 -            }
 -        }
 -
 -        // Caller must hold mLock.
 -        Txn getTxn() throws Exception {
 -            TransactionManager<Txn> txnMgr = mTxnMgr;
 -            if (mTxn != null) {
 -                txnMgr.reuseTxn(mTxn);
 -            } else {
 -                Txn parentTxn;
 -                if (mParent == null || mTop) {
 -                    parentTxn = null;
 -                } else if ((parentTxn = mParent.mTxn) == null) {
 -                    // No point in creating nested transaction if parent
 -                    // has never been used. Create parent transaction
 -                    // and use it in child transaction, just like a fake
 -                    // nested transaction.
 -                    if ((parentTxn = mParent.getTxn()) != null) {
 -                        return mTxn = parentTxn;
 -                    }
 -                    // Isolation level of parent is none, so proceed to create
 -                    // a real transaction.
 -                }
 -                if (mTimeoutUnit == null) {
 -                    mTxn = txnMgr.createTxn(parentTxn, mLevel);
 -                } else {
 -                    mTxn = txnMgr.createTxn(parentTxn, mLevel, mDesiredLockTimeout, mTimeoutUnit);
 -                }
 -            }
 -            return mTxn;
 -        }
 -
 -        // Caller must hold mLock.
 -        private void closeCursors() throws PersistException {
 -            if (mCursorList != null) {
 -                mCursorList.closeCursors();
 -            }
 -        }
 -    }
 -
 -    /**
 -     * Simple fast list/map for holding a small amount of cursors.
 -     */
 -    static class CursorList<V> {
 -        private int mSize;
 -        private Cursor<?>[] mCursors;
 -        private V[] mValues;
 -
 -        CursorList() {
 -            mCursors = new Cursor[8];
 -        }
 -
 -        /**
 -         * @param value optional value to associate
 -         */
 -        @SuppressWarnings("unchecked")
 -        void register(Cursor<?> cursor, V value) {
 -            int size = mSize;
 -            Cursor<?>[] cursors = mCursors;
 -
 -            if (size == cursors.length) {
 -                int newLength = size << 1;
 -
 -                Cursor<?>[] newCursors = new Cursor[newLength];
 -                System.arraycopy(cursors, 0, newCursors, 0, size);
 -                mCursors = cursors = newCursors;
 -
 -                if (mValues != null) {
 -                    V[] newValues = (V[]) new Object[newLength];
 -                    System.arraycopy(mValues, 0, newValues, 0, size);
 -                    mValues = newValues;
 -                }
 -            }
 -
 -            cursors[size] = cursor;
 -
 -            if (value != null) {
 -                V[] values = mValues;
 -                if (values == null) {
 -                    mValues = values = (V[]) new Object[cursors.length];
 -                }
 -                values[size] = value;
 -            }
 -
 -            mSize = size + 1;
 -        }
 -
 -        V unregister(Cursor<?> cursor) {
 -            // Assuming that cursors are opened and closed in LIFO order
 -            // (stack order), search backwards to optimize.
 -            Cursor<?>[] cursors = mCursors;
 -            int size = mSize;
 -            int i = size;
 -            search: {
 -                while (--i >= 0) {
 -                    if (cursors[i] == cursor) {
 -                        break search;
 -                    }
 -                }
 -                // Not found.
 -                return null;
 -            }
 -
 -            V[] values = mValues;
 -            V value;
 -
 -            if (values == null) {
 -                value = null;
 -                if (i == size - 1) {
 -                    // Clear reference so that it can be garbage collected.
 -                    cursors[i] = null;
 -                } else {
 -                    // Shift array elements down.
 -                    System.arraycopy(cursors, i + 1, cursors, i, size - i - 1);
 -                }
 -            } else {
 -                value = values[i];
 -                if (i == size - 1) {
 -                    // Clear references so that they can be garbage collected.
 -                    cursors[i] = null;
 -                    values[i] = null;
 -                } else {
 -                    // Shift array elements down.
 -                    System.arraycopy(cursors, i + 1, cursors, i, size - i - 1);
 -                    System.arraycopy(values, i + 1, values, i, size - i - 1);
 -                }
 -            }
 -
 -            mSize = size - 1;
 -            return value;
 -        }
 -
 -        int size() {
 -            return mSize;
 -        }
 -
 -        Cursor<?> getCursor(int index) {
 -            return mCursors[index];
 -        }
 -
 -        V getValue(int index) {
 -            V[] values = mValues;
 -            return values == null ? null : values[index];
 -        }
 -
 -        /**
 -         * Closes all cursors and resets the size of this list to 0.
 -         */
 -        void closeCursors() throws PersistException {
 -            // Note: Iteration must be in reverse order. Calling close on the
 -            // cursor should cause it to unregister from this list. This will
 -            // cause only a modification to the end of the list, which is no
 -            // longer needed by this method.
 -            try {
 -                Cursor<?>[] cursors = mCursors;
 -                V[] values = mValues;
 -                int i = mSize;
 -                if (values == null) {
 -                    while (--i >= 0) {
 -                        Cursor<?> cursor = cursors[i];
 -                        if (cursor != null) {
 -                            cursor.close();
 -                            cursors[i] = null;
 -                        }
 -                    }
 -                } else {
 -                    while (--i >= 0) {
 -                        Cursor<?> cursor = cursors[i];
 -                        if (cursor != null) {
 -                            cursor.close();
 -                            cursors[i] = null;
 -                            values[i] = null;
 -                        }
 -                    }
 -                }
 -            } catch (FetchException e) {
 -                throw e.toPersistException();
 -            }
 -            mSize = 0;
 -        }
 -    }
  }
 diff --git a/src/main/java/com/amazon/carbonado/spi/TransactionScope.java b/src/main/java/com/amazon/carbonado/spi/TransactionScope.java new file mode 100644 index 0000000..2031d68 --- /dev/null +++ b/src/main/java/com/amazon/carbonado/spi/TransactionScope.java @@ -0,0 +1,562 @@ +/*
 + * Copyright 2008 Amazon Technologies, Inc. or its affiliates.
 + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
 + * of Amazon Technologies, Inc. or its affiliates.  All rights reserved.
 + *
 + * Licensed under the Apache License, Version 2.0 (the "License");
 + * you may not use this file except in compliance with the License.
 + * You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package com.amazon.carbonado.spi;
 +
 +import java.util.IdentityHashMap;
 +import java.util.Map;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.locks.Lock;
 +import java.util.concurrent.locks.ReentrantLock;
 +
 +import com.amazon.carbonado.Cursor;
 +import com.amazon.carbonado.FetchException;
 +import com.amazon.carbonado.IsolationLevel;
 +import com.amazon.carbonado.PersistException;
 +import com.amazon.carbonado.RepositoryException;
 +import com.amazon.carbonado.Storable;
 +import com.amazon.carbonado.Transaction;
 +
 +/**
 + * Container of thread local, scoped transactions.
 + *
 + * @param <Txn> Transaction type
 + * @author Brian S O'Neill
 + * @since 1.2
 + * @see TransactionManager
 + */
 +public class TransactionScope<Txn> {
 +    final TransactionManager<Txn> mTxnMgr;
 +    final Lock mLock;
 +
 +    TransactionImpl<Txn> mCurrent;
 +
 +    // Tracks all registered cursors by storage type.
 +    private Map<Class<?>, CursorList<TransactionImpl<Txn>>> mCursors;
 +
 +    private boolean mClosed;
 +
 +    TransactionScope(TransactionManager<Txn> txnMgr, boolean closed) {
 +	mTxnMgr = txnMgr;
 +        mLock = new ReentrantLock(true);
 +	mClosed = closed;
 +    }
 +
 +    /**
 +     * Enters a new transaction scope.
 +     *
 +     * @param level desired isolation level (may be null)
 +     * @throws UnsupportedOperationException if isolation level higher than
 +     * supported by repository
 +     */
 +    public Transaction enter(IsolationLevel level) {
 +        mLock.lock();
 +        try {
 +            TransactionImpl<Txn> parent = mCurrent;
 +            IsolationLevel actualLevel = mTxnMgr.selectIsolationLevel(parent, level);
 +            if (actualLevel == null) {
 +                if (parent == null) {
 +                    throw new UnsupportedOperationException
 +                        ("Desired isolation level not supported: " + level);
 +                } else {
 +                    throw new UnsupportedOperationException
 +                        ("Desired isolation level not supported: " + level
 +                         + "; parent isolation level: " + parent.getIsolationLevel());
 +                }
 +            }
 +
 +            return mCurrent = new TransactionImpl<Txn>(this, parent, false, actualLevel);
 +        } finally {
 +            mLock.unlock();
 +        }
 +    }
 +
 +    /**
 +     * Enters a new top-level transaction scope.
 +     *
 +     * @param level desired isolation level (may be null)
 +     * @throws UnsupportedOperationException if isolation level higher than
 +     * supported by repository
 +     */
 +    public Transaction enterTop(IsolationLevel level) {
 +        mLock.lock();
 +        try {
 +            IsolationLevel actualLevel = mTxnMgr.selectIsolationLevel(null, level);
 +            if (actualLevel == null) {
 +                throw new UnsupportedOperationException
 +                    ("Desired isolation level not supported: " + level);
 +            }
 +
 +            return mCurrent = new TransactionImpl<Txn>(this, mCurrent, true, actualLevel);
 +        } finally {
 +            mLock.unlock();
 +        }
 +    }
 +
 +    /**
 +     * Registers the given cursor against the current transaction, allowing
 +     * it to be closed on transaction exit or transaction manager close. If
 +     * there is no current transaction scope, the cursor is registered as not
 +     * part of a transaction. Cursors should register when created.
 +     */
 +    public <S extends Storable> void register(Class<S> type, Cursor<S> cursor) {
 +        mLock.lock();
 +        try {
 +            checkState();
 +            if (mCursors == null) {
 +                mCursors = new IdentityHashMap<Class<?>, CursorList<TransactionImpl<Txn>>>();
 +            }
 +
 +            CursorList<TransactionImpl<Txn>> cursorList = mCursors.get(type);
 +            if (cursorList == null) {
 +                cursorList = new CursorList<TransactionImpl<Txn>>();
 +                mCursors.put(type, cursorList);
 +            }
 +
 +            cursorList.register(cursor, mCurrent);
 +
 +            if (mCurrent != null) {
 +                mCurrent.register(cursor);
 +            }
 +        } finally {
 +            mLock.unlock();
 +        }
 +    }
 +
 +    /**
 +     * Unregisters a previously registered cursor. Cursors should unregister
 +     * when closed.
 +     */
 +    public <S extends Storable> void unregister(Class<S> type, Cursor<S> cursor) {
 +        mLock.lock();
 +        try {
 +            if (mCursors != null) {
 +                CursorList<TransactionImpl<Txn>> cursorList = mCursors.get(type);
 +                if (cursorList != null) {
 +                    TransactionImpl<Txn> txnImpl = cursorList.unregister(cursor);
 +                    if (txnImpl != null) {
 +                        txnImpl.unregister(cursor);
 +                    }
 +                }
 +            }
 +        } finally {
 +            mLock.unlock();
 +        }
 +    }
 +
 +    /**
 +     * Returns lock used by TransactionScope. While holding lock, operations
 +     * are suspended.
 +     */
 +    public Lock getLock() {
 +        return mLock;
 +    }
 +
 +    /**
 +     * Exits all transactions and closes all cursors. Should be called only
 +     * when repository is closed.
 +     */
 +    public void close() throws RepositoryException {
 +        mLock.lock();
 +        try {
 +            if (!mClosed) {
 +                while (mCurrent != null) {
 +                    mCurrent.exit();
 +                }
 +                if (mCursors != null) {
 +                    for (CursorList<TransactionImpl<Txn>> cursorList : mCursors.values()) {
 +                        cursorList.closeCursors();
 +                    }
 +                }
 +            }
 +        } finally {
 +            mClosed = true;
 +            mLock.unlock();
 +        }
 +    }
 +
 +    /**
 +     * Returns null if no transaction is in progress.
 +     *
 +     * @throws Exception thrown by createTxn or reuseTxn
 +     */
 +    public Txn getTxn() throws Exception {
 +        mLock.lock();
 +        try {
 +            checkState();
 +            return mCurrent == null ? null : mCurrent.getTxn();
 +        } finally {
 +            mLock.unlock();
 +        }
 +    }
 +
 +    /**
 +     * Returns true if a transaction is in progress and it is for update.
 +     */
 +    public boolean isForUpdate() {
 +        mLock.lock();
 +        try {
 +            return (mClosed || mCurrent == null) ? false : mCurrent.isForUpdate();
 +        } finally {
 +            mLock.unlock();
 +        }
 +    }
 +
 +    /**
 +     * Returns the isolation level of the current transaction, or null if there
 +     * is no transaction in the current thread.
 +     */
 +    public IsolationLevel getIsolationLevel() {
 +        mLock.lock();
 +        try {
 +            return (mClosed || mCurrent == null) ? null : mCurrent.getIsolationLevel();
 +        } finally {
 +            mLock.unlock();
 +        }
 +    }
 +
 +    /**
 +     * Caller must hold mLock.
 +     */
 +    private void checkState() {
 +        if (mClosed) {
 +            throw new IllegalStateException("Repository is closed");
 +        }
 +    }
 +
 +    private static class TransactionImpl<Txn> implements Transaction {
 +        private final TransactionScope<Txn> mScope;
 +        private final TransactionImpl<Txn> mParent;
 +        private final boolean mTop;
 +        private final IsolationLevel mLevel;
 +
 +        private boolean mForUpdate;
 +        private int mDesiredLockTimeout;
 +        private TimeUnit mTimeoutUnit;
 +
 +        private TransactionImpl<Txn> mChild;
 +        private boolean mExited;
 +        private Txn mTxn;
 +
 +        // Tracks all registered cursors.
 +        private CursorList<?> mCursorList;
 +
 +        TransactionImpl(TransactionScope<Txn> scope,
 +                        TransactionImpl<Txn> parent,
 +                        boolean top,
 +                        IsolationLevel level) {
 +            mScope = scope;
 +            mParent = parent;
 +            mTop = top;
 +            mLevel = level;
 +            if (!top && parent != null) {
 +                parent.mChild = this;
 +                mDesiredLockTimeout = parent.mDesiredLockTimeout;
 +                mTimeoutUnit = parent.mTimeoutUnit;
 +            }
 +        }
 +
 +        public void commit() throws PersistException {
 +            TransactionScope<Txn> scope = mScope;
 +            scope.mLock.lock();
 +            try {
 +                if (!mExited) {
 +                    if (mChild != null) {
 +                        mChild.commit();
 +                    }
 +
 +                    closeCursors();
 +
 +                    if (mTxn != null) {
 +                        if (mParent == null || mParent.mTxn != mTxn) {
 +                            try {
 +                                if (!scope.mTxnMgr.commitTxn(mTxn)) {
 +                                    mTxn = null;
 +                                }
 +                            } catch (Throwable e) {
 +                                mTxn = null;
 +                                throw ExceptionTransformer.getInstance().toPersistException(e);
 +                            }
 +                        } else {
 +                            // Indicate fake nested transaction committed.
 +                            mTxn = null;
 +                        }
 +                    }
 +                }
 +            } finally {
 +                scope.mLock.unlock();
 +            }
 +        }
 +
 +        public void exit() throws PersistException {
 +            TransactionScope<Txn> scope = mScope;
 +            scope.mLock.lock();
 +            try {
 +                if (!mExited) {
 +                    if (mChild != null) {
 +                        mChild.exit();
 +                    }
 +
 +                    closeCursors();
 +
 +                    if (mTxn != null) {
 +                        try {
 +                            if (mParent == null || mParent.mTxn != mTxn) {
 +                                try {
 +                                    scope.mTxnMgr.abortTxn(mTxn);
 +                                } catch (Throwable e) {
 +                                    throw ExceptionTransformer.getInstance().toPersistException(e);
 +                                }
 +                            }
 +                        } finally {
 +                            mTxn = null;
 +                        }
 +                    }
 +
 +                    scope.mCurrent = mParent;
 +
 +                    mExited = true;
 +                }
 +            } finally {
 +                scope.mLock.unlock();
 +            }
 +        }
 +
 +        public void setForUpdate(boolean forUpdate) {
 +            mForUpdate = forUpdate && mScope.mTxnMgr.supportsForUpdate();
 +        }
 +
 +        public boolean isForUpdate() {
 +            return mForUpdate;
 +        }
 +
 +        public void setDesiredLockTimeout(int timeout, TimeUnit unit) {
 +            TransactionScope<Txn> scope = mScope;
 +            scope.mLock.lock();
 +            try {
 +                if (timeout < 0) {
 +                    mDesiredLockTimeout = 0;
 +                    mTimeoutUnit = null;
 +                } else {
 +                    mDesiredLockTimeout = timeout;
 +                    mTimeoutUnit = unit;
 +                }
 +            } finally {
 +                scope.mLock.unlock();
 +            }
 +        }
 +
 +        public IsolationLevel getIsolationLevel() {
 +            return mLevel;
 +        }
 +
 +        // Caller must hold mLock.
 +        <S extends Storable> void register(Cursor<S> cursor) {
 +            if (mCursorList == null) {
 +                mCursorList = new CursorList<Object>();
 +            }
 +            mCursorList.register(cursor, null);
 +        }
 +
 +        // Caller must hold mLock.
 +        <S extends Storable> void unregister(Cursor<S> cursor) {
 +            if (mCursorList != null) {
 +                mCursorList.unregister(cursor);
 +            }
 +        }
 +
 +        // Caller must hold mLock.
 +        Txn getTxn() throws Exception {
 +            TransactionScope<Txn> scope = mScope;
 +            if (mTxn != null) {
 +                scope.mTxnMgr.reuseTxn(mTxn);
 +            } else {
 +                Txn parentTxn;
 +                if (mParent == null || mTop) {
 +                    parentTxn = null;
 +                } else if ((parentTxn = mParent.mTxn) == null) {
 +                    // No point in creating nested transaction if parent
 +                    // has never been used. Create parent transaction
 +                    // and use it in child transaction, just like a fake
 +                    // nested transaction.
 +                    if ((parentTxn = mParent.getTxn()) != null) {
 +                        return mTxn = parentTxn;
 +                    }
 +                    // Isolation level of parent is none, so proceed to create
 +                    // a real transaction.
 +                }
 +                if (mTimeoutUnit == null) {
 +                    mTxn = scope.mTxnMgr.createTxn(parentTxn, mLevel);
 +                } else {
 +                    mTxn = scope.mTxnMgr.createTxn(parentTxn, mLevel,
 +						   mDesiredLockTimeout, mTimeoutUnit);
 +                }
 +            }
 +            return mTxn;
 +        }
 +
 +        // Caller must hold mLock.
 +        private void closeCursors() throws PersistException {
 +            if (mCursorList != null) {
 +                mCursorList.closeCursors();
 +            }
 +        }
 +    }
 +
 +    /**
 +     * Simple fast list/map for holding a small amount of cursors.
 +     */
 +    static class CursorList<V> {
 +        private int mSize;
 +        private Cursor<?>[] mCursors;
 +        private V[] mValues;
 +
 +        CursorList() {
 +            mCursors = new Cursor[8];
 +        }
 +
 +        /**
 +         * @param value optional value to associate
 +         */
 +        @SuppressWarnings("unchecked")
 +        void register(Cursor<?> cursor, V value) {
 +            int size = mSize;
 +            Cursor<?>[] cursors = mCursors;
 +
 +            if (size == cursors.length) {
 +                int newLength = size << 1;
 +
 +                Cursor<?>[] newCursors = new Cursor[newLength];
 +                System.arraycopy(cursors, 0, newCursors, 0, size);
 +                mCursors = cursors = newCursors;
 +
 +                if (mValues != null) {
 +                    V[] newValues = (V[]) new Object[newLength];
 +                    System.arraycopy(mValues, 0, newValues, 0, size);
 +                    mValues = newValues;
 +                }
 +            }
 +
 +            cursors[size] = cursor;
 +
 +            if (value != null) {
 +                V[] values = mValues;
 +                if (values == null) {
 +                    mValues = values = (V[]) new Object[cursors.length];
 +                }
 +                values[size] = value;
 +            }
 +
 +            mSize = size + 1;
 +        }
 +
 +        V unregister(Cursor<?> cursor) {
 +            // Assuming that cursors are opened and closed in LIFO order
 +            // (stack order), search backwards to optimize.
 +            Cursor<?>[] cursors = mCursors;
 +            int size = mSize;
 +            int i = size;
 +            search: {
 +                while (--i >= 0) {
 +                    if (cursors[i] == cursor) {
 +                        break search;
 +                    }
 +                }
 +                // Not found.
 +                return null;
 +            }
 +
 +            V[] values = mValues;
 +            V value;
 +
 +            if (values == null) {
 +                value = null;
 +                if (i == size - 1) {
 +                    // Clear reference so that it can be garbage collected.
 +                    cursors[i] = null;
 +                } else {
 +                    // Shift array elements down.
 +                    System.arraycopy(cursors, i + 1, cursors, i, size - i - 1);
 +                }
 +            } else {
 +                value = values[i];
 +                if (i == size - 1) {
 +                    // Clear references so that they can be garbage collected.
 +                    cursors[i] = null;
 +                    values[i] = null;
 +                } else {
 +                    // Shift array elements down.
 +                    System.arraycopy(cursors, i + 1, cursors, i, size - i - 1);
 +                    System.arraycopy(values, i + 1, values, i, size - i - 1);
 +                }
 +            }
 +
 +            mSize = size - 1;
 +            return value;
 +        }
 +
 +        int size() {
 +            return mSize;
 +        }
 +
 +        Cursor<?> getCursor(int index) {
 +            return mCursors[index];
 +        }
 +
 +        V getValue(int index) {
 +            V[] values = mValues;
 +            return values == null ? null : values[index];
 +        }
 +
 +        /**
 +         * Closes all cursors and resets the size of this list to 0.
 +         */
 +        void closeCursors() throws PersistException {
 +            // Note: Iteration must be in reverse order. Calling close on the
 +            // cursor should cause it to unregister from this list. This will
 +            // cause only a modification to the end of the list, which is no
 +            // longer needed by this method.
 +            try {
 +                Cursor<?>[] cursors = mCursors;
 +                V[] values = mValues;
 +                int i = mSize;
 +                if (values == null) {
 +                    while (--i >= 0) {
 +                        Cursor<?> cursor = cursors[i];
 +                        if (cursor != null) {
 +                            cursor.close();
 +                            cursors[i] = null;
 +                        }
 +                    }
 +                } else {
 +                    while (--i >= 0) {
 +                        Cursor<?> cursor = cursors[i];
 +                        if (cursor != null) {
 +                            cursor.close();
 +                            cursors[i] = null;
 +                            values[i] = null;
 +                        }
 +                    }
 +                }
 +            } catch (FetchException e) {
 +                throw e.toPersistException();
 +            }
 +            mSize = 0;
 +        }
 +    }
 +}
 | 
