From f0ec30fd9cc7fa19f9f9bf82d7d7449a65d90359 Mon Sep 17 00:00:00 2001 From: "Brian S. O'Neill" Date: Sun, 15 Oct 2006 17:50:08 +0000 Subject: Created StorageCollection. More tests added. --- .../carbonado/repo/indexed/IndexedRepository.java | 47 +++--- .../amazon/carbonado/repo/jdbc/JDBCRepository.java | 51 +++---- .../carbonado/repo/logging/LoggingRepository.java | 25 ++-- .../repo/replicated/ReplicatedRepository.java | 34 ++--- .../carbonado/repo/sleepycat/BDBRepository.java | 163 ++++++++++++--------- 5 files changed, 168 insertions(+), 152 deletions(-) (limited to 'src/main/java/com/amazon/carbonado/repo') diff --git a/src/main/java/com/amazon/carbonado/repo/indexed/IndexedRepository.java b/src/main/java/com/amazon/carbonado/repo/indexed/IndexedRepository.java index 59a359b..6fc6d57 100644 --- a/src/main/java/com/amazon/carbonado/repo/indexed/IndexedRepository.java +++ b/src/main/java/com/amazon/carbonado/repo/indexed/IndexedRepository.java @@ -20,8 +20,6 @@ package com.amazon.carbonado.repo.indexed; import java.util.ArrayList; import java.util.Arrays; -import java.util.Map; -import java.util.IdentityHashMap; import java.util.concurrent.atomic.AtomicReference; @@ -45,6 +43,8 @@ import com.amazon.carbonado.info.StorableIntrospector; import com.amazon.carbonado.qe.RepositoryAccess; import com.amazon.carbonado.qe.StorageAccess; +import com.amazon.carbonado.spi.StorageCollection; + /** * Wraps another repository in order to make it support indexes. The wrapped * repository must support creation of new types. @@ -60,30 +60,17 @@ class IndexedRepository implements Repository, private final AtomicReference mRootRef; private final Repository mRepository; private final String mName; - private final Map, IndexedStorage> mStorages; + private final StorageCollection mStorages; IndexedRepository(AtomicReference rootRef, String name, Repository repository) { mRootRef = rootRef; mRepository = repository; mName = name; - mStorages = new IdentityHashMap, IndexedStorage>(); - if (repository.getCapability(IndexInfoCapability.class) == null) { - throw new UnsupportedOperationException - ("Wrapped repository doesn't support being indexed"); - } - } - - public String getName() { - return mName; - } - @SuppressWarnings("unchecked") - public Storage storageFor(Class type) - throws MalformedTypeException, SupportException, RepositoryException - { - synchronized (mStorages) { - IndexedStorage storage = (IndexedStorage) mStorages.get(type); - if (storage == null) { + mStorages = new StorageCollection() { + protected Storage createStorage(Class type) + throws RepositoryException + { Storage masterStorage = mRepository.storageFor(type); if (Unindexed.class.isAssignableFrom(type)) { @@ -98,13 +85,27 @@ class IndexedRepository implements Repository, return masterStorage; } - storage = new IndexedStorage(this, masterStorage); - mStorages.put(type, storage); + return new IndexedStorage(IndexedRepository.this, masterStorage); } - return storage; + }; + + if (repository.getCapability(IndexInfoCapability.class) == null) { + throw new UnsupportedOperationException + ("Wrapped repository doesn't support being indexed"); } } + public String getName() { + return mName; + } + + @SuppressWarnings("unchecked") + public Storage storageFor(Class type) + throws MalformedTypeException, SupportException, RepositoryException + { + return mStorages.storageFor(type); + } + public Transaction enterTransaction() { return mRepository.enterTransaction(); } diff --git a/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCRepository.java b/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCRepository.java index 6e7bf2f..3a6ec68 100644 --- a/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCRepository.java +++ b/src/main/java/com/amazon/carbonado/repo/jdbc/JDBCRepository.java @@ -21,6 +21,8 @@ package com.amazon.carbonado.repo.jdbc; import java.sql.Connection; import java.sql.DatabaseMetaData; import java.sql.SQLException; + +import java.util.ArrayList; import java.util.Map; import java.util.IdentityHashMap; @@ -54,6 +56,8 @@ import com.amazon.carbonado.capability.StorableInfoCapability; import com.amazon.carbonado.info.StorableProperty; +import com.amazon.carbonado.spi.StorageCollection; + /** * Repository implementation backed by a JDBC accessible database. * JDBCRepository is not independent of the underlying database schema, and so @@ -142,7 +146,7 @@ public class JDBCRepository private final DataSource mDataSource; private final String mCatalog; private final String mSchema; - private final Map, JDBCStorage> mStorages; + private final StorageCollection mStorages; // Track all open connections so that they can be closed when this // repository is closed. @@ -195,7 +199,22 @@ public class JDBCRepository mDataSource = dataSource; mCatalog = catalog; mSchema = schema; - mStorages = new IdentityHashMap, JDBCStorage>(); + + mStorages = new StorageCollection() { + protected Storage createStorage(Class type) + throws RepositoryException + { + // Lock on mAllTxnMgrs to prevent databases from being opened during shutdown. + synchronized (mAllTxnMgrs) { + JDBCStorableInfo info = examineStorable(type); + if (!info.isSupported()) { + throw new UnsupportedTypeException(type); + } + return new JDBCStorage(JDBCRepository.this, info); + } + } + }; + mOpenConnections = new IdentityHashMap(); mCurrentTxnMgr = new ThreadLocal(); mAllTxnMgrs = new WeakIdentityMap(); @@ -267,22 +286,7 @@ public class JDBCRepository @SuppressWarnings("unchecked") public Storage storageFor(Class type) throws RepositoryException { - // Lock on mAllTxnMgrs to prevent databases from being opened during shutdown. - synchronized (mAllTxnMgrs) { - JDBCStorage storage = (JDBCStorage) mStorages.get(type); - if (storage == null) { - // Examine and throw exception early if there is a problem. - JDBCStorableInfo info = examineStorable(type); - - if (!info.isSupported()) { - throw new UnsupportedTypeException(type); - } - - storage = new JDBCStorage(this, info); - mStorages.put(type, storage); - } - return storage; - } + return mStorages.storageFor(type); } public Transaction enterTransaction() { @@ -343,14 +347,11 @@ public class JDBCRepository public String[] getUserStorableTypeNames() { // We don't register Storable types persistently, so just return what // we know right now. - synchronized (mAllTxnMgrs) { - String[] names = new String[mStorages.size()]; - int i = 0; - for (Class type : mStorages.keySet()) { - names[i++] = type.getName(); - } - return names; + ArrayList names = new ArrayList(); + for (Storage storage : mStorages.allStorage()) { + names.add(storage.getStorableType().getName()); } + return names.toArray(new String[names.size()]); } public boolean isSupported(Class type) { diff --git a/src/main/java/com/amazon/carbonado/repo/logging/LoggingRepository.java b/src/main/java/com/amazon/carbonado/repo/logging/LoggingRepository.java index 99c462c..58639e2 100644 --- a/src/main/java/com/amazon/carbonado/repo/logging/LoggingRepository.java +++ b/src/main/java/com/amazon/carbonado/repo/logging/LoggingRepository.java @@ -18,9 +18,6 @@ package com.amazon.carbonado.repo.logging; -import java.util.IdentityHashMap; -import java.util.Map; - import java.util.concurrent.atomic.AtomicReference; import com.amazon.carbonado.IsolationLevel; @@ -34,6 +31,8 @@ import com.amazon.carbonado.TriggerFactory; import com.amazon.carbonado.capability.Capability; +import com.amazon.carbonado.spi.StorageCollection; + /** * * @@ -45,8 +44,7 @@ class LoggingRepository implements Repository, LogAccessCapability { private final Repository mRepo; private final Log mLog; - // Map of storages by storable class - private final Map, LoggingStorage> mStorages; + private final StorageCollection mStorages; LoggingRepository(AtomicReference rootRef, Iterable triggerFactories, @@ -57,7 +55,13 @@ class LoggingRepository implements Repository, LogAccessCapability { mRepo = actual; mLog = log; - mStorages = new IdentityHashMap, LoggingStorage>(); + mStorages = new StorageCollection() { + protected Storage createStorage(Class type) + throws RepositoryException + { + return new LoggingStorage(LoggingRepository.this, mRepo.storageFor(type)); + } + }; } public String getName() { @@ -67,14 +71,7 @@ class LoggingRepository implements Repository, LogAccessCapability { public Storage storageFor(Class type) throws SupportException, RepositoryException { - synchronized (mStorages) { - LoggingStorage storage = mStorages.get(type); - if (storage == null) { - storage = new LoggingStorage(this, mRepo.storageFor(type)); - mStorages.put(type, storage); - } - return storage; - } + return mStorages.storageFor(type); } public Transaction enterTransaction() { diff --git a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java index 9a3711f..b07b853 100644 --- a/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java +++ b/src/main/java/com/amazon/carbonado/repo/replicated/ReplicatedRepository.java @@ -18,9 +18,7 @@ package com.amazon.carbonado.repo.replicated; import java.util.Comparator; -import java.util.IdentityHashMap; import java.util.LinkedHashSet; -import java.util.Map; import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; @@ -53,6 +51,7 @@ import com.amazon.carbonado.capability.StorableInfoCapability; import com.amazon.carbonado.info.Direction; import com.amazon.carbonado.info.StorableIntrospector; +import com.amazon.carbonado.spi.StorageCollection; import com.amazon.carbonado.spi.TransactionPair; import com.amazon.carbonado.util.Throttle; @@ -136,8 +135,7 @@ class ReplicatedRepository private Repository mReplicaRepository; private Repository mMasterRepository; - // Map of storages by storable class - private final Map, ReplicatedStorage> mStorages; + private final StorageCollection mStorages; ReplicatedRepository(String aName, Repository aReplicaRepository, @@ -145,8 +143,13 @@ class ReplicatedRepository mName = aName; mReplicaRepository = aReplicaRepository; mMasterRepository = aMasterRepository; - - mStorages = new IdentityHashMap, ReplicatedStorage>(); + mStorages = new StorageCollection() { + protected Storage createStorage(Class type) + throws SupportException, RepositoryException + { + return new ReplicatedStorage(ReplicatedRepository.this, type); + } + }; } public String getName() { @@ -161,27 +164,10 @@ class ReplicatedRepository return mMasterRepository; } - @SuppressWarnings("unchecked") public Storage storageFor(Class type) throws MalformedTypeException, SupportException, RepositoryException { - synchronized (mStorages) { - ReplicatedStorage storage = mStorages.get(type); - if (storage == null) { - // Examine and throw exception if there is a problem. - StorableIntrospector.examine(type); - - storage = createStorage(type); - mStorages.put(type, storage); - } - return storage; - } - } - - private ReplicatedStorage createStorage(Class type) - throws SupportException, RepositoryException - { - return new ReplicatedStorage(this, type); + return mStorages.storageFor(type); } public Transaction enterTransaction() { diff --git a/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBRepository.java b/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBRepository.java index 5147361..b97ec14 100644 --- a/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBRepository.java +++ b/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBRepository.java @@ -22,10 +22,10 @@ import java.io.File; import java.lang.ref.WeakReference; import java.util.ArrayList; import java.util.Map; -import java.util.IdentityHashMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -43,6 +43,7 @@ import com.amazon.carbonado.PersistException; import com.amazon.carbonado.Repository; import com.amazon.carbonado.RepositoryException; import com.amazon.carbonado.Storable; +import com.amazon.carbonado.Storage; import com.amazon.carbonado.Transaction; import com.amazon.carbonado.TriggerFactory; @@ -66,6 +67,7 @@ import com.amazon.carbonado.spi.ExceptionTransformer; import com.amazon.carbonado.spi.LobEngine; import com.amazon.carbonado.spi.SequenceValueGenerator; import com.amazon.carbonado.spi.SequenceValueProducer; +import com.amazon.carbonado.spi.StorageCollection; /** * Repository implementation backed by a Berkeley DB. Data is encoded in the @@ -94,13 +96,13 @@ abstract class BDBRepository private final AtomicReference mRootRef; private final StorableCodecFactory mStorableCodecFactory; private final ExceptionTransformer mExTransformer; - private final Map, BDBStorage> mStorages; + private final StorageCollection mStorages; private final Map mSequences; private final ThreadLocal> mCurrentTxnMgr; private final Lock mShutdownLock; - // Lock with a timeout value to recover from deadlock condition. - private final int mLockTimeoutSeconds = 5; + private final Condition mShutdownCondition; + private int mShutdownBlockerCount; // Weakly tracks all BDBTransactionManager instances for shutdown hook. private final Map, ?> mAllTxnMgrs; @@ -155,10 +157,28 @@ abstract class BDBRepository mTriggerFactories = builder.getTriggerFactories(); mRootRef = rootRef; mExTransformer = exTransformer; - mStorages = new IdentityHashMap, BDBStorage>(); + + mStorages = new StorageCollection() { + protected Storage createStorage(Class type) + throws RepositoryException + { + lockoutShutdown(); + try { + try { + return BDBRepository.this.createStorage(type); + } catch (Exception e) { + throw toRepositoryException(e); + } + } finally { + unlockoutShutdown(); + } + } + }; + mSequences = new ConcurrentHashMap(); mCurrentTxnMgr = new ThreadLocal>(); mShutdownLock = new ReentrantLock(); + mShutdownCondition = mShutdownLock.newCondition(); mAllTxnMgrs = new WeakIdentityMap(); mRunCheckpointer = !builder.getReadOnly() && builder.getRunCheckpointer(); mRunDeadlockDetector = builder.getRunDeadlockDetector(); @@ -179,34 +199,10 @@ abstract class BDBRepository return mName; } - @SuppressWarnings("unchecked") public BDBStorage storageFor(Class type) throws MalformedTypeException, RepositoryException { - // Acquire lock to prevent databases from being opened during shutdown. - try { - if (mShutdownLock.tryLock(mLockTimeoutSeconds, TimeUnit.SECONDS)) { - try { - BDBStorage storage = mStorages.get(type); - if (storage == null) { - // Examine and throw exception early if there is a problem. - StorableIntrospector.examine(type); - - try { - storage = createStorage(type); - } catch (Exception e) { - throw toRepositoryException(e); - } - mStorages.put(type, storage); - } - return (BDBStorage) storage; - } finally { - mShutdownLock.unlock(); - } - } - } catch (InterruptedException e) { - } - throw new RepositoryException("Unable to acquire shutdown lock"); + return (BDBStorage) mStorages.storageFor(type); } public Transaction enterTransaction() { @@ -427,29 +423,22 @@ abstract class BDBRepository SequenceValueProducer getSequenceValueProducer(String name) throws PersistException { SequenceValueGenerator producer = mSequences.get(name); if (producer == null) { - // Acquire lock to prevent sequences from being created during shutdown. + lockoutShutdown(); try { - if (mShutdownLock.tryLock(mLockTimeoutSeconds, TimeUnit.SECONDS)) { + producer = mSequences.get(name); + if (producer == null) { + Repository metaRepo = getRootRepository(); try { - producer = mSequences.get(name); - if (producer == null) { - Repository metaRepo = getRootRepository(); - try { - producer = new SequenceValueGenerator(metaRepo, name); - } catch (RepositoryException e) { - throw toPersistException(e); - } - mSequences.put(name, producer); - } - return producer; - } finally { - mShutdownLock.unlock(); + producer = new SequenceValueGenerator(metaRepo, name); + } catch (RepositoryException e) { + throw toPersistException(e); } + mSequences.put(name, producer); } - } catch (InterruptedException e) { - e.printStackTrace(); + return producer; + } finally { + unlockoutShutdown(); } - throw new PersistException("Unable to acquire shutdown lock"); } return producer; } @@ -472,21 +461,15 @@ abstract class BDBRepository LobEngine getLobEngine() throws RepositoryException { LobEngine engine = mLobEngine; if (engine == null) { - // Acquire lock to prevent LobEngine from being created during shutdown. + lockoutShutdown(); try { - if (mShutdownLock.tryLock(mLockTimeoutSeconds, TimeUnit.SECONDS)) { - try { - if ((engine = mLobEngine) == null) { - mLobEngine = engine = new LobEngine(this); - } - return engine; - } finally { - mShutdownLock.unlock(); - } + if ((engine = mLobEngine) == null) { + mLobEngine = engine = new LobEngine(this); } - } catch (InterruptedException e) { + return engine; + } finally { + unlockoutShutdown(); } - throw new RepositoryException("Unable to acquire shutdown lock"); } return engine; } @@ -605,18 +588,66 @@ abstract class BDBRepository BDBTransactionManager openTransactionManager() { BDBTransactionManager txnMgr = mCurrentTxnMgr.get(); if (txnMgr == null) { - mShutdownLock.lock(); + lockoutShutdown(); try { txnMgr = new BDBTransactionManager(mExTransformer, this); mCurrentTxnMgr.set(txnMgr); mAllTxnMgrs.put(txnMgr, null); } finally { - mShutdownLock.unlock(); + unlockoutShutdown(); } } return txnMgr; } + /** + * Call to prevent shutdown hook from running. Be sure to call + * unlockoutShutdown afterwards. + */ + private void lockoutShutdown() { + mShutdownLock.lock(); + try { + mShutdownBlockerCount++; + } finally { + mShutdownLock.unlock(); + } + } + + /** + * Only call this to release lockoutShutdown. + */ + private void unlockoutShutdown() { + mShutdownLock.lock(); + try { + if (--mShutdownBlockerCount == 0) { + mShutdownCondition.signalAll(); + } + } finally { + mShutdownLock.unlock(); + } + } + + /** + * Only to be called by shutdown hook itself. + */ + void lockForShutdown() { + mShutdownLock.lock(); + while (mShutdownBlockerCount > 0) { + try { + mShutdownCondition.await(); + } catch (InterruptedException e) { + mLog.warn("Ignoring interruption for shutdown"); + } + } + } + + /** + * Only to be called by shutdown hook itself. + */ + void unlockForShutdown() { + mShutdownLock.unlock(); + } + /** * Periodically runs checkpoints on the environment. */ @@ -843,7 +874,7 @@ abstract class BDBRepository } private void doShutdown(BDBRepository repository, boolean suspendThreads) { - repository.mShutdownLock.lock(); + repository.lockForShutdown(); try { // Return unused sequence values. for (SequenceValueGenerator generator : repository.mSequences.values()) { @@ -877,9 +908,9 @@ abstract class BDBRepository } // Close database handles. - for (BDBStorage storage : repository.mStorages.values()) { + for (Storage storage : repository.mStorages.allStorage()) { try { - storage.close(); + ((BDBStorage) storage).close(); } catch (Throwable e) { repository.getLog().error(null, e); } @@ -914,7 +945,7 @@ abstract class BDBRepository repository.mPostShutdownHook.run(); } } finally { - repository.mShutdownLock.unlock(); + repository.unlockForShutdown(); } } } -- cgit v1.2.3