summaryrefslogtreecommitdiff
path: root/src/main/java/com/amazon/carbonado/repo/sleepycat
diff options
context:
space:
mode:
authorBrian S. O'Neill <bronee@gmail.com>2006-10-15 17:50:08 +0000
committerBrian S. O'Neill <bronee@gmail.com>2006-10-15 17:50:08 +0000
commitf0ec30fd9cc7fa19f9f9bf82d7d7449a65d90359 (patch)
treeffb5f5fecb4282f1bdb6e8bbb3e572f256310a70 /src/main/java/com/amazon/carbonado/repo/sleepycat
parent4ceddfc456e83a79e782599b5b86b68e38b6ef94 (diff)
Created StorageCollection.
More tests added.
Diffstat (limited to 'src/main/java/com/amazon/carbonado/repo/sleepycat')
-rw-r--r--src/main/java/com/amazon/carbonado/repo/sleepycat/BDBRepository.java163
1 files changed, 97 insertions, 66 deletions
diff --git a/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBRepository.java b/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBRepository.java
index 5147361..b97ec14 100644
--- a/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBRepository.java
+++ b/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBRepository.java
@@ -22,10 +22,10 @@ import java.io.File;
import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.Map;
-import java.util.IdentityHashMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -43,6 +43,7 @@ import com.amazon.carbonado.PersistException;
import com.amazon.carbonado.Repository;
import com.amazon.carbonado.RepositoryException;
import com.amazon.carbonado.Storable;
+import com.amazon.carbonado.Storage;
import com.amazon.carbonado.Transaction;
import com.amazon.carbonado.TriggerFactory;
@@ -66,6 +67,7 @@ import com.amazon.carbonado.spi.ExceptionTransformer;
import com.amazon.carbonado.spi.LobEngine;
import com.amazon.carbonado.spi.SequenceValueGenerator;
import com.amazon.carbonado.spi.SequenceValueProducer;
+import com.amazon.carbonado.spi.StorageCollection;
/**
* Repository implementation backed by a Berkeley DB. Data is encoded in the
@@ -94,13 +96,13 @@ abstract class BDBRepository<Txn>
private final AtomicReference<Repository> mRootRef;
private final StorableCodecFactory mStorableCodecFactory;
private final ExceptionTransformer mExTransformer;
- private final Map<Class<?>, BDBStorage<Txn, ?>> mStorages;
+ private final StorageCollection mStorages;
private final Map<String, SequenceValueGenerator> mSequences;
private final ThreadLocal<BDBTransactionManager<Txn>> mCurrentTxnMgr;
private final Lock mShutdownLock;
- // Lock with a timeout value to recover from deadlock condition.
- private final int mLockTimeoutSeconds = 5;
+ private final Condition mShutdownCondition;
+ private int mShutdownBlockerCount;
// Weakly tracks all BDBTransactionManager instances for shutdown hook.
private final Map<BDBTransactionManager<Txn>, ?> mAllTxnMgrs;
@@ -155,10 +157,28 @@ abstract class BDBRepository<Txn>
mTriggerFactories = builder.getTriggerFactories();
mRootRef = rootRef;
mExTransformer = exTransformer;
- mStorages = new IdentityHashMap<Class<?>, BDBStorage<Txn, ?>>();
+
+ mStorages = new StorageCollection() {
+ protected <S extends Storable> Storage<S> createStorage(Class<S> type)
+ throws RepositoryException
+ {
+ lockoutShutdown();
+ try {
+ try {
+ return BDBRepository.this.createStorage(type);
+ } catch (Exception e) {
+ throw toRepositoryException(e);
+ }
+ } finally {
+ unlockoutShutdown();
+ }
+ }
+ };
+
mSequences = new ConcurrentHashMap<String, SequenceValueGenerator>();
mCurrentTxnMgr = new ThreadLocal<BDBTransactionManager<Txn>>();
mShutdownLock = new ReentrantLock();
+ mShutdownCondition = mShutdownLock.newCondition();
mAllTxnMgrs = new WeakIdentityMap();
mRunCheckpointer = !builder.getReadOnly() && builder.getRunCheckpointer();
mRunDeadlockDetector = builder.getRunDeadlockDetector();
@@ -179,34 +199,10 @@ abstract class BDBRepository<Txn>
return mName;
}
- @SuppressWarnings("unchecked")
public <S extends Storable> BDBStorage<Txn, S> storageFor(Class<S> type)
throws MalformedTypeException, RepositoryException
{
- // Acquire lock to prevent databases from being opened during shutdown.
- try {
- if (mShutdownLock.tryLock(mLockTimeoutSeconds, TimeUnit.SECONDS)) {
- try {
- BDBStorage<Txn, ?> storage = mStorages.get(type);
- if (storage == null) {
- // Examine and throw exception early if there is a problem.
- StorableIntrospector.examine(type);
-
- try {
- storage = createStorage(type);
- } catch (Exception e) {
- throw toRepositoryException(e);
- }
- mStorages.put(type, storage);
- }
- return (BDBStorage<Txn, S>) storage;
- } finally {
- mShutdownLock.unlock();
- }
- }
- } catch (InterruptedException e) {
- }
- throw new RepositoryException("Unable to acquire shutdown lock");
+ return (BDBStorage<Txn, S>) mStorages.storageFor(type);
}
public Transaction enterTransaction() {
@@ -427,29 +423,22 @@ abstract class BDBRepository<Txn>
SequenceValueProducer getSequenceValueProducer(String name) throws PersistException {
SequenceValueGenerator producer = mSequences.get(name);
if (producer == null) {
- // Acquire lock to prevent sequences from being created during shutdown.
+ lockoutShutdown();
try {
- if (mShutdownLock.tryLock(mLockTimeoutSeconds, TimeUnit.SECONDS)) {
+ producer = mSequences.get(name);
+ if (producer == null) {
+ Repository metaRepo = getRootRepository();
try {
- producer = mSequences.get(name);
- if (producer == null) {
- Repository metaRepo = getRootRepository();
- try {
- producer = new SequenceValueGenerator(metaRepo, name);
- } catch (RepositoryException e) {
- throw toPersistException(e);
- }
- mSequences.put(name, producer);
- }
- return producer;
- } finally {
- mShutdownLock.unlock();
+ producer = new SequenceValueGenerator(metaRepo, name);
+ } catch (RepositoryException e) {
+ throw toPersistException(e);
}
+ mSequences.put(name, producer);
}
- } catch (InterruptedException e) {
- e.printStackTrace();
+ return producer;
+ } finally {
+ unlockoutShutdown();
}
- throw new PersistException("Unable to acquire shutdown lock");
}
return producer;
}
@@ -472,21 +461,15 @@ abstract class BDBRepository<Txn>
LobEngine getLobEngine() throws RepositoryException {
LobEngine engine = mLobEngine;
if (engine == null) {
- // Acquire lock to prevent LobEngine from being created during shutdown.
+ lockoutShutdown();
try {
- if (mShutdownLock.tryLock(mLockTimeoutSeconds, TimeUnit.SECONDS)) {
- try {
- if ((engine = mLobEngine) == null) {
- mLobEngine = engine = new LobEngine(this);
- }
- return engine;
- } finally {
- mShutdownLock.unlock();
- }
+ if ((engine = mLobEngine) == null) {
+ mLobEngine = engine = new LobEngine(this);
}
- } catch (InterruptedException e) {
+ return engine;
+ } finally {
+ unlockoutShutdown();
}
- throw new RepositoryException("Unable to acquire shutdown lock");
}
return engine;
}
@@ -605,19 +588,67 @@ abstract class BDBRepository<Txn>
BDBTransactionManager<Txn> openTransactionManager() {
BDBTransactionManager<Txn> txnMgr = mCurrentTxnMgr.get();
if (txnMgr == null) {
- mShutdownLock.lock();
+ lockoutShutdown();
try {
txnMgr = new BDBTransactionManager<Txn>(mExTransformer, this);
mCurrentTxnMgr.set(txnMgr);
mAllTxnMgrs.put(txnMgr, null);
} finally {
- mShutdownLock.unlock();
+ unlockoutShutdown();
}
}
return txnMgr;
}
/**
+ * Call to prevent shutdown hook from running. Be sure to call
+ * unlockoutShutdown afterwards.
+ */
+ private void lockoutShutdown() {
+ mShutdownLock.lock();
+ try {
+ mShutdownBlockerCount++;
+ } finally {
+ mShutdownLock.unlock();
+ }
+ }
+
+ /**
+ * Only call this to release lockoutShutdown.
+ */
+ private void unlockoutShutdown() {
+ mShutdownLock.lock();
+ try {
+ if (--mShutdownBlockerCount == 0) {
+ mShutdownCondition.signalAll();
+ }
+ } finally {
+ mShutdownLock.unlock();
+ }
+ }
+
+ /**
+ * Only to be called by shutdown hook itself.
+ */
+ void lockForShutdown() {
+ mShutdownLock.lock();
+ while (mShutdownBlockerCount > 0) {
+ try {
+ mShutdownCondition.await();
+ } catch (InterruptedException e) {
+ mLog.warn("Ignoring interruption for shutdown");
+ }
+ }
+ }
+
+ /**
+ * Only to be called by shutdown hook itself.
+ */
+ void unlockForShutdown() {
+ mShutdownLock.unlock();
+ }
+
+ /**
* Periodically runs checkpoints on the environment.
*/
private static class Checkpointer extends Thread {
@@ -843,7 +874,7 @@ abstract class BDBRepository<Txn>
}
private void doShutdown(BDBRepository<?> repository, boolean suspendThreads) {
- repository.mShutdownLock.lock();
+ repository.lockForShutdown();
try {
// Return unused sequence values.
for (SequenceValueGenerator generator : repository.mSequences.values()) {
@@ -877,9 +908,9 @@ abstract class BDBRepository<Txn>
}
// Close database handles.
- for (BDBStorage storage : repository.mStorages.values()) {
+ for (Storage storage : repository.mStorages.allStorage()) {
try {
- storage.close();
+ ((BDBStorage) storage).close();
} catch (Throwable e) {
repository.getLog().error(null, e);
}
@@ -914,7 +945,7 @@ abstract class BDBRepository<Txn>
repository.mPostShutdownHook.run();
}
} finally {
- repository.mShutdownLock.unlock();
+ repository.unlockForShutdown();
}
}
}