From f32365f070c20c48fc369edce0e356274aa1b6d3 Mon Sep 17 00:00:00 2001 From: "Brian S. O'Neill" Date: Sun, 8 Oct 2006 15:25:04 +0000 Subject: Folded in projects that have no special dependencies. --- .../amazon/carbonado/repo/sleepycat/BDBCursor.java | 300 +++++++ .../carbonado/repo/sleepycat/BDBProduct.java | 52 ++ .../carbonado/repo/sleepycat/BDBRepository.java | 922 +++++++++++++++++++++ .../repo/sleepycat/BDBRepositoryBuilder.java | 717 ++++++++++++++++ .../carbonado/repo/sleepycat/BDBStorage.java | 889 ++++++++++++++++++++ .../repo/sleepycat/BDBTransactionManager.java | 82 ++ .../repo/sleepycat/CheckpointCapability.java | 59 ++ .../repo/sleepycat/CompactionCapability.java | 52 ++ .../repo/sleepycat/EnvironmentCapability.java | 34 + .../repo/sleepycat/StoredDatabaseInfo.java | 102 +++ .../carbonado/repo/sleepycat/package-info.java | 28 + 11 files changed, 3237 insertions(+) create mode 100644 src/main/java/com/amazon/carbonado/repo/sleepycat/BDBCursor.java create mode 100644 src/main/java/com/amazon/carbonado/repo/sleepycat/BDBProduct.java create mode 100644 src/main/java/com/amazon/carbonado/repo/sleepycat/BDBRepository.java create mode 100644 src/main/java/com/amazon/carbonado/repo/sleepycat/BDBRepositoryBuilder.java create mode 100644 src/main/java/com/amazon/carbonado/repo/sleepycat/BDBStorage.java create mode 100644 src/main/java/com/amazon/carbonado/repo/sleepycat/BDBTransactionManager.java create mode 100644 src/main/java/com/amazon/carbonado/repo/sleepycat/CheckpointCapability.java create mode 100644 src/main/java/com/amazon/carbonado/repo/sleepycat/CompactionCapability.java create mode 100644 src/main/java/com/amazon/carbonado/repo/sleepycat/EnvironmentCapability.java create mode 100644 src/main/java/com/amazon/carbonado/repo/sleepycat/StoredDatabaseInfo.java create mode 100644 src/main/java/com/amazon/carbonado/repo/sleepycat/package-info.java (limited to 'src/main/java/com/amazon/carbonado/repo/sleepycat') diff --git a/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBCursor.java b/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBCursor.java new file mode 100644 index 0000000..81a517b --- /dev/null +++ b/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBCursor.java @@ -0,0 +1,300 @@ +/* + * Copyright 2006 Amazon Technologies, Inc. or its affiliates. + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks + * of Amazon Technologies, Inc. or its affiliates. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.amazon.carbonado.repo.sleepycat; + +import com.amazon.carbonado.FetchException; +import com.amazon.carbonado.IsolationLevel; +import com.amazon.carbonado.PersistException; +import com.amazon.carbonado.Storable; + +import com.amazon.carbonado.raw.RawCursor; +import com.amazon.carbonado.raw.RawUtil; + +/** + * + * + * @author Brian S O'Neill + */ +abstract class BDBCursor extends RawCursor { + private static final byte[] NO_DATA = new byte[0]; + + private final BDBTransactionManager mTxnMgr; + private final BDBStorage mStorage; + /** + * @param txnMgr + * @param startBound specify the starting key for the cursor, or null if first + * @param inclusiveStart true if start bound is inclusive + * @param endBound specify the ending key for the cursor, or null if last + * @param inclusiveEnd true if end bound is inclusive + * @param maxPrefix maximum expected common initial bytes in start and end bound + * @param reverse when true, iteration is reversed + * @param storage + * @throws IllegalArgumentException if any bound is null but is not inclusive + * @throws ClassCastException if lock is not an object passed by + * {@link BDBStorage#openCursor BDBStorage.openCursor} + */ + protected BDBCursor(BDBTransactionManager txnMgr, + byte[] startBound, boolean inclusiveStart, + byte[] endBound, boolean inclusiveEnd, + int maxPrefix, + boolean reverse, + BDBStorage storage) + throws FetchException + { + super(txnMgr.getLock(), + startBound, inclusiveStart, + endBound, inclusiveEnd, + maxPrefix, reverse); + + mTxnMgr = txnMgr; + mStorage = storage; + txnMgr.register(storage.getStorableType(), this); + } + + void open() throws FetchException { + try { + cursor_open(mTxnMgr.getTxn(), mTxnMgr.getIsolationLevel()); + } catch (Exception e) { + throw mStorage.toFetchException(e); + } + } + + public void close() throws FetchException { + try { + super.close(); + } finally { + mTxnMgr.unregister(mStorage.getStorableType(), this); + } + } + + protected void release() throws FetchException { + try { + cursor_close(); + } catch (Exception e) { + throw mStorage.toFetchException(e); + } + } + + protected byte[] getCurrentKey() throws FetchException { + if (searchKey_getPartial()) { + throw new IllegalStateException(); + } + return searchKey_getDataCopy(); + } + + protected byte[] getCurrentValue() throws FetchException { + if (data_getPartial()) { + throw new IllegalStateException(); + } + return data_getDataCopy(); + } + + protected void disableKeyAndValue() { + searchKey_setPartial(true); + data_setPartial(true); + } + + protected void disableValue() { + data_setPartial(true); + } + + protected void enableKeyAndValue() throws FetchException { + searchKey_setPartial(false); + data_setPartial(false); + if (!hasCurrent()) { + throw new FetchException("Current key and value missing"); + } + } + + protected boolean hasCurrent() throws FetchException { + try { + return cursor_getCurrent(); + } catch (Exception e) { + throw mStorage.toFetchException(e); + } + } + + protected S instantiateCurrent() throws FetchException { + return mStorage.instantiate(primaryKey_getData(), data_getData()); + } + + protected boolean toFirst() throws FetchException { + try { + return cursor_getFirst(); + } catch (Exception e) { + throw mStorage.toFetchException(e); + } + } + + protected boolean toFirst(byte[] key) throws FetchException { + try { + searchKey_setData(key); + return cursor_getSearchKeyRange(); + } catch (Exception e) { + throw mStorage.toFetchException(e); + } + } + + protected boolean toLast() throws FetchException { + try { + return cursor_getLast(); + } catch (Exception e) { + throw mStorage.toFetchException(e); + } + } + + protected boolean toLast(byte[] key) throws FetchException { + try { + // BDB cursor doesn't support "search for exact or less than", so + // emulate it here. Add one to the key value, search, and then + // back up. + + // This destroys the caller's key value. This method's + // documentation indicates that the byte array may be altered. + if (!RawUtil.increment(key)) { + // This point is reached upon overflow, because key looked like: + // 0xff, 0xff, 0xff, 0xff... + // So moving to the absolute last is just fine. + return cursor_getLast(); + } + + // Search for next record... + searchKey_setData(key); + if (cursor_getSearchKeyRange()) { + // ...and back up. + if (!cursor_getPrev()) { + return false; + } + } else { + // Search found nothing, so go to the end. + if (!cursor_getLast()) { + return false; + } + } + + // No guarantee that the currently matched key is correct, since + // additional records may have been inserted after the search + // operation finished. + + key = searchKey_getData(); + + do { + if (compareKeysPartially(searchKey_getData(), key) <= 0) { + return true; + } + // Keep backing up until the found key is equal or smaller than + // what was requested. + } while (cursor_getPrevNoDup()); + + return false; + } catch (Exception e) { + throw mStorage.toFetchException(e); + } + } + + protected boolean toNext() throws FetchException { + try { + return cursor_getNext(); + } catch (Exception e) { + throw mStorage.toFetchException(e); + } + } + + protected boolean toPrevious() throws FetchException { + try { + return cursor_getPrev(); + } catch (Exception e) { + throw mStorage.toFetchException(e); + } + } + + /** + * If the given byte array is less than or equal to given size, it is + * simply returned. Otherwise, a new array is allocated and the data is + * copied. + */ + protected static byte[] getData(byte[] data, int size) { + if (data == null) { + return NO_DATA; + } + if (data.length <= size) { + return data; + } + byte[] newData = new byte[size]; + System.arraycopy(data, 0, newData, 0, size); + return newData; + } + + /** + * Returns a copy of the data array. + */ + protected static byte[] getDataCopy(byte[] data, int size) { + if (data == null) { + return NO_DATA; + } + byte[] newData = new byte[size]; + System.arraycopy(data, 0, newData, 0, size); + return newData; + } + + protected void handleNoSuchElement() throws FetchException { + // Might not be any more elements because storage is closed. + mStorage.checkClosed(); + } + + protected abstract byte[] searchKey_getData(); + + protected abstract byte[] searchKey_getDataCopy(); + + protected abstract void searchKey_setData(byte[] data); + + protected abstract void searchKey_setPartial(boolean partial); + + protected abstract boolean searchKey_getPartial(); + + protected abstract byte[] data_getData(); + + protected abstract byte[] data_getDataCopy(); + + protected abstract void data_setPartial(boolean partial); + + protected abstract boolean data_getPartial(); + + protected abstract byte[] primaryKey_getData(); + + protected abstract void cursor_open(Txn txn, IsolationLevel level) throws Exception; + + protected abstract void cursor_close() throws Exception; + + protected abstract boolean cursor_getCurrent() throws Exception; + + protected abstract boolean cursor_getFirst() throws Exception; + + protected abstract boolean cursor_getLast() throws Exception; + + protected abstract boolean cursor_getSearchKeyRange() throws Exception; + + protected abstract boolean cursor_getNext() throws Exception; + + protected abstract boolean cursor_getNextDup() throws Exception; + + protected abstract boolean cursor_getPrev() throws Exception; + + protected abstract boolean cursor_getPrevNoDup() throws Exception; +} diff --git a/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBProduct.java b/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBProduct.java new file mode 100644 index 0000000..1b46495 --- /dev/null +++ b/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBProduct.java @@ -0,0 +1,52 @@ +/* + * Copyright 2006 Amazon Technologies, Inc. or its affiliates. + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks + * of Amazon Technologies, Inc. or its affiliates. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.amazon.carbonado.repo.sleepycat; + +/** + * Set of supported BDB products. + * + * @author Brian S O'Neill + */ +public enum BDBProduct { + /** BDB Native, legacy API */ + DB_Legacy, + + /** BDB Native */ + DB, + + /** BDB Native, High Availability */ + DB_HA, + + /** BDB Java Edition */ + JE; + + public static BDBProduct forString(String name) { + name = name.toLowerCase(); + if (name.equals("db_legacy")) { + return DB_Legacy; + } else if (name.equals("db")) { + return DB; + } else if (name.equals("db_ha")) { + return DB_HA; + } else if (name.equals("je")) { + return JE; + } + throw new IllegalArgumentException("Unsupported product: " + name); + } +} diff --git a/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBRepository.java b/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBRepository.java new file mode 100644 index 0000000..4618c7d --- /dev/null +++ b/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBRepository.java @@ -0,0 +1,922 @@ +/* + * Copyright 2006 Amazon Technologies, Inc. or its affiliates. + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks + * of Amazon Technologies, Inc. or its affiliates. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.amazon.carbonado.repo.sleepycat; + +import java.io.File; +import java.lang.ref.WeakReference; +import java.util.ArrayList; +import java.util.Map; +import java.util.IdentityHashMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.cojen.util.WeakIdentityMap; + +import com.amazon.carbonado.ConfigurationException; +import com.amazon.carbonado.Cursor; +import com.amazon.carbonado.FetchException; +import com.amazon.carbonado.IsolationLevel; +import com.amazon.carbonado.MalformedTypeException; +import com.amazon.carbonado.PersistException; +import com.amazon.carbonado.Repository; +import com.amazon.carbonado.RepositoryException; +import com.amazon.carbonado.Storable; +import com.amazon.carbonado.Storage; +import com.amazon.carbonado.Transaction; +import com.amazon.carbonado.TriggerFactory; + +import com.amazon.carbonado.capability.Capability; +import com.amazon.carbonado.capability.IndexInfo; +import com.amazon.carbonado.capability.IndexInfoCapability; +import com.amazon.carbonado.capability.ShutdownCapability; +import com.amazon.carbonado.capability.StorableInfoCapability; + +import com.amazon.carbonado.info.StorableIntrospector; + +import com.amazon.carbonado.layout.LayoutCapability; +import com.amazon.carbonado.layout.LayoutFactory; + +import com.amazon.carbonado.qe.RepositoryAccess; +import com.amazon.carbonado.qe.StorageAccess; + +import com.amazon.carbonado.raw.StorableCodecFactory; + +import com.amazon.carbonado.spi.ExceptionTransformer; +import com.amazon.carbonado.spi.LobEngine; +import com.amazon.carbonado.spi.SequenceValueGenerator; +import com.amazon.carbonado.spi.SequenceValueProducer; + +/** + * Repository implementation backed by a Berkeley DB. Data is encoded in the + * BDB in a specialized format, and so this repository should not be used to + * open arbitrary Berkeley databases. BDBRepository has total schema ownership, + * and so it updates type definitions in the storage layer automatically. + * + * @author Brian S O'Neill + * @author Vidya Iyer + * @author Nicole Deflaux + */ +abstract class BDBRepository + implements Repository, + RepositoryAccess, + IndexInfoCapability, + CheckpointCapability, + EnvironmentCapability, + ShutdownCapability, + StorableInfoCapability +{ + private final Log mLog = LogFactory.getLog(getClass()); + + private final String mName; + private final boolean mIsMaster; + final Iterable mTriggerFactories; + private final AtomicReference mRootRef; + private final StorableCodecFactory mStorableCodecFactory; + private final ExceptionTransformer mExTransformer; + private final Map, BDBStorage> mStorages; + private final Map mSequences; + private final ThreadLocal> mCurrentTxnMgr; + + private final Lock mShutdownLock; + // Lock with a timeout value to recover from deadlock condition. + private final int mLockTimeoutSeconds = 5; + + // Weakly tracks all BDBTransactionManager instances for shutdown hook. + private final Map, ?> mAllTxnMgrs; + + Checkpointer mCheckpointer; + DeadlockDetector mDeadlockDetector; + + private ShutdownHook mShutdownHook; + final Runnable mPreShutdownHook; + final Runnable mPostShutdownHook; + volatile boolean mHasShutdown; + + private final Object mInitialDBConfig; + private final BDBRepositoryBuilder.DatabaseHook mDatabaseHook; + private final Map, Integer> mDatabasePageSizes; + + final boolean mRunCheckpointer; + final boolean mRunDeadlockDetector; + + final File mDataHome; + final File mEnvHome; + final String mSingleFileName; + + private final String mMergeSortTempDir; + + private LayoutFactory mLayoutFactory; + + private LobEngine mLobEngine; + + /** + * Subclass must call protected start method to fully initialize + * BDBRepository. + * + * @param builder repository configuration + * @param exTransformer transformer for exceptions + * @throws IllegalArgumentException if name or environment home is null + */ + @SuppressWarnings("unchecked") + BDBRepository(AtomicReference rootRef, + BDBRepositoryBuilder builder, + ExceptionTransformer exTransformer) + throws ConfigurationException + { + builder.assertReady(); + + if (exTransformer == null) { + throw new IllegalArgumentException("Exception transformer must not be null"); + } + + mName = builder.getName(); + mIsMaster = builder.isMaster(); + mTriggerFactories = builder.getTriggerFactories(); + mRootRef = rootRef; + mExTransformer = exTransformer; + mStorages = new IdentityHashMap, BDBStorage>(); + mSequences = new ConcurrentHashMap(); + mCurrentTxnMgr = new ThreadLocal>(); + mShutdownLock = new ReentrantLock(); + mAllTxnMgrs = new WeakIdentityMap(); + mRunCheckpointer = !builder.getReadOnly() && builder.getRunCheckpointer(); + mRunDeadlockDetector = builder.getRunDeadlockDetector(); + mStorableCodecFactory = builder.getStorableCodecFactory(); + mPreShutdownHook = builder.getPreShutdownHook(); + mPostShutdownHook = builder.getShutdownHook(); + mInitialDBConfig = builder.getInitialDatabaseConfig(); + mDatabaseHook = builder.getDatabaseHook(); + mDatabasePageSizes = builder.getDatabasePagesMap(); + mDataHome = builder.getDataHomeFile(); + mEnvHome = builder.getEnvironmentHomeFile(); + mSingleFileName = builder.getSingleFileName(); + // FIXME: see comments in builder + mMergeSortTempDir = null; //builder.getMergeSortTempDirectory(); + } + + public String getName() { + return mName; + } + + @SuppressWarnings("unchecked") + public BDBStorage storageFor(Class type) + throws MalformedTypeException, RepositoryException + { + // Acquire lock to prevent databases from being opened during shutdown. + try { + if (mShutdownLock.tryLock(mLockTimeoutSeconds, TimeUnit.SECONDS)) { + try { + BDBStorage storage = mStorages.get(type); + if (storage == null) { + // Examine and throw exception early if there is a problem. + StorableIntrospector.examine(type); + + try { + storage = createStorage(type); + } catch (Exception e) { + throw toRepositoryException(e); + } + mStorages.put(type, storage); + } + return (BDBStorage) storage; + } finally { + mShutdownLock.unlock(); + } + } + } catch (InterruptedException e) { + } + throw new RepositoryException("Unable to acquire shutdown lock"); + } + + public Transaction enterTransaction() { + return openTransactionManager().enter(null); + } + + public Transaction enterTransaction(IsolationLevel level) { + return openTransactionManager().enter(level); + } + + public Transaction enterTopTransaction(IsolationLevel level) { + return openTransactionManager().enterTop(level); + } + + public IsolationLevel getTransactionIsolationLevel() { + return openTransactionManager().getIsolationLevel(); + } + + @SuppressWarnings("unchecked") + public C getCapability(Class capabilityType) { + if (capabilityType.isInstance(this)) { + return (C) this; + } + if (capabilityType == LayoutCapability.class) { + return (C) mLayoutFactory; + } + return null; + } + + public IndexInfo[] getIndexInfo(Class storableType) + throws RepositoryException + { + return ((BDBStorage) storageFor(storableType)).getIndexInfo(); + } + + public String[] getUserStorableTypeNames() throws RepositoryException { + Repository metaRepo = getRootRepository(); + + Cursor cursor = + metaRepo.storageFor(StoredDatabaseInfo.class) + .query().orderBy("databaseName").fetch(); + + ArrayList names = new ArrayList(); + while (cursor.hasNext()) { + StoredDatabaseInfo info = cursor.next(); + // Ordinary user types support evolution. + if (info.getEvolutionStrategy() != StoredDatabaseInfo.EVOLUTION_NONE) { + names.add(info.getDatabaseName()); + } + } + + return names.toArray(new String[names.size()]); + } + + public boolean isSupported(Class type) { + if (type == null) { + return false; + } + StorableIntrospector.examine(type); + return true; + } + + public boolean isPropertySupported(Class type, String name) { + if (type == null || name == null) { + return false; + } + return StorableIntrospector.examine(type).getAllProperties().get(name) != null; + } + + public void close() { + shutdown(false); + } + + public boolean isAutoShutdownEnabled() { + return mShutdownHook != null; + } + + public void setAutoShutdownEnabled(boolean enabled) { + if (mShutdownHook == null) { + if (enabled) { + mShutdownHook = new ShutdownHook(this); + try { + Runtime.getRuntime().addShutdownHook(mShutdownHook); + } catch (IllegalStateException e) { + // Shutdown in progress, so immediately run hook. + mShutdownHook.run(); + } + } + } else { + if (!enabled) { + try { + Runtime.getRuntime().removeShutdownHook(mShutdownHook); + } catch (IllegalStateException e) { + // Shutdown in progress, hook is running. + } + mShutdownHook = null; + } + } + } + + public void shutdown() { + shutdown(true); + } + + private void shutdown(boolean suspendThreads) { + if (!mHasShutdown) { + // Since this repository is being closed before system shutdown, + // remove shutdown hook and run it now. + ShutdownHook hook = mShutdownHook; + if (hook != null) { + try { + Runtime.getRuntime().removeShutdownHook(hook); + } catch (IllegalStateException e) { + // Shutdown in progress, hook is running. + hook = null; + } + } else { + // If hook is null, auto-shutdown was disabled. Make a new + // instance to use, but don't register it. + hook = new ShutdownHook(this); + } + if (hook != null) { + hook.run(suspendThreads); + } + mHasShutdown = true; + } + } + + /** + * Suspend the checkpointer until the suspension time has expired or until + * manually resumed. If a checkpoint is in progress, this method will block + * until it is finished. If checkpointing is disabled, calling this method + * has no effect. + * + *

Calling this method repeatedly resets the suspension time. This + * technique should be used by hot backup processes to ensure that its + * failure does not leave the checkpointer permanently suspended. Each + * invocation of suspendCheckpointer is like a lease renewal or heartbeat. + * + * @param suspensionTime minimum length of suspension, in milliseconds, + * unless checkpointer is manually resumed + */ + public void suspendCheckpointer(long suspensionTime) { + if (mCheckpointer != null) { + mCheckpointer.suspendCheckpointer(suspensionTime); + } + } + + /** + * Resumes the checkpointer if it was suspended. If checkpointing is + * disabled or if not suspended, calling this method has no effect. + */ + public void resumeCheckpointer() { + if (mCheckpointer != null) { + mCheckpointer.resumeCheckpointer(); + } + } + + /** + * Forces a checkpoint to run now, even if checkpointer is suspended or + * disabled. If a checkpoint is in progress, then this method will block + * until it is finished, and then run another checkpoint. This method does + * not return until the requested checkpoint has finished. + */ + public void forceCheckpoint() throws PersistException { + if (mCheckpointer != null) { + mCheckpointer.forceCheckpoint(); + } else { + try { + env_checkpoint(); + } catch (Exception e) { + throw toPersistException(e); + } + } + } + + public Repository getRootRepository() { + return mRootRef.get(); + } + + public StorageAccess storageAccessFor(Class type) + throws RepositoryException + { + return storageFor(type); + } + + @Override + protected void finalize() { + close(); + } + + /** + * @see com.amazon.carbonado.spi.RepositoryBuilder#isMaster + */ + boolean isMaster() { + return mIsMaster; + } + + String getDatabaseFileName(String dbName) { + if (mSingleFileName != null) { + dbName = mSingleFileName; + } + + if (mDataHome != null && !mDataHome.equals(mEnvHome)) { + dbName = new File(mDataHome, dbName).getPath(); + } + + return dbName; + } + + String getMergeSortTempDirectory() { + if (mMergeSortTempDir != null) { + new File(mMergeSortTempDir).mkdirs(); + } + return mMergeSortTempDir; + } + + SequenceValueProducer getSequenceValueProducer(String name) throws PersistException { + SequenceValueGenerator producer = mSequences.get(name); + if (producer == null) { + // Acquire lock to prevent sequences from being created during shutdown. + try { + if (mShutdownLock.tryLock(mLockTimeoutSeconds, TimeUnit.SECONDS)) { + try { + producer = mSequences.get(name); + if (producer == null) { + Repository metaRepo = getRootRepository(); + try { + producer = new SequenceValueGenerator(metaRepo, name); + } catch (RepositoryException e) { + throw toPersistException(e); + } + mSequences.put(name, producer); + } + return producer; + } finally { + mShutdownLock.unlock(); + } + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + throw new PersistException("Unable to acquire shutdown lock"); + } + return producer; + } + + Log getLog() { + return mLog; + } + + StorableCodecFactory getStorableCodecFactory() { + return mStorableCodecFactory; + } + + LayoutFactory getLayoutFactory() throws RepositoryException { + if (mLayoutFactory == null) { + mLayoutFactory = new LayoutFactory(getRootRepository()); + } + return mLayoutFactory; + } + + LobEngine getLobEngine() throws RepositoryException { + LobEngine engine = mLobEngine; + if (engine == null) { + // Acquire lock to prevent LobEngine from being created during shutdown. + try { + if (mShutdownLock.tryLock(mLockTimeoutSeconds, TimeUnit.SECONDS)) { + try { + if ((engine = mLobEngine) == null) { + mLobEngine = engine = new LobEngine(this); + } + return engine; + } finally { + mShutdownLock.unlock(); + } + } + } catch (InterruptedException e) { + } + throw new RepositoryException("Unable to acquire shutdown lock"); + } + return engine; + } + + /** + * Returns the optional BDB specific database configuration to use + * for all databases created. + */ + public Object getInitialDatabaseConfig() { + return mInitialDBConfig; + } + + /** + * Returns the desired page size for the given type, or null for default. + */ + Integer getDatabasePageSize(Class type) { + if (mDatabasePageSizes == null) { + return null; + } + Integer size = mDatabasePageSizes.get(type); + if (size == null && type != null) { + size = mDatabasePageSizes.get(null); + } + return size; + } + + void runDatabasePrepareForOpeningHook(Object database) throws RepositoryException { + if (mDatabaseHook != null) { + mDatabaseHook.prepareForOpening(database); + } + } + + /** + * @param checkpointInterval how often to run checkpoints, in milliseconds, + * or zero if never. Ignored if builder has checkpoints disabled. + * @param deadlockDetectorInterval how often to run deadlock detector, in + * milliseconds, or zero if never. + */ + void start(long checkpointInterval, long deadlockDetectorInterval) { + getLog().info("Opening repository \"" + getName() + '"'); + + if (mRunCheckpointer && checkpointInterval > 0) { + mCheckpointer = new Checkpointer(this, checkpointInterval); + mCheckpointer.start(); + } else { + mCheckpointer = null; + } + + if (mRunDeadlockDetector && deadlockDetectorInterval > 0) { + mDeadlockDetector = new DeadlockDetector(this, deadlockDetectorInterval); + mDeadlockDetector.start(); + } else { + mDeadlockDetector = null; + } + + setAutoShutdownEnabled(true); + } + + abstract String getVersionMajor(); + + abstract String getVersionMajorMinor(); + + abstract String getVersionMajorMinorPatch(); + + abstract IsolationLevel selectIsolationLevel(Transaction parent, IsolationLevel level); + + abstract Txn txn_begin(Txn parent, IsolationLevel level) throws Exception; + + abstract Txn txn_begin_nowait(Txn parent, IsolationLevel level) throws Exception; + + abstract void txn_commit(Txn txn) throws Exception; + + abstract void txn_abort(Txn txn) throws Exception; + + /** + * Force a checkpoint to run. + */ + abstract void env_checkpoint() throws Exception; + + /** + * @param kBytes run checkpoint if at least this many kilobytes in log + * @param minutes run checkpoint if at least this many minutes passed since + * last checkpoint + */ + abstract void env_checkpoint(int kBytes, int minutes) throws Exception; + + /** + * Run the deadlock detector. + */ + abstract void env_detectDeadlocks() throws Exception; + + /** + * Close the environment. + */ + abstract void env_close() throws Exception; + + abstract BDBStorage createStorage(Class type) + throws Exception; + + FetchException toFetchException(Throwable e) { + return mExTransformer.toFetchException(e); + } + + PersistException toPersistException(Throwable e) { + return mExTransformer.toPersistException(e); + } + + RepositoryException toRepositoryException(Throwable e) { + return mExTransformer.toRepositoryException(e); + } + + /** + * Returns the thread-local BDBTransactionManager instance, creating it if + * needed. + */ + BDBTransactionManager openTransactionManager() { + BDBTransactionManager txnMgr = mCurrentTxnMgr.get(); + if (txnMgr == null) { + mShutdownLock.lock(); + try { + txnMgr = new BDBTransactionManager(mExTransformer, this); + mCurrentTxnMgr.set(txnMgr); + mAllTxnMgrs.put(txnMgr, null); + } finally { + mShutdownLock.unlock(); + } + } + return txnMgr; + } + + /** + * Periodically runs checkpoints on the environment. + */ + private static class Checkpointer extends Thread { + private final WeakReference mRepository; + private final long mSleepInterval; + private final int mKBytes; + private final int mMinutes; + + private boolean mInProgress; + private long mSuspendUntil = Long.MIN_VALUE; + + /** + * + * @param repository outer class + * @param sleepInterval milliseconds to sleep before running checkpoint + */ + Checkpointer(BDBRepository repository, long sleepInterval) { + this(repository, sleepInterval, 1024, 5); + } + + /** + * + * @param repository outer class + * @param sleepInterval milliseconds to sleep before running checkpoint + * @param kBytes run checkpoint if at least this many kilobytes in log + * @param minutes run checkpoint if at least this many minutes passed + * since last checkpoint + */ + Checkpointer(BDBRepository repository, long sleepInterval, int kBytes, int minutes) { + super("BDBRepository checkpointer (" + repository.getName() + ')'); + setDaemon(true); + mRepository = new WeakReference(repository); + mSleepInterval = sleepInterval; + mKBytes = kBytes; + mMinutes = minutes; + } + + public void run() { + try { + while (true) { + synchronized (this) { + if (!mInProgress) { + try { + wait(mSleepInterval); + } catch (InterruptedException e) { + break; + } + } + } + + BDBRepository repository = mRepository.get(); + if (repository == null) { + break; + } + + if (mSuspendUntil != Long.MIN_VALUE) { + if (System.currentTimeMillis() < mSuspendUntil) { + continue; + } + } + + Log log = repository.getLog(); + + if (log.isDebugEnabled()) { + log.debug("Running checkpoint on repository \"" + + repository.getName() + '"'); + } + + try { + synchronized (this) { + mInProgress = true; + } + repository.env_checkpoint(mKBytes, mMinutes); + if (log.isDebugEnabled()) { + log.debug("Finished running checkpoint on repository \"" + + repository.getName() + '"'); + } + } catch (ThreadDeath e) { + break; + } catch (Throwable e) { + log.error("Checkpoint failed", e); + } finally { + synchronized (this) { + mInProgress = false; + notify(); + } + repository = null; + } + } + } finally { + synchronized (this) { + mInProgress = false; + notify(); + } + } + } + + /** + * Blocks until checkpoint has finished. + */ + synchronized void suspendCheckpointer(long suspensionTime) { + while (mInProgress) { + try { + wait(); + } catch (InterruptedException e) { + } + } + + if (suspensionTime <= 0) { + return; + } + + long now = System.currentTimeMillis(); + long suspendUntil = now + suspensionTime; + if (now >= 0 && suspendUntil < 0) { + // Overflow. + suspendUntil = Long.MAX_VALUE; + } + mSuspendUntil = suspendUntil; + } + + synchronized void resumeCheckpointer() { + mSuspendUntil = Long.MIN_VALUE; + } + + /** + * Blocks until checkpoint has finished. + */ + synchronized void forceCheckpoint() throws PersistException { + while (mInProgress) { + try { + wait(); + } catch (InterruptedException e) { + return; + } + } + + BDBRepository repository = mRepository.get(); + if (repository != null) { + try { + repository.env_checkpoint(); + } catch (Exception e) { + throw repository.toPersistException(e); + } + } + } + } + + /** + * Periodically runs deadlock detection on the environment. + */ + private static class DeadlockDetector extends Thread { + private final WeakReference mRepository; + private final long mSleepInterval; + + /** + * @param repository outer class + * @param sleepInterval milliseconds to sleep before running deadlock detection + */ + DeadlockDetector(BDBRepository repository, long sleepInterval) { + super("BDBRepository deadlock detector (" + repository.getName() + ')'); + setDaemon(true); + mRepository = new WeakReference(repository); + mSleepInterval = sleepInterval; + } + + public void run() { + while (true) { + synchronized (this) { + try { + wait(mSleepInterval); + } catch (InterruptedException e) { + break; + } + } + + BDBRepository repository = mRepository.get(); + if (repository == null) { + break; + } + + try { + repository.env_detectDeadlocks(); + } catch (ThreadDeath e) { + break; + } catch (Throwable e) { + repository.getLog().error("Deadlock detection failed", e); + } finally { + repository = null; + } + } + } + } + + private static class ShutdownHook extends Thread { + private final WeakReference> mRepository; + + ShutdownHook(BDBRepository repository) { + super("BDBRepository shutdown (" + repository.getName() + ')'); + mRepository = new WeakReference>(repository); + } + + public void run() { + run(true); + } + + public void run(boolean suspendThreads) { + BDBRepository repository = mRepository.get(); + if (repository == null) { + return; + } + + repository.getLog().info("Closing repository \"" + repository.getName() + '"'); + + try { + doShutdown(repository, suspendThreads); + } finally { + repository.mHasShutdown = true; + mRepository.clear(); + repository.getLog().info + ("Finished closing repository \"" + repository.getName() + '"'); + } + } + + private void doShutdown(BDBRepository repository, boolean suspendThreads) { + repository.mShutdownLock.lock(); + try { + // Return unused sequence values. + for (SequenceValueGenerator generator : repository.mSequences.values()) { + try { + generator.returnReservedValues(); + } catch (RepositoryException e) { + repository.getLog().warn(null, e); + } + } + + // Close transactions and cursors. + for (BDBTransactionManager txnMgr : repository.mAllTxnMgrs.keySet()) { + if (suspendThreads) { + // Lock transaction manager but don't release it. This + // prevents other threads from beginning work during + // shutdown, which will likely fail along the way. + txnMgr.getLock().lock(); + } + try { + txnMgr.close(); + } catch (Throwable e) { + repository.getLog().error(null, e); + } + } + + // Run any external shutdown logic that needs to + // happen before the databases and the environment are + // actually closed + if (repository.mPreShutdownHook != null) { + repository.mPreShutdownHook.run(); + } + + // Close database handles. + for (BDBStorage storage : repository.mStorages.values()) { + try { + storage.close(); + } catch (Throwable e) { + repository.getLog().error(null, e); + } + } + + // Wait for checkpointer to finish. + if (repository.mCheckpointer != null) { + repository.mCheckpointer.interrupt(); + try { + repository.mCheckpointer.join(); + } catch (InterruptedException e) { + } + } + + // Wait for deadlock detector to finish. + if (repository.mDeadlockDetector != null) { + repository.mDeadlockDetector.interrupt(); + try { + repository.mDeadlockDetector.join(); + } catch (InterruptedException e) { + } + } + + // Close environment. + try { + repository.env_close(); + } catch (Throwable e) { + repository.getLog().error(null, e); + } + + if (repository.mPostShutdownHook != null) { + repository.mPostShutdownHook.run(); + } + } finally { + repository.mShutdownLock.unlock(); + } + } + } +} diff --git a/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBRepositoryBuilder.java b/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBRepositoryBuilder.java new file mode 100644 index 0000000..c63445a --- /dev/null +++ b/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBRepositoryBuilder.java @@ -0,0 +1,717 @@ +/* + * Copyright 2006 Amazon Technologies, Inc. or its affiliates. + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks + * of Amazon Technologies, Inc. or its affiliates. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.amazon.carbonado.repo.sleepycat; + +import java.lang.reflect.Constructor; + +import java.io.File; +import java.io.IOException; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +import java.util.concurrent.atomic.AtomicReference; + +import com.amazon.carbonado.Repository; +import com.amazon.carbonado.RepositoryException; +import com.amazon.carbonado.Storable; +import com.amazon.carbonado.TriggerFactory; + +import com.amazon.carbonado.repo.indexed.IndexedRepositoryBuilder; + +import com.amazon.carbonado.raw.GenericStorableCodecFactory; +import com.amazon.carbonado.raw.StorableCodecFactory; + +import com.amazon.carbonado.spi.AbstractRepositoryBuilder; + +import com.amazon.carbonado.util.ThrowUnchecked; + +import com.amazon.carbonado.ConfigurationException; + +/** + * Builder and configuration options for BDBRepository. + * + *

+ * BDBRepositoryBuilder builder = new BDBRepositoryBuilder();
+ *
+ * builder.setProduct("JE");
+ * builder.setName("test");
+ * builder.setEnvironmentHome("/tmp/testRepo");
+ * builder.setTransactionNoSync(true);
+ *
+ * Repository repo = builder.build();
+ * 
+ * + *

+ * The following extra capabilities are supported: + *

    + *
  • {@link com.amazon.carbonado.capability.IndexInfoCapability IndexInfoCapability} + *
  • {@link com.amazon.carbonado.capability.StorableInfoCapability StorableInfoCapability} + *
  • {@link com.amazon.carbonado.capability.ShutdownCapability ShutdownCapability} + *
  • {@link com.amazon.carbonado.layout.LayoutCapability LayoutCapability} + *
  • {@link CheckpointCapability CheckpointCapability} + *
  • {@link EnvironmentCapability EnvironmentCapability} + *
+ * + * @author Brian S O'Neill + * @author Vidya Iyer + * @author Nicole Deflaux + */ +public class BDBRepositoryBuilder extends AbstractRepositoryBuilder { + + private static final BDBProduct DEFAULT_PRODUCT = BDBProduct.JE; + + private static final int DEFAULT_CHECKPOINT_INTERVAL = 10000; + + private String mName; + private boolean mIsMaster = true; + private BDBProduct mProduct = DEFAULT_PRODUCT; + private File mEnvHome; + private String mMergeSortTempDir; + private File mDataHome; + private String mSingleFileName; + private boolean mIndexSupport = true; + private boolean mReadOnly; + private Long mCacheSize; + private double mLockTimeout = 0.5; + private double mTxnTimeout = 300.0; + private boolean mTxnNoSync; + private Boolean mDatabasesTransactional = null; + private Map, Integer> mDatabasePageSizes; + private boolean mPrivate; + private boolean mMultiversion; + private boolean mRunCheckpointer = true; + private int mCheckpointInterval = DEFAULT_CHECKPOINT_INTERVAL; + private boolean mRunDeadlockDetector = true; + private Object mInitialEnvConfig = null; + private Object mInitialDBConfig = null; + private StorableCodecFactory mStorableCodecFactory = new GenericStorableCodecFactory(); + private Runnable mPreShutdownHook; + private Runnable mPostShutdownHook; + private DatabaseHook mDatabaseHook; + + public BDBRepositoryBuilder() { + } + + public Repository build(AtomicReference rootRef) throws RepositoryException { + if (mIndexSupport) { + // Wrap BDBRepository with IndexedRepository. + + // Temporarily set to false to avoid infinite recursion. + mIndexSupport = false; + try { + IndexedRepositoryBuilder ixBuilder = new IndexedRepositoryBuilder(); + ixBuilder.setWrappedRepository(this); + ixBuilder.setMaster(isMaster()); + return ixBuilder.build(rootRef); + } finally { + mIndexSupport = true; + } + } + + assertReady(); + + // Make environment directory if it doesn't exist. + File homeFile = getEnvironmentHomeFile(); + if (!homeFile.exists()) { + if (!homeFile.mkdirs()) { + throw new RepositoryException + ("Unable to make environment home directory: " + homeFile); + } + } + + BDBRepository repo; + + try { + repo = getRepositoryConstructor().newInstance(rootRef, this); + } catch (Exception e) { + ThrowUnchecked.fireFirstDeclaredCause(e, RepositoryException.class); + // Not reached. + return null; + } + + rootRef.set(repo); + return repo; + } + + public String getName() { + return mName; + } + + public void setName(String name) { + mName = name; + } + + public boolean isMaster() { + return mIsMaster; + } + + public void setMaster(boolean b) { + mIsMaster = b; + } + + /** + * Sets the BDB product to use, which defaults to JE. Also supported is DB + * and DB_HA. If not supported, an IllegalArgumentException is thrown. + */ + public void setProduct(String product) { + mProduct = product == null ? DEFAULT_PRODUCT : BDBProduct.forString(product); + } + + /** + * Returns the BDB product to use, which is JE by default. + */ + public String getProduct() { + return mProduct.toString(); + } + + /** + * Sets the BDB product to use, which defaults to JE. + */ + public void setBDBProduct(BDBProduct product) { + mProduct = product == null ? DEFAULT_PRODUCT : product; + } + + /** + * Returns the BDB product to use, which is JE by default. + */ + public BDBProduct getBDBProduct() { + return mProduct; + } + + /** + * Sets the repository environment home directory, which is required. + */ + public void setEnvironmentHomeFile(File envHome) { + try { + // Switch to canonical for more detailed error messages. + envHome = envHome.getCanonicalFile(); + } catch (IOException e) { + } + mEnvHome = envHome; + } + + /** + * Returns the repository environment home directory. + */ + public File getEnvironmentHomeFile() { + return mEnvHome; + } + + /** + * Sets the repository environment home directory, which is required. + * + * @throws RepositoryException if environment home is not valid + */ + public void setEnvironmentHome(String envHome) { + setEnvironmentHomeFile(new File(envHome)); + } + + /** + * Returns the repository environment home directory. + */ + public String getEnvironmentHome() { + return mEnvHome.getPath(); + } + + /** + * By default, data files are stored relative to the environment home. Call + * this method to override. For BDBRepositories that are log files only, + * this configuration is ignored. + */ + public void setDataHomeFile(File dir) { + if (dir != null) { + try { + // Switch to canonical for more detailed error messages. + dir = dir.getCanonicalFile(); + } catch (IOException e) { + } + } + mDataHome = dir; + } + + /** + * Returns the optional directory to store data files. Returns null if data + * files are expected to be relative to the environment home. + */ + public File getDataHomeFile() { + if (mDataHome == null) { + return getEnvironmentHomeFile(); + } + return mDataHome; + } + + /** + * By default, data files are stored relative to the environment home. Call + * this method to override. For BDBRepositories that are log files only, + * this configuration is ignored. + */ + public void setDataHome(String dir) { + if (dir == null) { + mDataHome = null; + } else { + setDataHomeFile(new File(dir)); + } + } + + /** + * Returns the directory to store data files. + */ + public String getDataHome() { + return getDataHomeFile().getPath(); + } + + /** + * Sets the directory to use for creating temporary files needed for merge + * sorting. If null or not specified, the default temporary file directory is used. + * + * @param tempDir directory to store temp files for merge sorting, or null + * for default + */ + /* FIXME: use common config somehow, since indexed repo needs this too + public void setMergeSortTempDirectory(String tempDir) { + mMergeSortTempDir = tempDir; + } + */ + + /** + * Returns the directory to use for creating temporary files needed for + * merge sorting. If null, the default temporary file directory is used. + */ + /* FIXME: use common config somehow, since indexed repo needs this too + public String getMergeSortTempDirectory() { + return mMergeSortTempDir; + } + */ + + /** + * Specify that all BDB databases should reside in one file, except for log + * files and caches. The filename is relative to the environment home, + * unless data directories have been specified. For BDBRepositories that + * are log files only, this configuration is ignored. + * + *

Note: When setting this option, the storable codec factory must also + * be changed, since the default storable codec factory is unable to + * distinguish storable types that reside in a single database file. + */ + public void setSingleFileName(String filename) { + mSingleFileName = filename; + } + + /** + * Returns the single file that all BDB databases should reside in. + */ + public String getSingleFileName() { + return mSingleFileName; + } + + /** + * By default, user specified indexes are supported. Pass false to disable + * this, and no indexes will be built. Another consequence of this option + * is that no unique constraint checks will be applied to alternate keys. + */ + public void setIndexSupport(boolean indexSupport) { + mIndexSupport = indexSupport; + } + + /** + * Returns true if indexes are supported, which is true by default. + */ + public boolean getIndexSupport() { + return mIndexSupport; + } + + /** + * Sets the repository to read-only mode. By default, repository is opened + * for reads and writes. + */ + public void setReadOnly(boolean readOnly) { + mReadOnly = readOnly; + } + + /** + * Returns true if repository should be opened read-only. + */ + public boolean getReadOnly() { + return mReadOnly; + } + + /** + * Set the repository cache size, in bytes. Actual BDB implementation will + * select a suitable default if this is not set. + */ + public void setCacheSize(long cacheSize) { + mCacheSize = cacheSize; + } + + /** + * Set the repository cache size, in bytes. Actual BDB implementation will + * select a suitable default if this is not set. + * + * @param cacheSize cache size to use, or null for default + */ + public void setCacheSize(Long cacheSize) { + mCacheSize = cacheSize; + } + + /** + * Returns the repository cache size, or null if default should be + * selected. + */ + public Long getCacheSize() { + return mCacheSize; + } + + /** + * Set the lock timeout, in seconds. Default value is 0.5 seconds. + */ + public void setLockTimeout(double lockTimeout) { + mLockTimeout = lockTimeout; + } + + /** + * Returns the lock timeout, in seconds. + */ + public double getLockTimeout() { + return mLockTimeout; + } + + /** + * Returns the lock timeout, in microseconds, limited to max long value. + */ + public long getLockTimeoutInMicroseconds() { + return inMicros(mLockTimeout); + } + + /** + * Set the transaction timeout, in seconds. Default value is 300 seconds. + */ + public void setTransactionTimeout(double txnTimeout) { + mTxnTimeout = txnTimeout; + } + + /** + * Returns the repository transaction timeout, in seconds. + */ + public double getTransactionTimeout() { + return mTxnTimeout; + } + + /** + * Returns the repository transaction timeout, in microseconds, limited to + * max long value. + */ + public long getTransactionTimeoutInMicroseconds() { + return inMicros(mTxnTimeout); + } + + /** + * When true, commits are not forcibly flushed to disk. This improves + * performance, but there is a chance of losing the most recent commits if + * the machine crashes. + */ + public void setTransactionNoSync(boolean noSync) { + mTxnNoSync = noSync; + } + + /** + * Returns true if transactions are forcibly flushed to disk. + */ + public boolean getTransactionNoSync() { + return mTxnNoSync; + } + + /** + * When true, allows databases to be transactional. This setting affects + * the databases, not the environment. If this is not explicitly set, the + * environment getTransactional is used. + */ + public void setDatabasesTransactional(Boolean transactional) { + mDatabasesTransactional = transactional; + } + + /** + * Returns true if the databases are configured to be transactional, + * false if configured to not be transactional, null if this override was never set + */ + public Boolean getDatabasesTransactional() { + return mDatabasesTransactional; + } + + /** + * Sets the desired page size for a given type. If not specified, the page + * size applies to all types. + */ + public void setDatabasePageSize(Integer bytes, Class type) { + if (mDatabasePageSizes == null) { + mDatabasePageSizes = new HashMap, Integer>(); + } + mDatabasePageSizes.put(type, bytes); + } + + Map, Integer> getDatabasePagesMap() { + if (mDatabasePageSizes == null) { + return null; + } + return new HashMap, Integer>(mDatabasePageSizes); + } + + /** + * When true, BDB environment cannot be shared by other processes, and + * region files are not created. By default, environment is shared, if + * supported. + */ + public void setPrivate(boolean b) { + mPrivate = b; + } + + /** + * Returns true if BDB environment is private. By default, environment is + * shared, if supported. + */ + public boolean isPrivate() { + return mPrivate; + } + + /** + * Set true to enable multiversion concurrency control (MVCC) on BDB + * environment. This enables snapshot isolation, and is it is not supported + * by all BDB products and versions. + */ + public void setMultiversion(boolean multiversion) { + mMultiversion = multiversion; + } + + /** + * Returns false by default because multiversion concurrency control (MVCC) + * is not enabled. + */ + public boolean isMultiversion() { + return mMultiversion; + } + + /** + * Disable automatic checkpointing of database if another process is + * responsible for that. The false setting is implied for read-only + * databases. + */ + public void setRunCheckpointer(boolean runCheckpointer) { + mRunCheckpointer = runCheckpointer; + } + + /** + * Returns true if checkpointer is run automatically. + */ + public boolean getRunCheckpointer() { + return mRunCheckpointer; + } + + /** + * Set the interval to run checkpoints. This setting is ignored if the + * checkpointer is not configured to run. + * + * @param intervalMillis interval between checkpoints, in milliseconds + */ + public void setCheckpointInterval(int intervalMillis) { + mCheckpointInterval = intervalMillis; + } + + /** + * @return interval between checkpoints, in milliseconds + */ + public int getCheckpointInterval() { + return mCheckpointInterval; + } + + /** + * Disable automatic deadlock detection of database if another thread is + * responsible for that. + */ + public void setRunDeadlockDetector(boolean runDeadlockDetector) { + mRunDeadlockDetector = runDeadlockDetector; + } + + /** + * Returns true if deadlock detector is configured to run. + */ + public boolean getRunDeadlockDetector() { + return mRunDeadlockDetector; + } + + /** + * Optionally set the BDB specific environment configuration to + * use. The builder will verify that needed configuration values are set. + */ + public void setInitialEnvironmentConfig(Object envConfig) { + mInitialEnvConfig = envConfig; + } + + /** + * Returns the optional BDB specific environment configuration to use. + */ + public Object getInitialEnvironmentConfig() { + return mInitialEnvConfig; + } + + /** + * Optionally set the BDB specific database configuration to use + * for all databases created. The storage will verify that needed + * configuration values are set. + */ + public void setInitialDatabaseConfig(Object dbConfig) { + mInitialDBConfig = dbConfig; + } + + /** + * Returns the optional BDB specific database configuration to use + * for all databases created. + */ + public Object getInitialDatabaseConfig() { + return mInitialDBConfig; + } + + /** + * Override the default storable codec factory. + */ + public void setStorableCodecFactory(StorableCodecFactory factory) { + mStorableCodecFactory = factory; + } + + /** + * Returns the storable codec factory used. + */ + public StorableCodecFactory getStorableCodecFactory() { + return mStorableCodecFactory; + } + + /** + * Sets a callback to be invoked before the repository has finished running + * its own shutdown hooks. This method is also invoked when repository is + * manually closed. + */ + public void setPreShutdownHook(Runnable hook) { + mPreShutdownHook = hook; + } + + /** + * Returns the custom shutdown hook that runs before the repository has + * finished running its own shutdown hooks, or null if none. + */ + public Runnable getPreShutdownHook() { + return mPreShutdownHook; + } + + /** + * Sets a callback to be invoked after repository has finished running its + * own shutdown hooks. This method is also invoked when repository is + * manually closed. + */ + public void setShutdownHook(Runnable hook) { + mPostShutdownHook = hook; + } + + /** + * Returns the custom shutdown hook that runs after the repository has + * finished running its own shutdown hooks, or null if none. + */ + public Runnable getShutdownHook() { + return mPostShutdownHook; + } + + /** + * Sets a hook to be called whenever a database is opened. + */ + public void setDatabaseHook(DatabaseHook hook) { + mDatabaseHook = hook; + } + + /** + * Returns the custom open database hook, or null if none. + */ + public DatabaseHook getDatabaseHook() { + return mDatabaseHook; + } + + private long inMicros(double seconds) { + if (seconds >= Long.MAX_VALUE) { + return Long.MAX_VALUE; + } + if (seconds <= 0 || Double.isNaN(seconds)) { + return 0L; + } + return (long) (seconds * 1000000); + } + + public void errorCheck(Collection messages) throws ConfigurationException { + super.errorCheck(messages); + + checkClass: { + Exception error; + try { + getRepositoryConstructor(); + break checkClass; + } catch (ClassCastException e) { + error = e; + } catch (ClassNotFoundException e) { + error = e; + } catch (NoSuchMethodException e) { + error = e; + } + messages.add("BDB product \"" + getProduct() + "\" not supported: " + error); + } + + File envHome = getEnvironmentHomeFile(); + if (envHome == null) { + messages.add("environmentHome missing"); + } else { + if (envHome.exists() && !envHome.isDirectory()) { + messages.add("environment home is not a directory: " + envHome); + } + } + } + + /** + * Looks up appropriate repository via reflection, whose name is derived + * from the BDB product string. + */ + @SuppressWarnings("unchecked") + private Constructor getRepositoryConstructor() + throws ClassCastException, ClassNotFoundException, NoSuchMethodException + { + String className = getClass().getPackage().getName() + '.' + + getBDBProduct().name() + "_Repository"; + Class repoClass = Class.forName(className); + if (BDBRepository.class.isAssignableFrom(repoClass)) { + return repoClass.getDeclaredConstructor + (AtomicReference.class, BDBRepositoryBuilder.class); + } + throw new ClassCastException("Not an instance of BDBRepository: " + repoClass.getName()); + } + + public static interface DatabaseHook { + /** + * Called right before database is opened. + * + * @param db reference to database or config - actual type depends on BDB + * implementation. + */ + void prepareForOpening(Object db) throws RepositoryException; + } +} diff --git a/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBStorage.java b/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBStorage.java new file mode 100644 index 0000000..76cb83c --- /dev/null +++ b/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBStorage.java @@ -0,0 +1,889 @@ +/* + * Copyright 2006 Amazon Technologies, Inc. or its affiliates. + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks + * of Amazon Technologies, Inc. or its affiliates. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.amazon.carbonado.repo.sleepycat; + +import java.util.Collection; +import java.util.Collections; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import com.amazon.carbonado.Cursor; +import com.amazon.carbonado.FetchException; +import com.amazon.carbonado.MalformedFilterException; +import com.amazon.carbonado.PersistException; +import com.amazon.carbonado.Query; +import com.amazon.carbonado.Repository; +import com.amazon.carbonado.RepositoryException; +import com.amazon.carbonado.Storable; +import com.amazon.carbonado.Storage; +import com.amazon.carbonado.SupportException; +import com.amazon.carbonado.Trigger; + +import com.amazon.carbonado.capability.IndexInfo; + +import com.amazon.carbonado.cursor.ArraySortBuffer; +import com.amazon.carbonado.cursor.EmptyCursor; +import com.amazon.carbonado.cursor.MergeSortBuffer; +import com.amazon.carbonado.cursor.SingletonCursor; +import com.amazon.carbonado.cursor.SortBuffer; + +import com.amazon.carbonado.filter.Filter; + +import com.amazon.carbonado.info.Direction; +import com.amazon.carbonado.info.StorableIndex; +import com.amazon.carbonado.info.StorableInfo; +import com.amazon.carbonado.info.StorableIntrospector; +import com.amazon.carbonado.info.StorableProperty; + +import com.amazon.carbonado.layout.Layout; +import com.amazon.carbonado.layout.LayoutFactory; +import com.amazon.carbonado.layout.Unevolvable; + +import com.amazon.carbonado.lob.Blob; +import com.amazon.carbonado.lob.Clob; + +import com.amazon.carbonado.qe.BoundaryType; +import com.amazon.carbonado.qe.QueryEngine; +import com.amazon.carbonado.qe.QueryExecutorFactory; +import com.amazon.carbonado.qe.StorageAccess; + +import com.amazon.carbonado.raw.StorableCodec; +import com.amazon.carbonado.raw.StorableCodecFactory; +import com.amazon.carbonado.raw.RawSupport; +import com.amazon.carbonado.raw.RawUtil; + +import com.amazon.carbonado.spi.IndexInfoImpl; +import com.amazon.carbonado.spi.LobEngine; +import com.amazon.carbonado.spi.SequenceValueProducer; +import com.amazon.carbonado.spi.StorableIndexSet; +import com.amazon.carbonado.spi.TriggerManager; + +/** + * + * @author Brian S O'Neill + */ +abstract class BDBStorage implements Storage, StorageAccess { + /** Constant indicating success */ + protected static final byte[] SUCCESS = new byte[0]; + + /** Constant indicating an entry was not found */ + protected static final byte[] NOT_FOUND = new byte[0]; + + /** Constant indicating an entry already exists */ + protected static final Object KEY_EXIST = new Object(); + + private static final int DEFAULT_LOB_BLOCK_SIZE = 1000; + + final BDBRepository mRepository; + /** Reference to the type of storable */ + private final Class mType; + + /** Does most of the work in generating storables, used for preparing and querying */ + private StorableCodec mStorableCodec; + + /** + * Reference to an instance of Proxy, defined in this class, which binds + * the storables to our own implementation. Handed off to mStorableFactory. + */ + private final RawSupport mRawSupport; + + /** Primary key index is required, and is the only one supported. */ + private StorableIndex mPrimaryKeyIndex; + + /** Reference to primary database. */ + private Object mPrimaryDatabase; + + /** Reference to query engine, defined later in this class */ + private QueryEngine mQueryEngine; + + private Storage mRootStorage; + + final TriggerManager mTriggerManager; + + /** + * Constructs a storage instance, but subclass must call open before it can + * be used. + * + * @param repository repository this storage came from + * @throws SupportException if storable type is not supported + */ + protected BDBStorage(BDBRepository repository, Class type) + throws SupportException + { + mRepository = repository; + mType = type; + mRawSupport = new Support(repository, this); + mTriggerManager = new TriggerManager(type, repository.mTriggerFactories); + try { + // Ask if any lobs via static method first, to prevent stack + // overflow that occurs when creating BDBStorage instances for + // metatypes. These metatypes cannot support Lobs. + if (LobEngine.hasLobs(type)) { + Trigger lobTrigger = repository.getLobEngine() + .getSupportTrigger(type, DEFAULT_LOB_BLOCK_SIZE); + addTrigger(lobTrigger); + } + } catch (SupportException e) { + throw e; + } catch (RepositoryException e) { + throw new SupportException(e); + } + } + + public Class getStorableType() { + return mType; + } + + public S prepare() { + return mStorableCodec.instantiate(mRawSupport); + } + + public Query query() throws FetchException { + return mQueryEngine.query(); + } + + public Query query(String filter) throws FetchException { + return mQueryEngine.query(filter); + } + + public Query query(Filter filter) throws FetchException { + return mQueryEngine.query(filter); + } + + public boolean addTrigger(Trigger trigger) { + return mTriggerManager.addTrigger(trigger); + } + + public boolean removeTrigger(Trigger trigger) { + return mTriggerManager.removeTrigger(trigger); + } + + public IndexInfo[] getIndexInfo() { + StorableIndex pkIndex = mPrimaryKeyIndex; + + if (pkIndex == null) { + return new IndexInfo[0]; + } + + int i = pkIndex.getPropertyCount(); + String[] propertyNames = new String[i]; + Direction[] directions = new Direction[i]; + while (--i >= 0) { + propertyNames[i] = pkIndex.getProperty(i).getName(); + directions[i] = pkIndex.getPropertyDirection(i); + } + + return new IndexInfo[] { + new IndexInfoImpl(getStorableType().getName(), true, true, propertyNames, directions) + }; + } + + public QueryExecutorFactory getQueryExecutorFactory() { + return mQueryEngine; + } + + public Collection> getAllIndexes() { + return Collections.singletonList(mPrimaryKeyIndex); + } + + public Storage storageDelegate(StorableIndex index) { + // We're the grunt and don't delegate. + return null; + } + + public SortBuffer createSortBuffer() { + // FIXME: This is messy. If Storables had built-in serialization + // support, then MergeSortBuffer would not need a root storage. + if (mRootStorage == null) { + try { + mRootStorage = mRepository.getRootRepository().storageFor(getStorableType()); + } catch (RepositoryException e) { + LogFactory.getLog(BDBStorage.class).warn(null, e); + return new ArraySortBuffer(); + } + } + + // FIXME: sort buffer should be on repository access. Also, create abstract + // repository access that creates the correct merge sort buffer. And more: + // create capability for managing merge sort buffers. + return new MergeSortBuffer(mRootStorage); + } + + public long countAll() throws FetchException { + // Return -1 to indicate default algorithmn should be used. + return -1; + } + + public Cursor fetchAll() throws FetchException { + return fetchSubset(null, null, + BoundaryType.OPEN, null, + BoundaryType.OPEN, null, + false, false); + } + + public Cursor fetchOne(StorableIndex index, + Object[] identityValues) + throws FetchException + { + byte[] key = mStorableCodec.encodePrimaryKey(identityValues); + byte[] value = mRawSupport.tryLoad(key); + if (value == null) { + return EmptyCursor.the(); + } + return new SingletonCursor(instantiate(key, value)); + } + + public Cursor fetchSubset(StorableIndex index, + Object[] identityValues, + BoundaryType rangeStartBoundary, + Object rangeStartValue, + BoundaryType rangeEndBoundary, + Object rangeEndValue, + boolean reverseRange, + boolean reverseOrder) + throws FetchException + { + BDBTransactionManager txnMgr = openTransactionManager(); + + if (reverseRange) { + { + BoundaryType temp = rangeStartBoundary; + rangeStartBoundary = rangeEndBoundary; + rangeEndBoundary = temp; + } + + { + Object temp = rangeStartValue; + rangeStartValue = rangeEndValue; + rangeEndValue = temp; + } + } + + // Lock out shutdown task. + txnMgr.getLock().lock(); + try { + StorableCodec codec = mStorableCodec; + + final byte[] identityKey; + if (identityValues == null || identityValues.length == 0) { + identityKey = codec.encodePrimaryKeyPrefix(); + } else { + identityKey = codec.encodePrimaryKey(identityValues, 0, identityValues.length); + } + + final byte[] startBound; + if (rangeStartBoundary == BoundaryType.OPEN) { + startBound = identityKey; + } else { + startBound = createBound(identityValues, identityKey, rangeStartValue, codec); + if (!reverseOrder && rangeStartBoundary == BoundaryType.EXCLUSIVE) { + // If key is composite and partial, need to skip trailing + // unspecified keys by adding one and making inclusive. + if (!RawUtil.increment(startBound)) { + return EmptyCursor.the(); + } + rangeStartBoundary = BoundaryType.INCLUSIVE; + } + } + + final byte[] endBound; + if (rangeEndBoundary == BoundaryType.OPEN) { + endBound = identityKey; + } else { + endBound = createBound(identityValues, identityKey, rangeEndValue, codec); + if (reverseOrder && rangeEndBoundary == BoundaryType.EXCLUSIVE) { + // If key is composite and partial, need to skip trailing + // unspecified keys by subtracting one and making + // inclusive. + if (!RawUtil.decrement(endBound)) { + return EmptyCursor.the(); + } + rangeEndBoundary = BoundaryType.INCLUSIVE; + } + } + + final boolean inclusiveStart = rangeStartBoundary != BoundaryType.EXCLUSIVE; + final boolean inclusiveEnd = rangeEndBoundary != BoundaryType.EXCLUSIVE; + + try { + BDBCursor cursor = openCursor + (txnMgr, + startBound, inclusiveStart, + endBound, inclusiveEnd, + mStorableCodec.getPrimaryKeyPrefixLength(), + reverseOrder, + getPrimaryDatabase()); + + cursor.open(); + return cursor; + } catch (Exception e) { + throw toFetchException(e); + } + } finally { + txnMgr.getLock().unlock(); + } + } + + private byte[] createBound(Object[] exactValues, byte[] exactKey, Object rangeValue, + StorableCodec codec) { + Object[] values = {rangeValue}; + if (exactValues == null || exactValues.length == 0) { + return codec.encodePrimaryKey(values, 0, 1); + } + + byte[] rangeKey = codec.encodePrimaryKey + (values, exactValues.length, exactValues.length + 1); + byte[] bound = new byte[exactKey.length + rangeKey.length]; + System.arraycopy(exactKey, 0, bound, 0, exactKey.length); + System.arraycopy(rangeKey, 0, bound, exactKey.length, rangeKey.length); + return bound; + } + + protected BDBRepository getRepository() { + return mRepository; + } + + /** + * @param readOnly when true, this method will not attempt to reconcile + * differences between the current index set and the desired index set. + */ + protected void open(boolean readOnly) throws RepositoryException { + final Layout layout = getLayout(); + + StorableInfo info = StorableIntrospector.examine(getStorableType()); + + StorableCodecFactory codecFactory = mRepository.getStorableCodecFactory(); + + // Open primary database. + Object primaryDatabase; + + String databaseName = codecFactory.getStorageName(getStorableType()); + if (databaseName == null) { + databaseName = getStorableType().getName(); + } + + // Primary info may be null for StoredDatabaseInfo itself. + StoredDatabaseInfo primaryInfo; + boolean isPrimaryEmpty; + + try { + BDBTransactionManager txnMgr = mRepository.openTransactionManager(); + // Lock out shutdown task. + txnMgr.getLock().lock(); + try { + primaryDatabase = env_openPrimaryDatabase(null, databaseName); + primaryInfo = registerPrimaryDatabase(readOnly, layout); + isPrimaryEmpty = db_isEmpty(null, primaryDatabase, txnMgr.isForUpdate()); + } finally { + txnMgr.getLock().unlock(); + } + } catch (Exception e) { + throw toRepositoryException(e); + } + + StorableIndex pkIndex; + + if (!isPrimaryEmpty && primaryInfo != null + && primaryInfo.getIndexNameDescriptor() != null) { + + // Entries already exist, so primary key format is locked in. + pkIndex = StorableIndex.parseNameDescriptor + (primaryInfo.getIndexNameDescriptor(), info); + // TODO: Verify index types match and throw error if not. + } else { + // In order to select the best index for the primary key, allow all + // indexes to be considered. + StorableIndexSet indexSet = new StorableIndexSet(); + indexSet.addIndexes(info); + indexSet.addAlternateKeys(info); + indexSet.addPrimaryKey(info); + + indexSet.reduce(Direction.ASCENDING); + + pkIndex = indexSet.findPrimaryKeyIndex(info); + if (primaryInfo != null) { + if (!pkIndex.getNameDescriptor().equals(primaryInfo.getIndexNameDescriptor()) || + !pkIndex.getTypeDescriptor().equals(primaryInfo.getIndexTypeDescriptor())) { + + primaryInfo.setIndexNameDescriptor(pkIndex.getNameDescriptor()); + primaryInfo.setIndexTypeDescriptor(pkIndex.getTypeDescriptor()); + + if (!readOnly) { + primaryInfo.update(); + } + } + } + } + + // Indicate that primary key is clustered, which can affect query analysis. + pkIndex = pkIndex.clustered(true); + + try { + mStorableCodec = codecFactory + .createCodec(getStorableType(), pkIndex, mRepository.isMaster(), layout); + } catch (SupportException e) { + // We've opened the database prematurely, since type isn't + // supported by encoding strategy. Close it down and unregister. + try { + db_close(primaryDatabase); + } catch (Exception e2) { + // Don't care. + } + try { + unregisterDatabase(readOnly, getStorableType().getName()); + } catch (Exception e2) { + // Don't care. + } + throw e; + } + + mPrimaryKeyIndex = mStorableCodec.getPrimaryKeyIndex(); + mPrimaryDatabase = primaryDatabase; + + mQueryEngine = new QueryEngine(getStorableType(), mRepository); + } + + protected S instantiate(byte[] key, byte[] value) throws FetchException { + return mStorableCodec.instantiate(mRawSupport, key, value); + } + + protected CompactionCapability.Result compact() throws RepositoryException { + byte[] start = mStorableCodec.encodePrimaryKeyPrefix(); + if (start != null && start.length == 0) { + start = null; + } + + byte[] end; + if (start == null) { + end = null; + } else { + end = start.clone(); + if (!RawUtil.increment(end)) { + end = null; + } + } + + try { + Txn txn = mRepository.openTransactionManager().getTxn(); + return db_compact(txn, mPrimaryDatabase, start, end); + } catch (Exception e) { + throw mRepository.toRepositoryException(e); + } + } + + /** + * @return true if record with given key exists + */ + protected abstract boolean db_exists(Txn txn, byte[] key, boolean rmw) throws Exception; + + /** + * @return NOT_FOUND, any byte[], or null (if empty result) + */ + protected abstract byte[] db_get(Txn txn, byte[] key, boolean rmw) throws Exception; + + /** + * @return SUCCESS, KEY_EXIST, or NOT_FOUND otherwise + */ + protected abstract Object db_putNoOverwrite(Txn txn, byte[] key, byte[] value) + throws Exception; + + /** + * @return true if successful + */ + protected abstract boolean db_put(Txn txn, byte[] key, byte[] value) + throws Exception; + + /** + * @return true if successful + */ + protected abstract boolean db_delete(Txn txn, byte[] key) + throws Exception; + + protected abstract void db_truncate(Txn txn) throws Exception; + + /** + * @return true if database has no entries. + */ + protected abstract boolean db_isEmpty(Txn txn, Object database, boolean rmw) throws Exception; + + protected CompactionCapability.Result db_compact + (Txn txn, Object database, byte[] start, byte[] end) + throws Exception + { + throw new UnsupportedOperationException(); + } + + protected abstract void db_close(Object database) throws Exception; + + /** + * Implementation should call runDatabasePrepareForOpeningHook on database + * before opening. + */ + protected abstract Object env_openPrimaryDatabase(Txn txn, String name) throws Exception; + + protected void runDatabasePrepareForOpeningHook(Object database) throws RepositoryException { + mRepository.runDatabasePrepareForOpeningHook(database); + } + + protected abstract void env_removeDatabase(Txn txn, String databaseName) throws Exception; + + /** + * @param txn optional transaction to commit when cursor is closed + * @param txnMgr + * @param startBound specify the starting key for the cursor, or null if first + * @param inclusiveStart true if start bound is inclusive + * @param endBound specify the ending key for the cursor, or null if last + * @param inclusiveEnd true if end bound is inclusive + * @param maxPrefix maximum expected common initial bytes in start and end bound + * @param reverse when true, iteration is reversed + * @param database database to use + */ + protected abstract BDBCursor openCursor + (BDBTransactionManager txnMgr, + byte[] startBound, boolean inclusiveStart, + byte[] endBound, boolean inclusiveEnd, + int maxPrefix, + boolean reverse, + Object database) + throws Exception; + + FetchException toFetchException(Throwable e) { + return mRepository.toFetchException(e); + } + + PersistException toPersistException(Throwable e) { + return mRepository.toPersistException(e); + } + + RepositoryException toRepositoryException(Throwable e) { + return mRepository.toRepositoryException(e); + } + + BDBTransactionManager openTransactionManager() { + return mRepository.openTransactionManager(); + } + + /** + * Caller must hold transaction lock. May throw FetchException if storage + * is closed. + */ + Object getPrimaryDatabase() throws FetchException { + Object database = mPrimaryDatabase; + if (database == null) { + checkClosed(); + throw new IllegalStateException("BDBStorage not opened"); + } + return database; + } + + Blob getBlob(long locator) throws FetchException { + try { + return mRepository.getLobEngine().getBlobValue(locator); + } catch (RepositoryException e) { + throw e.toFetchException(); + } + } + + long getLocator(Blob blob) throws PersistException { + try { + return mRepository.getLobEngine().getLocator(blob); + } catch (ClassCastException e) { + throw new PersistException(e); + } catch (RepositoryException e) { + throw e.toPersistException(); + } + } + + Clob getClob(long locator) throws FetchException { + try { + return mRepository.getLobEngine().getClobValue(locator); + } catch (RepositoryException e) { + throw e.toFetchException(); + } + } + + long getLocator(Clob clob) throws PersistException { + try { + return mRepository.getLobEngine().getLocator(clob); + } catch (ClassCastException e) { + throw new PersistException(e); + } catch (RepositoryException e) { + throw e.toPersistException(); + } + } + + /** + * If open, returns normally. If shutting down, blocks forever. Otherwise, + * if closed, throws FetchException. Method blocks forever on shutdown to + * prevent threads from starting work that will likely fail along the way. + */ + void checkClosed() throws FetchException { + BDBTransactionManager txnMgr = openTransactionManager(); + + // Lock out shutdown task. + txnMgr.getLock().lock(); + try { + if (mPrimaryDatabase == null) { + // If shuting down, this will force us to block forever. + try { + txnMgr.getTxn(); + } catch (Exception e) { + // Don't care. + } + // Okay, not shutting down, so throw exception. + throw new FetchException("Repository closed"); + } + } finally { + txnMgr.getLock().unlock(); + } + } + + void close() throws Exception { + BDBTransactionManager txnMgr = mRepository.openTransactionManager(); + txnMgr.getLock().lock(); + try { + if (mPrimaryDatabase != null) { + db_close(mPrimaryDatabase); + mPrimaryDatabase = null; + } + } finally { + txnMgr.getLock().unlock(); + } + } + + private Layout getLayout() throws RepositoryException { + if (Unevolvable.class.isAssignableFrom(getStorableType())) { + // Don't record generation for storables marked as unevolvable. + return null; + } + + LayoutFactory factory; + try { + factory = mRepository.getLayoutFactory(); + } catch (SupportException e) { + // Metadata repository does not support layout storables, so it + // cannot support generations. + return null; + } + + return factory.layoutFor(getStorableType()); + } + + /** + * Note: returned StoredDatabaseInfo does not have name and type + * descriptors saved yet. + * + * @return null if type cannot be registered + */ + private StoredDatabaseInfo registerPrimaryDatabase(boolean readOnly, Layout layout) + throws Exception + { + if (getStorableType() == StoredDatabaseInfo.class) { + // Can't register itself in itself. + return null; + } + StoredDatabaseInfo info; + try { + info = prepareStoredDatabaseInfo(); + } catch (SupportException e) { + return null; + } + info.setDatabaseName(getStorableType().getName()); + if (!info.tryLoad()) { + if (layout == null) { + info.setEvolutionStrategy(StoredDatabaseInfo.EVOLUTION_NONE); + } else { + info.setEvolutionStrategy(StoredDatabaseInfo.EVOLUTION_STANDARD); + } + info.setCreationTimestamp(System.currentTimeMillis()); + info.setVersionNumber(0); + if (!readOnly) { + info.insert(); + } + } + return info; + } + + private void unregisterDatabase(boolean readOnly, String name) throws RepositoryException { + if (getStorableType() == StoredDatabaseInfo.class) { + // Can't unregister when register wasn't allowed. + return; + } + if (!readOnly) { + StoredDatabaseInfo info; + try { + info = prepareStoredDatabaseInfo(); + } catch (SupportException e) { + return; + } + info.setDatabaseName(name); + info.delete(); + } + } + + /** + * @throws SupportException if StoredDatabaseInfo is not supported by codec factory + */ + private StoredDatabaseInfo prepareStoredDatabaseInfo() throws RepositoryException { + return mRepository.getRootRepository().storageFor(StoredDatabaseInfo.class).prepare(); + } + + // Note: BDBStorage could just implement the RawSupport interface, but + // then these hidden methods would be public. A simple cast of Storage to + // RawSupport would expose them. + private static class Support implements RawSupport { + private final BDBRepository mRepository; + private final BDBStorage mStorage; + private Map> mProperties; + + Support(BDBRepository repo, BDBStorage storage) { + mRepository = repo; + mStorage = storage; + } + + public Repository getRootRepository() { + return mRepository.getRootRepository(); + } + + public boolean isPropertySupported(String name) { + if (name == null) { + return false; + } + if (mProperties == null) { + mProperties = StorableIntrospector + .examine(mStorage.getStorableType()).getAllProperties(); + } + return mProperties.containsKey(name); + } + + public byte[] tryLoad(byte[] key) throws FetchException { + BDBTransactionManager txnMgr = mStorage.openTransactionManager(); + byte[] result; + // Lock out shutdown task. + txnMgr.getLock().lock(); + try { + try { + result = mStorage.db_get(txnMgr.getTxn(), key, txnMgr.isForUpdate()); + } catch (Throwable e) { + throw mStorage.toFetchException(e); + } + } finally { + txnMgr.getLock().unlock(); + } + if (result == NOT_FOUND) { + return null; + } + if (result == null) { + result = SUCCESS; + } + return result; + } + + public boolean tryInsert(S storable, byte[] key, byte[] value) throws PersistException { + BDBTransactionManager txnMgr = mStorage.openTransactionManager(); + Object result; + // Lock out shutdown task. + txnMgr.getLock().lock(); + try { + try { + result = mStorage.db_putNoOverwrite(txnMgr.getTxn(), key, value); + } catch (Throwable e) { + throw mStorage.toPersistException(e); + } + } finally { + txnMgr.getLock().unlock(); + } + if (result == KEY_EXIST) { + return false; + } + if (result != SUCCESS) { + throw new PersistException("Failed"); + } + return true; + } + + public void store(S storable, byte[] key, byte[] value) throws PersistException { + BDBTransactionManager txnMgr = mStorage.openTransactionManager(); + // Lock out shutdown task. + txnMgr.getLock().lock(); + try { + try { + if (!mStorage.db_put(txnMgr.getTxn(), key, value)) { + throw new PersistException("Failed"); + } + } catch (Throwable e) { + throw mStorage.toPersistException(e); + } + } finally { + txnMgr.getLock().unlock(); + } + } + + public boolean tryDelete(byte[] key) throws PersistException { + BDBTransactionManager txnMgr = mStorage.openTransactionManager(); + // Lock out shutdown task. + txnMgr.getLock().lock(); + try { + try { + return mStorage.db_delete(txnMgr.getTxn(), key); + } catch (Throwable e) { + throw mStorage.toPersistException(e); + } + } finally { + txnMgr.getLock().unlock(); + } + } + + public Blob getBlob(long locator) throws FetchException { + return mStorage.getBlob(locator); + } + + public long getLocator(Blob blob) throws PersistException { + return mStorage.getLocator(blob); + } + + public Clob getClob(long locator) throws FetchException { + return mStorage.getClob(locator); + } + + public long getLocator(Clob clob) throws PersistException { + return mStorage.getLocator(clob); + } + + public SequenceValueProducer getSequenceValueProducer(String name) + throws PersistException + { + return mStorage.mRepository.getSequenceValueProducer(name); + } + + public Trigger getInsertTrigger() { + return mStorage.mTriggerManager.getInsertTrigger(); + } + + public Trigger getUpdateTrigger() { + return mStorage.mTriggerManager.getUpdateTrigger(); + } + + public Trigger getDeleteTrigger() { + return mStorage.mTriggerManager.getDeleteTrigger(); + } + } +} diff --git a/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBTransactionManager.java b/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBTransactionManager.java new file mode 100644 index 0000000..4c6105f --- /dev/null +++ b/src/main/java/com/amazon/carbonado/repo/sleepycat/BDBTransactionManager.java @@ -0,0 +1,82 @@ +/* + * Copyright 2006 Amazon Technologies, Inc. or its affiliates. + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks + * of Amazon Technologies, Inc. or its affiliates. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.amazon.carbonado.repo.sleepycat; + +import java.lang.ref.WeakReference; +import java.util.concurrent.TimeUnit; + +import com.amazon.carbonado.IsolationLevel; +import com.amazon.carbonado.Transaction; + +import com.amazon.carbonado.spi.ExceptionTransformer; +import com.amazon.carbonado.spi.TransactionManager; + +/** + * This class is used for tracking transactions and open cursors. Each + * thread that uses the BDBRepository instance is assigned at most one + * BDBTransactionManager instance. + * + * @author Brian S O'Neill + */ +class BDBTransactionManager extends TransactionManager { + // Weakly reference repository because thread locals are not cleaned up + // very quickly and BDB environments hang on to a ton of memory. + private final WeakReference> mRepositoryRef; + + BDBTransactionManager(ExceptionTransformer exTransformer, BDBRepository repository) { + super(exTransformer); + mRepositoryRef = new WeakReference>(repository); + } + + protected IsolationLevel selectIsolationLevel(Transaction parent, IsolationLevel level) { + return repository().selectIsolationLevel(parent, level); + } + + protected Txn createTxn(Txn parent, IsolationLevel level) throws Exception { + return repository().txn_begin(parent, level); + } + + @Override + protected Txn createTxn(Txn parent, IsolationLevel level, int timeout, TimeUnit unit) + throws Exception + { + if (timeout == 0) { + return repository().txn_begin_nowait(parent, level); + } else { + return repository().txn_begin(parent, level); + } + } + + protected boolean commitTxn(Txn txn) throws Exception { + repository().txn_commit(txn); + return false; + } + + protected void abortTxn(Txn txn) throws Exception { + repository().txn_abort(txn); + } + + private BDBRepository repository() { + BDBRepository repo = mRepositoryRef.get(); + if (repo == null) { + throw new IllegalStateException("Repository closed"); + } + return repo; + } +} diff --git a/src/main/java/com/amazon/carbonado/repo/sleepycat/CheckpointCapability.java b/src/main/java/com/amazon/carbonado/repo/sleepycat/CheckpointCapability.java new file mode 100644 index 0000000..dc4b3d7 --- /dev/null +++ b/src/main/java/com/amazon/carbonado/repo/sleepycat/CheckpointCapability.java @@ -0,0 +1,59 @@ +/* + * Copyright 2006 Amazon Technologies, Inc. or its affiliates. + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks + * of Amazon Technologies, Inc. or its affiliates. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.amazon.carbonado.repo.sleepycat; + +import com.amazon.carbonado.PersistException; +import com.amazon.carbonado.capability.Capability; + +/** + * Capability to control BDB checkpointing. Useful when performing hot backups. + * + * @author Brian S O'Neill + */ +public interface CheckpointCapability extends Capability { + /** + * Suspend the checkpointer until the suspension time has expired or until + * manually resumed. If a checkpoint is in progress, this method will block + * until it is finished. If checkpointing is disabled, calling this method + * has no effect. + * + *

Calling this method repeatedly resets the suspension time. This + * technique should be used by hot backup processes to ensure that its + * failure does not leave the checkpointer permanently suspended. Each + * invocation of suspendCheckpointer is like a lease renewal or heartbeat. + * + * @param suspensionTime minimum length of suspension, in milliseconds, + * unless checkpointer is manually resumed + */ + void suspendCheckpointer(long suspensionTime); + + /** + * Resumes the checkpointer if it was suspended. If checkpointing is + * disabled or if not suspended, calling this method has no effect. + */ + void resumeCheckpointer(); + + /** + * Forces a checkpoint to run now, even if checkpointer is suspended or + * disabled. If a checkpoint is in progress, then this method will block + * until it is finished, and then run another checkpoint. This method does + * not return until the requested checkpoint has finished. + */ + void forceCheckpoint() throws PersistException; +} diff --git a/src/main/java/com/amazon/carbonado/repo/sleepycat/CompactionCapability.java b/src/main/java/com/amazon/carbonado/repo/sleepycat/CompactionCapability.java new file mode 100644 index 0000000..021f62d --- /dev/null +++ b/src/main/java/com/amazon/carbonado/repo/sleepycat/CompactionCapability.java @@ -0,0 +1,52 @@ +/* + * Copyright 2006 Amazon Technologies, Inc. or its affiliates. + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks + * of Amazon Technologies, Inc. or its affiliates. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.amazon.carbonado.repo.sleepycat; + +import com.amazon.carbonado.RepositoryException; +import com.amazon.carbonado.Storable; +import com.amazon.carbonado.capability.Capability; + +/** + * Capability to compact a BDB database. This capability is not supported by + * all versions of BDB. + * + * @author Brian S O'Neill + */ +public interface CompactionCapability extends Capability { + /** + * Compact an entire BDB backed storage. This call may be made within a + * transaction scope. + * + * @param storableType required storable type + */ + Result compact(Class storableType) + throws RepositoryException; + + public interface Result { + int getPagesExamine(); + + int getPagesFree(); + + int getPagesTruncated(); + + int getLevels(); + + int getDeadlockCount(); + } +} diff --git a/src/main/java/com/amazon/carbonado/repo/sleepycat/EnvironmentCapability.java b/src/main/java/com/amazon/carbonado/repo/sleepycat/EnvironmentCapability.java new file mode 100644 index 0000000..1244d14 --- /dev/null +++ b/src/main/java/com/amazon/carbonado/repo/sleepycat/EnvironmentCapability.java @@ -0,0 +1,34 @@ +/* + * Copyright 2006 Amazon Technologies, Inc. or its affiliates. + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks + * of Amazon Technologies, Inc. or its affiliates. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.amazon.carbonado.repo.sleepycat; + +import com.amazon.carbonado.capability.Capability; + +/** + * Capability to provide direct access to the underlying BDB environment. + * + * @author Brian S O'Neill + */ +public interface EnvironmentCapability extends Capability { + /** + * Returns the BDB environment object, which must be cast to the expected + * type, depending on the BDB product and version being used. + */ + Object getEnvironment(); +} diff --git a/src/main/java/com/amazon/carbonado/repo/sleepycat/StoredDatabaseInfo.java b/src/main/java/com/amazon/carbonado/repo/sleepycat/StoredDatabaseInfo.java new file mode 100644 index 0000000..9925e01 --- /dev/null +++ b/src/main/java/com/amazon/carbonado/repo/sleepycat/StoredDatabaseInfo.java @@ -0,0 +1,102 @@ +/* + * Copyright 2006 Amazon Technologies, Inc. or its affiliates. + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks + * of Amazon Technologies, Inc. or its affiliates. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.amazon.carbonado.repo.sleepycat; + +import com.amazon.carbonado.Nullable; +import com.amazon.carbonado.PrimaryKey; +import com.amazon.carbonado.Storable; +import com.amazon.carbonado.Version; + +import com.amazon.carbonado.layout.Unevolvable; +import com.amazon.carbonado.repo.indexed.Unindexed; + +/** + * Stores basic information about the BDB databases managed by BDBRepository. + * + *

Note: This storable cannot have indexes defined, since it is used to + * discover information about indexes. It would create a cyclic dependency. + * + * @author Brian S O'Neill + */ +@PrimaryKey("databaseName") +public abstract class StoredDatabaseInfo implements Storable, Unevolvable, Unindexed { + /** Evolution strategy code */ + public static final int EVOLUTION_NONE = 0, EVOLUTION_STANDARD = 1; + + public StoredDatabaseInfo() { + } + + public abstract String getDatabaseName(); + + public abstract void setDatabaseName(String name); + + /** + * Returns the index name descriptor for the keys of this database. This + * descriptor is defined by {@link com.amazon.carbonado.info.StorableIndex}, and + * it does not contain type information. + */ + @Nullable + public abstract String getIndexNameDescriptor(); + + public abstract void setIndexNameDescriptor(String descriptor); + + /** + * Returns the types of the index properties. This descriptor is defined by + * {@link com.amazon.carbonado.info.StorableIndex}. + */ + @Nullable + public abstract String getIndexTypeDescriptor(); + + public abstract void setIndexTypeDescriptor(String descriptor); + + /** + * Returns EVOLUTION_NONE if evolution of records is not supported. + */ + public abstract int getEvolutionStrategy(); + + public abstract void setEvolutionStrategy(int strategy); + + /** + * Returns the milliseconds from 1970-01-01T00:00:00Z when this record was + * created. + */ + public abstract long getCreationTimestamp(); + + public abstract void setCreationTimestamp(long timestamp); + + /** + * Record version number for this StoredDatabaseInfo instance. Some + * encoding strategies require a version number. + */ + @Version + public abstract int getVersionNumber(); + + public abstract void setVersionNumber(int version); + + /** + * Since this record cannot evolve, this property allows it to be extended + * without conflicting with existing records. This record cannot evolve + * because an evolution strategy likely depends on this interface remaining + * stable, avoiding a cyclic dependency. + */ + @Nullable + public abstract byte[] getExtraData(); + + public abstract void setExtraData(byte[] data); +} diff --git a/src/main/java/com/amazon/carbonado/repo/sleepycat/package-info.java b/src/main/java/com/amazon/carbonado/repo/sleepycat/package-info.java new file mode 100644 index 0000000..ba03e57 --- /dev/null +++ b/src/main/java/com/amazon/carbonado/repo/sleepycat/package-info.java @@ -0,0 +1,28 @@ +/* + * Copyright 2006 Amazon Technologies, Inc. or its affiliates. + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks + * of Amazon Technologies, Inc. or its affiliates. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Repository implementation backed by a Sleepycat (BDB, Berkeley) + * database. Data is encoded in a specialized format, and so this repository + * should not be used to open arbitrary Berkeley databases. BDBRepository has + * total schema ownership, and so it updates type definitions in the storage + * layer automatically. + * + * @see com.amazon.carbonado.repo.sleepycat.BDBRepositoryBuilder + */ +package com.amazon.carbonado.repo.sleepycat; -- cgit v1.2.3