diff options
Diffstat (limited to 'src/main/java/com/amazon/carbonado/repo/sleepycat')
4 files changed, 182 insertions, 400 deletions
diff --git a/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBCursor.java b/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBCursor.java index 5d21dad..f16d2f7 100644 --- a/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBCursor.java +++ b/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBCursor.java @@ -25,6 +25,8 @@ import com.amazon.carbonado.Storable; import com.amazon.carbonado.raw.RawCursor;
import com.amazon.carbonado.raw.RawUtil;
+import com.amazon.carbonado.spi.TransactionManager;
+
/**
*
*
@@ -33,7 +35,7 @@ import com.amazon.carbonado.raw.RawUtil; abstract class BDBCursor<Txn, S extends Storable> extends RawCursor<S> {
private static final byte[] NO_DATA = new byte[0];
- private final BDBTransactionManager<Txn> mTxnMgr;
+ private final TransactionManager<Txn> mTxnMgr;
private final BDBStorage<Txn, S> mStorage;
/**
* @param txnMgr
@@ -48,7 +50,7 @@ abstract class BDBCursor<Txn, S extends Storable> extends RawCursor<S> { * @throws ClassCastException if lock is not an object passed by
* {@link BDBStorage#openCursor BDBStorage.openCursor}
*/
- protected BDBCursor(BDBTransactionManager<Txn> txnMgr,
+ protected BDBCursor(TransactionManager<Txn> txnMgr,
byte[] startBound, boolean inclusiveStart,
byte[] endBound, boolean inclusiveEnd,
int maxPrefix,
diff --git a/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBRepository.java b/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBRepository.java index 6fc7cdd..0e2ed96 100644 --- a/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBRepository.java +++ b/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBRepository.java @@ -22,23 +22,15 @@ import java.io.File; import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.cojen.util.WeakIdentityMap;
-
import com.amazon.carbonado.ConfigurationException;
import com.amazon.carbonado.Cursor;
import com.amazon.carbonado.FetchException;
import com.amazon.carbonado.IsolationLevel;
-import com.amazon.carbonado.MalformedTypeException;
import com.amazon.carbonado.PersistException;
import com.amazon.carbonado.Repository;
import com.amazon.carbonado.RepositoryException;
@@ -63,11 +55,14 @@ import com.amazon.carbonado.qe.StorageAccess; import com.amazon.carbonado.raw.StorableCodecFactory;
+import com.amazon.carbonado.sequence.SequenceCapability;
+import com.amazon.carbonado.sequence.SequenceValueGenerator;
+import com.amazon.carbonado.sequence.SequenceValueProducer;
+
+import com.amazon.carbonado.spi.AbstractRepository;
import com.amazon.carbonado.spi.ExceptionTransformer;
import com.amazon.carbonado.spi.LobEngine;
-import com.amazon.carbonado.spi.SequenceValueGenerator;
-import com.amazon.carbonado.spi.SequenceValueProducer;
-import com.amazon.carbonado.spi.StorageCollection;
+import com.amazon.carbonado.spi.TransactionManager;
/**
* Repository implementation backed by a Berkeley DB. Data is encoded in the
@@ -78,42 +73,31 @@ import com.amazon.carbonado.spi.StorageCollection; * @author Brian S O'Neill
* @author Vidya Iyer
* @author Nicole Deflaux
+ * @author bcastill
*/
-abstract class BDBRepository<Txn>
+abstract class BDBRepository<Txn> extends AbstractRepository<Txn>
implements Repository,
RepositoryAccess,
IndexInfoCapability,
CheckpointCapability,
EnvironmentCapability,
ShutdownCapability,
- StorableInfoCapability
+ StorableInfoCapability,
+ SequenceCapability
{
private final Log mLog = LogFactory.getLog(getClass());
- private final String mName;
private final boolean mIsMaster;
final Iterable<TriggerFactory> mTriggerFactories;
private final AtomicReference<Repository> mRootRef;
private final StorableCodecFactory mStorableCodecFactory;
private final ExceptionTransformer mExTransformer;
- private final StorageCollection mStorages;
- private final Map<String, SequenceValueGenerator> mSequences;
- private final ThreadLocal<BDBTransactionManager<Txn>> mCurrentTxnMgr;
-
- private final Lock mShutdownLock;
- private final Condition mShutdownCondition;
- private int mShutdownBlockerCount;
-
- // Weakly tracks all BDBTransactionManager instances for shutdown hook.
- private final Map<BDBTransactionManager<Txn>, ?> mAllTxnMgrs;
Checkpointer mCheckpointer;
DeadlockDetector mDeadlockDetector;
- private ShutdownHook mShutdownHook;
- final Runnable mPreShutdownHook;
- final Runnable mPostShutdownHook;
- volatile boolean mHasShutdown;
+ private final Runnable mPreShutdownHook;
+ private final Runnable mPostShutdownHook;
private final Object mInitialDBConfig;
private final BDBRepositoryBuilder.DatabaseHook mDatabaseHook;
@@ -126,8 +110,6 @@ abstract class BDBRepository<Txn> final File mEnvHome;
final String mSingleFileName;
- private final String mMergeSortTempDir;
-
private LayoutFactory mLayoutFactory;
private LobEngine mLobEngine;
@@ -146,40 +128,19 @@ abstract class BDBRepository<Txn> ExceptionTransformer exTransformer)
throws ConfigurationException
{
+ super(builder.getName());
+
builder.assertReady();
if (exTransformer == null) {
throw new IllegalArgumentException("Exception transformer must not be null");
}
- mName = builder.getName();
mIsMaster = builder.isMaster();
mTriggerFactories = builder.getTriggerFactories();
mRootRef = rootRef;
mExTransformer = exTransformer;
- mStorages = new StorageCollection() {
- protected <S extends Storable> Storage<S> createStorage(Class<S> type)
- throws RepositoryException
- {
- lockoutShutdown();
- try {
- try {
- return BDBRepository.this.createStorage(type);
- } catch (Exception e) {
- throw toRepositoryException(e);
- }
- } finally {
- unlockoutShutdown();
- }
- }
- };
-
- mSequences = new ConcurrentHashMap<String, SequenceValueGenerator>();
- mCurrentTxnMgr = new ThreadLocal<BDBTransactionManager<Txn>>();
- mShutdownLock = new ReentrantLock();
- mShutdownCondition = mShutdownLock.newCondition();
- mAllTxnMgrs = new WeakIdentityMap();
mRunCheckpointer = !builder.getReadOnly() && builder.getRunCheckpointer();
mRunDeadlockDetector = builder.getRunDeadlockDetector();
mStorableCodecFactory = builder.getStorableCodecFactory();
@@ -191,40 +152,13 @@ abstract class BDBRepository<Txn> mDataHome = builder.getDataHomeFile();
mEnvHome = builder.getEnvironmentHomeFile();
mSingleFileName = builder.getSingleFileName();
- // FIXME: see comments in builder
- mMergeSortTempDir = null; //builder.getMergeSortTempDirectory();
- }
-
- public String getName() {
- return mName;
- }
-
- public <S extends Storable> BDBStorage<Txn, S> storageFor(Class<S> type)
- throws MalformedTypeException, RepositoryException
- {
- return (BDBStorage<Txn, S>) mStorages.storageFor(type);
- }
-
- public Transaction enterTransaction() {
- return openTransactionManager().enter(null);
- }
-
- public Transaction enterTransaction(IsolationLevel level) {
- return openTransactionManager().enter(level);
- }
-
- public Transaction enterTopTransaction(IsolationLevel level) {
- return openTransactionManager().enterTop(level);
- }
-
- public IsolationLevel getTransactionIsolationLevel() {
- return openTransactionManager().getIsolationLevel();
}
@SuppressWarnings("unchecked")
public <C extends Capability> C getCapability(Class<C> capabilityType) {
- if (capabilityType.isInstance(this)) {
- return (C) this;
+ C cap = super.getCapability(capabilityType);
+ if (cap != null) {
+ return cap;
}
if (capabilityType == LayoutCapability.class) {
return (C) mLayoutFactory;
@@ -276,65 +210,6 @@ abstract class BDBRepository<Txn> return StorableIntrospector.examine(type).getAllProperties().get(name) != null;
}
- public void close() {
- shutdown(false);
- }
-
- public boolean isAutoShutdownEnabled() {
- return mShutdownHook != null;
- }
-
- public void setAutoShutdownEnabled(boolean enabled) {
- if (mShutdownHook == null) {
- if (enabled) {
- mShutdownHook = new ShutdownHook(this);
- try {
- Runtime.getRuntime().addShutdownHook(mShutdownHook);
- } catch (IllegalStateException e) {
- // Shutdown in progress, so immediately run hook.
- mShutdownHook.run();
- }
- }
- } else {
- if (!enabled) {
- try {
- Runtime.getRuntime().removeShutdownHook(mShutdownHook);
- } catch (IllegalStateException e) {
- // Shutdown in progress, hook is running.
- }
- mShutdownHook = null;
- }
- }
- }
-
- public void shutdown() {
- shutdown(true);
- }
-
- private void shutdown(boolean suspendThreads) {
- if (!mHasShutdown) {
- // Since this repository is being closed before system shutdown,
- // remove shutdown hook and run it now.
- ShutdownHook hook = mShutdownHook;
- if (hook != null) {
- try {
- Runtime.getRuntime().removeShutdownHook(hook);
- } catch (IllegalStateException e) {
- // Shutdown in progress, hook is running.
- hook = null;
- }
- } else {
- // If hook is null, auto-shutdown was disabled. Make a new
- // instance to use, but don't register it.
- hook = new ShutdownHook(this);
- }
- if (hook != null) {
- hook.run(suspendThreads);
- }
- mHasShutdown = true;
- }
- }
-
/**
* Suspend the checkpointer until the suspension time has expired or until
* manually resumed. If a checkpoint is in progress, this method will block
@@ -390,7 +265,7 @@ abstract class BDBRepository<Txn> public <S extends Storable> StorageAccess<S> storageAccessFor(Class<S> type)
throws RepositoryException
{
- return storageFor(type);
+ return (BDBStorage<Txn, S>) storageFor(type);
}
@Override
@@ -398,6 +273,78 @@ abstract class BDBRepository<Txn> close();
}
+ protected void shutdownHook() {
+ // Run any external shutdown logic that needs to happen before the
+ // databases and the environment are actually closed
+ if (mPreShutdownHook != null) {
+ mPreShutdownHook.run();
+ }
+
+ // Close database handles.
+ for (Storage storage : allStorage()) {
+ try {
+ if (storage instanceof BDBStorage) {
+ ((BDBStorage) storage).close();
+ }
+ } catch (Throwable e) {
+ getLog().error(null, e);
+ }
+ }
+
+ // Wait for checkpointer to finish.
+ if (mCheckpointer != null) {
+ mCheckpointer.interrupt();
+ try {
+ mCheckpointer.join();
+ } catch (InterruptedException e) {
+ }
+ }
+
+ // Wait for deadlock detector to finish.
+ if (mDeadlockDetector != null) {
+ mDeadlockDetector.interrupt();
+ try {
+ mDeadlockDetector.join();
+ } catch (InterruptedException e) {
+ }
+ }
+
+ // Close environment.
+ try {
+ env_close();
+ } catch (Throwable e) {
+ getLog().error(null, e);
+ }
+
+ if (mPostShutdownHook != null) {
+ mPostShutdownHook.run();
+ }
+ }
+
+ protected Log getLog() {
+ return mLog;
+ }
+
+ protected TransactionManager createTransactionManager() {
+ return new BDBTransactionManager(mExTransformer, this);
+ }
+
+ protected <S extends Storable> Storage createStorage(Class<S> type)
+ throws RepositoryException
+ {
+ try {
+ return createBDBStorage(type);
+ } catch (Exception e) {
+ throw toRepositoryException(e);
+ }
+ }
+
+ protected SequenceValueProducer createSequenceValueProducer(String name)
+ throws RepositoryException
+ {
+ return new SequenceValueGenerator(BDBRepository.this, name);
+ }
+
/**
* @see com.amazon.carbonado.spi.RepositoryBuilder#isMaster
*/
@@ -417,40 +364,6 @@ abstract class BDBRepository<Txn> return dbName;
}
- String getMergeSortTempDirectory() {
- if (mMergeSortTempDir != null) {
- new File(mMergeSortTempDir).mkdirs();
- }
- return mMergeSortTempDir;
- }
-
- SequenceValueProducer getSequenceValueProducer(String name) throws PersistException {
- SequenceValueGenerator producer = mSequences.get(name);
- if (producer == null) {
- lockoutShutdown();
- try {
- producer = mSequences.get(name);
- if (producer == null) {
- Repository metaRepo = getRootRepository();
- try {
- producer = new SequenceValueGenerator(metaRepo, name);
- } catch (RepositoryException e) {
- throw toPersistException(e);
- }
- mSequences.put(name, producer);
- }
- return producer;
- } finally {
- unlockoutShutdown();
- }
- }
- return producer;
- }
-
- Log getLog() {
- return mLog;
- }
-
StorableCodecFactory getStorableCodecFactory() {
return mStorableCodecFactory;
}
@@ -464,7 +377,7 @@ abstract class BDBRepository<Txn> LobEngine getLobEngine() throws RepositoryException {
if (mLobEngine == null) {
- mLobEngine = new LobEngine(getRootRepository());
+ mLobEngine = new LobEngine(this, getRootRepository());
}
return mLobEngine;
}
@@ -555,7 +468,7 @@ abstract class BDBRepository<Txn> */
abstract void env_close() throws Exception;
- abstract <S extends Storable> BDBStorage<Txn, S> createStorage(Class<S> type)
+ abstract <S extends Storable> BDBStorage<Txn, S> createBDBStorage(Class<S> type)
throws Exception;
FetchException toFetchException(Throwable e) {
@@ -571,70 +484,11 @@ abstract class BDBRepository<Txn> }
/**
- * Returns the thread-local BDBTransactionManager instance, creating it if
- * needed.
- */
- BDBTransactionManager<Txn> openTransactionManager() {
- BDBTransactionManager<Txn> txnMgr = mCurrentTxnMgr.get();
- if (txnMgr == null) {
- lockoutShutdown();
- try {
- txnMgr = new BDBTransactionManager<Txn>(mExTransformer, this);
- mCurrentTxnMgr.set(txnMgr);
- mAllTxnMgrs.put(txnMgr, null);
- } finally {
- unlockoutShutdown();
- }
- }
- return txnMgr;
- }
-
- /**
- * Call to prevent shutdown hook from running. Be sure to call
- * unlockoutShutdown afterwards.
- */
- private void lockoutShutdown() {
- mShutdownLock.lock();
- try {
- mShutdownBlockerCount++;
- } finally {
- mShutdownLock.unlock();
- }
- }
-
- /**
- * Only call this to release lockoutShutdown.
+ * Returns the thread-local BDBTransactionManager, creating it if needed.
*/
- private void unlockoutShutdown() {
- mShutdownLock.lock();
- try {
- if (--mShutdownBlockerCount == 0) {
- mShutdownCondition.signalAll();
- }
- } finally {
- mShutdownLock.unlock();
- }
- }
-
- /**
- * Only to be called by shutdown hook itself.
- */
- void lockForShutdown() {
- mShutdownLock.lock();
- while (mShutdownBlockerCount > 0) {
- try {
- mShutdownCondition.await();
- } catch (InterruptedException e) {
- mLog.warn("Ignoring interruption for shutdown");
- }
- }
- }
-
- /**
- * Only to be called by shutdown hook itself.
- */
- void unlockForShutdown() {
- mShutdownLock.unlock();
+ // Provides access to transaction manager from other classes.
+ TransactionManager<Txn> localTxnManager() {
+ return localTransactionManager();
}
/**
@@ -667,7 +521,8 @@ abstract class BDBRepository<Txn> * since last checkpoint
*/
Checkpointer(BDBRepository repository, long sleepInterval, int kBytes, int minutes) {
- super("BDBRepository checkpointer (" + repository.getName() + ')');
+ super(repository.getClass().getSimpleName() + " checkpointer (" +
+ repository.getName() + ')');
setDaemon(true);
mRepository = new WeakReference<BDBRepository>(repository);
mSleepInterval = sleepInterval;
@@ -800,7 +655,8 @@ abstract class BDBRepository<Txn> * @param sleepInterval milliseconds to sleep before running deadlock detection
*/
DeadlockDetector(BDBRepository repository, long sleepInterval) {
- super("BDBRepository deadlock detector (" + repository.getName() + ')');
+ super(repository.getClass().getSimpleName() + " deadlock detector (" +
+ repository.getName() + ')');
setDaemon(true);
mRepository = new WeakReference<BDBRepository>(repository);
mSleepInterval = sleepInterval;
@@ -831,111 +687,4 @@ abstract class BDBRepository<Txn> }
}
}
-
- private static class ShutdownHook extends Thread {
- private final WeakReference<BDBRepository<?>> mRepository;
-
- ShutdownHook(BDBRepository repository) {
- super("BDBRepository shutdown (" + repository.getName() + ')');
- mRepository = new WeakReference<BDBRepository<?>>(repository);
- }
-
- public void run() {
- run(true);
- }
-
- public void run(boolean suspendThreads) {
- BDBRepository<?> repository = mRepository.get();
- if (repository == null) {
- return;
- }
-
- repository.getLog().info("Closing repository \"" + repository.getName() + '"');
-
- try {
- doShutdown(repository, suspendThreads);
- } finally {
- repository.mHasShutdown = true;
- mRepository.clear();
- repository.getLog().info
- ("Finished closing repository \"" + repository.getName() + '"');
- }
- }
-
- private void doShutdown(BDBRepository<?> repository, boolean suspendThreads) {
- repository.lockForShutdown();
- try {
- // Return unused sequence values.
- for (SequenceValueGenerator generator : repository.mSequences.values()) {
- try {
- generator.returnReservedValues();
- } catch (RepositoryException e) {
- repository.getLog().warn(null, e);
- }
- }
-
- // Close transactions and cursors.
- for (BDBTransactionManager<?> txnMgr : repository.mAllTxnMgrs.keySet()) {
- if (suspendThreads) {
- // Lock transaction manager but don't release it. This
- // prevents other threads from beginning work during
- // shutdown, which will likely fail along the way.
- txnMgr.getLock().lock();
- }
- try {
- txnMgr.close();
- } catch (Throwable e) {
- repository.getLog().error(null, e);
- }
- }
-
- // Run any external shutdown logic that needs to
- // happen before the databases and the environment are
- // actually closed
- if (repository.mPreShutdownHook != null) {
- repository.mPreShutdownHook.run();
- }
-
- // Close database handles.
- for (Storage storage : repository.mStorages.allStorage()) {
- try {
- ((BDBStorage) storage).close();
- } catch (Throwable e) {
- repository.getLog().error(null, e);
- }
- }
-
- // Wait for checkpointer to finish.
- if (repository.mCheckpointer != null) {
- repository.mCheckpointer.interrupt();
- try {
- repository.mCheckpointer.join();
- } catch (InterruptedException e) {
- }
- }
-
- // Wait for deadlock detector to finish.
- if (repository.mDeadlockDetector != null) {
- repository.mDeadlockDetector.interrupt();
- try {
- repository.mDeadlockDetector.join();
- } catch (InterruptedException e) {
- }
- }
-
- // Close environment.
- try {
- repository.env_close();
- } catch (Throwable e) {
- repository.getLog().error(null, e);
- }
-
- if (repository.mPostShutdownHook != null) {
- repository.mPostShutdownHook.run();
- }
- } finally {
- repository.unlockForShutdown();
- }
- }
- }
}
diff --git a/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBRepositoryBuilder.java b/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBRepositoryBuilder.java index 1f60bfd..fa24624 100644 --- a/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBRepositoryBuilder.java +++ b/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBRepositoryBuilder.java @@ -64,6 +64,7 @@ import com.amazon.carbonado.ConfigurationException; * <li>{@link com.amazon.carbonado.capability.StorableInfoCapability StorableInfoCapability}
* <li>{@link com.amazon.carbonado.capability.ShutdownCapability ShutdownCapability}
* <li>{@link com.amazon.carbonado.layout.LayoutCapability LayoutCapability}
+ * <li>{@link com.amazon.carbonado.sequence.SequenceCapability SequenceCapability}
* <li>{@link CheckpointCapability CheckpointCapability}
* <li>{@link EnvironmentCapability EnvironmentCapability}
* </ul>
@@ -82,7 +83,6 @@ public class BDBRepositoryBuilder extends AbstractRepositoryBuilder { private boolean mIsMaster = true;
private BDBProduct mProduct = DEFAULT_PRODUCT;
private File mEnvHome;
- private String mMergeSortTempDir;
private File mDataHome;
private String mSingleFileName;
private boolean mIndexSupport = true;
@@ -283,29 +283,6 @@ public class BDBRepositoryBuilder extends AbstractRepositoryBuilder { }
/**
- * Sets the directory to use for creating temporary files needed for merge
- * sorting. If null or not specified, the default temporary file directory is used.
- *
- * @param tempDir directory to store temp files for merge sorting, or null
- * for default
- */
- /* FIXME: use common config somehow, since indexed repo needs this too
- public void setMergeSortTempDirectory(String tempDir) {
- mMergeSortTempDir = tempDir;
- }
- */
-
- /**
- * Returns the directory to use for creating temporary files needed for
- * merge sorting. If null, the default temporary file directory is used.
- */
- /* FIXME: use common config somehow, since indexed repo needs this too
- public String getMergeSortTempDirectory() {
- return mMergeSortTempDir;
- }
- */
-
- /**
* Specify that all BDB databases should reside in one file, except for log
* files and caches. The filename is relative to the environment home,
* unless data directories have been specified. For BDBRepositories that
diff --git a/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBStorage.java b/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBStorage.java index d8a3066..82ba789 100644 --- a/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBStorage.java +++ b/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBStorage.java @@ -71,10 +71,12 @@ import com.amazon.carbonado.raw.StorableCodecFactory; import com.amazon.carbonado.raw.RawSupport;
import com.amazon.carbonado.raw.RawUtil;
+import com.amazon.carbonado.sequence.SequenceValueProducer;
+
import com.amazon.carbonado.spi.IndexInfoImpl;
import com.amazon.carbonado.spi.LobEngine;
-import com.amazon.carbonado.spi.SequenceValueProducer;
import com.amazon.carbonado.spi.StorableIndexSet;
+import com.amazon.carbonado.spi.TransactionManager;
import com.amazon.carbonado.spi.TriggerManager;
/**
@@ -154,7 +156,7 @@ abstract class BDBStorage<Txn, S extends Storable> implements Storage<S>, Storag }
public S prepare() {
- return mStorableCodec.instantiate(mRawSupport);
+ return mStorableCodec.instantiate();
}
public Query<S> query() throws FetchException {
@@ -169,6 +171,48 @@ abstract class BDBStorage<Txn, S extends Storable> implements Storage<S>, Storag return mQueryEngine.query(filter);
}
+ public void truncate() throws PersistException {
+ if (mTriggerManager.getDeleteTrigger() != null || mRepository.mSingleFileName != null) {
+ final int batchSize = 100;
+
+ while (true) {
+ Transaction txn = mRepository.enterTransaction(IsolationLevel.READ_COMMITTED);
+ txn.setForUpdate(true);
+ try {
+ Cursor<S> cursor = query().fetch();
+ if (!cursor.hasNext()) {
+ break;
+ }
+ int count = 0;
+ do {
+ cursor.next().tryDelete();
+ } while (count++ < batchSize && cursor.hasNext());
+ txn.commit();
+ } catch (FetchException e) {
+ throw e.toPersistException();
+ } finally {
+ txn.exit();
+ }
+
+ return;
+ }
+ }
+
+ TransactionManager<Txn> txnMgr = localTxnManager();
+
+ // Lock out shutdown task.
+ txnMgr.getLock().lock();
+ try {
+ try {
+ db_truncate(txnMgr.getTxn());
+ } catch (Exception e) {
+ throw toPersistException(e);
+ }
+ } finally {
+ txnMgr.getLock().unlock();
+ }
+ }
+
public boolean addTrigger(Trigger<? super S> trigger) {
return mTriggerManager.addTrigger(trigger);
}
@@ -222,9 +266,6 @@ abstract class BDBStorage<Txn, S extends Storable> implements Storage<S>, Storag }
}
- // FIXME: sort buffer should be on repository access. Also, create abstract
- // repository access that creates the correct merge sort buffer. And more:
- // create capability for managing merge sort buffers.
return new MergeSortBuffer<S>(mRootStorage);
}
@@ -262,7 +303,7 @@ abstract class BDBStorage<Txn, S extends Storable> implements Storage<S>, Storag boolean reverseOrder)
throws FetchException
{
- BDBTransactionManager<Txn> txnMgr = openTransactionManager();
+ TransactionManager<Txn> txnMgr = localTxnManager();
if (reverseRange) {
{
@@ -367,6 +408,12 @@ abstract class BDBStorage<Txn, S extends Storable> implements Storage<S>, Storag * differences between the current index set and the desired index set.
*/
protected void open(boolean readOnly) throws RepositoryException {
+ open(readOnly, null, true);
+ }
+
+ protected void open(boolean readOnly, Txn openTxn, boolean installTriggers)
+ throws RepositoryException
+ {
final Layout layout = getLayout();
StorableInfo<S> info = StorableIntrospector.examine(getStorableType());
@@ -386,11 +433,11 @@ abstract class BDBStorage<Txn, S extends Storable> implements Storage<S>, Storag boolean isPrimaryEmpty;
try {
- BDBTransactionManager<Txn> txnMgr = mRepository.openTransactionManager();
+ TransactionManager<Txn> txnMgr = mRepository.localTxnManager();
// Lock out shutdown task.
txnMgr.getLock().lock();
try {
- primaryDatabase = env_openPrimaryDatabase(null, databaseName);
+ primaryDatabase = env_openPrimaryDatabase(openTxn, databaseName);
primaryInfo = registerPrimaryDatabase(readOnly, layout);
isPrimaryEmpty = db_isEmpty(null, primaryDatabase, txnMgr.isForUpdate());
} finally {
@@ -446,7 +493,8 @@ abstract class BDBStorage<Txn, S extends Storable> implements Storage<S>, Storag try {
mStorableCodec = codecFactory
- .createCodec(getStorableType(), pkIndex, mRepository.isMaster(), layout);
+ .createCodec(getStorableType(), pkIndex, mRepository.isMaster(), layout,
+ mRawSupport);
} catch (SupportException e) {
// We've opened the database prematurely, since type isn't
// supported by encoding strategy. Close it down and unregister.
@@ -468,12 +516,14 @@ abstract class BDBStorage<Txn, S extends Storable> implements Storage<S>, Storag mQueryEngine = new QueryEngine<S>(getStorableType(), mRepository);
- // Don't install automatic triggers until we're completely ready.
- mTriggerManager.addTriggers(getStorableType(), mRepository.mTriggerFactories);
+ if (installTriggers) {
+ // Don't install automatic triggers until we're completely ready.
+ mTriggerManager.addTriggers(getStorableType(), mRepository.mTriggerFactories);
+ }
}
protected S instantiate(byte[] key, byte[] value) throws FetchException {
- return mStorableCodec.instantiate(mRawSupport, key, value);
+ return mStorableCodec.instantiate(key, value);
}
protected CompactionCapability.Result<S> compact() throws RepositoryException {
@@ -493,7 +543,7 @@ abstract class BDBStorage<Txn, S extends Storable> implements Storage<S>, Storag }
try {
- Txn txn = mRepository.openTransactionManager().getTxn();
+ Txn txn = mRepository.localTxnManager().getTxn();
return db_compact(txn, mPrimaryDatabase, start, end);
} catch (Exception e) {
throw mRepository.toRepositoryException(e);
@@ -568,7 +618,7 @@ abstract class BDBStorage<Txn, S extends Storable> implements Storage<S>, Storag * @param database database to use
*/
protected abstract BDBCursor<Txn, S> openCursor
- (BDBTransactionManager<Txn> txnMgr,
+ (TransactionManager<Txn> txnMgr,
byte[] startBound, boolean inclusiveStart,
byte[] endBound, boolean inclusiveEnd,
int maxPrefix,
@@ -588,8 +638,8 @@ abstract class BDBStorage<Txn, S extends Storable> implements Storage<S>, Storag return mRepository.toRepositoryException(e);
}
- BDBTransactionManager<Txn> openTransactionManager() {
- return mRepository.openTransactionManager();
+ TransactionManager<Txn> localTxnManager() {
+ return mRepository.localTxnManager();
}
/**
@@ -647,7 +697,7 @@ abstract class BDBStorage<Txn, S extends Storable> implements Storage<S>, Storag * prevent threads from starting work that will likely fail along the way.
*/
void checkClosed() throws FetchException {
- BDBTransactionManager<Txn> txnMgr = openTransactionManager();
+ TransactionManager<Txn> txnMgr = localTxnManager();
// Lock out shutdown task.
txnMgr.getLock().lock();
@@ -668,7 +718,7 @@ abstract class BDBStorage<Txn, S extends Storable> implements Storage<S>, Storag }
void close() throws Exception {
- BDBTransactionManager<Txn> txnMgr = mRepository.openTransactionManager();
+ TransactionManager<Txn> txnMgr = mRepository.localTxnManager();
txnMgr.getLock().lock();
try {
if (mPrimaryDatabase != null) {
@@ -811,7 +861,7 @@ abstract class BDBStorage<Txn, S extends Storable> implements Storage<S>, Storag }
public byte[] tryLoad(byte[] key) throws FetchException {
- BDBTransactionManager<Txn> txnMgr = mStorage.openTransactionManager();
+ TransactionManager<Txn> txnMgr = mStorage.localTxnManager();
byte[] result;
// Lock out shutdown task.
txnMgr.getLock().lock();
@@ -834,7 +884,7 @@ abstract class BDBStorage<Txn, S extends Storable> implements Storage<S>, Storag }
public boolean tryInsert(S storable, byte[] key, byte[] value) throws PersistException {
- BDBTransactionManager<Txn> txnMgr = mStorage.openTransactionManager();
+ TransactionManager<Txn> txnMgr = mStorage.localTxnManager();
Object result;
// Lock out shutdown task.
txnMgr.getLock().lock();
@@ -857,7 +907,7 @@ abstract class BDBStorage<Txn, S extends Storable> implements Storage<S>, Storag }
public void store(S storable, byte[] key, byte[] value) throws PersistException {
- BDBTransactionManager<Txn> txnMgr = mStorage.openTransactionManager();
+ TransactionManager<Txn> txnMgr = mStorage.localTxnManager();
// Lock out shutdown task.
txnMgr.getLock().lock();
try {
@@ -874,7 +924,7 @@ abstract class BDBStorage<Txn, S extends Storable> implements Storage<S>, Storag }
public boolean tryDelete(byte[] key) throws PersistException {
- BDBTransactionManager<Txn> txnMgr = mStorage.openTransactionManager();
+ TransactionManager<Txn> txnMgr = mStorage.localTxnManager();
// Lock out shutdown task.
txnMgr.getLock().lock();
try {
@@ -907,7 +957,11 @@ abstract class BDBStorage<Txn, S extends Storable> implements Storage<S>, Storag public SequenceValueProducer getSequenceValueProducer(String name)
throws PersistException
{
- return mStorage.mRepository.getSequenceValueProducer(name);
+ try {
+ return mStorage.mRepository.getSequenceValueProducer(name);
+ } catch (RepositoryException e) {
+ throw e.toPersistException();
+ }
}
public Trigger<? super S> getInsertTrigger() {
|