From dc708480b286decd4328d52d742cac000faca3f8 Mon Sep 17 00:00:00 2001 From: "Brian S. O'Neill" Date: Sun, 4 May 2008 17:36:39 +0000 Subject: Merged map repository implementation from map branch. --- pom.xml | 2 +- .../java/com/amazon/carbonado/repo/map/Key.java | 273 +++++ .../com/amazon/carbonado/repo/map/MapCursor.java | 173 +++ .../amazon/carbonado/repo/map/MapRepository.java | 125 +++ .../carbonado/repo/map/MapRepositoryBuilder.java | 151 +++ .../com/amazon/carbonado/repo/map/MapStorage.java | 813 ++++++++++++++ .../amazon/carbonado/repo/map/MapTransaction.java | 240 ++++ .../carbonado/repo/map/MapTransactionManager.java | 98 ++ .../amazon/carbonado/repo/map/UpgradableLock.java | 1161 ++++++++++++++++++++ .../amazon/carbonado/repo/map/package-info.java | 24 + 10 files changed, 3059 insertions(+), 1 deletion(-) create mode 100644 src/main/java/com/amazon/carbonado/repo/map/Key.java create mode 100644 src/main/java/com/amazon/carbonado/repo/map/MapCursor.java create mode 100644 src/main/java/com/amazon/carbonado/repo/map/MapRepository.java create mode 100644 src/main/java/com/amazon/carbonado/repo/map/MapRepositoryBuilder.java create mode 100644 src/main/java/com/amazon/carbonado/repo/map/MapStorage.java create mode 100644 src/main/java/com/amazon/carbonado/repo/map/MapTransaction.java create mode 100644 src/main/java/com/amazon/carbonado/repo/map/MapTransactionManager.java create mode 100644 src/main/java/com/amazon/carbonado/repo/map/UpgradableLock.java create mode 100644 src/main/java/com/amazon/carbonado/repo/map/package-info.java diff --git a/pom.xml b/pom.xml index 9538a01..d2ce95d 100644 --- a/pom.xml +++ b/pom.xml @@ -191,7 +191,7 @@ Standard Repositories - com.amazon.carbonado.repo.jdbc:com.amazon.carbonado.repo.replicated:com.amazon.carbonado.repo.logging:com.amazon.carbonado.repo.sleepycat + com.amazon.carbonado.repo.map:com.amazon.carbonado.repo.jdbc:com.amazon.carbonado.repo.replicated:com.amazon.carbonado.repo.logging:com.amazon.carbonado.repo.sleepycat Service Provider Interface diff --git a/src/main/java/com/amazon/carbonado/repo/map/Key.java b/src/main/java/com/amazon/carbonado/repo/map/Key.java new file mode 100644 index 0000000..5032f66 --- /dev/null +++ b/src/main/java/com/amazon/carbonado/repo/map/Key.java @@ -0,0 +1,273 @@ +/* + * Copyright 2008 Amazon Technologies, Inc. or its affiliates. + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks + * of Amazon Technologies, Inc. or its affiliates. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.amazon.carbonado.repo.map; + +import java.lang.reflect.UndeclaredThrowableException; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Map; + +import org.cojen.classfile.ClassFile; +import org.cojen.classfile.CodeBuilder; +import org.cojen.classfile.Label; +import org.cojen.classfile.LocalVariable; +import org.cojen.classfile.MethodInfo; +import org.cojen.classfile.Modifiers; +import org.cojen.classfile.TypeDesc; + +import org.cojen.util.ClassInjector; +import org.cojen.util.SoftValuedHashMap; + +import com.amazon.carbonado.Storable; + +import com.amazon.carbonado.info.OrderedProperty; +import com.amazon.carbonado.info.StorableInfo; +import com.amazon.carbonado.info.StorableIntrospector; +import com.amazon.carbonado.info.StorableProperty; + +import com.amazon.carbonado.gen.CodeBuilderUtil; + +/** + * + * + * @author Brian S O'Neill + */ +class Key implements Comparable> { + protected final S mStorable; + protected final Comparator mComparator; + + Key(S storable, Comparator comparator) { + mStorable = storable; + mComparator = comparator; + } + + @Override + public String toString() { + return mStorable.toString(); + } + + public int compareTo(Key other) { + int result = mComparator.compare(mStorable, other.mStorable); + if (result == 0) { + int t1 = tieBreaker(); + int t2 = other.tieBreaker(); + if (t1 < t2) { + result = -1; + } else if (t1 > t2) { + result = 1; + } + } + return result; + } + + protected int tieBreaker() { + return 0; + } + + public static interface Assigner { + void setKeyValues(S storable, Object[] identityValues); + + void setKeyValues(S storable, Object[] identityValues, Object rangeValue); + } + + private static final Map mAssigners; + + static { + mAssigners = new SoftValuedHashMap(); + } + + public static synchronized Assigner getAssigner(Class clazz) { + Assigner assigner = mAssigners.get(clazz); + if (assigner == null) { + assigner = createAssigner(clazz); + mAssigners.put(clazz, assigner); + } + return assigner; + } + + private static Assigner createAssigner(Class clazz) { + final StorableInfo info = StorableIntrospector.examine(clazz); + + ClassInjector ci = ClassInjector.create(clazz.getName(), clazz.getClassLoader()); + ClassFile cf = new ClassFile(ci.getClassName()); + cf.addInterface(Assigner.class); + cf.markSynthetic(); + cf.setSourceFile(Key.class.getName()); + cf.setTarget("1.5"); + + cf.addDefaultConstructor(); + + // Define required setKeyValues methods. + + List> pk = + new ArrayList>(info.getPrimaryKey().getProperties()); + + TypeDesc storableType = TypeDesc.forClass(Storable.class); + TypeDesc storableArrayType = storableType.toArrayType(); + TypeDesc userStorableType = TypeDesc.forClass(info.getStorableType()); + TypeDesc objectArrayType = TypeDesc.OBJECT.toArrayType(); + + MethodInfo mi = cf.addMethod(Modifiers.PUBLIC, "setKeyValues", null, + new TypeDesc[] {storableType, objectArrayType}); + + CodeBuilder b = new CodeBuilder(mi); + + LocalVariable userStorableVar = b.createLocalVariable(null, userStorableType); + b.loadLocal(b.getParameter(0)); + b.checkCast(userStorableType); + b.storeLocal(userStorableVar); + + // Switch on the number of values supplied. + + /* Switch looks like this for two pk properties: + + switch(identityValues.length) { + default: + throw new IllegalArgumentException(); + case 2: + storable.setPkProp2(identityValues[1]); + case 1: + storable.setPkProp1(identityValues[0]); + case 0: + } + + */ + + b.loadLocal(b.getParameter(1)); + b.arrayLength(); + + int[] cases = new int[pk.size() + 1]; + Label[] labels = new Label[pk.size() + 1]; + + for (int i=0; i= 0) { + b.loadLocal(userStorableVar); + b.loadLocal(b.getParameter(1)); + b.loadConstant(prop); + b.loadFromArray(storableArrayType); + callSetPropertyValue(b, pk.get(prop)); + } + // Fall through to next case. + } + + b.returnVoid(); + + defaultLabel.setLocation(); + CodeBuilderUtil.throwException(b, IllegalArgumentException.class, null); + + // The setKeyValues method that takes a range value calls the other + // setKeyValues method first, to take care of the identityValues. + + mi = cf.addMethod(Modifiers.PUBLIC, "setKeyValues", null, + new TypeDesc[] {storableType, objectArrayType, TypeDesc.OBJECT}); + + b = new CodeBuilder(mi); + + b.loadThis(); + b.loadLocal(b.getParameter(0)); + b.loadLocal(b.getParameter(1)); + b.invokeVirtual("setKeyValues", null, new TypeDesc[] {storableType, objectArrayType}); + + userStorableVar = b.createLocalVariable(null, userStorableType); + b.loadLocal(b.getParameter(0)); + b.checkCast(userStorableType); + b.storeLocal(userStorableVar); + + // Switch on the number of values supplied. + + /* Switch looks like this for two pk properties: + + switch(identityValues.length) { + default: + throw new IllegalArgumentException(); + case 0: + storable.setPkProp1(rangeValue); + return; + case 1: + storable.setPkProp2(rangeValue); + return; + } + + */ + + b.loadLocal(b.getParameter(1)); + b.arrayLength(); + + cases = new int[pk.size()]; + labels = new Label[pk.size()]; + + for (int i=0; i) ci.defineClass(cf).newInstance(); + } catch (IllegalAccessException e) { + throw new UndeclaredThrowableException(e); + } catch (InstantiationException e) { + throw new UndeclaredThrowableException(e); + } + } + + /** + * Creates code to call set method. Assumes Storable and property value + * are already on the stack. + */ + private static void callSetPropertyValue(CodeBuilder b, OrderedProperty op) { + StorableProperty property = op.getChainedProperty().getLastProperty(); + TypeDesc propType = TypeDesc.forClass(property.getType()); + if (propType != TypeDesc.OBJECT) { + TypeDesc objectType = propType.toObjectType(); + b.checkCast(objectType); + // Potentially unbox primitive. + b.convert(objectType, propType); + } + b.invoke(property.getWriteMethod()); + } +} diff --git a/src/main/java/com/amazon/carbonado/repo/map/MapCursor.java b/src/main/java/com/amazon/carbonado/repo/map/MapCursor.java new file mode 100644 index 0000000..cd0f7e3 --- /dev/null +++ b/src/main/java/com/amazon/carbonado/repo/map/MapCursor.java @@ -0,0 +1,173 @@ +/* + * Copyright 2008 Amazon Technologies, Inc. or its affiliates. + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks + * of Amazon Technologies, Inc. or its affiliates. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.amazon.carbonado.repo.map; + +import java.util.ConcurrentModificationException; +import java.util.Iterator; +import java.util.NoSuchElementException; + +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + +import com.amazon.carbonado.FetchException; +import com.amazon.carbonado.IsolationLevel; +import com.amazon.carbonado.Storable; + +import com.amazon.carbonado.cursor.AbstractCursor; + +import com.amazon.carbonado.spi.TransactionScope; + +/** + * Returns copies of Storables that it iterates over. + * + * @author Brian S O'Neill + */ +class MapCursor extends AbstractCursor { + private static final AtomicReferenceFieldUpdater cIteratorRef = + AtomicReferenceFieldUpdater.newUpdater + (MapCursor.class, Iterator.class, "mIterator"); + + private final MapStorage mStorage; + private final TransactionScope mScope; + private final MapTransaction mTxn; + private final boolean mIsForUpdate; + + private volatile Iterator mIterator; + + MapCursor(MapStorage storage, + TransactionScope scope, + Iterable iterable) + throws Exception + { + MapTransaction txn = scope.getTxn(); + + mStorage = storage; + mScope = scope; + mTxn = txn; + + if (txn == null) { + mStorage.mLock.lockForRead(scope); + mIsForUpdate = false; + } else { + // Since lock is so coarse, all reads in transaction scope are + // upgrade to avoid deadlocks. + txn.lockForUpgrade(mStorage.mLock, mIsForUpdate = scope.isForUpdate()); + } + + scope.register(storage.getStorableType(), this); + mIterator = iterable.iterator(); + } + + public void close() { + Iterator it = mIterator; + if (it != null) { + if (cIteratorRef.compareAndSet(this, it, null)) { + UpgradableLock lock = mStorage.mLock; + if (mTxn == null) { + lock.unlockFromRead(mScope); + } else { + mTxn.unlockFromUpgrade(lock, mIsForUpdate); + } + mScope.unregister(mStorage.getStorableType(), this); + } + } + } + + public boolean hasNext() throws FetchException { + Iterator it = mIterator; + try { + if (it != null && it.hasNext()) { + return true; + } else { + close(); + } + return false; + } catch (ConcurrentModificationException e) { + close(); + throw new FetchException(e); + } catch (Error e) { + try { + close(); + } catch (Error e2) { + // Ignore. + } + throw e; + } + } + + public S next() throws FetchException { + Iterator it = mIterator; + if (it == null) { + close(); + throw new NoSuchElementException(); + } + try { + S next = mStorage.copyAndFireLoadTrigger(it.next()); + if (!hasNext()) { + close(); + } + return next; + } catch (ConcurrentModificationException e) { + close(); + throw new FetchException(e); + } catch (Error e) { + try { + close(); + } catch (Error e2) { + // Ignore. + } + throw e; + } + } + + @Override + public int skipNext(int amount) throws FetchException { + if (amount <= 0) { + if (amount < 0) { + throw new IllegalArgumentException("Cannot skip negative amount: " + amount); + } + return 0; + } + + // Skip over entries without copying them. + + int count = 0; + Iterator it = mIterator; + + if (it != null) { + try { + while (--amount >= 0 && it.hasNext()) { + it.next(); + count++; + } + } catch (ConcurrentModificationException e) { + close(); + throw new FetchException(e); + } catch (Error e) { + try { + close(); + } catch (Error e2) { + // Ignore. + } + throw e; + } + } + + return count; + } +} diff --git a/src/main/java/com/amazon/carbonado/repo/map/MapRepository.java b/src/main/java/com/amazon/carbonado/repo/map/MapRepository.java new file mode 100644 index 0000000..2e82f89 --- /dev/null +++ b/src/main/java/com/amazon/carbonado/repo/map/MapRepository.java @@ -0,0 +1,125 @@ +/* + * Copyright 2008 Amazon Technologies, Inc. or its affiliates. + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks + * of Amazon Technologies, Inc. or its affiliates. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.amazon.carbonado.repo.map; + +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; + +import com.amazon.carbonado.Repository; +import com.amazon.carbonado.RepositoryException; +import com.amazon.carbonado.Storable; +import com.amazon.carbonado.Storage; +import com.amazon.carbonado.TriggerFactory; + +import com.amazon.carbonado.capability.IndexInfo; +import com.amazon.carbonado.capability.IndexInfoCapability; + +import com.amazon.carbonado.sequence.SequenceValueGenerator; +import com.amazon.carbonado.sequence.SequenceValueProducer; + +import com.amazon.carbonado.qe.RepositoryAccess; +import com.amazon.carbonado.qe.StorageAccess; + +import com.amazon.carbonado.spi.AbstractRepository; +import com.amazon.carbonado.spi.LobEngine; +import com.amazon.carbonado.spi.TransactionManager; +import com.amazon.carbonado.spi.TransactionScope; + +/** + * + * + * @author Brian S O'Neill + * @see MapRepositoryBuilder + */ +class MapRepository extends AbstractRepository + implements RepositoryAccess, IndexInfoCapability +{ + private final AtomicReference mRootRef; + private final boolean mIsMaster; + private final int mLockTimeout; + private final TimeUnit mLockTimeoutUnit; + + final Iterable mTriggerFactories; + private final MapTransactionManager mTxnManager; + private LobEngine mLobEngine; + + MapRepository(AtomicReference rootRef, MapRepositoryBuilder builder) { + super(builder.getName()); + mRootRef = rootRef; + mIsMaster = builder.isMaster(); + mLockTimeout = builder.getLockTimeout(); + mLockTimeoutUnit = builder.getLockTimeoutUnit(); + + mTriggerFactories = builder.getTriggerFactories(); + mTxnManager = new MapTransactionManager(mLockTimeout, mLockTimeoutUnit); + } + + public Repository getRootRepository() { + return mRootRef.get(); + } + + public StorageAccess storageAccessFor(Class type) + throws RepositoryException + { + return (StorageAccess) storageFor(type); + } + + public IndexInfo[] getIndexInfo(Class storableType) + throws RepositoryException + { + return ((MapStorage) storageFor(storableType)).getIndexInfo(); + } + + protected Log getLog() { + return null; + } + + protected TransactionManager transactionManager() { + return mTxnManager; + } + + protected TransactionScope localTransactionScope() { + return mTxnManager.localScope(); + } + + protected Storage createStorage(Class type) + throws RepositoryException + { + return new MapStorage(this, type, mLockTimeout, mLockTimeoutUnit); + } + + protected SequenceValueProducer createSequenceValueProducer(String name) + throws RepositoryException + { + return new SequenceValueGenerator(this, name); + } + + LobEngine getLobEngine() throws RepositoryException { + if (mLobEngine == null) { + mLobEngine = new LobEngine(this, getRootRepository()); + } + return mLobEngine; + } + + boolean isMaster() { + return mIsMaster; + } +} diff --git a/src/main/java/com/amazon/carbonado/repo/map/MapRepositoryBuilder.java b/src/main/java/com/amazon/carbonado/repo/map/MapRepositoryBuilder.java new file mode 100644 index 0000000..8d5d399 --- /dev/null +++ b/src/main/java/com/amazon/carbonado/repo/map/MapRepositoryBuilder.java @@ -0,0 +1,151 @@ +/* + * Copyright 2008 Amazon Technologies, Inc. or its affiliates. + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks + * of Amazon Technologies, Inc. or its affiliates. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.amazon.carbonado.repo.map; + +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.TimeUnit; + +import com.amazon.carbonado.Repository; +import com.amazon.carbonado.RepositoryException; + +import com.amazon.carbonado.repo.indexed.IndexedRepositoryBuilder; + +import com.amazon.carbonado.spi.AbstractRepositoryBuilder; + +/** + * Volatile repository implementation backed by a concurrent map. Locks used by + * repository are coarse, much like table locks. Loads and queries + * acquire read locks, and modifications acquire write locks. Within + * transactions, loads and queries always acquire upgradable locks, to reduce + * the likelihood of deadlock. + * + *

This repository supports transactions, which also may be + * nested. Supported isolation levels are read committed and serializable. Read + * uncommitted is promoted to read committed, and repeatable read is promoted + * to serializable. + * + *

+ * The following extra capabilities are supported: + *

    + *
  • {@link com.amazon.carbonado.capability.IndexInfoCapability IndexInfoCapability} + *
  • {@link com.amazon.carbonado.capability.ShutdownCapability ShutdownCapability} + *
  • {@link com.amazon.carbonado.sequence.SequenceCapability SequenceCapability} + *
+ * + *

Note: This repository uses concurrent navigable map classes, which became + * available in JDK1.6. + * + * @author Brian S O'Neill + * @since 1.2 + */ +public class MapRepositoryBuilder extends AbstractRepositoryBuilder { + /** + * Convenience method to build a new MapRepository. + */ + public static Repository newRepository() { + try { + MapRepositoryBuilder builder = new MapRepositoryBuilder(); + return builder.build(); + } catch (RepositoryException e) { + // Not expected. + throw new RuntimeException(e); + } + } + + private String mName = ""; + private boolean mIsMaster = true; + private boolean mIndexSupport = true; + private int mLockTimeout; + private TimeUnit mLockTimeoutUnit; + + public MapRepositoryBuilder() { + setLockTimeoutMillis(500); + } + + public Repository build(AtomicReference rootRef) throws RepositoryException { + if (mIndexSupport) { + // Temporarily set to false to avoid infinite recursion. + mIndexSupport = false; + try { + IndexedRepositoryBuilder ixBuilder = new IndexedRepositoryBuilder(); + ixBuilder.setWrappedRepository(this); + ixBuilder.setMaster(isMaster()); + ixBuilder.setAllClustered(true); + return ixBuilder.build(rootRef); + } finally { + mIndexSupport = true; + } + } + + assertReady(); + + Repository repo = new MapRepository(rootRef, this); + + rootRef.set(repo); + return repo; + } + + public String getName() { + return mName; + } + + public void setName(String name) { + mName = name; + } + + public boolean isMaster() { + return mIsMaster; + } + + public void setMaster(boolean b) { + mIsMaster = b; + } + + /** + * Set the lock timeout, in milliseconds. Default value is 500 milliseconds. + */ + public void setLockTimeoutMillis(int timeout) { + setLockTimeout(timeout, TimeUnit.MILLISECONDS); + } + + /** + * Set the lock timeout. Default value is 500 milliseconds. + */ + public void setLockTimeout(int timeout, TimeUnit unit) { + if (timeout < 0 || unit == null) { + throw new IllegalArgumentException(); + } + mLockTimeout = timeout; + mLockTimeoutUnit = unit; + } + + /** + * Returns the lock timeout. Call getLockTimeoutUnit to get the unit. + */ + public int getLockTimeout() { + return mLockTimeout; + } + + /** + * Returns the lock timeout unit. Call getLockTimeout to get the timeout. + */ + public TimeUnit getLockTimeoutUnit() { + return mLockTimeoutUnit; + } +} diff --git a/src/main/java/com/amazon/carbonado/repo/map/MapStorage.java b/src/main/java/com/amazon/carbonado/repo/map/MapStorage.java new file mode 100644 index 0000000..520d18c --- /dev/null +++ b/src/main/java/com/amazon/carbonado/repo/map/MapStorage.java @@ -0,0 +1,813 @@ +/* + * Copyright 2008 Amazon Technologies, Inc. or its affiliates. + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks + * of Amazon Technologies, Inc. or its affiliates. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.amazon.carbonado.repo.map; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.EnumSet; +import java.util.Iterator; +import java.util.List; +import java.util.NavigableMap; +import java.util.Set; + +import java.util.concurrent.ConcurrentNavigableMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.TimeUnit; + +import org.cojen.classfile.TypeDesc; + +import com.amazon.carbonado.Cursor; +import com.amazon.carbonado.FetchException; +import com.amazon.carbonado.FetchInterruptedException; +import com.amazon.carbonado.FetchTimeoutException; +import com.amazon.carbonado.IsolationLevel; +import com.amazon.carbonado.PersistException; +import com.amazon.carbonado.PersistInterruptedException; +import com.amazon.carbonado.PersistTimeoutException; +import com.amazon.carbonado.Query; +import com.amazon.carbonado.Repository; +import com.amazon.carbonado.RepositoryException; +import com.amazon.carbonado.Storable; +import com.amazon.carbonado.Storage; +import com.amazon.carbonado.SupportException; +import com.amazon.carbonado.Trigger; + +import com.amazon.carbonado.capability.IndexInfo; + +import com.amazon.carbonado.cursor.ArraySortBuffer; +import com.amazon.carbonado.cursor.EmptyCursor; +import com.amazon.carbonado.cursor.FilteredCursor; +import com.amazon.carbonado.cursor.SingletonCursor; +import com.amazon.carbonado.cursor.SortBuffer; +import com.amazon.carbonado.cursor.SortedCursor; + +import com.amazon.carbonado.filter.Filter; +import com.amazon.carbonado.filter.FilterValues; +import com.amazon.carbonado.filter.RelOp; + +import com.amazon.carbonado.sequence.SequenceValueProducer; + +import com.amazon.carbonado.gen.DelegateStorableGenerator; +import com.amazon.carbonado.gen.DelegateSupport; +import com.amazon.carbonado.gen.MasterFeature; + +import com.amazon.carbonado.util.QuickConstructorGenerator; + +import com.amazon.carbonado.filter.Filter; + +import com.amazon.carbonado.info.ChainedProperty; +import com.amazon.carbonado.info.Direction; +import com.amazon.carbonado.info.OrderedProperty; +import com.amazon.carbonado.info.StorableIndex; +import com.amazon.carbonado.info.StorableInfo; +import com.amazon.carbonado.info.StorableIntrospector; +import com.amazon.carbonado.info.StorableProperty; + +import com.amazon.carbonado.qe.BoundaryType; +import com.amazon.carbonado.qe.QueryExecutorFactory; +import com.amazon.carbonado.qe.QueryEngine; +import com.amazon.carbonado.qe.StorageAccess; + +import com.amazon.carbonado.spi.IndexInfoImpl; +import com.amazon.carbonado.spi.LobEngine; +import com.amazon.carbonado.spi.TransactionScope; +import com.amazon.carbonado.spi.TriggerManager; + +/** + * + * + * @author Brian S O'Neill + */ +class MapStorage + implements Storage, DelegateSupport, StorageAccess +{ + private static final int DEFAULT_LOB_BLOCK_SIZE = 1000; + private static final Object[] NO_VALUES = new Object[0]; + + private final MapRepository mRepo; + private final StorableInfo mInfo; + private final TriggerManager mTriggers; + private final InstanceFactory mInstanceFactory; + private final StorableIndex mPrimaryKeyIndex; + private final QueryEngine mQueryEngine; + + private final int mLockTimeout; + private final TimeUnit mLockTimeoutUnit; + + private final ConcurrentNavigableMap, S> mMap; + private final Comparator mFullComparator; + private final Comparator[] mSearchComparators; + + private final Key.Assigner mKeyAssigner; + + /** + * Simple lock which is reentrant for transactions, but auto-commit does not + * need to support reentrancy. Read lock requests in transactions can starve + * write lock requests, but auto-commit cannot cause starvation. In practice + * starvation is not possible since transactions always lock for upgrade. + */ + final UpgradableLock mLock = new UpgradableLock() { + @Override + protected boolean isReadLockHeld(Object locker) { + return locker instanceof MapTransaction; + } + }; + + MapStorage(MapRepository repo, Class type, int lockTimeout, TimeUnit lockTimeoutUnit) + throws SupportException + { + mRepo = repo; + mInfo = StorableIntrospector.examine(type); + mTriggers = new TriggerManager(); + + EnumSet features; + if (repo.isMaster()) { + features = EnumSet.of(MasterFeature.INSERT_CHECK_REQUIRED, + MasterFeature.VERSIONING, + MasterFeature.INSERT_SEQUENCES); + } else { + features = EnumSet.of(MasterFeature.INSERT_CHECK_REQUIRED); + } + + Class delegateStorableClass = + DelegateStorableGenerator.getDelegateClass(type, features); + + mInstanceFactory = QuickConstructorGenerator + .getInstance(delegateStorableClass, InstanceFactory.class); + + mPrimaryKeyIndex = + new StorableIndex(mInfo.getPrimaryKey(), Direction.ASCENDING).clustered(true); + + mQueryEngine = new QueryEngine(type, repo); + + mLockTimeout = lockTimeout; + mLockTimeoutUnit = lockTimeoutUnit; + + mMap = new ConcurrentSkipListMap, S>(); + List> propList = createPkPropList(); + mFullComparator = SortedCursor.createComparator(propList); + mSearchComparators = new Comparator[propList.size() + 1]; + mSearchComparators[propList.size()] = mFullComparator; + + mKeyAssigner = Key.getAssigner(type); + + try { + if (LobEngine.hasLobs(type)) { + Trigger lobTrigger = repo.getLobEngine() + .getSupportTrigger(type, DEFAULT_LOB_BLOCK_SIZE); + addTrigger(lobTrigger); + } + + // Don't install automatic triggers until we're completely ready. + mTriggers.addTriggers(type, repo.mTriggerFactories); + } catch (SupportException e) { + throw e; + } catch (RepositoryException e) { + throw new SupportException(e); + } + } + + public Class getStorableType() { + return mInfo.getStorableType(); + } + + public S prepare() { + return (S) mInstanceFactory.instantiate(this); + } + + public Query query() throws FetchException { + return mQueryEngine.query(); + } + + public Query query(String filter) throws FetchException { + return mQueryEngine.query(filter); + } + + public Query query(Filter filter) throws FetchException { + return mQueryEngine.query(filter); + } + + public void truncate() throws PersistException { + try { + Object locker = mRepo.localTransactionScope().getTxn(); + if (locker == null) { + locker = Thread.currentThread(); + } + doLockForWrite(locker); + try { + mMap.clear(); + } finally { + mLock.unlockFromWrite(locker); + } + } catch (PersistException e) { + throw e; + } catch (Exception e) { + throw new PersistException(e); + } + } + + public boolean addTrigger(Trigger trigger) { + return mTriggers.addTrigger(trigger); + } + + public boolean removeTrigger(Trigger trigger) { + return mTriggers.removeTrigger(trigger); + } + + public IndexInfo[] getIndexInfo() { + StorableIndex pkIndex = mPrimaryKeyIndex; + + if (pkIndex == null) { + return new IndexInfo[0]; + } + + int i = pkIndex.getPropertyCount(); + String[] propertyNames = new String[i]; + Direction[] directions = new Direction[i]; + while (--i >= 0) { + propertyNames[i] = pkIndex.getProperty(i).getName(); + directions[i] = pkIndex.getPropertyDirection(i); + } + + return new IndexInfo[] { + new IndexInfoImpl(getStorableType().getName(), true, true, propertyNames, directions) + }; + } + + public boolean doTryLoad(S storable) throws FetchException { + try { + TransactionScope scope = mRepo.localTransactionScope(); + MapTransaction txn = scope.getTxn(); + if (txn == null) { + doLockForRead(scope); + try { + return doTryLoadNoLock(storable); + } finally { + mLock.unlockFromRead(scope); + } + } else { + // Since lock is so coarse, all reads in transaction scope are + // upgrade to avoid deadlocks. + final boolean isForUpdate = scope.isForUpdate(); + txn.lockForUpgrade(mLock, isForUpdate); + try { + return doTryLoadNoLock(storable); + } finally { + txn.unlockFromUpgrade(mLock, isForUpdate); + } + } + } catch (FetchException e) { + throw e; + } catch (Exception e) { + throw new FetchException(e); + } + } + + // Caller must hold lock. + boolean doTryLoadNoLock(S storable) { + S existing = mMap.get(new Key(storable, mFullComparator)); + if (existing == null) { + return false; + } else { + storable.markAllPropertiesDirty(); + existing.copyAllProperties(storable); + storable.markAllPropertiesClean(); + return true; + } + } + + public boolean doTryInsert(S storable) throws PersistException { + try { + TransactionScope scope = mRepo.localTransactionScope(); + MapTransaction txn = scope.getTxn(); + if (txn == null) { + // No need to acquire full write lock since map is concurrent + // and existing storable (if any) is not being + // modified. Upgrade lock is required because a concurrent + // transaction might be in progress, and so insert should wait. + doLockForUpgrade(scope); + try { + return doTryInsertNoLock(storable); + } finally { + mLock.unlockFromUpgrade(scope); + } + } else { + txn.lockForWrite(mLock); + if (doTryInsertNoLock(storable)) { + txn.inserted(this, storable); + return true; + } else { + return false; + } + } + } catch (PersistException e) { + throw e; + } catch (FetchException e) { + throw e.toPersistException(); + } catch (Exception e) { + throw new PersistException(e); + } + } + + // Caller must hold upgrade or write lock. + private boolean doTryInsertNoLock(S storable) { + Key key = new Key(storable, mFullComparator); + S existing = mMap.get(key); + if (existing != null) { + return false; + } + // Create a fresh copy to ensure that custom fields are not saved. + S copy = (S) storable.prepare(); + storable.copyAllProperties(copy); + copy.markAllPropertiesClean(); + mMap.put(key, copy); + storable.markAllPropertiesClean(); + return true; + } + + public boolean doTryUpdate(S storable) throws PersistException { + try { + TransactionScope scope = mRepo.localTransactionScope(); + MapTransaction txn = scope.getTxn(); + if (txn == null) { + // Full write lock is required since existing storable is being + // modified. Readers cannot be allowed to see modifications + // until they are complete. In addtion, a concurrent + // transaction might be in progress, and so update should wait. + doLockForWrite(scope); + try { + return doTryUpdateNoLock(storable); + } finally { + mLock.unlockFromWrite(scope); + } + } else { + txn.lockForWrite(mLock); + S existing = mMap.get(new Key(storable, mFullComparator)); + if (existing == null) { + return false; + } else { + // Copy existing object to undo log. + txn.updated(this, (S) existing.copy()); + + // Copy altered values to existing object. + existing.markAllPropertiesDirty(); + storable.copyDirtyProperties(existing); + existing.markAllPropertiesClean(); + + // Copy all values to user object, to simulate a reload. + storable.markAllPropertiesDirty(); + existing.copyAllProperties(storable); + storable.markAllPropertiesClean(); + + return true; + } + } + } catch (PersistException e) { + throw e; + } catch (Exception e) { + throw new PersistException(e); + } + } + + // Caller must hold write lock. + private boolean doTryUpdateNoLock(S storable) { + S existing = mMap.get(new Key(storable, mFullComparator)); + if (existing == null) { + return false; + } else { + // Copy altered values to existing object. + existing.markAllPropertiesDirty(); + storable.copyDirtyProperties(existing); + existing.markAllPropertiesClean(); + + // Copy all values to user object, to simulate a reload. + storable.markAllPropertiesDirty(); + existing.copyAllProperties(storable); + storable.markAllPropertiesClean(); + + return true; + } + } + + public boolean doTryDelete(S storable) throws PersistException { + try { + TransactionScope scope = mRepo.localTransactionScope(); + MapTransaction txn = scope.getTxn(); + if (txn == null) { + // No need to acquire full write lock since map is concurrent + // and existing storable (if any) is not being + // modified. Upgrade lock is required because a concurrent + // transaction might be in progress, and so delete should wait. + doLockForUpgrade(scope); + try { + return doTryDeleteNoLock(storable); + } finally { + mLock.unlockFromUpgrade(scope); + } + } else { + txn.lockForWrite(mLock); + S existing = mMap.remove(new Key(storable, mFullComparator)); + if (existing == null) { + return false; + } else { + txn.deleted(this, existing); + return true; + } + } + } catch (PersistException e) { + throw e; + } catch (FetchException e) { + throw e.toPersistException(); + } catch (Exception e) { + throw new PersistException(e); + } + } + + // Caller must hold upgrade or write lock. + private boolean doTryDeleteNoLock(S storable) { + return mMap.remove(new Key(storable, mFullComparator)) != null; + } + + // Called by MapTransaction, which implicitly holds lock. + void mapPut(S storable) { + mMap.put(new Key(storable, mFullComparator), storable); + } + + // Called by MapTransaction, which implicitly holds lock. + void mapRemove(S storable) { + mMap.remove(new Key(storable, mFullComparator)); + } + + private void doLockForRead(Object locker) throws FetchException { + try { + if (!mLock.tryLockForRead(locker, mLockTimeout, mLockTimeoutUnit)) { + throw new FetchTimeoutException("" + mLockTimeout + ' ' + + mLockTimeoutUnit.toString().toLowerCase()); + } + } catch (InterruptedException e) { + throw new FetchInterruptedException(e); + } + } + + private void doLockForUpgrade(Object locker) throws FetchException { + try { + if (!mLock.tryLockForUpgrade(locker, mLockTimeout, mLockTimeoutUnit)) { + throw new FetchTimeoutException("" + mLockTimeout + ' ' + + mLockTimeoutUnit.toString().toLowerCase()); + } + } catch (InterruptedException e) { + throw new FetchInterruptedException(e); + } + } + + private void doLockForWrite(Object locker) throws PersistException { + try { + if (!mLock.tryLockForWrite(locker, mLockTimeout, mLockTimeoutUnit)) { + throw new PersistTimeoutException("" + mLockTimeout + ' ' + + mLockTimeoutUnit.toString().toLowerCase()); + } + } catch (InterruptedException e) { + throw new PersistInterruptedException(e); + } + } + + public Repository getRootRepository() { + return mRepo.getRootRepository(); + } + + public boolean isPropertySupported(String propertyName) { + return mInfo.getAllProperties().containsKey(propertyName); + } + + public Trigger getInsertTrigger() { + return mTriggers.getInsertTrigger(); + } + + public Trigger getUpdateTrigger() { + return mTriggers.getUpdateTrigger(); + } + + public Trigger getDeleteTrigger() { + return mTriggers.getDeleteTrigger(); + } + + public Trigger getLoadTrigger() { + return mTriggers.getLoadTrigger(); + } + + public void locallyDisableLoadTrigger() { + mTriggers.locallyDisableLoad(); + } + + public void locallyEnableLoadTrigger() { + mTriggers.locallyEnableLoad(); + } + + public SequenceValueProducer getSequenceValueProducer(String name) throws PersistException { + try { + return mRepo.getSequenceValueProducer(name); + } catch (RepositoryException e) { + throw e.toPersistException(); + } + } + + public QueryExecutorFactory getQueryExecutorFactory() { + return mQueryEngine; + } + + public Collection> getAllIndexes() { + return Collections.singletonList(mPrimaryKeyIndex); + } + + public Storage storageDelegate(StorableIndex index) { + // We're the grunt and don't delegate. + return null; + } + + public long countAll() throws FetchException { + try { + Object locker = mRepo.localTransactionScope().getTxn(); + if (locker == null) { + locker = Thread.currentThread(); + } + doLockForRead(locker); + try { + return mMap.size(); + } finally { + mLock.unlockFromRead(locker); + } + } catch (FetchException e) { + throw e; + } catch (Exception e) { + throw new FetchException(e); + } + } + + public Cursor fetchAll() throws FetchException { + try { + return new MapCursor(this, mRepo.localTransactionScope(), mMap.values()); + } catch (FetchException e) { + throw e; + } catch (Exception e) { + throw new FetchException(e); + } + } + + public Cursor fetchOne(StorableIndex index, Object[] identityValues) + throws FetchException + { + try { + S key = prepare(); + for (int i=0; i scope = mRepo.localTransactionScope(); + MapTransaction txn = scope.getTxn(); + if (txn == null) { + doLockForRead(scope); + try { + S value = mMap.get(new Key(key, mFullComparator)); + if (value == null) { + return EmptyCursor.the(); + } else { + return new SingletonCursor(copyAndFireLoadTrigger(value)); + } + } finally { + mLock.unlockFromRead(scope); + } + } else { + // Since lock is so coarse, all reads in transaction scope are + // upgrade to avoid deadlocks. + final boolean isForUpdate = scope.isForUpdate(); + txn.lockForUpgrade(mLock, isForUpdate); + try { + S value = mMap.get(new Key(key, mFullComparator)); + if (value == null) { + return EmptyCursor.the(); + } else { + return new SingletonCursor(copyAndFireLoadTrigger(value)); + } + } finally { + txn.unlockFromUpgrade(mLock, isForUpdate); + } + } + } catch (FetchException e) { + throw e; + } catch (Exception e) { + throw new FetchException(e); + } + } + + S copyAndFireLoadTrigger(S storable) throws FetchException { + storable = (S) storable.copy(); + Trigger trigger = getLoadTrigger(); + if (trigger != null) { + trigger.afterLoad(storable); + // In case trigger modified the properties, make sure they're still clean. + storable.markAllPropertiesClean(); + } + return storable; + } + + public Query indexEntryQuery(StorableIndex index) { + return null; + } + + public Cursor fetchFromIndexEntryQuery(StorableIndex index, Query indexEntryQuery) { + return null; + } + + public Cursor fetchSubset(StorableIndex index, + Object[] identityValues, + BoundaryType rangeStartBoundary, + Object rangeStartValue, + BoundaryType rangeEndBoundary, + Object rangeEndValue, + boolean reverseRange, + boolean reverseOrder) + throws FetchException + { + if (identityValues == null) { + identityValues = NO_VALUES; + } + + NavigableMap, S> map = mMap; + + int tieBreaker = 1; + if (reverseOrder) { + map = map.descendingMap(); + reverseRange = !reverseRange; + tieBreaker = -tieBreaker; + } + + if (reverseRange) { + BoundaryType t1 = rangeStartBoundary; + rangeStartBoundary = rangeEndBoundary; + rangeEndBoundary = t1; + Object t2 = rangeStartValue; + rangeStartValue = rangeEndValue; + rangeEndValue = t2; + } + + tail: { + Key startKey; + switch (rangeStartBoundary) { + case OPEN: default: + if (identityValues.length == 0) { + break tail; + } else { + // Tie breaker of -1 puts search key right before first actual + // match, thus forming an inclusive start match. + startKey = searchKey(-tieBreaker, identityValues); + } + break; + case INCLUSIVE: + // Tie breaker of -1 puts search key right before first actual + // match, thus forming an inclusive start match. + startKey = searchKey(-tieBreaker, identityValues, rangeStartValue); + break; + case EXCLUSIVE: + // Tie breaker of +1 puts search key right after first actual + // match, thus forming an exlusive start match. + startKey = searchKey(tieBreaker, identityValues, rangeStartValue); + break; + } + + map = map.tailMap(startKey, true); + } + + Cursor cursor; + try { + cursor = new MapCursor(this, mRepo.localTransactionScope(), map.values()); + } catch (FetchException e) { + throw e; + } catch (Exception e) { + throw new FetchException(e); + } + + // Use filter to stop cursor at desired ending position. + + // FIXME: Let query engine do this so that filter can be + // cached. Somehow indicate this at a high level so that query plan + // shows a filter. + Filter filter; + FilterValues filterValues; + + if (rangeEndBoundary == BoundaryType.OPEN) { + if (identityValues.length == 0) { + filter = null; + filterValues = null; + } else { + filter = Filter.getOpenFilter(getStorableType()); + for (int i=0; i> createPkPropList() { + return new ArrayList>(mInfo.getPrimaryKey().getProperties()); + } + + private Key searchKey(int tieBreaker, Object[] identityValues) { + S storable = prepare(); + mKeyAssigner.setKeyValues(storable, identityValues); + Comparator c = getSearchComparator(identityValues.length); + return new SearchKey(tieBreaker, storable, c); + } + + private Key searchKey(int tieBreaker, Object[] identityValues, Object rangeValue) { + S storable = prepare(); + mKeyAssigner.setKeyValues(storable, identityValues, rangeValue); + Comparator c = getSearchComparator(identityValues.length + 1); + return new SearchKey(tieBreaker, storable, c); + } + + private Comparator getSearchComparator(int propertyCount) { + Comparator comparator = mSearchComparators[propertyCount]; + if (comparator == null) { + List> propList = createPkPropList().subList(0, propertyCount); + if (propList.size() > 0) { + comparator = SortedCursor.createComparator(propList); + } else { + comparator = SortedCursor.createComparator(getStorableType()); + } + mSearchComparators[propertyCount] = comparator; + } + return comparator; + } + + public SortBuffer createSortBuffer() { + return new ArraySortBuffer(); + } + + public static interface InstanceFactory { + Storable instantiate(DelegateSupport support); + } + + private static class SearchKey extends Key { + private final int mTieBreaker; + + SearchKey(int tieBreaker, S storable, Comparator comparator) { + super(storable, comparator); + mTieBreaker = tieBreaker; + } + + @Override + protected int tieBreaker() { + return mTieBreaker; + } + + @Override + public String toString() { + return super.toString() + ", tieBreaker=" + mTieBreaker; + } + } +} diff --git a/src/main/java/com/amazon/carbonado/repo/map/MapTransaction.java b/src/main/java/com/amazon/carbonado/repo/map/MapTransaction.java new file mode 100644 index 0000000..d2cfe87 --- /dev/null +++ b/src/main/java/com/amazon/carbonado/repo/map/MapTransaction.java @@ -0,0 +1,240 @@ +/* + * Copyright 2008 Amazon Technologies, Inc. or its affiliates. + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks + * of Amazon Technologies, Inc. or its affiliates. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.amazon.carbonado.repo.map; + +import java.util.concurrent.TimeUnit; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import com.amazon.carbonado.FetchException; +import com.amazon.carbonado.FetchInterruptedException; +import com.amazon.carbonado.FetchTimeoutException; +import com.amazon.carbonado.IsolationLevel; +import com.amazon.carbonado.PersistException; +import com.amazon.carbonado.PersistInterruptedException; +import com.amazon.carbonado.PersistTimeoutException; +import com.amazon.carbonado.Storable; +import com.amazon.carbonado.Storage; + +/** + * + * + * @author Brian S O'Neill + */ +class MapTransaction { + private final MapTransaction mParent; + private final IsolationLevel mLevel; + private final Object mLocker; + private final int mLockTimeout; + private final TimeUnit mLockTimeoutUnit; + + private Set mUpgradeLocks; + private Set mWriteLocks; + + private List mUndoLog; + + MapTransaction(MapTransaction parent, IsolationLevel level, + int lockTimeout, TimeUnit lockTimeoutUnit) + { + mParent = parent; + mLevel = level; + mLocker = parent == null ? this : parent.mLocker; + mLockTimeout = lockTimeout; + mLockTimeoutUnit = lockTimeoutUnit; + } + + void lockForUpgrade(UpgradableLock lock, boolean isForUpdate) throws FetchException { + if (!isForUpdate && mLevel.isAtMost(IsolationLevel.READ_COMMITTED)) { + doLockForUpgrade(lock); + } else { + Set locks = mUpgradeLocks; + if (locks == null) { + mUpgradeLocks = locks = new HashSet(); + doLockForUpgrade(lock); + locks.add(lock); + } else if (!locks.contains(lock)) { + doLockForUpgrade(lock); + locks.add(lock); + } + } + } + + private void doLockForUpgrade(UpgradableLock lock) throws FetchException { + try { + if (!lock.tryLockForUpgrade(mLocker, mLockTimeout, mLockTimeoutUnit)) { + throw new FetchTimeoutException("" + mLockTimeout + ' ' + + mLockTimeoutUnit.toString().toLowerCase()); + } + } catch (InterruptedException e) { + throw new FetchInterruptedException(e); + } + } + + void unlockFromUpgrade(UpgradableLock lock, boolean isForUpdate) { + if (!isForUpdate && mLevel.isAtMost(IsolationLevel.READ_COMMITTED)) { + lock.unlockFromUpgrade(mLocker); + } + } + + void lockForWrite(UpgradableLock lock) throws PersistException { + Set locks = mWriteLocks; + if (locks == null) { + mWriteLocks = locks = new HashSet(); + doLockForWrite(lock); + locks.add(lock); + } else if (!locks.contains(lock)) { + doLockForWrite(lock); + locks.add(lock); + } + } + + private void doLockForWrite(UpgradableLock lock) throws PersistException { + try { + if (!lock.tryLockForWrite(mLocker, mLockTimeout, mLockTimeoutUnit)) { + throw new PersistTimeoutException("" + mLockTimeout + ' ' + + mLockTimeoutUnit.toString().toLowerCase()); + } + } catch (InterruptedException e) { + throw new PersistInterruptedException(e); + } + } + + /** + * Add to undo log. + */ + void inserted(final MapStorage storage, final S key) { + addToUndoLog(new Undoable() { + public void undo() { + storage.mapRemove(key); + } + }); + } + + /** + * Add to undo log. + */ + void updated(final MapStorage storage, final S old) { + addToUndoLog(new Undoable() { + public void undo() { + storage.mapPut(old); + } + }); + } + + /** + * Add to undo log. + */ + void deleted(final MapStorage storage, final S old) { + addToUndoLog(new Undoable() { + public void undo() { + storage.mapPut(old); + } + }); + } + + void commit() { + MapTransaction parent = mParent; + + if (parent == null) { + releaseLocks(); + return; + } + + // Pass undo log to parent. + if (parent.mUndoLog == null) { + parent.mUndoLog = mUndoLog; + } else if (mUndoLog != null) { + parent.mUndoLog.addAll(mUndoLog); + } + mUndoLog = null; + + // Pass write locks to parent or release if parent already has the lock. + { + Set locks = mWriteLocks; + if (locks != null) { + Set parentLocks = parent.mWriteLocks; + if (parentLocks == null) { + parent.mWriteLocks = locks; + } else { + for (UpgradableLock lock : locks) { + if (!parentLocks.add(lock)) { + lock.unlockFromWrite(mLocker); + } + } + } + mWriteLocks = null; + } + } + + // Upgrade locks can simply be released. + releaseUpgradeLocks(); + } + + void abort() { + List log = mUndoLog; + if (log != null) { + for (int i=log.size(); --i>=0; ) { + log.get(i).undo(); + } + } + mUndoLog = null; + + releaseLocks(); + } + + private void addToUndoLog(Undoable entry) { + List log = mUndoLog; + if (log == null) { + mUndoLog = log = new ArrayList(); + } + log.add(entry); + } + + private void releaseLocks() { + releaseWriteLocks(); + releaseUpgradeLocks(); + } + + private void releaseWriteLocks() { + Set locks = mWriteLocks; + if (locks != null) { + for (UpgradableLock lock : locks) { + lock.unlockFromWrite(mLocker); + } + mWriteLocks = null; + } + } + + private void releaseUpgradeLocks() { + Set locks = mUpgradeLocks; + if (locks != null) { + for (UpgradableLock lock : locks) { + lock.unlockFromUpgrade(mLocker); + } + mUpgradeLocks = null; + } + } + + private static interface Undoable { + void undo(); + } +} diff --git a/src/main/java/com/amazon/carbonado/repo/map/MapTransactionManager.java b/src/main/java/com/amazon/carbonado/repo/map/MapTransactionManager.java new file mode 100644 index 0000000..5598353 --- /dev/null +++ b/src/main/java/com/amazon/carbonado/repo/map/MapTransactionManager.java @@ -0,0 +1,98 @@ +/* + * Copyright 2008 Amazon Technologies, Inc. or its affiliates. + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks + * of Amazon Technologies, Inc. or its affiliates. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.amazon.carbonado.repo.map; + +import java.util.concurrent.TimeUnit; + +import com.amazon.carbonado.IsolationLevel; +import com.amazon.carbonado.PersistException; +import com.amazon.carbonado.Transaction; + +import com.amazon.carbonado.spi.TransactionManager; + +/** + * + * + * @author Brian S O'Neill + */ +class MapTransactionManager extends TransactionManager { + private final int mLockTimeout; + private final TimeUnit mLockTimeoutUnit; + + MapTransactionManager(int lockTimeout, TimeUnit lockTimeoutUnit) { + mLockTimeout = lockTimeout; + mLockTimeoutUnit = lockTimeoutUnit; + } + + protected IsolationLevel selectIsolationLevel(Transaction parent, IsolationLevel level) { + if (level == null) { + if (parent == null) { + return IsolationLevel.READ_COMMITTED; + } + return parent.getIsolationLevel(); + } + + switch (level) { + case NONE: + return IsolationLevel.NONE; + case READ_UNCOMMITTED: + case READ_COMMITTED: + return IsolationLevel.READ_COMMITTED; + case REPEATABLE_READ: + case SERIALIZABLE: + return IsolationLevel.SERIALIZABLE; + default: + // Not supported. + return null; + } + } + + protected boolean supportsForUpdate() { + return true; + } + + protected MapTransaction createTxn(MapTransaction parent, IsolationLevel level) + throws Exception + { + if (level == IsolationLevel.NONE) { + return null; + } + return new MapTransaction(parent, level, mLockTimeout, mLockTimeoutUnit); + } + + @Override + protected MapTransaction createTxn(MapTransaction parent, IsolationLevel level, + int timeout, TimeUnit unit) + throws Exception + { + if (level == IsolationLevel.NONE) { + return null; + } + return new MapTransaction(parent, level, timeout, unit); + } + + protected boolean commitTxn(MapTransaction txn) throws PersistException { + txn.commit(); + return false; + } + + protected void abortTxn(MapTransaction txn) throws PersistException { + txn.abort(); + } +} diff --git a/src/main/java/com/amazon/carbonado/repo/map/UpgradableLock.java b/src/main/java/com/amazon/carbonado/repo/map/UpgradableLock.java new file mode 100644 index 0000000..9862791 --- /dev/null +++ b/src/main/java/com/amazon/carbonado/repo/map/UpgradableLock.java @@ -0,0 +1,1161 @@ +/* + * Copyright 2007 Amazon Technologies, Inc. or its affiliates. + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks + * of Amazon Technologies, Inc. or its affiliates. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.amazon.carbonado.repo.map; + +import java.util.concurrent.TimeUnit; + +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + +import java.util.concurrent.locks.LockSupport; + +/** + * Partially reentrant, upgradable read/write lock. Up to 1,073,741,824 read + * locks can be held. Upgraders and writers may re-enter the lock up to + * 2,147,483,648 times. Attempts by readers to re-enter the lock is not + * detected and is deadlock prone, unless locker already holds an upgrade or + * write lock. Subclasses can support full reentrancy by overriding the + * protected read lock adjust and hold check methods. + * + *

This lock implementation differs from the usual Java lock with respect to + * lock ownership. Locks are not owned by threads, but by arbitrary locker + * objects. A thread which attempts to acquire an upgrade or write lock twice + * with different locker objects will deadlock on the second attempt. + * + *

As is typical of read/write lock implementations, a read lock blocks + * waiting writers, but it doesn't block other readers. A write lock is + * exclusive and can be held by at most one locker. Attempting to acquire a + * write lock while a read lock is held by the same locker is inherently + * deadlock prone, and some read/write lock implementations will always + * deadlock. + * + *

An upgrade lock allows a read lock to be safely upgraded to a write + * lock. Instead of acquiring a read lock, an upgrade lock is acquired. This + * acts like a shared read lock in that readers are not blocked, but it also + * acts like an exclusive write lock -- writers and upgraders are blocked. With + * an upgrade lock held, the locker may acquire a write lock without deadlock. + * + *

+ * Locks held    Locks safely           Locks acquirable
+ * by owner      acquirable by owner    by other lockers
+ * --------------------------------------------------
+ *   - - -         R U W                  R U W
+ *   R - -         - - -                  R U -
+ *   R U -         - U -                  R - -
+ *   - U -         R U W                  R - -
+ *   - U W         R U W                  - - -
+ *   R U W         R U W                  - - -
+ *   R - W         R U W                  - - -
+ *   - - W         R U W                  - - -
+ * 
+ * + * @author Brian S O'Neill + * @param Locker type + */ +class UpgradableLock { + // Design note: This class borrows heavily from AbstractQueuedSynchronizer. + // Consult that class for understanding the locking mechanics. + + private static enum Result { + /** Lock acquisition failed */ + FAILED, + /** Lock has just been acquired by locker and can be safely released later */ + ACQUIRED, + /** Lock is already owned by locker and should not be released more than once */ + OWNED + } + + private static final AtomicReferenceFieldUpdater cRWHeadRef = + AtomicReferenceFieldUpdater.newUpdater + (UpgradableLock.class, Node.class, "mRWHead"); + + private static final AtomicReferenceFieldUpdater cRWTailRef = + AtomicReferenceFieldUpdater.newUpdater + (UpgradableLock.class, Node.class, "mRWTail"); + + private static final AtomicReferenceFieldUpdater cUHeadRef = + AtomicReferenceFieldUpdater.newUpdater + (UpgradableLock.class, Node.class, "mUHead"); + + private static final AtomicReferenceFieldUpdater cUTailRef = + AtomicReferenceFieldUpdater.newUpdater + (UpgradableLock.class, Node.class, "mUTail"); + + private static final AtomicIntegerFieldUpdater cStateRef = + AtomicIntegerFieldUpdater.newUpdater + (UpgradableLock.class, "mState"); + + // State mask bits for held locks. Read lock count is stored in lower 30 bits of state. + private static final int LOCK_STATE_UPGRADE = 0x40000000; + // Write state must be this value in order for quick sign check to work. + private static final int LOCK_STATE_WRITE = 0x80000000; + + private static final int LOCK_STATE_MASK = LOCK_STATE_UPGRADE | LOCK_STATE_WRITE; + + /** + * The number of nanoseconds for which it is faster to spin + * rather than to use timed park. A rough estimate suffices + * to improve responsiveness with very short timeouts. + */ + private static final long SPIN_FOR_TIMEOUT_THRESHOLD = 1000L; + + // Head of read-write queue. + private transient volatile Node mRWHead; + + // Tail of read-write queue. + private transient volatile Node mRWTail; + + // Head of write upgrade queue. + private transient volatile Node mUHead; + + // Tail of write upgrade queue. + private transient volatile Node mUTail; + + private transient volatile int mState; + + // Owner holds an upgradable lock and possibly a write lock too. + private transient L mOwner; + + // Counts number of times that owner has entered an upgradable or write lock. + private transient int mUpgradeCount; + private transient int mWriteCount; + + public UpgradableLock() { + } + + /** + * Acquire a shared read lock, possibly blocking indefinitely. + * + * @param locker object which might be write or upgrade lock owner + */ + public final void lockForRead(L locker) { + if (!tryLockForRead(locker)) { + lockForReadQueued(locker, addReadWaiter()); + } + } + + /** + * Acquire a shared read lock, possibly blocking until interrupted. + * + * @param locker object which might be write or upgrade lock owner + */ + public final void lockForReadInterruptibly(L locker) throws InterruptedException { + if (Thread.interrupted()) { + throw new InterruptedException(); + } + if (!tryLockForRead(locker)) { + lockForReadQueuedInterruptibly(locker, addReadWaiter()); + } + } + + /** + * Attempt to immediately acquire a shared read lock. + * + * @param locker object which might be write or upgrade lock owner + * @return true if acquired + */ + public final boolean tryLockForRead(L locker) { + int state = mState; + if (state >= 0) { // no write lock is held + if (isReadWriteFirst() || isReadLockHeld(locker)) { + do { + if (incrementReadLocks(state)) { + adjustReadLockCount(locker, 1); + return true; + } + // keep looping on CAS failure if a reader or upgrader mucked with the state + } while ((state = mState) >= 0); + } + } else if (mOwner == locker) { + // keep looping on CAS failure if a reader or upgrader mucked with the state + while (!incrementReadLocks(state)) { + state = mState; + } + adjustReadLockCount(locker, 1); + return true; + } + return false; + } + + /** + * Attempt to acquire a shared read lock, waiting a maximum amount of + * time. + * + * @param locker object which might be write or upgrade lock owner + * @return true if acquired + */ + public final boolean tryLockForRead(L locker, long timeout, TimeUnit unit) + throws InterruptedException + { + if (Thread.interrupted()) { + throw new InterruptedException(); + } + if (!tryLockForRead(locker)) { + return lockForReadQueuedInterruptibly(locker, addReadWaiter(), unit.toNanos(timeout)); + } + return true; + } + + /** + * Release a previously acquired read lock. + */ + public final void unlockFromRead(L locker) { + adjustReadLockCount(locker, -1); + + int readLocks; + while ((readLocks = decrementReadLocks(mState)) < 0) {} + + if (readLocks == 0) { + Node h = mRWHead; + if (h != null && h.mWaitStatus != 0) { + unparkReadWriteSuccessor(h); + } + } + } + + /** + * Acquire an upgrade lock, possibly blocking indefinitely. + * + * @param locker object trying to become lock owner + * @return true if acquired + */ + public final boolean lockForUpgrade(L locker) { + return lockForUpgrade_(locker) != Result.FAILED; + } + + /** + * Acquire an upgrade lock, possibly blocking indefinitely. + * + * @param locker object trying to become lock owner + * @return ACQUIRED or OWNED + */ + private final Result lockForUpgrade_(L locker) { + Result result; + if ((result = tryLockForUpgrade_(locker)) == Result.FAILED) { + result = lockForUpgradeQueued(locker, addUpgradeWaiter()); + } + return result; + } + + /** + * Acquire an upgrade lock, possibly blocking until interrupted. + * + * @param locker object trying to become lock owner + * @return true if acquired + */ + public final boolean lockForUpgradeInterruptibly(L locker) throws InterruptedException { + return lockForUpgradeInterruptibly_(locker) != Result.FAILED; + } + + /** + * Acquire an upgrade lock, possibly blocking until interrupted. + * + * @param locker object trying to become lock owner + * @return ACQUIRED or OWNED + */ + private final Result lockForUpgradeInterruptibly_(L locker) throws InterruptedException { + if (Thread.interrupted()) { + throw new InterruptedException(); + } + Result result; + if ((result = tryLockForUpgrade_(locker)) == Result.FAILED) { + result = lockForUpgradeQueuedInterruptibly(locker, addUpgradeWaiter()); + } + return result; + } + + /** + * Attempt to immediately acquire an upgrade lock. + * + * @param locker object trying to become lock owner + * @return true if acquired + */ + public final boolean tryLockForUpgrade(L locker) { + return tryLockForUpgrade_(locker) != Result.FAILED; + } + + /** + * Attempt to immediately acquire an upgrade lock. + * + * @param locker object trying to become lock owner + * @return FAILED, ACQUIRED or OWNED + */ + private final Result tryLockForUpgrade_(L locker) { + int state = mState; + if ((state & LOCK_STATE_MASK) == 0) { // no write or upgrade lock is held + if (isUpgradeFirst()) { + do { + if (setUpgradeLock(state)) { + mOwner = locker; + incrementUpgradeCount(); + return Result.ACQUIRED; + } + // keep looping on CAS failure if a reader mucked with the state + } while (((state = mState) & LOCK_STATE_MASK) == 0); + } + } else if (mOwner == locker) { + incrementUpgradeCount(); + return Result.OWNED; + } + return Result.FAILED; + } + + /** + * Attempt to acquire an upgrade lock, waiting a maximum amount of time. + * + * @param locker object trying to become lock owner + * @return true if acquired + */ + public final boolean tryLockForUpgrade(L locker, long timeout, TimeUnit unit) + throws InterruptedException + { + return tryLockForUpgrade_(locker, timeout, unit) != Result.FAILED; + } + + /** + * Attempt to acquire an upgrade lock, waiting a maximum amount of time. + * + * @param locker object trying to become lock owner + * @return FAILED, ACQUIRED or OWNED + */ + private final Result tryLockForUpgrade_(L locker, long timeout, TimeUnit unit) + throws InterruptedException + { + if (Thread.interrupted()) { + throw new InterruptedException(); + } + Result result; + if ((result = tryLockForUpgrade_(locker)) == Result.FAILED) { + result = lockForUpgradeQueuedInterruptibly(locker, addUpgradeWaiter(), + unit.toNanos(timeout)); + } + return result; + } + + /** + * Release a previously acquired upgrade lock. + */ + public final void unlockFromUpgrade(L locker) { + int upgradeCount = mUpgradeCount - 1; + if (upgradeCount < 0) { + throw new IllegalMonitorStateException("Too many upgrade locks released"); + } + if (upgradeCount == 0 && mWriteCount > 0) { + // Don't release last upgrade lock and switch write lock to + // automatic upgrade mode. + clearUpgradeLock(mState); + return; + } + mUpgradeCount = upgradeCount; + if (upgradeCount > 0) { + return; + } + + mOwner = null; + + // keep looping on CAS failure if reader mucked with state + while (!clearUpgradeLock(mState)) {} + + Node h = mUHead; + if (h != null && h.mWaitStatus != 0) { + unparkUpgradeSuccessor(h); + } + } + + /** + * Acquire an exclusive write lock, possibly blocking indefinitely. + * + * @param locker object trying to become lock owner + */ + public final void lockForWrite(L locker) { + Result writeResult; + if (!tryLockForWrite(locker)) { + Result upgradeResult = lockForUpgrade_(locker); + if (!tryLockForWrite(locker)) { + lockForWriteQueued(locker, addWriteWaiter()); + } + if (upgradeResult == Result.ACQUIRED) { + // clear upgrade state bit to indicate automatic upgrade + while (!clearUpgradeLock(mState)) {} + } else { + // undo automatic upgrade count increment + mUpgradeCount--; + } + } + } + + /** + * Acquire an exclusive write lock, possibly blocking until interrupted. + * + * @param locker object trying to become lock owner + */ + public final void lockForWriteInterruptibly(L locker) throws InterruptedException { + if (Thread.interrupted()) { + throw new InterruptedException(); + } + if (!tryLockForWrite(locker)) { + Result upgradeResult = lockForUpgradeInterruptibly_(locker); + if (!tryLockForWrite(locker)) { + lockForWriteQueuedInterruptibly(locker, addWriteWaiter()); + } + if (upgradeResult == Result.ACQUIRED) { + // clear upgrade state bit to indicate automatic upgrade + while (!clearUpgradeLock(mState)) {} + } else { + // undo automatic upgrade count increment + mUpgradeCount--; + } + } + } + + /** + * Attempt to immediately acquire an exclusive lock. + * + * @param locker object trying to become lock owner + * @return true if acquired + */ + public final boolean tryLockForWrite(L locker) { + int state = mState; + if (state == 0) { + // no locks are held + if (isUpgradeOrReadWriteFirst() && setWriteLock(state)) { + // keep upgrade state bit clear to indicate automatic upgrade + mOwner = locker; + incrementUpgradeCount(); + incrementWriteCount(); + return true; + } + } else if (state == LOCK_STATE_UPGRADE) { + // only upgrade lock is held; upgrade to full write lock + if (mOwner == locker && setWriteLock(state)) { + incrementWriteCount(); + return true; + } + } else if (state < 0) { + // write lock is held, and upgrade lock might be held too + if (mOwner == locker) { + incrementWriteCount(); + return true; + } + } + return false; + } + + /** + * Attempt to acquire an exclusive lock, waiting a maximum amount of time. + * + * @param locker object trying to become lock owner + * @return true if acquired + */ + public final boolean tryLockForWrite(L locker, long timeout, TimeUnit unit) + throws InterruptedException + { + if (Thread.interrupted()) { + throw new InterruptedException(); + } + if (!tryLockForWrite(locker)) { + return lockForWriteQueuedInterruptibly(locker, addWriteWaiter(), + unit.toNanos(timeout)); + } + return true; + } + + /** + * Release a previously acquired write lock. + */ + public final void unlockFromWrite(L locker) { + int writeCount = mWriteCount - 1; + if (writeCount < 0) { + throw new IllegalMonitorStateException("Too many write locks released"); + } + mWriteCount = writeCount; + if (writeCount > 0) { + return; + } + + // copy original state to check if upgrade lock was automatic + final int state = mState; + + // make sure upgrade lock is still held after releasing write lock + mState = LOCK_STATE_UPGRADE; + + Node h = mRWHead; + if (h != null && h.mWaitStatus != 0) { + unparkReadWriteSuccessor(h); + } + + if (state == LOCK_STATE_WRITE) { + // upgrade owner was automatically set, so automatically clear it + unlockFromUpgrade(locker); + } + } + + public String toString() { + int state = mState; + int readLocks = state & ~LOCK_STATE_MASK; + int upgradeLocks = mUpgradeCount; + int writeLocks = mWriteCount; + + return super.toString() + + "[Read locks = " + readLocks + + ", Upgrade locks = " + upgradeLocks + + ", Write locks = " + writeLocks + + ", Owner = " + mOwner + + ']'; + } + + /** + * Add or subtract to the count of read locks held for the given + * locker. Default implementation does nothing, and so read locks are not + * reentrant. + * + * @throws IllegalMonitorStateException if count overflows or underflows + */ + protected void adjustReadLockCount(L locker, int amount) { + } + + /** + * Default implementation does nothing and always returns false, and so + * read locks are not reentrant. Overridden implementation may choose to + * always returns true, in which case read lock requests can starve upgrade + * and write lock requests. + */ + protected boolean isReadLockHeld(L locker) { + return false; + } + + private Node enqForReadWrite(final Node node) { + for (;;) { + Node t = mRWTail; + if (t == null) { // Must initialize + Node h = new Node(); // Dummy header + h.mNext = node; + node.mPrev = h; + if (cRWHeadRef.compareAndSet(this, null, h)) { + mRWTail = node; + return h; + } + } else { + node.mPrev = t; + if (cRWTailRef.compareAndSet(this, t, node)) { + t.mNext = node; + return t; + } + } + } + } + + private Node enqForUpgrade(final Node node) { + for (;;) { + Node t = mUTail; + if (t == null) { // Must initialize + Node h = new Node(); // Dummy header + h.mNext = node; + node.mPrev = h; + if (cUHeadRef.compareAndSet(this, null, h)) { + mUTail = node; + return h; + } + } else { + node.mPrev = t; + if (cUTailRef.compareAndSet(this, t, node)) { + t.mNext = node; + return t; + } + } + } + } + + private Node addReadWaiter() { + return addReadWriteWaiter(true); + } + + private Node addWriteWaiter() { + return addReadWriteWaiter(false); + } + + private Node addReadWriteWaiter(boolean shared) { + Node node = new Node(Thread.currentThread(), shared); + // Try the fast path of enq; backup to full enq on failure + Node pred = mRWTail; + if (pred != null) { + node.mPrev = pred; + if (cRWTailRef.compareAndSet(this, pred, node)) { + pred.mNext = node; + return node; + } + } + enqForReadWrite(node); + return node; + } + + private Node addUpgradeWaiter() { + Node node = new Node(Thread.currentThread(), false); + // Try the fast path of enq; backup to full enq on failure + Node pred = mUTail; + if (pred != null) { + node.mPrev = pred; + if (cUTailRef.compareAndSet(this, pred, node)) { + pred.mNext = node; + return node; + } + } + enqForUpgrade(node); + return node; + } + + private void setReadWriteHead(Node node) { + mRWHead = node; + node.mThread = null; + node.mPrev = null; + } + + private void setUpgradeHead(Node node) { + mUHead = node; + node.mThread = null; + node.mPrev = null; + } + + private void unparkReadWriteSuccessor(Node node) { + Node.cWaitStatusRef.compareAndSet(node, Node.SIGNAL, 0); + + Node s = node.mNext; + if (s == null || s.mWaitStatus > 0) { + s = null; + for (Node t = mRWTail; t != null && t != node; t = t.mPrev) { + if (t.mWaitStatus <= 0) { + s = t; + } + } + } + if (s != null) { + LockSupport.unpark(s.mThread); + } + } + + private void unparkUpgradeSuccessor(Node node) { + Node.cWaitStatusRef.compareAndSet(node, Node.SIGNAL, 0); + + Node s = node.mNext; + if (s == null || s.mWaitStatus > 0) { + s = null; + for (Node t = mUTail; t != null && t != node; t = t.mPrev) { + if (t.mWaitStatus <= 0) { + s = t; + } + } + } + if (s != null) { + LockSupport.unpark(s.mThread); + } + } + + private void setReadWriteHeadAndPropagate(Node node) { + setReadWriteHead(node); + if (node.mWaitStatus != 0) { + Node s = node.mNext; + if (s == null || s.mShared) { + unparkReadWriteSuccessor(node); + } + } + } + + private void cancelAcquireReadWrite(Node node) { + if (node != null) { + node.mThread = null; + node.mWaitStatus = Node.CANCELLED; + unparkReadWriteSuccessor(node); + } + } + + private void cancelAcquireUpgrade(Node node) { + if (node != null) { + node.mThread = null; + node.mWaitStatus = Node.CANCELLED; + unparkUpgradeSuccessor(node); + } + } + + private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { + int s = pred.mWaitStatus; + if (s < 0) { + return true; + } + if (s > 0) { + node.mPrev = pred.mPrev; + } else { + Node.cWaitStatusRef.compareAndSet(pred, 0, Node.SIGNAL); + } + return false; + } + + private static void selfInterrupt() { + Thread.currentThread().interrupt(); + } + + private final boolean parkAndCheckInterrupt() { + LockSupport.park(this); + return Thread.interrupted(); + } + + /** + * @return ACQUIRED or OWNED + */ + private final Result lockForUpgradeQueued(L locker, final Node node) { + try { + boolean interrupted = false; + for (;;) { + final Node p = node.predecessor(); + Result result; + if (p == mUHead && (result = tryLockForUpgrade_(locker)) != Result.FAILED) { + setUpgradeHead(node); + p.mNext = null; // help GC + if (interrupted) { + selfInterrupt(); + } + return result; + } + if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) { + interrupted = true; + } + } + } catch (RuntimeException e) { + cancelAcquireUpgrade(node); + throw e; + } + } + + /** + * @return ACQUIRED or OWNED + */ + private final Result lockForUpgradeQueuedInterruptibly(L locker, final Node node) + throws InterruptedException + { + try { + for (;;) { + final Node p = node.predecessor(); + Result result; + if (p == mUHead && (result = tryLockForUpgrade_(locker)) != Result.FAILED) { + setUpgradeHead(node); + p.mNext = null; // help GC + return result; + } + if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) { + break; + } + } + } catch (RuntimeException e) { + cancelAcquireUpgrade(node); + throw e; + } + // Arrive here only if interrupted + cancelAcquireUpgrade(node); + throw new InterruptedException(); + } + + /** + * @return FAILED, ACQUIRED or OWNED + */ + private final Result lockForUpgradeQueuedInterruptibly(L locker, final Node node, + long nanosTimeout) + throws InterruptedException + { + long lastTime = System.nanoTime(); + try { + for (;;) { + final Node p = node.predecessor(); + Result result; + if (p == mUHead && (result = tryLockForUpgrade_(locker)) != Result.FAILED) { + setUpgradeHead(node); + p.mNext = null; // help GC + return result; + } + if (nanosTimeout <= 0) { + cancelAcquireUpgrade(node); + return Result.FAILED; + } + if (nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD + && shouldParkAfterFailedAcquire(p, node)) + { + LockSupport.parkNanos(this, nanosTimeout); + } + long now = System.nanoTime(); + nanosTimeout -= now - lastTime; + lastTime = now; + if (Thread.interrupted()) { + break; + } + } + } catch (RuntimeException e) { + cancelAcquireUpgrade(node); + throw e; + } + // Arrive here only if interrupted + cancelAcquireUpgrade(node); + throw new InterruptedException(); + } + + private final void lockForReadQueued(L locker, final Node node) { + try { + boolean interrupted = false; + for (;;) { + final Node p = node.predecessor(); + if (p == mRWHead && tryLockForRead(locker)) { + setReadWriteHeadAndPropagate(node); + p.mNext = null; // help GC + if (interrupted) { + selfInterrupt(); + } + return; + } + if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) { + interrupted = true; + } + } + } catch (RuntimeException e) { + cancelAcquireReadWrite(node); + throw e; + } + } + + private final void lockForReadQueuedInterruptibly(L locker, final Node node) + throws InterruptedException + { + try { + for (;;) { + final Node p = node.predecessor(); + if (p == mRWHead && tryLockForRead(locker)) { + setReadWriteHeadAndPropagate(node); + p.mNext = null; // help GC + return; + } + if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) { + break; + } + } + } catch (RuntimeException e) { + cancelAcquireReadWrite(node); + throw e; + } + // Arrive here only if interrupted + cancelAcquireReadWrite(node); + throw new InterruptedException(); + } + + /** + * @return true if acquired + */ + private final boolean lockForReadQueuedInterruptibly(L locker, final Node node, + long nanosTimeout) + throws InterruptedException + { + long lastTime = System.nanoTime(); + try { + for (;;) { + final Node p = node.predecessor(); + if (p == mRWHead && tryLockForRead(locker)) { + setReadWriteHeadAndPropagate(node); + p.mNext = null; // help GC + return true; + } + if (nanosTimeout <= 0) { + cancelAcquireReadWrite(node); + return false; + } + if (nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD + && shouldParkAfterFailedAcquire(p, node)) + { + LockSupport.parkNanos(this, nanosTimeout); + } + long now = System.nanoTime(); + nanosTimeout -= now - lastTime; + lastTime = now; + if (Thread.interrupted()) { + break; + } + } + } catch (RuntimeException e) { + cancelAcquireReadWrite(node); + throw e; + } + // Arrive here only if interrupted + cancelAcquireReadWrite(node); + throw new InterruptedException(); + } + + private final void lockForWriteQueued(L locker, final Node node) { + try { + boolean interrupted = false; + for (;;) { + final Node p = node.predecessor(); + if (p == mRWHead && tryLockForWrite(locker)) { + setReadWriteHead(node); + p.mNext = null; // help GC + if (interrupted) { + selfInterrupt(); + } + return; + } + if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) { + interrupted = true; + } + } + } catch (RuntimeException e) { + cancelAcquireReadWrite(node); + throw e; + } + } + + private final void lockForWriteQueuedInterruptibly(L locker, final Node node) + throws InterruptedException + { + try { + for (;;) { + final Node p = node.predecessor(); + if (p == mRWHead && tryLockForWrite(locker)) { + setReadWriteHead(node); + p.mNext = null; // help GC + return; + } + if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) { + break; + } + } + } catch (RuntimeException e) { + cancelAcquireReadWrite(node); + throw e; + } + // Arrive here only if interrupted + cancelAcquireReadWrite(node); + throw new InterruptedException(); + } + + /** + * @return true if acquired + */ + private final boolean lockForWriteQueuedInterruptibly(L locker, final Node node, + long nanosTimeout) + throws InterruptedException + { + long lastTime = System.nanoTime(); + try { + for (;;) { + final Node p = node.predecessor(); + if (p == mRWHead && tryLockForWrite(locker)) { + setReadWriteHead(node); + p.mNext = null; // help GC + return true; + } + if (nanosTimeout <= 0) { + cancelAcquireReadWrite(node); + return false; + } + if (nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD + && shouldParkAfterFailedAcquire(p, node)) + { + LockSupport.parkNanos(this, nanosTimeout); + } + long now = System.nanoTime(); + nanosTimeout -= now - lastTime; + lastTime = now; + if (Thread.interrupted()) { + break; + } + } + } catch (RuntimeException e) { + cancelAcquireReadWrite(node); + throw e; + } + // Arrive here only if interrupted + cancelAcquireReadWrite(node); + throw new InterruptedException(); + } + + private final boolean isReadWriteFirst() { + Node h; + if ((h = mRWHead) == null) { + return true; + } + Thread current = Thread.currentThread(); + Node s; + return ((s = h.mNext) != null && s.mThread == current) || fullIsReadWriteFirst(current); + } + + private final boolean fullIsReadWriteFirst(Thread current) { + Node h, s; + Thread firstThread = null; + if (((h = mRWHead) != null && (s = h.mNext) != null && + s.mPrev == mRWHead && (firstThread = s.mThread) != null)) + { + return firstThread == current; + } + Node t = mRWTail; + while (t != null && t != mRWHead) { + Thread tt = t.mThread; + if (tt != null) { + firstThread = tt; + } + t = t.mPrev; + } + return firstThread == current || firstThread == null; + } + + private final boolean isUpgradeFirst() { + Node h; + if ((h = mUHead) == null) { + return true; + } + Thread current = Thread.currentThread(); + Node s; + return ((s = h.mNext) != null && s.mThread == current) || fullIsUpgradeFirst(current); + } + + private final boolean fullIsUpgradeFirst(Thread current) { + Node h, s; + Thread firstThread = null; + if (((h = mUHead) != null && (s = h.mNext) != null && + s.mPrev == mUHead && (firstThread = s.mThread) != null)) + { + return firstThread == current; + } + Node t = mUTail; + while (t != null && t != mUHead) { + Thread tt = t.mThread; + if (tt != null) { + firstThread = tt; + } + t = t.mPrev; + } + return firstThread == current || firstThread == null; + } + + private final boolean isUpgradeOrReadWriteFirst() { + Node uh, rwh; + if ((uh = mUHead) == null || (rwh = mRWHead) == null) { + return true; + } + Thread current = Thread.currentThread(); + Node us, rws; + return ((us = uh.mNext) != null && us.mThread == current) + || ((rws = rwh.mNext) != null && rws.mThread == current) + || fullIsUpgradeFirst(current) + || fullIsReadWriteFirst(current); + } + + /** + * @return false if state changed + */ + private boolean incrementReadLocks(int state) { + int readLocks = (state & ~LOCK_STATE_MASK) + 1; + if (readLocks == LOCK_STATE_MASK) { + throw new IllegalMonitorStateException("Maximum read lock count exceeded"); + } + return cStateRef.compareAndSet(this, state, state & LOCK_STATE_MASK | readLocks); + } + + /** + * @return number of remaining read locks or negative if concurrent + * modification prevented operation + */ + private int decrementReadLocks(int state) { + int readLocks = (state & ~LOCK_STATE_MASK) - 1; + if (readLocks < 0) { + throw new IllegalMonitorStateException("Too many read locks released"); + } + if (cStateRef.compareAndSet(this, state, state & LOCK_STATE_MASK | readLocks)) { + return readLocks; + } + return -1; + } + + /** + * @return false if concurrent modification prevented operation + */ + private boolean setUpgradeLock(int state) { + return cStateRef.compareAndSet(this, state, state | LOCK_STATE_UPGRADE); + } + + /** + * @return false if concurrent modification prevented operation + */ + private boolean clearUpgradeLock(int state) { + return cStateRef.compareAndSet(this, state, state & ~LOCK_STATE_UPGRADE); + } + + private void incrementUpgradeCount() { + int upgradeCount = mUpgradeCount + 1; + if (upgradeCount < 0) { + throw new IllegalMonitorStateException("Maximum upgrade lock count exceeded"); + } + mUpgradeCount = upgradeCount; + } + + /** + * @return false if concurrent modification prevented operation + */ + private boolean setWriteLock(int state) { + return cStateRef.compareAndSet(this, state, state | LOCK_STATE_WRITE); + } + + private void incrementWriteCount() { + int writeCount = mWriteCount + 1; + if (writeCount < 0) { + throw new IllegalMonitorStateException("Maximum write lock count exceeded"); + } + mWriteCount = writeCount; + } + + /** + * Node class ripped off from AbstractQueuedSynchronizer and modified + * slightly. Read the comments in that class for better understanding. + */ + static final class Node { + static final AtomicIntegerFieldUpdater cWaitStatusRef = + AtomicIntegerFieldUpdater.newUpdater(Node.class, "mWaitStatus"); + + static final int CANCELLED = 1; + static final int SIGNAL = -1; + + volatile int mWaitStatus; + volatile Node mPrev; + volatile Node mNext; + volatile Thread mThread; + + final boolean mShared; + + // Used to establish initial head + Node() { + mShared = false; + } + + Node(Thread thread, boolean shared) { + mThread = thread; + mShared = shared; + } + + final Node predecessor() throws NullPointerException { + Node p = mPrev; + if (p == null) { + throw new NullPointerException(); + } else { + return p; + } + } + } +} diff --git a/src/main/java/com/amazon/carbonado/repo/map/package-info.java b/src/main/java/com/amazon/carbonado/repo/map/package-info.java new file mode 100644 index 0000000..b386251 --- /dev/null +++ b/src/main/java/com/amazon/carbonado/repo/map/package-info.java @@ -0,0 +1,24 @@ +/* + * Copyright 2008 Amazon Technologies, Inc. or its affiliates. + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks + * of Amazon Technologies, Inc. or its affiliates. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Volatile repository implementation backed by a concurrent map. + * + * @see com.amazon.carbonado.repo.map.MapRepositoryBuilder + */ +package com.amazon.carbonado.repo.map; -- cgit v1.2.3