From d90258bfa37ec69cd4795feeb2d2ff94623b0199 Mon Sep 17 00:00:00 2001 From: "Brian S. O'Neill" Date: Fri, 18 Jul 2008 01:56:53 +0000 Subject: Moved transaction support classes to txn package. --- .../amazon/carbonado/repo/jdbc/JDBCRepository.java | 4 +- .../repo/jdbc/JDBCTransactionManager.java | 2 +- .../com/amazon/carbonado/repo/map/MapCursor.java | 2 +- .../amazon/carbonado/repo/map/MapRepository.java | 5 +- .../com/amazon/carbonado/repo/map/MapStorage.java | 3 +- .../carbonado/repo/map/MapTransactionManager.java | 2 +- .../repo/replicated/ReplicatedRepository.java | 3 +- .../amazon/carbonado/repo/sleepycat/BDBCursor.java | 2 +- .../carbonado/repo/sleepycat/BDBRepository.java | 5 +- .../carbonado/repo/sleepycat/BDBStorage.java | 3 +- .../repo/sleepycat/BDBTransactionManager.java | 2 +- .../amazon/carbonado/spi/AbstractRepository.java | 3 + .../amazon/carbonado/spi/TransactionManager.java | 212 ------- .../com/amazon/carbonado/spi/TransactionPair.java | 109 ---- .../com/amazon/carbonado/spi/TransactionScope.java | 619 -------------------- .../amazon/carbonado/txn/TransactionManager.java | 212 +++++++ .../com/amazon/carbonado/txn/TransactionPair.java | 109 ++++ .../com/amazon/carbonado/txn/TransactionScope.java | 621 +++++++++++++++++++++ .../com/amazon/carbonado/txn/package-info.java | 22 + 19 files changed, 986 insertions(+), 954 deletions(-) delete mode 100644 src/main/java/com/amazon/carbonado/spi/TransactionManager.java delete mode 100644 src/main/java/com/amazon/carbonado/spi/TransactionPair.java delete mode 100644 src/main/java/com/amazon/carbonado/spi/TransactionScope.java create mode 100644 src/main/java/com/amazon/carbonado/txn/TransactionManager.java create mode 100644 src/main/java/com/amazon/carbonado/txn/TransactionPair.java create mode 100644 src/main/java/com/amazon/carbonado/txn/TransactionScope.java create mode 100644 src/main/java/com/amazon/carbonado/txn/package-info.java (limited to 'src/main/java/com/amazon/carbonado') diff --git a/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCRepository.java b/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCRepository.java index 643cd85..03383c6 100644 --- a/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCRepository.java +++ b/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCRepository.java @@ -56,8 +56,8 @@ import com.amazon.carbonado.info.StorableProperty; import com.amazon.carbonado.sequence.SequenceCapability; import com.amazon.carbonado.sequence.SequenceValueProducer; import com.amazon.carbonado.spi.AbstractRepository; -import com.amazon.carbonado.spi.TransactionManager; -import com.amazon.carbonado.spi.TransactionScope; +import com.amazon.carbonado.txn.TransactionManager; +import com.amazon.carbonado.txn.TransactionScope; import com.amazon.carbonado.util.ThrowUnchecked; /** diff --git a/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCTransactionManager.java b/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCTransactionManager.java index c499a6b..55e6150 100644 --- a/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCTransactionManager.java +++ b/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCTransactionManager.java @@ -26,7 +26,7 @@ import com.amazon.carbonado.FetchException; import com.amazon.carbonado.IsolationLevel; import com.amazon.carbonado.PersistException; import com.amazon.carbonado.Transaction; -import com.amazon.carbonado.spi.TransactionManager; +import com.amazon.carbonado.txn.TransactionManager; /** * Manages transactions for JDBCRepository. diff --git a/src/main/java/com/amazon/carbonado/repo/map/MapCursor.java b/src/main/java/com/amazon/carbonado/repo/map/MapCursor.java index 006fd46..b9b0ab5 100644 --- a/src/main/java/com/amazon/carbonado/repo/map/MapCursor.java +++ b/src/main/java/com/amazon/carbonado/repo/map/MapCursor.java @@ -29,7 +29,7 @@ import com.amazon.carbonado.Storable; import com.amazon.carbonado.cursor.AbstractCursor; -import com.amazon.carbonado.spi.TransactionScope; +import com.amazon.carbonado.txn.TransactionScope; /** * Returns copies of Storables that it iterates over. diff --git a/src/main/java/com/amazon/carbonado/repo/map/MapRepository.java b/src/main/java/com/amazon/carbonado/repo/map/MapRepository.java index 99b0e3b..f7ea4d7 100644 --- a/src/main/java/com/amazon/carbonado/repo/map/MapRepository.java +++ b/src/main/java/com/amazon/carbonado/repo/map/MapRepository.java @@ -40,8 +40,9 @@ import com.amazon.carbonado.qe.StorageAccess; import com.amazon.carbonado.spi.AbstractRepository; import com.amazon.carbonado.spi.LobEngine; -import com.amazon.carbonado.spi.TransactionManager; -import com.amazon.carbonado.spi.TransactionScope; + +import com.amazon.carbonado.txn.TransactionManager; +import com.amazon.carbonado.txn.TransactionScope; /** * diff --git a/src/main/java/com/amazon/carbonado/repo/map/MapStorage.java b/src/main/java/com/amazon/carbonado/repo/map/MapStorage.java index a15796c..7c2fa0a 100644 --- a/src/main/java/com/amazon/carbonado/repo/map/MapStorage.java +++ b/src/main/java/com/amazon/carbonado/repo/map/MapStorage.java @@ -78,9 +78,10 @@ import com.amazon.carbonado.qe.StorageAccess; import com.amazon.carbonado.spi.IndexInfoImpl; import com.amazon.carbonado.spi.LobEngine; -import com.amazon.carbonado.spi.TransactionScope; import com.amazon.carbonado.spi.TriggerManager; +import com.amazon.carbonado.txn.TransactionScope; + /** * * diff --git a/src/main/java/com/amazon/carbonado/repo/map/MapTransactionManager.java b/src/main/java/com/amazon/carbonado/repo/map/MapTransactionManager.java index 929ab17..2c7b881 100644 --- a/src/main/java/com/amazon/carbonado/repo/map/MapTransactionManager.java +++ b/src/main/java/com/amazon/carbonado/repo/map/MapTransactionManager.java @@ -24,7 +24,7 @@ import com.amazon.carbonado.IsolationLevel; import com.amazon.carbonado.PersistException; import com.amazon.carbonado.Transaction; -import com.amazon.carbonado.spi.TransactionManager; +import com.amazon.carbonado.txn.TransactionManager; /** * diff --git a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java index 5aa4243..51dbba8 100644 --- a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java +++ b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java @@ -56,7 +56,8 @@ import com.amazon.carbonado.info.StorableIntrospector; import com.amazon.carbonado.repo.indexed.IndexEntryAccessCapability; import com.amazon.carbonado.spi.StoragePool; -import com.amazon.carbonado.spi.TransactionPair; + +import com.amazon.carbonado.txn.TransactionPair; import com.amazon.carbonado.util.Throttle; diff --git a/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBCursor.java b/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBCursor.java index a0dfe38..41a0a91 100644 --- a/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBCursor.java +++ b/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBCursor.java @@ -25,7 +25,7 @@ import com.amazon.carbonado.Storable; import com.amazon.carbonado.raw.RawCursor; import com.amazon.carbonado.raw.RawUtil; -import com.amazon.carbonado.spi.TransactionScope; +import com.amazon.carbonado.txn.TransactionScope; /** * diff --git a/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBRepository.java b/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBRepository.java index 9abdd5d..9f3d9ac 100644 --- a/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBRepository.java +++ b/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBRepository.java @@ -64,8 +64,9 @@ import com.amazon.carbonado.sequence.SequenceValueProducer; import com.amazon.carbonado.spi.AbstractRepository; import com.amazon.carbonado.spi.ExceptionTransformer; import com.amazon.carbonado.spi.LobEngine; -import com.amazon.carbonado.spi.TransactionManager; -import com.amazon.carbonado.spi.TransactionScope; + +import com.amazon.carbonado.txn.TransactionManager; +import com.amazon.carbonado.txn.TransactionScope; /** * Repository implementation backed by a Berkeley DB. Data is encoded in the diff --git a/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBStorage.java b/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBStorage.java index 45205c7..f4a010b 100644 --- a/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBStorage.java +++ b/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBStorage.java @@ -79,9 +79,10 @@ import com.amazon.carbonado.sequence.SequenceValueProducer; import com.amazon.carbonado.spi.IndexInfoImpl; import com.amazon.carbonado.spi.LobEngine; -import com.amazon.carbonado.spi.TransactionScope; import com.amazon.carbonado.spi.TriggerManager; +import com.amazon.carbonado.txn.TransactionScope; + /** * * @author Brian S O'Neill diff --git a/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBTransactionManager.java b/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBTransactionManager.java index 1e613ca..8ac7fdd 100644 --- a/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBTransactionManager.java +++ b/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBTransactionManager.java @@ -26,7 +26,7 @@ import com.amazon.carbonado.PersistException; import com.amazon.carbonado.Transaction; import com.amazon.carbonado.spi.ExceptionTransformer; -import com.amazon.carbonado.spi.TransactionManager; +import com.amazon.carbonado.txn.TransactionManager; /** * This class is used for tracking transactions and open cursors. diff --git a/src/main/java/com/amazon/carbonado/spi/AbstractRepository.java b/src/main/java/com/amazon/carbonado/spi/AbstractRepository.java index e73a8c9..688134b 100644 --- a/src/main/java/com/amazon/carbonado/spi/AbstractRepository.java +++ b/src/main/java/com/amazon/carbonado/spi/AbstractRepository.java @@ -41,6 +41,9 @@ import com.amazon.carbonado.sequence.SequenceCapability; import com.amazon.carbonado.sequence.SequenceValueProducer; import com.amazon.carbonado.sequence.SequenceValueProducerPool; +import com.amazon.carbonado.txn.TransactionManager; +import com.amazon.carbonado.txn.TransactionScope; + /** * Implements basic functionality required by a core Repository. * diff --git a/src/main/java/com/amazon/carbonado/spi/TransactionManager.java b/src/main/java/com/amazon/carbonado/spi/TransactionManager.java deleted file mode 100644 index 48f52af..0000000 --- a/src/main/java/com/amazon/carbonado/spi/TransactionManager.java +++ /dev/null @@ -1,212 +0,0 @@ -/* - * Copyright 2006 Amazon Technologies, Inc. or its affiliates. - * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks - * of Amazon Technologies, Inc. or its affiliates. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.amazon.carbonado.spi; - -import java.util.Map; -import java.util.concurrent.TimeUnit; - -import org.cojen.util.WeakIdentityMap; - -import com.amazon.carbonado.IsolationLevel; -import com.amazon.carbonado.PersistException; -import com.amazon.carbonado.RepositoryException; -import com.amazon.carbonado.Transaction; - -/** - * Generic transaction manager for repositories. - * - * @param Transaction implementation - * @author Brian S O'Neill - */ -public abstract class TransactionManager { - private static final int OPEN = 0, CLOSED = 1, SUSPENDED = 2; - - private final ThreadLocal> mLocalScope; - private final Map, ?> mAllScopes; - - private int mState; - - public TransactionManager() { - mLocalScope = new ThreadLocal>(); - mAllScopes = new WeakIdentityMap(); - } - - /** - * Returns the thread-local TransactionScope, creating it if needed. - */ - public TransactionScope localScope() { - TransactionScope scope = mLocalScope.get(); - if (scope == null) { - int state; - synchronized (this) { - state = mState; - scope = new TransactionScope(this, state != OPEN); - mAllScopes.put(scope, null); - } - mLocalScope.set(scope); - if (state == SUSPENDED) { - // Immediately suspend new scope. - scope.getLock().lock(); - } - } - return scope; - } - - /** - * Detaches the thread-local TransactionScope from the current thread. It - * can be {@link TransactionScope#attach attached} later, and to any thread - * which does not currently have a TransactionScope. - * - * @return detached thread-local TransactionScope or null if none - * @since 1.2 - */ - public TransactionScope detachLocalScope() { - TransactionScope scope = mLocalScope.get(); - if (scope != null) { - scope.markDetached(); - mLocalScope.remove(); - } - return scope; - } - - // Called by TransactionScope. - boolean removeLocalScope(TransactionScope scope) { - TransactionScope existing = mLocalScope.get(); - if (existing == scope) { - mLocalScope.remove(); - return true; - } - return false; - } - - // Called by TransactionScope. - boolean setLocalScope(TransactionScope scope, boolean detached) { - TransactionScope existing = mLocalScope.get(); - if ((existing == null && detached) || existing == scope) { - mLocalScope.set(scope); - return true; - } - return false; - } - - /** - * Closes all transaction scopes. Should be called only when repository is - * closed. - * - * @param suspend when true, indefinitely suspend all threads interacting - * with transactions - */ - public synchronized void close(boolean suspend) throws RepositoryException { - if (mState == SUSPENDED) { - // If suspended, attempting to close again will likely deadlock. - return; - } - - if (suspend) { - for (TransactionScope scope : mAllScopes.keySet()) { - // Lock scope but don't release it. This prevents other threads - // from beginning work during shutdown, which will likely fail - // along the way. - scope.getLock().lock(); - } - } - - mState = suspend ? SUSPENDED : CLOSED; - - for (TransactionScope scope : mAllScopes.keySet()) { - scope.close(); - } - } - - /** - * Returns supported isolation level, which may be higher. If isolation - * level cannot go higher (or lower than parent) then return null. - * - * @param parent optional parent transaction - * @param level desired isolation level (may be null) - */ - protected abstract IsolationLevel selectIsolationLevel(Transaction parent, - IsolationLevel level); - - /** - * Return true if transactions support "for update" mode. - * - * @since 1.2 - */ - protected abstract boolean supportsForUpdate(); - - /** - * Creates an internal transaction representation, with the optional parent - * transaction. If parent is not null and real nested transactions are not - * supported, simply return parent transaction for supporting fake nested - * transactions. - * - * @param parent optional parent transaction - * @param level required isolation level - * @return new transaction, parent transaction, or possibly null if required - * isolation level is none - */ - protected abstract Txn createTxn(Txn parent, IsolationLevel level) throws Exception; - - /** - * Creates an internal transaction representation, with the optional parent - * transaction. If parent is not null and real nested transactions are not - * supported, simply return parent transaction for supporting fake nested - * transactions. - * - *

The default implementation of this method just calls the regular - * createTxn method, ignoring the timeout parameter. - * - * @param parent optional parent transaction - * @param level required isolation level - * @param timeout desired timeout for lock acquisition, never negative - * @param unit timeout unit, never null - * @return new transaction, parent transaction, or possibly null if required - * isolation level is none - */ - protected Txn createTxn(Txn parent, IsolationLevel level, - int timeout, TimeUnit unit) - throws Exception - { - return createTxn(parent, level); - } - - /** - * Called when a transaction is about to be reused. The default - * implementation of this method does nothing. Override if any preparation - * is required to ready a transaction for reuse. - * - * @param txn transaction to reuse, never null - * @since 1.1.3 - */ - protected void reuseTxn(Txn txn) throws Exception { - } - - /** - * Commits and closes the given internal transaction. - * - * @return true if transaction object is still valid - */ - protected abstract boolean commitTxn(Txn txn) throws PersistException; - - /** - * Aborts and closes the given internal transaction. - */ - protected abstract void abortTxn(Txn txn) throws PersistException; -} diff --git a/src/main/java/com/amazon/carbonado/spi/TransactionPair.java b/src/main/java/com/amazon/carbonado/spi/TransactionPair.java deleted file mode 100644 index ff07bc2..0000000 --- a/src/main/java/com/amazon/carbonado/spi/TransactionPair.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Copyright 2006 Amazon Technologies, Inc. or its affiliates. - * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks - * of Amazon Technologies, Inc. or its affiliates. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.amazon.carbonado.spi; - -import java.util.concurrent.TimeUnit; - -import com.amazon.carbonado.IsolationLevel; -import com.amazon.carbonado.PersistException; -import com.amazon.carbonado.Transaction; - -/** - * Pairs two transaction together into one. The transaction cannot be atomic, - * however. Inconsistencies can result if the primary transaction succeeds in - * committing, but the secondary fails. Therefore, the designated primary - * transaction should be the one that is more likely to fail. For example, the - * primary transaction might rely on the network, but the secondary operates - * locally. - * - * @author Don Schneider - * @author Brian S O'Neill - */ -public class TransactionPair implements Transaction { - private final Transaction mPrimaryTransaction; - private final Transaction mSecondaryTransaction; - - /** - * @param primaryTransaction is committed first, exited last - * @param secondaryTransaction is exited first, commited last - */ - public TransactionPair(Transaction primaryTransaction, Transaction secondaryTransaction) { - mPrimaryTransaction = primaryTransaction; - mSecondaryTransaction = secondaryTransaction; - } - - public void commit() throws PersistException { - mPrimaryTransaction.commit(); - try { - mSecondaryTransaction.commit(); - } catch (Exception e) { - throw new PersistException - ("Failure to commit secondary transaction has likely caused an inconsistency", e); - } - } - - public void exit() throws PersistException { - try { - mSecondaryTransaction.exit(); - } finally { - // Do this second so if there is an exception, the user sees the - // primary exception, which is presumably more important. - mPrimaryTransaction.exit(); - } - } - - public void setForUpdate(boolean forUpdate) { - mPrimaryTransaction.setForUpdate(forUpdate); - mSecondaryTransaction.setForUpdate(forUpdate); - } - - public boolean isForUpdate() { - return mPrimaryTransaction.isForUpdate() && mSecondaryTransaction.isForUpdate(); - } - - public void setDesiredLockTimeout(int timeout, TimeUnit unit) { - mPrimaryTransaction.setDesiredLockTimeout(timeout, unit); - mSecondaryTransaction.setDesiredLockTimeout(timeout, unit); - } - - public IsolationLevel getIsolationLevel() { - return mPrimaryTransaction.getIsolationLevel() - .lowestCommon(mSecondaryTransaction.getIsolationLevel()); - } - - public void detach() { - mPrimaryTransaction.detach(); - try { - mSecondaryTransaction.detach(); - } catch (IllegalStateException e) { - mPrimaryTransaction.attach(); - throw e; - } - } - - public void attach() { - mPrimaryTransaction.attach(); - try { - mSecondaryTransaction.attach(); - } catch (IllegalStateException e) { - mPrimaryTransaction.detach(); - throw e; - } - } -} diff --git a/src/main/java/com/amazon/carbonado/spi/TransactionScope.java b/src/main/java/com/amazon/carbonado/spi/TransactionScope.java deleted file mode 100644 index bce7804..0000000 --- a/src/main/java/com/amazon/carbonado/spi/TransactionScope.java +++ /dev/null @@ -1,619 +0,0 @@ -/* - * Copyright 2008 Amazon Technologies, Inc. or its affiliates. - * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks - * of Amazon Technologies, Inc. or its affiliates. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.amazon.carbonado.spi; - -import java.util.IdentityHashMap; -import java.util.Map; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -import com.amazon.carbonado.Cursor; -import com.amazon.carbonado.FetchException; -import com.amazon.carbonado.IsolationLevel; -import com.amazon.carbonado.PersistException; -import com.amazon.carbonado.RepositoryException; -import com.amazon.carbonado.Storable; -import com.amazon.carbonado.Transaction; - -/** - * Container of thread local, scoped transactions. - * - * @param Transaction implementation - * @author Brian S O'Neill - * @since 1.2 - * @see TransactionManager - */ -public class TransactionScope { - final TransactionManager mTxnMgr; - final Lock mLock; - - TransactionImpl mActive; - - // Tracks all registered cursors by storage type. - private Map, CursorList>> mCursors; - - private boolean mClosed; - private boolean mDetached; - - TransactionScope(TransactionManager txnMgr, boolean closed) { - mTxnMgr = txnMgr; - mLock = new ReentrantLock(true); - } - - /** - * Enters a new transaction scope which becomes the active transaction. - * - * @param level desired isolation level (may be null) - * @throws UnsupportedOperationException if isolation level higher than - * supported by repository - */ - public Transaction enter(IsolationLevel level) { - mLock.lock(); - try { - TransactionImpl parent = mActive; - IsolationLevel actualLevel = mTxnMgr.selectIsolationLevel(parent, level); - if (actualLevel == null) { - if (parent == null) { - throw new UnsupportedOperationException - ("Desired isolation level not supported: " + level); - } else { - throw new UnsupportedOperationException - ("Desired isolation level not supported: " + level - + "; parent isolation level: " + parent.getIsolationLevel()); - } - } - - return mActive = new TransactionImpl(this, parent, false, actualLevel); - } finally { - mLock.unlock(); - } - } - - /** - * Enters a new top-level transaction scope which becomes the active - * transaction. - * - * @param level desired isolation level (may be null) - * @throws UnsupportedOperationException if isolation level higher than - * supported by repository - */ - public Transaction enterTop(IsolationLevel level) { - mLock.lock(); - try { - IsolationLevel actualLevel = mTxnMgr.selectIsolationLevel(null, level); - if (actualLevel == null) { - throw new UnsupportedOperationException - ("Desired isolation level not supported: " + level); - } - - return mActive = new TransactionImpl(this, mActive, true, actualLevel); - } finally { - mLock.unlock(); - } - } - - /** - * Registers the given cursor against the active transaction, allowing it - * to be closed on transaction exit or transaction manager close. If there - * is no active transaction in scope, the cursor is registered as not part - * of a transaction. Cursors should register when created. - */ - public void register(Class type, Cursor cursor) { - mLock.lock(); - try { - checkClosed(); - if (mCursors == null) { - mCursors = new IdentityHashMap, CursorList>>(); - } - - CursorList> cursorList = mCursors.get(type); - if (cursorList == null) { - cursorList = new CursorList>(); - mCursors.put(type, cursorList); - } - - cursorList.register(cursor, mActive); - - if (mActive != null) { - mActive.register(cursor); - } - } finally { - mLock.unlock(); - } - } - - /** - * Unregisters a previously registered cursor. Cursors should unregister - * when closed. - */ - public void unregister(Class type, Cursor cursor) { - mLock.lock(); - try { - if (mCursors != null) { - CursorList> cursorList = mCursors.get(type); - if (cursorList != null) { - TransactionImpl txnImpl = cursorList.unregister(cursor); - if (txnImpl != null) { - txnImpl.unregister(cursor); - } - } - } - } finally { - mLock.unlock(); - } - } - - /** - * Returns lock used by TransactionScope. While holding lock, operations - * are suspended. - */ - public Lock getLock() { - return mLock; - } - - /** - * Returns the implementation for the active transaction, or null if there - * is no active transaction. - * - * @throws Exception thrown by createTxn or reuseTxn - */ - public Txn getTxn() throws Exception { - mLock.lock(); - try { - checkClosed(); - return mActive == null ? null : mActive.getTxn(); - } finally { - mLock.unlock(); - } - } - - /** - * Returns true if an active transaction exists and it is for update. - */ - public boolean isForUpdate() { - mLock.lock(); - try { - return (mClosed || mActive == null) ? false : mActive.isForUpdate(); - } finally { - mLock.unlock(); - } - } - - /** - * Returns the isolation level of the active transaction, or null if there - * is no active transaction. - */ - public IsolationLevel getIsolationLevel() { - mLock.lock(); - try { - return (mClosed || mActive == null) ? null : mActive.getIsolationLevel(); - } finally { - mLock.unlock(); - } - } - - /** - * Attach this scope to the current thread, if it has been {@link - * TransactionManager#detachLocalScope detached}. - * - * @throws IllegalStateException if current thread has a different - * transaction already attached - */ - public void attach() { - mLock.lock(); - try { - if (mTxnMgr.setLocalScope(this, mDetached)) { - mDetached = false; - } else if (!mDetached) { - throw new IllegalStateException("Transaction scope is not detached"); - } else { - throw new IllegalStateException - ("Current thread has a different transaction already attached"); - } - } finally { - mLock.unlock(); - } - } - - // Called by TransactionImpl. - void detach() { - mLock.lock(); - try { - if (mDetached || mTxnMgr.removeLocalScope(this)) { - mDetached = true; - } else { - throw new IllegalStateException("Transaction is attached to a different thread"); - } - } finally { - mLock.unlock(); - } - } - - // Called by TransactionManager. - void markDetached() { - mLock.lock(); - try { - mDetached = true; - } finally { - mLock.unlock(); - } - } - - /** - * Exits all transactions and closes all cursors. Should be called only - * when repository is closed. - */ - void close() throws RepositoryException { - mLock.lock(); - try { - if (!mClosed) { - while (mActive != null) { - mActive.exit(); - } - if (mCursors != null) { - for (CursorList> cursorList : mCursors.values()) { - cursorList.closeCursors(); - } - } - } - } finally { - mClosed = true; - mLock.unlock(); - } - } - - /** - * Caller must hold mLock. - */ - private void checkClosed() { - if (mClosed) { - throw new IllegalStateException("Repository is closed"); - } - } - - private static class TransactionImpl implements Transaction { - private final TransactionScope mScope; - private final TransactionImpl mParent; - private final boolean mTop; - private final IsolationLevel mLevel; - - private boolean mForUpdate; - private int mDesiredLockTimeout; - private TimeUnit mTimeoutUnit; - - private TransactionImpl mChild; - private boolean mExited; - private Txn mTxn; - - // Tracks all registered cursors. - private CursorList mCursorList; - - TransactionImpl(TransactionScope scope, - TransactionImpl parent, - boolean top, - IsolationLevel level) { - mScope = scope; - mParent = parent; - mTop = top; - mLevel = level; - if (!top && parent != null) { - parent.mChild = this; - mDesiredLockTimeout = parent.mDesiredLockTimeout; - mTimeoutUnit = parent.mTimeoutUnit; - } - } - - public void commit() throws PersistException { - TransactionScope scope = mScope; - scope.mLock.lock(); - try { - if (!mExited) { - if (mChild != null) { - mChild.commit(); - } - - closeCursors(); - - if (mTxn != null) { - if (mParent == null || mParent.mTxn != mTxn) { - try { - if (!scope.mTxnMgr.commitTxn(mTxn)) { - mTxn = null; - } - } catch (Throwable e) { - mTxn = null; - throw ExceptionTransformer.getInstance().toPersistException(e); - } - } else { - // Indicate fake nested transaction committed. - mTxn = null; - } - } - } - } finally { - scope.mLock.unlock(); - } - } - - public void exit() throws PersistException { - TransactionScope scope = mScope; - scope.mLock.lock(); - try { - if (!mExited) { - if (mChild != null) { - mChild.exit(); - } - - closeCursors(); - - if (mTxn != null) { - try { - if (mParent == null || mParent.mTxn != mTxn) { - try { - scope.mTxnMgr.abortTxn(mTxn); - } catch (Throwable e) { - throw ExceptionTransformer.getInstance().toPersistException(e); - } - } - } finally { - mTxn = null; - } - } - - scope.mActive = mParent; - - mExited = true; - } - } finally { - scope.mLock.unlock(); - } - } - - public void setForUpdate(boolean forUpdate) { - mForUpdate = forUpdate && mScope.mTxnMgr.supportsForUpdate(); - } - - public boolean isForUpdate() { - return mForUpdate; - } - - public void setDesiredLockTimeout(int timeout, TimeUnit unit) { - TransactionScope scope = mScope; - scope.mLock.lock(); - try { - if (timeout < 0) { - mDesiredLockTimeout = 0; - mTimeoutUnit = null; - } else { - mDesiredLockTimeout = timeout; - mTimeoutUnit = unit; - } - } finally { - scope.mLock.unlock(); - } - } - - public IsolationLevel getIsolationLevel() { - return mLevel; - } - - public void detach() { - mScope.detach(); - } - - public void attach() { - mScope.attach(); - } - - // Caller must hold mLock. - void register(Cursor cursor) { - if (mCursorList == null) { - mCursorList = new CursorList(); - } - mCursorList.register(cursor, null); - } - - // Caller must hold mLock. - void unregister(Cursor cursor) { - if (mCursorList != null) { - mCursorList.unregister(cursor); - } - } - - // Caller must hold mLock. - Txn getTxn() throws Exception { - TransactionScope scope = mScope; - if (mTxn != null) { - scope.mTxnMgr.reuseTxn(mTxn); - } else { - Txn parentTxn; - if (mParent == null || mTop) { - parentTxn = null; - } else if ((parentTxn = mParent.mTxn) == null) { - // No point in creating nested transaction if parent - // has never been used. Create parent transaction - // and use it in child transaction, just like a fake - // nested transaction. - if ((parentTxn = mParent.getTxn()) != null) { - return mTxn = parentTxn; - } - // Isolation level of parent is none, so proceed to create - // a real transaction. - } - if (mTimeoutUnit == null) { - mTxn = scope.mTxnMgr.createTxn(parentTxn, mLevel); - } else { - mTxn = scope.mTxnMgr.createTxn(parentTxn, mLevel, - mDesiredLockTimeout, mTimeoutUnit); - } - } - return mTxn; - } - - // Caller must hold mLock. - private void closeCursors() throws PersistException { - if (mCursorList != null) { - mCursorList.closeCursors(); - } - } - } - - /** - * Simple fast list/map for holding a small amount of cursors. - */ - static class CursorList { - private int mSize; - private Cursor[] mCursors; - private V[] mValues; - - CursorList() { - mCursors = new Cursor[8]; - } - - /** - * @param value optional value to associate - */ - @SuppressWarnings("unchecked") - void register(Cursor cursor, V value) { - int size = mSize; - Cursor[] cursors = mCursors; - - if (size == cursors.length) { - int newLength = size << 1; - - Cursor[] newCursors = new Cursor[newLength]; - System.arraycopy(cursors, 0, newCursors, 0, size); - mCursors = cursors = newCursors; - - if (mValues != null) { - V[] newValues = (V[]) new Object[newLength]; - System.arraycopy(mValues, 0, newValues, 0, size); - mValues = newValues; - } - } - - cursors[size] = cursor; - - if (value != null) { - V[] values = mValues; - if (values == null) { - mValues = values = (V[]) new Object[cursors.length]; - } - values[size] = value; - } - - mSize = size + 1; - } - - V unregister(Cursor cursor) { - // Assuming that cursors are opened and closed in LIFO order - // (stack order), search backwards to optimize. - Cursor[] cursors = mCursors; - int size = mSize; - int i = size; - search: { - while (--i >= 0) { - if (cursors[i] == cursor) { - break search; - } - } - // Not found. - return null; - } - - V[] values = mValues; - V value; - - if (values == null) { - value = null; - if (i == size - 1) { - // Clear reference so that it can be garbage collected. - cursors[i] = null; - } else { - // Shift array elements down. - System.arraycopy(cursors, i + 1, cursors, i, size - i - 1); - } - } else { - value = values[i]; - if (i == size - 1) { - // Clear references so that they can be garbage collected. - cursors[i] = null; - values[i] = null; - } else { - // Shift array elements down. - System.arraycopy(cursors, i + 1, cursors, i, size - i - 1); - System.arraycopy(values, i + 1, values, i, size - i - 1); - } - } - - mSize = size - 1; - return value; - } - - int size() { - return mSize; - } - - Cursor getCursor(int index) { - return mCursors[index]; - } - - V getValue(int index) { - V[] values = mValues; - return values == null ? null : values[index]; - } - - /** - * Closes all cursors and resets the size of this list to 0. - */ - void closeCursors() throws PersistException { - // Note: Iteration must be in reverse order. Calling close on the - // cursor should cause it to unregister from this list. This will - // cause only a modification to the end of the list, which is no - // longer needed by this method. - try { - Cursor[] cursors = mCursors; - V[] values = mValues; - int i = mSize; - if (values == null) { - while (--i >= 0) { - Cursor cursor = cursors[i]; - if (cursor != null) { - cursor.close(); - cursors[i] = null; - } - } - } else { - while (--i >= 0) { - Cursor cursor = cursors[i]; - if (cursor != null) { - cursor.close(); - cursors[i] = null; - values[i] = null; - } - } - } - } catch (FetchException e) { - throw e.toPersistException(); - } - mSize = 0; - } - } -} diff --git a/src/main/java/com/amazon/carbonado/txn/TransactionManager.java b/src/main/java/com/amazon/carbonado/txn/TransactionManager.java new file mode 100644 index 0000000..4914be9 --- /dev/null +++ b/src/main/java/com/amazon/carbonado/txn/TransactionManager.java @@ -0,0 +1,212 @@ +/* + * Copyright 2006 Amazon Technologies, Inc. or its affiliates. + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks + * of Amazon Technologies, Inc. or its affiliates. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.amazon.carbonado.txn; + +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.cojen.util.WeakIdentityMap; + +import com.amazon.carbonado.IsolationLevel; +import com.amazon.carbonado.PersistException; +import com.amazon.carbonado.RepositoryException; +import com.amazon.carbonado.Transaction; + +/** + * Generic transaction manager for repositories. + * + * @param Transaction implementation + * @author Brian S O'Neill + */ +public abstract class TransactionManager { + private static final int OPEN = 0, CLOSED = 1, SUSPENDED = 2; + + private final ThreadLocal> mLocalScope; + private final Map, ?> mAllScopes; + + private int mState; + + public TransactionManager() { + mLocalScope = new ThreadLocal>(); + mAllScopes = new WeakIdentityMap(); + } + + /** + * Returns the thread-local TransactionScope, creating it if needed. + */ + public TransactionScope localScope() { + TransactionScope scope = mLocalScope.get(); + if (scope == null) { + int state; + synchronized (this) { + state = mState; + scope = new TransactionScope(this, state != OPEN); + mAllScopes.put(scope, null); + } + mLocalScope.set(scope); + if (state == SUSPENDED) { + // Immediately suspend new scope. + scope.getLock().lock(); + } + } + return scope; + } + + /** + * Detaches the thread-local TransactionScope from the current thread. It + * can be {@link TransactionScope#attach attached} later, and to any thread + * which does not currently have a TransactionScope. + * + * @return detached thread-local TransactionScope or null if none + * @since 1.2 + */ + public TransactionScope detachLocalScope() { + TransactionScope scope = mLocalScope.get(); + if (scope != null) { + scope.markDetached(); + mLocalScope.remove(); + } + return scope; + } + + // Called by TransactionScope. + boolean removeLocalScope(TransactionScope scope) { + TransactionScope existing = mLocalScope.get(); + if (existing == scope) { + mLocalScope.remove(); + return true; + } + return false; + } + + // Called by TransactionScope. + boolean setLocalScope(TransactionScope scope, boolean detached) { + TransactionScope existing = mLocalScope.get(); + if ((existing == null && detached) || existing == scope) { + mLocalScope.set(scope); + return true; + } + return false; + } + + /** + * Closes all transaction scopes. Should be called only when repository is + * closed. + * + * @param suspend when true, indefinitely suspend all threads interacting + * with transactions + */ + public synchronized void close(boolean suspend) throws RepositoryException { + if (mState == SUSPENDED) { + // If suspended, attempting to close again will likely deadlock. + return; + } + + if (suspend) { + for (TransactionScope scope : mAllScopes.keySet()) { + // Lock scope but don't release it. This prevents other threads + // from beginning work during shutdown, which will likely fail + // along the way. + scope.getLock().lock(); + } + } + + mState = suspend ? SUSPENDED : CLOSED; + + for (TransactionScope scope : mAllScopes.keySet()) { + scope.close(); + } + } + + /** + * Returns supported isolation level, which may be higher. If isolation + * level cannot go higher (or lower than parent) then return null. + * + * @param parent optional parent transaction + * @param level desired isolation level (may be null) + */ + protected abstract IsolationLevel selectIsolationLevel(Transaction parent, + IsolationLevel level); + + /** + * Return true if transactions support "for update" mode. + * + * @since 1.2 + */ + protected abstract boolean supportsForUpdate(); + + /** + * Creates an internal transaction representation, with the optional parent + * transaction. If parent is not null and real nested transactions are not + * supported, simply return parent transaction for supporting fake nested + * transactions. + * + * @param parent optional parent transaction + * @param level required isolation level + * @return new transaction, parent transaction, or possibly null if required + * isolation level is none + */ + protected abstract Txn createTxn(Txn parent, IsolationLevel level) throws Exception; + + /** + * Creates an internal transaction representation, with the optional parent + * transaction. If parent is not null and real nested transactions are not + * supported, simply return parent transaction for supporting fake nested + * transactions. + * + *

The default implementation of this method just calls the regular + * createTxn method, ignoring the timeout parameter. + * + * @param parent optional parent transaction + * @param level required isolation level + * @param timeout desired timeout for lock acquisition, never negative + * @param unit timeout unit, never null + * @return new transaction, parent transaction, or possibly null if required + * isolation level is none + */ + protected Txn createTxn(Txn parent, IsolationLevel level, + int timeout, TimeUnit unit) + throws Exception + { + return createTxn(parent, level); + } + + /** + * Called when a transaction is about to be reused. The default + * implementation of this method does nothing. Override if any preparation + * is required to ready a transaction for reuse. + * + * @param txn transaction to reuse, never null + * @since 1.1.3 + */ + protected void reuseTxn(Txn txn) throws Exception { + } + + /** + * Commits and closes the given internal transaction. + * + * @return true if transaction object is still valid + */ + protected abstract boolean commitTxn(Txn txn) throws PersistException; + + /** + * Aborts and closes the given internal transaction. + */ + protected abstract void abortTxn(Txn txn) throws PersistException; +} diff --git a/src/main/java/com/amazon/carbonado/txn/TransactionPair.java b/src/main/java/com/amazon/carbonado/txn/TransactionPair.java new file mode 100644 index 0000000..f07f57e --- /dev/null +++ b/src/main/java/com/amazon/carbonado/txn/TransactionPair.java @@ -0,0 +1,109 @@ +/* + * Copyright 2006 Amazon Technologies, Inc. or its affiliates. + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks + * of Amazon Technologies, Inc. or its affiliates. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.amazon.carbonado.txn; + +import java.util.concurrent.TimeUnit; + +import com.amazon.carbonado.IsolationLevel; +import com.amazon.carbonado.PersistException; +import com.amazon.carbonado.Transaction; + +/** + * Pairs two transaction together into one. The transaction cannot be atomic, + * however. Inconsistencies can result if the primary transaction succeeds in + * committing, but the secondary fails. Therefore, the designated primary + * transaction should be the one that is more likely to fail. For example, the + * primary transaction might rely on the network, but the secondary operates + * locally. + * + * @author Don Schneider + * @author Brian S O'Neill + */ +public class TransactionPair implements Transaction { + private final Transaction mPrimaryTransaction; + private final Transaction mSecondaryTransaction; + + /** + * @param primaryTransaction is committed first, exited last + * @param secondaryTransaction is exited first, commited last + */ + public TransactionPair(Transaction primaryTransaction, Transaction secondaryTransaction) { + mPrimaryTransaction = primaryTransaction; + mSecondaryTransaction = secondaryTransaction; + } + + public void commit() throws PersistException { + mPrimaryTransaction.commit(); + try { + mSecondaryTransaction.commit(); + } catch (Exception e) { + throw new PersistException + ("Failure to commit secondary transaction has likely caused an inconsistency", e); + } + } + + public void exit() throws PersistException { + try { + mSecondaryTransaction.exit(); + } finally { + // Do this second so if there is an exception, the user sees the + // primary exception, which is presumably more important. + mPrimaryTransaction.exit(); + } + } + + public void setForUpdate(boolean forUpdate) { + mPrimaryTransaction.setForUpdate(forUpdate); + mSecondaryTransaction.setForUpdate(forUpdate); + } + + public boolean isForUpdate() { + return mPrimaryTransaction.isForUpdate() && mSecondaryTransaction.isForUpdate(); + } + + public void setDesiredLockTimeout(int timeout, TimeUnit unit) { + mPrimaryTransaction.setDesiredLockTimeout(timeout, unit); + mSecondaryTransaction.setDesiredLockTimeout(timeout, unit); + } + + public IsolationLevel getIsolationLevel() { + return mPrimaryTransaction.getIsolationLevel() + .lowestCommon(mSecondaryTransaction.getIsolationLevel()); + } + + public void detach() { + mPrimaryTransaction.detach(); + try { + mSecondaryTransaction.detach(); + } catch (IllegalStateException e) { + mPrimaryTransaction.attach(); + throw e; + } + } + + public void attach() { + mPrimaryTransaction.attach(); + try { + mSecondaryTransaction.attach(); + } catch (IllegalStateException e) { + mPrimaryTransaction.detach(); + throw e; + } + } +} diff --git a/src/main/java/com/amazon/carbonado/txn/TransactionScope.java b/src/main/java/com/amazon/carbonado/txn/TransactionScope.java new file mode 100644 index 0000000..62b62a3 --- /dev/null +++ b/src/main/java/com/amazon/carbonado/txn/TransactionScope.java @@ -0,0 +1,621 @@ +/* + * Copyright 2008 Amazon Technologies, Inc. or its affiliates. + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks + * of Amazon Technologies, Inc. or its affiliates. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.amazon.carbonado.txn; + +import java.util.IdentityHashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import com.amazon.carbonado.Cursor; +import com.amazon.carbonado.FetchException; +import com.amazon.carbonado.IsolationLevel; +import com.amazon.carbonado.PersistException; +import com.amazon.carbonado.RepositoryException; +import com.amazon.carbonado.Storable; +import com.amazon.carbonado.Transaction; + +import com.amazon.carbonado.spi.ExceptionTransformer; + +/** + * Container of thread local, scoped transactions. + * + * @param Transaction implementation + * @author Brian S O'Neill + * @since 1.2 + * @see TransactionManager + */ +public class TransactionScope { + final TransactionManager mTxnMgr; + final Lock mLock; + + TransactionImpl mActive; + + // Tracks all registered cursors by storage type. + private Map, CursorList>> mCursors; + + private boolean mClosed; + private boolean mDetached; + + TransactionScope(TransactionManager txnMgr, boolean closed) { + mTxnMgr = txnMgr; + mLock = new ReentrantLock(true); + } + + /** + * Enters a new transaction scope which becomes the active transaction. + * + * @param level desired isolation level (may be null) + * @throws UnsupportedOperationException if isolation level higher than + * supported by repository + */ + public Transaction enter(IsolationLevel level) { + mLock.lock(); + try { + TransactionImpl parent = mActive; + IsolationLevel actualLevel = mTxnMgr.selectIsolationLevel(parent, level); + if (actualLevel == null) { + if (parent == null) { + throw new UnsupportedOperationException + ("Desired isolation level not supported: " + level); + } else { + throw new UnsupportedOperationException + ("Desired isolation level not supported: " + level + + "; parent isolation level: " + parent.getIsolationLevel()); + } + } + + return mActive = new TransactionImpl(this, parent, false, actualLevel); + } finally { + mLock.unlock(); + } + } + + /** + * Enters a new top-level transaction scope which becomes the active + * transaction. + * + * @param level desired isolation level (may be null) + * @throws UnsupportedOperationException if isolation level higher than + * supported by repository + */ + public Transaction enterTop(IsolationLevel level) { + mLock.lock(); + try { + IsolationLevel actualLevel = mTxnMgr.selectIsolationLevel(null, level); + if (actualLevel == null) { + throw new UnsupportedOperationException + ("Desired isolation level not supported: " + level); + } + + return mActive = new TransactionImpl(this, mActive, true, actualLevel); + } finally { + mLock.unlock(); + } + } + + /** + * Registers the given cursor against the active transaction, allowing it + * to be closed on transaction exit or transaction manager close. If there + * is no active transaction in scope, the cursor is registered as not part + * of a transaction. Cursors should register when created. + */ + public void register(Class type, Cursor cursor) { + mLock.lock(); + try { + checkClosed(); + if (mCursors == null) { + mCursors = new IdentityHashMap, CursorList>>(); + } + + CursorList> cursorList = mCursors.get(type); + if (cursorList == null) { + cursorList = new CursorList>(); + mCursors.put(type, cursorList); + } + + cursorList.register(cursor, mActive); + + if (mActive != null) { + mActive.register(cursor); + } + } finally { + mLock.unlock(); + } + } + + /** + * Unregisters a previously registered cursor. Cursors should unregister + * when closed. + */ + public void unregister(Class type, Cursor cursor) { + mLock.lock(); + try { + if (mCursors != null) { + CursorList> cursorList = mCursors.get(type); + if (cursorList != null) { + TransactionImpl txnImpl = cursorList.unregister(cursor); + if (txnImpl != null) { + txnImpl.unregister(cursor); + } + } + } + } finally { + mLock.unlock(); + } + } + + /** + * Returns lock used by TransactionScope. While holding lock, operations + * are suspended. + */ + public Lock getLock() { + return mLock; + } + + /** + * Returns the implementation for the active transaction, or null if there + * is no active transaction. + * + * @throws Exception thrown by createTxn or reuseTxn + */ + public Txn getTxn() throws Exception { + mLock.lock(); + try { + checkClosed(); + return mActive == null ? null : mActive.getTxn(); + } finally { + mLock.unlock(); + } + } + + /** + * Returns true if an active transaction exists and it is for update. + */ + public boolean isForUpdate() { + mLock.lock(); + try { + return (mClosed || mActive == null) ? false : mActive.isForUpdate(); + } finally { + mLock.unlock(); + } + } + + /** + * Returns the isolation level of the active transaction, or null if there + * is no active transaction. + */ + public IsolationLevel getIsolationLevel() { + mLock.lock(); + try { + return (mClosed || mActive == null) ? null : mActive.getIsolationLevel(); + } finally { + mLock.unlock(); + } + } + + /** + * Attach this scope to the current thread, if it has been {@link + * TransactionManager#detachLocalScope detached}. + * + * @throws IllegalStateException if current thread has a different + * transaction already attached + */ + public void attach() { + mLock.lock(); + try { + if (mTxnMgr.setLocalScope(this, mDetached)) { + mDetached = false; + } else if (!mDetached) { + throw new IllegalStateException("Transaction scope is not detached"); + } else { + throw new IllegalStateException + ("Current thread has a different transaction already attached"); + } + } finally { + mLock.unlock(); + } + } + + // Called by TransactionImpl. + void detach() { + mLock.lock(); + try { + if (mDetached || mTxnMgr.removeLocalScope(this)) { + mDetached = true; + } else { + throw new IllegalStateException("Transaction is attached to a different thread"); + } + } finally { + mLock.unlock(); + } + } + + // Called by TransactionManager. + void markDetached() { + mLock.lock(); + try { + mDetached = true; + } finally { + mLock.unlock(); + } + } + + /** + * Exits all transactions and closes all cursors. Should be called only + * when repository is closed. + */ + void close() throws RepositoryException { + mLock.lock(); + try { + if (!mClosed) { + while (mActive != null) { + mActive.exit(); + } + if (mCursors != null) { + for (CursorList> cursorList : mCursors.values()) { + cursorList.closeCursors(); + } + } + } + } finally { + mClosed = true; + mLock.unlock(); + } + } + + /** + * Caller must hold mLock. + */ + private void checkClosed() { + if (mClosed) { + throw new IllegalStateException("Repository is closed"); + } + } + + private static class TransactionImpl implements Transaction { + private final TransactionScope mScope; + private final TransactionImpl mParent; + private final boolean mTop; + private final IsolationLevel mLevel; + + private boolean mForUpdate; + private int mDesiredLockTimeout; + private TimeUnit mTimeoutUnit; + + private TransactionImpl mChild; + private boolean mExited; + private Txn mTxn; + + // Tracks all registered cursors. + private CursorList mCursorList; + + TransactionImpl(TransactionScope scope, + TransactionImpl parent, + boolean top, + IsolationLevel level) { + mScope = scope; + mParent = parent; + mTop = top; + mLevel = level; + if (!top && parent != null) { + parent.mChild = this; + mDesiredLockTimeout = parent.mDesiredLockTimeout; + mTimeoutUnit = parent.mTimeoutUnit; + } + } + + public void commit() throws PersistException { + TransactionScope scope = mScope; + scope.mLock.lock(); + try { + if (!mExited) { + if (mChild != null) { + mChild.commit(); + } + + closeCursors(); + + if (mTxn != null) { + if (mParent == null || mParent.mTxn != mTxn) { + try { + if (!scope.mTxnMgr.commitTxn(mTxn)) { + mTxn = null; + } + } catch (Throwable e) { + mTxn = null; + throw ExceptionTransformer.getInstance().toPersistException(e); + } + } else { + // Indicate fake nested transaction committed. + mTxn = null; + } + } + } + } finally { + scope.mLock.unlock(); + } + } + + public void exit() throws PersistException { + TransactionScope scope = mScope; + scope.mLock.lock(); + try { + if (!mExited) { + if (mChild != null) { + mChild.exit(); + } + + closeCursors(); + + if (mTxn != null) { + try { + if (mParent == null || mParent.mTxn != mTxn) { + try { + scope.mTxnMgr.abortTxn(mTxn); + } catch (Throwable e) { + throw ExceptionTransformer.getInstance().toPersistException(e); + } + } + } finally { + mTxn = null; + } + } + + scope.mActive = mParent; + + mExited = true; + } + } finally { + scope.mLock.unlock(); + } + } + + public void setForUpdate(boolean forUpdate) { + mForUpdate = forUpdate && mScope.mTxnMgr.supportsForUpdate(); + } + + public boolean isForUpdate() { + return mForUpdate; + } + + public void setDesiredLockTimeout(int timeout, TimeUnit unit) { + TransactionScope scope = mScope; + scope.mLock.lock(); + try { + if (timeout < 0) { + mDesiredLockTimeout = 0; + mTimeoutUnit = null; + } else { + mDesiredLockTimeout = timeout; + mTimeoutUnit = unit; + } + } finally { + scope.mLock.unlock(); + } + } + + public IsolationLevel getIsolationLevel() { + return mLevel; + } + + public void detach() { + mScope.detach(); + } + + public void attach() { + mScope.attach(); + } + + // Caller must hold mLock. + void register(Cursor cursor) { + if (mCursorList == null) { + mCursorList = new CursorList(); + } + mCursorList.register(cursor, null); + } + + // Caller must hold mLock. + void unregister(Cursor cursor) { + if (mCursorList != null) { + mCursorList.unregister(cursor); + } + } + + // Caller must hold mLock. + Txn getTxn() throws Exception { + TransactionScope scope = mScope; + if (mTxn != null) { + scope.mTxnMgr.reuseTxn(mTxn); + } else { + Txn parentTxn; + if (mParent == null || mTop) { + parentTxn = null; + } else if ((parentTxn = mParent.mTxn) == null) { + // No point in creating nested transaction if parent + // has never been used. Create parent transaction + // and use it in child transaction, just like a fake + // nested transaction. + if ((parentTxn = mParent.getTxn()) != null) { + return mTxn = parentTxn; + } + // Isolation level of parent is none, so proceed to create + // a real transaction. + } + if (mTimeoutUnit == null) { + mTxn = scope.mTxnMgr.createTxn(parentTxn, mLevel); + } else { + mTxn = scope.mTxnMgr.createTxn(parentTxn, mLevel, + mDesiredLockTimeout, mTimeoutUnit); + } + } + return mTxn; + } + + // Caller must hold mLock. + private void closeCursors() throws PersistException { + if (mCursorList != null) { + mCursorList.closeCursors(); + } + } + } + + /** + * Simple fast list/map for holding a small amount of cursors. + */ + static class CursorList { + private int mSize; + private Cursor[] mCursors; + private V[] mValues; + + CursorList() { + mCursors = new Cursor[8]; + } + + /** + * @param value optional value to associate + */ + @SuppressWarnings("unchecked") + void register(Cursor cursor, V value) { + int size = mSize; + Cursor[] cursors = mCursors; + + if (size == cursors.length) { + int newLength = size << 1; + + Cursor[] newCursors = new Cursor[newLength]; + System.arraycopy(cursors, 0, newCursors, 0, size); + mCursors = cursors = newCursors; + + if (mValues != null) { + V[] newValues = (V[]) new Object[newLength]; + System.arraycopy(mValues, 0, newValues, 0, size); + mValues = newValues; + } + } + + cursors[size] = cursor; + + if (value != null) { + V[] values = mValues; + if (values == null) { + mValues = values = (V[]) new Object[cursors.length]; + } + values[size] = value; + } + + mSize = size + 1; + } + + V unregister(Cursor cursor) { + // Assuming that cursors are opened and closed in LIFO order + // (stack order), search backwards to optimize. + Cursor[] cursors = mCursors; + int size = mSize; + int i = size; + search: { + while (--i >= 0) { + if (cursors[i] == cursor) { + break search; + } + } + // Not found. + return null; + } + + V[] values = mValues; + V value; + + if (values == null) { + value = null; + if (i == size - 1) { + // Clear reference so that it can be garbage collected. + cursors[i] = null; + } else { + // Shift array elements down. + System.arraycopy(cursors, i + 1, cursors, i, size - i - 1); + } + } else { + value = values[i]; + if (i == size - 1) { + // Clear references so that they can be garbage collected. + cursors[i] = null; + values[i] = null; + } else { + // Shift array elements down. + System.arraycopy(cursors, i + 1, cursors, i, size - i - 1); + System.arraycopy(values, i + 1, values, i, size - i - 1); + } + } + + mSize = size - 1; + return value; + } + + int size() { + return mSize; + } + + Cursor getCursor(int index) { + return mCursors[index]; + } + + V getValue(int index) { + V[] values = mValues; + return values == null ? null : values[index]; + } + + /** + * Closes all cursors and resets the size of this list to 0. + */ + void closeCursors() throws PersistException { + // Note: Iteration must be in reverse order. Calling close on the + // cursor should cause it to unregister from this list. This will + // cause only a modification to the end of the list, which is no + // longer needed by this method. + try { + Cursor[] cursors = mCursors; + V[] values = mValues; + int i = mSize; + if (values == null) { + while (--i >= 0) { + Cursor cursor = cursors[i]; + if (cursor != null) { + cursor.close(); + cursors[i] = null; + } + } + } else { + while (--i >= 0) { + Cursor cursor = cursors[i]; + if (cursor != null) { + cursor.close(); + cursors[i] = null; + values[i] = null; + } + } + } + } catch (FetchException e) { + throw e.toPersistException(); + } + mSize = 0; + } + } +} diff --git a/src/main/java/com/amazon/carbonado/txn/package-info.java b/src/main/java/com/amazon/carbonado/txn/package-info.java new file mode 100644 index 0000000..717592f --- /dev/null +++ b/src/main/java/com/amazon/carbonado/txn/package-info.java @@ -0,0 +1,22 @@ +/* + * Copyright 2008 Amazon Technologies, Inc. or its affiliates. + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks + * of Amazon Technologies, Inc. or its affiliates. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Provides support for managing transactions. + */ +package com.amazon.carbonado.txn; -- cgit v1.2.3