diff options
Diffstat (limited to 'src/main/java/com/amazon/carbonado/repo')
5 files changed, 168 insertions, 152 deletions
diff --git a/src/main/java/com/amazon/carbonado/repo/indexed/IndexedRepository.java b/src/main/java/com/amazon/carbonado/repo/indexed/IndexedRepository.java index 59a359b..6fc6d57 100644 --- a/src/main/java/com/amazon/carbonado/repo/indexed/IndexedRepository.java +++ b/src/main/java/com/amazon/carbonado/repo/indexed/IndexedRepository.java @@ -20,8 +20,6 @@ package com.amazon.carbonado.repo.indexed; import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Map;
-import java.util.IdentityHashMap;
import java.util.concurrent.atomic.AtomicReference;
@@ -45,6 +43,8 @@ import com.amazon.carbonado.info.StorableIntrospector; import com.amazon.carbonado.qe.RepositoryAccess;
import com.amazon.carbonado.qe.StorageAccess;
+import com.amazon.carbonado.spi.StorageCollection;
+
/**
* Wraps another repository in order to make it support indexes. The wrapped
* repository must support creation of new types.
@@ -60,30 +60,17 @@ class IndexedRepository implements Repository, private final AtomicReference<Repository> mRootRef;
private final Repository mRepository;
private final String mName;
- private final Map<Class<?>, IndexedStorage<?>> mStorages;
+ private final StorageCollection mStorages;
IndexedRepository(AtomicReference<Repository> rootRef, String name, Repository repository) {
mRootRef = rootRef;
mRepository = repository;
mName = name;
- mStorages = new IdentityHashMap<Class<?>, IndexedStorage<?>>();
- if (repository.getCapability(IndexInfoCapability.class) == null) {
- throw new UnsupportedOperationException
- ("Wrapped repository doesn't support being indexed");
- }
- }
-
- public String getName() {
- return mName;
- }
- @SuppressWarnings("unchecked")
- public <S extends Storable> Storage<S> storageFor(Class<S> type)
- throws MalformedTypeException, SupportException, RepositoryException
- {
- synchronized (mStorages) {
- IndexedStorage<S> storage = (IndexedStorage<S>) mStorages.get(type);
- if (storage == null) {
+ mStorages = new StorageCollection() {
+ protected <S extends Storable> Storage<S> createStorage(Class<S> type)
+ throws RepositoryException
+ {
Storage<S> masterStorage = mRepository.storageFor(type);
if (Unindexed.class.isAssignableFrom(type)) {
@@ -98,13 +85,27 @@ class IndexedRepository implements Repository, return masterStorage;
}
- storage = new IndexedStorage<S>(this, masterStorage);
- mStorages.put(type, storage);
+ return new IndexedStorage<S>(IndexedRepository.this, masterStorage);
}
- return storage;
+ };
+
+ if (repository.getCapability(IndexInfoCapability.class) == null) {
+ throw new UnsupportedOperationException
+ ("Wrapped repository doesn't support being indexed");
}
}
+ public String getName() {
+ return mName;
+ }
+
+ @SuppressWarnings("unchecked")
+ public <S extends Storable> Storage<S> storageFor(Class<S> type)
+ throws MalformedTypeException, SupportException, RepositoryException
+ {
+ return mStorages.storageFor(type);
+ }
+
public Transaction enterTransaction() {
return mRepository.enterTransaction();
}
diff --git a/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCRepository.java b/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCRepository.java index 6e7bf2f..3a6ec68 100644 --- a/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCRepository.java +++ b/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCRepository.java @@ -21,6 +21,8 @@ package com.amazon.carbonado.repo.jdbc; import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.SQLException;
+
+import java.util.ArrayList;
import java.util.Map;
import java.util.IdentityHashMap;
@@ -54,6 +56,8 @@ import com.amazon.carbonado.capability.StorableInfoCapability; import com.amazon.carbonado.info.StorableProperty;
+import com.amazon.carbonado.spi.StorageCollection;
+
/**
* Repository implementation backed by a JDBC accessible database.
* JDBCRepository is not independent of the underlying database schema, and so
@@ -142,7 +146,7 @@ public class JDBCRepository private final DataSource mDataSource;
private final String mCatalog;
private final String mSchema;
- private final Map<Class<?>, JDBCStorage<?>> mStorages;
+ private final StorageCollection mStorages;
// Track all open connections so that they can be closed when this
// repository is closed.
@@ -195,7 +199,22 @@ public class JDBCRepository mDataSource = dataSource;
mCatalog = catalog;
mSchema = schema;
- mStorages = new IdentityHashMap<Class<?>, JDBCStorage<?>>();
+
+ mStorages = new StorageCollection() {
+ protected <S extends Storable> Storage<S> createStorage(Class<S> type)
+ throws RepositoryException
+ {
+ // Lock on mAllTxnMgrs to prevent databases from being opened during shutdown.
+ synchronized (mAllTxnMgrs) {
+ JDBCStorableInfo<S> info = examineStorable(type);
+ if (!info.isSupported()) {
+ throw new UnsupportedTypeException(type);
+ }
+ return new JDBCStorage<S>(JDBCRepository.this, info);
+ }
+ }
+ };
+
mOpenConnections = new IdentityHashMap<Connection, Object>();
mCurrentTxnMgr = new ThreadLocal<JDBCTransactionManager>();
mAllTxnMgrs = new WeakIdentityMap();
@@ -267,22 +286,7 @@ public class JDBCRepository @SuppressWarnings("unchecked")
public <S extends Storable> Storage<S> storageFor(Class<S> type) throws RepositoryException {
- // Lock on mAllTxnMgrs to prevent databases from being opened during shutdown.
- synchronized (mAllTxnMgrs) {
- JDBCStorage<S> storage = (JDBCStorage<S>) mStorages.get(type);
- if (storage == null) {
- // Examine and throw exception early if there is a problem.
- JDBCStorableInfo<S> info = examineStorable(type);
-
- if (!info.isSupported()) {
- throw new UnsupportedTypeException(type);
- }
-
- storage = new JDBCStorage<S>(this, info);
- mStorages.put(type, storage);
- }
- return storage;
- }
+ return mStorages.storageFor(type);
}
public Transaction enterTransaction() {
@@ -343,14 +347,11 @@ public class JDBCRepository public String[] getUserStorableTypeNames() {
// We don't register Storable types persistently, so just return what
// we know right now.
- synchronized (mAllTxnMgrs) {
- String[] names = new String[mStorages.size()];
- int i = 0;
- for (Class<?> type : mStorages.keySet()) {
- names[i++] = type.getName();
- }
- return names;
+ ArrayList<String> names = new ArrayList<String>();
+ for (Storage storage : mStorages.allStorage()) {
+ names.add(storage.getStorableType().getName());
}
+ return names.toArray(new String[names.size()]);
}
public boolean isSupported(Class<Storable> type) {
diff --git a/src/main/java/com/amazon/carbonado/repo/logging/LoggingRepository.java b/src/main/java/com/amazon/carbonado/repo/logging/LoggingRepository.java index 99c462c..58639e2 100644 --- a/src/main/java/com/amazon/carbonado/repo/logging/LoggingRepository.java +++ b/src/main/java/com/amazon/carbonado/repo/logging/LoggingRepository.java @@ -18,9 +18,6 @@ package com.amazon.carbonado.repo.logging;
-import java.util.IdentityHashMap;
-import java.util.Map;
-
import java.util.concurrent.atomic.AtomicReference;
import com.amazon.carbonado.IsolationLevel;
@@ -34,6 +31,8 @@ import com.amazon.carbonado.TriggerFactory; import com.amazon.carbonado.capability.Capability;
+import com.amazon.carbonado.spi.StorageCollection;
+
/**
*
*
@@ -45,8 +44,7 @@ class LoggingRepository implements Repository, LogAccessCapability { private final Repository mRepo;
private final Log mLog;
- // Map of storages by storable class
- private final Map<Class<?>, LoggingStorage<?>> mStorages;
+ private final StorageCollection mStorages;
LoggingRepository(AtomicReference<Repository> rootRef,
Iterable<TriggerFactory> triggerFactories,
@@ -57,7 +55,13 @@ class LoggingRepository implements Repository, LogAccessCapability { mRepo = actual;
mLog = log;
- mStorages = new IdentityHashMap<Class<?>, LoggingStorage<?>>();
+ mStorages = new StorageCollection() {
+ protected <S extends Storable> Storage<S> createStorage(Class<S> type)
+ throws RepositoryException
+ {
+ return new LoggingStorage(LoggingRepository.this, mRepo.storageFor(type));
+ }
+ };
}
public String getName() {
@@ -67,14 +71,7 @@ class LoggingRepository implements Repository, LogAccessCapability { public <S extends Storable> Storage<S> storageFor(Class<S> type)
throws SupportException, RepositoryException
{
- synchronized (mStorages) {
- LoggingStorage storage = mStorages.get(type);
- if (storage == null) {
- storage = new LoggingStorage(this, mRepo.storageFor(type));
- mStorages.put(type, storage);
- }
- return storage;
- }
+ return mStorages.storageFor(type);
}
public Transaction enterTransaction() {
diff --git a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java index 9a3711f..b07b853 100644 --- a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java +++ b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java @@ -18,9 +18,7 @@ package com.amazon.carbonado.repo.replicated;
import java.util.Comparator;
-import java.util.IdentityHashMap;
import java.util.LinkedHashSet;
-import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
@@ -53,6 +51,7 @@ import com.amazon.carbonado.capability.StorableInfoCapability; import com.amazon.carbonado.info.Direction;
import com.amazon.carbonado.info.StorableIntrospector;
+import com.amazon.carbonado.spi.StorageCollection;
import com.amazon.carbonado.spi.TransactionPair;
import com.amazon.carbonado.util.Throttle;
@@ -136,8 +135,7 @@ class ReplicatedRepository private Repository mReplicaRepository;
private Repository mMasterRepository;
- // Map of storages by storable class
- private final Map<Class<?>, ReplicatedStorage<?>> mStorages;
+ private final StorageCollection mStorages;
ReplicatedRepository(String aName,
Repository aReplicaRepository,
@@ -145,8 +143,13 @@ class ReplicatedRepository mName = aName;
mReplicaRepository = aReplicaRepository;
mMasterRepository = aMasterRepository;
-
- mStorages = new IdentityHashMap<Class<?>, ReplicatedStorage<?>>();
+ mStorages = new StorageCollection() {
+ protected <S extends Storable> Storage<S> createStorage(Class<S> type)
+ throws SupportException, RepositoryException
+ {
+ return new ReplicatedStorage<S>(ReplicatedRepository.this, type);
+ }
+ };
}
public String getName() {
@@ -161,27 +164,10 @@ class ReplicatedRepository return mMasterRepository;
}
- @SuppressWarnings("unchecked")
public <S extends Storable> Storage<S> storageFor(Class<S> type)
throws MalformedTypeException, SupportException, RepositoryException
{
- synchronized (mStorages) {
- ReplicatedStorage storage = mStorages.get(type);
- if (storage == null) {
- // Examine and throw exception if there is a problem.
- StorableIntrospector.examine(type);
-
- storage = createStorage(type);
- mStorages.put(type, storage);
- }
- return storage;
- }
- }
-
- private <S extends Storable> ReplicatedStorage<S> createStorage(Class<S> type)
- throws SupportException, RepositoryException
- {
- return new ReplicatedStorage<S>(this, type);
+ return mStorages.storageFor(type);
}
public Transaction enterTransaction() {
diff --git a/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBRepository.java b/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBRepository.java index 5147361..b97ec14 100644 --- a/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBRepository.java +++ b/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBRepository.java @@ -22,10 +22,10 @@ import java.io.File; import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.Map;
-import java.util.IdentityHashMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -43,6 +43,7 @@ import com.amazon.carbonado.PersistException; import com.amazon.carbonado.Repository;
import com.amazon.carbonado.RepositoryException;
import com.amazon.carbonado.Storable;
+import com.amazon.carbonado.Storage;
import com.amazon.carbonado.Transaction;
import com.amazon.carbonado.TriggerFactory;
@@ -66,6 +67,7 @@ import com.amazon.carbonado.spi.ExceptionTransformer; import com.amazon.carbonado.spi.LobEngine;
import com.amazon.carbonado.spi.SequenceValueGenerator;
import com.amazon.carbonado.spi.SequenceValueProducer;
+import com.amazon.carbonado.spi.StorageCollection;
/**
* Repository implementation backed by a Berkeley DB. Data is encoded in the
@@ -94,13 +96,13 @@ abstract class BDBRepository<Txn> private final AtomicReference<Repository> mRootRef;
private final StorableCodecFactory mStorableCodecFactory;
private final ExceptionTransformer mExTransformer;
- private final Map<Class<?>, BDBStorage<Txn, ?>> mStorages;
+ private final StorageCollection mStorages;
private final Map<String, SequenceValueGenerator> mSequences;
private final ThreadLocal<BDBTransactionManager<Txn>> mCurrentTxnMgr;
private final Lock mShutdownLock;
- // Lock with a timeout value to recover from deadlock condition.
- private final int mLockTimeoutSeconds = 5;
+ private final Condition mShutdownCondition;
+ private int mShutdownBlockerCount;
// Weakly tracks all BDBTransactionManager instances for shutdown hook.
private final Map<BDBTransactionManager<Txn>, ?> mAllTxnMgrs;
@@ -155,10 +157,28 @@ abstract class BDBRepository<Txn> mTriggerFactories = builder.getTriggerFactories();
mRootRef = rootRef;
mExTransformer = exTransformer;
- mStorages = new IdentityHashMap<Class<?>, BDBStorage<Txn, ?>>();
+
+ mStorages = new StorageCollection() {
+ protected <S extends Storable> Storage<S> createStorage(Class<S> type)
+ throws RepositoryException
+ {
+ lockoutShutdown();
+ try {
+ try {
+ return BDBRepository.this.createStorage(type);
+ } catch (Exception e) {
+ throw toRepositoryException(e);
+ }
+ } finally {
+ unlockoutShutdown();
+ }
+ }
+ };
+
mSequences = new ConcurrentHashMap<String, SequenceValueGenerator>();
mCurrentTxnMgr = new ThreadLocal<BDBTransactionManager<Txn>>();
mShutdownLock = new ReentrantLock();
+ mShutdownCondition = mShutdownLock.newCondition();
mAllTxnMgrs = new WeakIdentityMap();
mRunCheckpointer = !builder.getReadOnly() && builder.getRunCheckpointer();
mRunDeadlockDetector = builder.getRunDeadlockDetector();
@@ -179,34 +199,10 @@ abstract class BDBRepository<Txn> return mName;
}
- @SuppressWarnings("unchecked")
public <S extends Storable> BDBStorage<Txn, S> storageFor(Class<S> type)
throws MalformedTypeException, RepositoryException
{
- // Acquire lock to prevent databases from being opened during shutdown.
- try {
- if (mShutdownLock.tryLock(mLockTimeoutSeconds, TimeUnit.SECONDS)) {
- try {
- BDBStorage<Txn, ?> storage = mStorages.get(type);
- if (storage == null) {
- // Examine and throw exception early if there is a problem.
- StorableIntrospector.examine(type);
-
- try {
- storage = createStorage(type);
- } catch (Exception e) {
- throw toRepositoryException(e);
- }
- mStorages.put(type, storage);
- }
- return (BDBStorage<Txn, S>) storage;
- } finally {
- mShutdownLock.unlock();
- }
- }
- } catch (InterruptedException e) {
- }
- throw new RepositoryException("Unable to acquire shutdown lock");
+ return (BDBStorage<Txn, S>) mStorages.storageFor(type);
}
public Transaction enterTransaction() {
@@ -427,29 +423,22 @@ abstract class BDBRepository<Txn> SequenceValueProducer getSequenceValueProducer(String name) throws PersistException {
SequenceValueGenerator producer = mSequences.get(name);
if (producer == null) {
- // Acquire lock to prevent sequences from being created during shutdown.
+ lockoutShutdown();
try {
- if (mShutdownLock.tryLock(mLockTimeoutSeconds, TimeUnit.SECONDS)) {
+ producer = mSequences.get(name);
+ if (producer == null) {
+ Repository metaRepo = getRootRepository();
try {
- producer = mSequences.get(name);
- if (producer == null) {
- Repository metaRepo = getRootRepository();
- try {
- producer = new SequenceValueGenerator(metaRepo, name);
- } catch (RepositoryException e) {
- throw toPersistException(e);
- }
- mSequences.put(name, producer);
- }
- return producer;
- } finally {
- mShutdownLock.unlock();
+ producer = new SequenceValueGenerator(metaRepo, name);
+ } catch (RepositoryException e) {
+ throw toPersistException(e);
}
+ mSequences.put(name, producer);
}
- } catch (InterruptedException e) {
- e.printStackTrace();
+ return producer;
+ } finally {
+ unlockoutShutdown();
}
- throw new PersistException("Unable to acquire shutdown lock");
}
return producer;
}
@@ -472,21 +461,15 @@ abstract class BDBRepository<Txn> LobEngine getLobEngine() throws RepositoryException {
LobEngine engine = mLobEngine;
if (engine == null) {
- // Acquire lock to prevent LobEngine from being created during shutdown.
+ lockoutShutdown();
try {
- if (mShutdownLock.tryLock(mLockTimeoutSeconds, TimeUnit.SECONDS)) {
- try {
- if ((engine = mLobEngine) == null) {
- mLobEngine = engine = new LobEngine(this);
- }
- return engine;
- } finally {
- mShutdownLock.unlock();
- }
+ if ((engine = mLobEngine) == null) {
+ mLobEngine = engine = new LobEngine(this);
}
- } catch (InterruptedException e) {
+ return engine;
+ } finally {
+ unlockoutShutdown();
}
- throw new RepositoryException("Unable to acquire shutdown lock");
}
return engine;
}
@@ -605,19 +588,67 @@ abstract class BDBRepository<Txn> BDBTransactionManager<Txn> openTransactionManager() {
BDBTransactionManager<Txn> txnMgr = mCurrentTxnMgr.get();
if (txnMgr == null) {
- mShutdownLock.lock();
+ lockoutShutdown();
try {
txnMgr = new BDBTransactionManager<Txn>(mExTransformer, this);
mCurrentTxnMgr.set(txnMgr);
mAllTxnMgrs.put(txnMgr, null);
} finally {
- mShutdownLock.unlock();
+ unlockoutShutdown();
}
}
return txnMgr;
}
/**
+ * Call to prevent shutdown hook from running. Be sure to call
+ * unlockoutShutdown afterwards.
+ */
+ private void lockoutShutdown() {
+ mShutdownLock.lock();
+ try {
+ mShutdownBlockerCount++;
+ } finally {
+ mShutdownLock.unlock();
+ }
+ }
+
+ /**
+ * Only call this to release lockoutShutdown.
+ */
+ private void unlockoutShutdown() {
+ mShutdownLock.lock();
+ try {
+ if (--mShutdownBlockerCount == 0) {
+ mShutdownCondition.signalAll();
+ }
+ } finally {
+ mShutdownLock.unlock();
+ }
+ }
+
+ /**
+ * Only to be called by shutdown hook itself.
+ */
+ void lockForShutdown() {
+ mShutdownLock.lock();
+ while (mShutdownBlockerCount > 0) {
+ try {
+ mShutdownCondition.await();
+ } catch (InterruptedException e) {
+ mLog.warn("Ignoring interruption for shutdown");
+ }
+ }
+ }
+
+ /**
+ * Only to be called by shutdown hook itself.
+ */
+ void unlockForShutdown() {
+ mShutdownLock.unlock();
+ }
+
+ /**
* Periodically runs checkpoints on the environment.
*/
private static class Checkpointer extends Thread {
@@ -843,7 +874,7 @@ abstract class BDBRepository<Txn> }
private void doShutdown(BDBRepository<?> repository, boolean suspendThreads) {
- repository.mShutdownLock.lock();
+ repository.lockForShutdown();
try {
// Return unused sequence values.
for (SequenceValueGenerator generator : repository.mSequences.values()) {
@@ -877,9 +908,9 @@ abstract class BDBRepository<Txn> }
// Close database handles.
- for (BDBStorage storage : repository.mStorages.values()) {
+ for (Storage storage : repository.mStorages.allStorage()) {
try {
- storage.close();
+ ((BDBStorage) storage).close();
} catch (Throwable e) {
repository.getLog().error(null, e);
}
@@ -914,7 +945,7 @@ abstract class BDBRepository<Txn> repository.mPostShutdownHook.run();
}
} finally {
- repository.mShutdownLock.unlock();
+ repository.unlockForShutdown();
}
}
}
|