summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--pom.xml2
-rw-r--r--src/main/java/com/amazon/carbonado/repo/map/Key.java273
-rw-r--r--src/main/java/com/amazon/carbonado/repo/map/MapCursor.java173
-rw-r--r--src/main/java/com/amazon/carbonado/repo/map/MapRepository.java125
-rw-r--r--src/main/java/com/amazon/carbonado/repo/map/MapRepositoryBuilder.java151
-rw-r--r--src/main/java/com/amazon/carbonado/repo/map/MapStorage.java813
-rw-r--r--src/main/java/com/amazon/carbonado/repo/map/MapTransaction.java240
-rw-r--r--src/main/java/com/amazon/carbonado/repo/map/MapTransactionManager.java98
-rw-r--r--src/main/java/com/amazon/carbonado/repo/map/UpgradableLock.java1161
-rw-r--r--src/main/java/com/amazon/carbonado/repo/map/package-info.java24
10 files changed, 3059 insertions, 1 deletions
diff --git a/pom.xml b/pom.xml
index 9538a01..d2ce95d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -191,7 +191,7 @@
</group>
<group>
<title>Standard Repositories</title>
- <packages>com.amazon.carbonado.repo.jdbc:com.amazon.carbonado.repo.replicated:com.amazon.carbonado.repo.logging:com.amazon.carbonado.repo.sleepycat</packages>
+ <packages>com.amazon.carbonado.repo.map:com.amazon.carbonado.repo.jdbc:com.amazon.carbonado.repo.replicated:com.amazon.carbonado.repo.logging:com.amazon.carbonado.repo.sleepycat</packages>
</group>
<group>
<title>Service Provider Interface</title>
diff --git a/src/main/java/com/amazon/carbonado/repo/map/Key.java b/src/main/java/com/amazon/carbonado/repo/map/Key.java
new file mode 100644
index 0000000..5032f66
--- /dev/null
+++ b/src/main/java/com/amazon/carbonado/repo/map/Key.java
@@ -0,0 +1,273 @@
+/*
+ * Copyright 2008 Amazon Technologies, Inc. or its affiliates.
+ * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
+ * of Amazon Technologies, Inc. or its affiliates. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.amazon.carbonado.repo.map;
+
+import java.lang.reflect.UndeclaredThrowableException;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+
+import org.cojen.classfile.ClassFile;
+import org.cojen.classfile.CodeBuilder;
+import org.cojen.classfile.Label;
+import org.cojen.classfile.LocalVariable;
+import org.cojen.classfile.MethodInfo;
+import org.cojen.classfile.Modifiers;
+import org.cojen.classfile.TypeDesc;
+
+import org.cojen.util.ClassInjector;
+import org.cojen.util.SoftValuedHashMap;
+
+import com.amazon.carbonado.Storable;
+
+import com.amazon.carbonado.info.OrderedProperty;
+import com.amazon.carbonado.info.StorableInfo;
+import com.amazon.carbonado.info.StorableIntrospector;
+import com.amazon.carbonado.info.StorableProperty;
+
+import com.amazon.carbonado.gen.CodeBuilderUtil;
+
+/**
+ *
+ *
+ * @author Brian S O'Neill
+ */
+class Key<S extends Storable> implements Comparable<Key<S>> {
+ protected final S mStorable;
+ protected final Comparator<S> mComparator;
+
+ Key(S storable, Comparator<S> comparator) {
+ mStorable = storable;
+ mComparator = comparator;
+ }
+
+ @Override
+ public String toString() {
+ return mStorable.toString();
+ }
+
+ public int compareTo(Key<S> other) {
+ int result = mComparator.compare(mStorable, other.mStorable);
+ if (result == 0) {
+ int t1 = tieBreaker();
+ int t2 = other.tieBreaker();
+ if (t1 < t2) {
+ result = -1;
+ } else if (t1 > t2) {
+ result = 1;
+ }
+ }
+ return result;
+ }
+
+ protected int tieBreaker() {
+ return 0;
+ }
+
+ public static interface Assigner<S extends Storable> {
+ void setKeyValues(S storable, Object[] identityValues);
+
+ void setKeyValues(S storable, Object[] identityValues, Object rangeValue);
+ }
+
+ private static final Map<Class, Assigner> mAssigners;
+
+ static {
+ mAssigners = new SoftValuedHashMap();
+ }
+
+ public static synchronized <S extends Storable> Assigner<S> getAssigner(Class<S> clazz) {
+ Assigner<S> assigner = mAssigners.get(clazz);
+ if (assigner == null) {
+ assigner = createAssigner(clazz);
+ mAssigners.put(clazz, assigner);
+ }
+ return assigner;
+ }
+
+ private static <S extends Storable> Assigner<S> createAssigner(Class<S> clazz) {
+ final StorableInfo<S> info = StorableIntrospector.examine(clazz);
+
+ ClassInjector ci = ClassInjector.create(clazz.getName(), clazz.getClassLoader());
+ ClassFile cf = new ClassFile(ci.getClassName());
+ cf.addInterface(Assigner.class);
+ cf.markSynthetic();
+ cf.setSourceFile(Key.class.getName());
+ cf.setTarget("1.5");
+
+ cf.addDefaultConstructor();
+
+ // Define required setKeyValues methods.
+
+ List<OrderedProperty<S>> pk =
+ new ArrayList<OrderedProperty<S>>(info.getPrimaryKey().getProperties());
+
+ TypeDesc storableType = TypeDesc.forClass(Storable.class);
+ TypeDesc storableArrayType = storableType.toArrayType();
+ TypeDesc userStorableType = TypeDesc.forClass(info.getStorableType());
+ TypeDesc objectArrayType = TypeDesc.OBJECT.toArrayType();
+
+ MethodInfo mi = cf.addMethod(Modifiers.PUBLIC, "setKeyValues", null,
+ new TypeDesc[] {storableType, objectArrayType});
+
+ CodeBuilder b = new CodeBuilder(mi);
+
+ LocalVariable userStorableVar = b.createLocalVariable(null, userStorableType);
+ b.loadLocal(b.getParameter(0));
+ b.checkCast(userStorableType);
+ b.storeLocal(userStorableVar);
+
+ // Switch on the number of values supplied.
+
+ /* Switch looks like this for two pk properties:
+
+ switch(identityValues.length) {
+ default:
+ throw new IllegalArgumentException();
+ case 2:
+ storable.setPkProp2(identityValues[1]);
+ case 1:
+ storable.setPkProp1(identityValues[0]);
+ case 0:
+ }
+
+ */
+
+ b.loadLocal(b.getParameter(1));
+ b.arrayLength();
+
+ int[] cases = new int[pk.size() + 1];
+ Label[] labels = new Label[pk.size() + 1];
+
+ for (int i=0; i<labels.length; i++) {
+ cases[i] = pk.size() - i;
+ labels[i] = b.createLabel();
+ }
+
+ Label defaultLabel = b.createLabel();
+
+ b.switchBranch(cases, labels, defaultLabel);
+
+ for (int i=0; i<labels.length; i++) {
+ labels[i].setLocation();
+ int prop = cases[i] - 1;
+ if (prop >= 0) {
+ b.loadLocal(userStorableVar);
+ b.loadLocal(b.getParameter(1));
+ b.loadConstant(prop);
+ b.loadFromArray(storableArrayType);
+ callSetPropertyValue(b, pk.get(prop));
+ }
+ // Fall through to next case.
+ }
+
+ b.returnVoid();
+
+ defaultLabel.setLocation();
+ CodeBuilderUtil.throwException(b, IllegalArgumentException.class, null);
+
+ // The setKeyValues method that takes a range value calls the other
+ // setKeyValues method first, to take care of the identityValues.
+
+ mi = cf.addMethod(Modifiers.PUBLIC, "setKeyValues", null,
+ new TypeDesc[] {storableType, objectArrayType, TypeDesc.OBJECT});
+
+ b = new CodeBuilder(mi);
+
+ b.loadThis();
+ b.loadLocal(b.getParameter(0));
+ b.loadLocal(b.getParameter(1));
+ b.invokeVirtual("setKeyValues", null, new TypeDesc[] {storableType, objectArrayType});
+
+ userStorableVar = b.createLocalVariable(null, userStorableType);
+ b.loadLocal(b.getParameter(0));
+ b.checkCast(userStorableType);
+ b.storeLocal(userStorableVar);
+
+ // Switch on the number of values supplied.
+
+ /* Switch looks like this for two pk properties:
+
+ switch(identityValues.length) {
+ default:
+ throw new IllegalArgumentException();
+ case 0:
+ storable.setPkProp1(rangeValue);
+ return;
+ case 1:
+ storable.setPkProp2(rangeValue);
+ return;
+ }
+
+ */
+
+ b.loadLocal(b.getParameter(1));
+ b.arrayLength();
+
+ cases = new int[pk.size()];
+ labels = new Label[pk.size()];
+
+ for (int i=0; i<labels.length; i++) {
+ cases[i] = i;
+ labels[i] = b.createLabel();
+ }
+
+ defaultLabel = b.createLabel();
+
+ b.switchBranch(cases, labels, defaultLabel);
+
+ for (int i=0; i<labels.length; i++) {
+ labels[i].setLocation();
+ int prop = cases[i];
+ b.loadLocal(userStorableVar);
+ b.loadLocal(b.getParameter(2));
+ callSetPropertyValue(b, pk.get(prop));
+ b.returnVoid();
+ }
+
+ defaultLabel.setLocation();
+ CodeBuilderUtil.throwException(b, IllegalArgumentException.class, null);
+
+ try {
+ return (Assigner<S>) ci.defineClass(cf).newInstance();
+ } catch (IllegalAccessException e) {
+ throw new UndeclaredThrowableException(e);
+ } catch (InstantiationException e) {
+ throw new UndeclaredThrowableException(e);
+ }
+ }
+
+ /**
+ * Creates code to call set method. Assumes Storable and property value
+ * are already on the stack.
+ */
+ private static void callSetPropertyValue(CodeBuilder b, OrderedProperty<?> op) {
+ StorableProperty<?> property = op.getChainedProperty().getLastProperty();
+ TypeDesc propType = TypeDesc.forClass(property.getType());
+ if (propType != TypeDesc.OBJECT) {
+ TypeDesc objectType = propType.toObjectType();
+ b.checkCast(objectType);
+ // Potentially unbox primitive.
+ b.convert(objectType, propType);
+ }
+ b.invoke(property.getWriteMethod());
+ }
+}
diff --git a/src/main/java/com/amazon/carbonado/repo/map/MapCursor.java b/src/main/java/com/amazon/carbonado/repo/map/MapCursor.java
new file mode 100644
index 0000000..cd0f7e3
--- /dev/null
+++ b/src/main/java/com/amazon/carbonado/repo/map/MapCursor.java
@@ -0,0 +1,173 @@
+/*
+ * Copyright 2008 Amazon Technologies, Inc. or its affiliates.
+ * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
+ * of Amazon Technologies, Inc. or its affiliates. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.amazon.carbonado.repo.map;
+
+import java.util.ConcurrentModificationException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+
+import com.amazon.carbonado.FetchException;
+import com.amazon.carbonado.IsolationLevel;
+import com.amazon.carbonado.Storable;
+
+import com.amazon.carbonado.cursor.AbstractCursor;
+
+import com.amazon.carbonado.spi.TransactionScope;
+
+/**
+ * Returns copies of Storables that it iterates over.
+ *
+ * @author Brian S O'Neill
+ */
+class MapCursor<S extends Storable> extends AbstractCursor<S> {
+ private static final AtomicReferenceFieldUpdater<MapCursor, Iterator> cIteratorRef =
+ AtomicReferenceFieldUpdater.newUpdater
+ (MapCursor.class, Iterator.class, "mIterator");
+
+ private final MapStorage<S> mStorage;
+ private final TransactionScope<MapTransaction> mScope;
+ private final MapTransaction mTxn;
+ private final boolean mIsForUpdate;
+
+ private volatile Iterator<S> mIterator;
+
+ MapCursor(MapStorage<S> storage,
+ TransactionScope<MapTransaction> scope,
+ Iterable<S> iterable)
+ throws Exception
+ {
+ MapTransaction txn = scope.getTxn();
+
+ mStorage = storage;
+ mScope = scope;
+ mTxn = txn;
+
+ if (txn == null) {
+ mStorage.mLock.lockForRead(scope);
+ mIsForUpdate = false;
+ } else {
+ // Since lock is so coarse, all reads in transaction scope are
+ // upgrade to avoid deadlocks.
+ txn.lockForUpgrade(mStorage.mLock, mIsForUpdate = scope.isForUpdate());
+ }
+
+ scope.register(storage.getStorableType(), this);
+ mIterator = iterable.iterator();
+ }
+
+ public void close() {
+ Iterator<S> it = mIterator;
+ if (it != null) {
+ if (cIteratorRef.compareAndSet(this, it, null)) {
+ UpgradableLock lock = mStorage.mLock;
+ if (mTxn == null) {
+ lock.unlockFromRead(mScope);
+ } else {
+ mTxn.unlockFromUpgrade(lock, mIsForUpdate);
+ }
+ mScope.unregister(mStorage.getStorableType(), this);
+ }
+ }
+ }
+
+ public boolean hasNext() throws FetchException {
+ Iterator<S> it = mIterator;
+ try {
+ if (it != null && it.hasNext()) {
+ return true;
+ } else {
+ close();
+ }
+ return false;
+ } catch (ConcurrentModificationException e) {
+ close();
+ throw new FetchException(e);
+ } catch (Error e) {
+ try {
+ close();
+ } catch (Error e2) {
+ // Ignore.
+ }
+ throw e;
+ }
+ }
+
+ public S next() throws FetchException {
+ Iterator<S> it = mIterator;
+ if (it == null) {
+ close();
+ throw new NoSuchElementException();
+ }
+ try {
+ S next = mStorage.copyAndFireLoadTrigger(it.next());
+ if (!hasNext()) {
+ close();
+ }
+ return next;
+ } catch (ConcurrentModificationException e) {
+ close();
+ throw new FetchException(e);
+ } catch (Error e) {
+ try {
+ close();
+ } catch (Error e2) {
+ // Ignore.
+ }
+ throw e;
+ }
+ }
+
+ @Override
+ public int skipNext(int amount) throws FetchException {
+ if (amount <= 0) {
+ if (amount < 0) {
+ throw new IllegalArgumentException("Cannot skip negative amount: " + amount);
+ }
+ return 0;
+ }
+
+ // Skip over entries without copying them.
+
+ int count = 0;
+ Iterator<S> it = mIterator;
+
+ if (it != null) {
+ try {
+ while (--amount >= 0 && it.hasNext()) {
+ it.next();
+ count++;
+ }
+ } catch (ConcurrentModificationException e) {
+ close();
+ throw new FetchException(e);
+ } catch (Error e) {
+ try {
+ close();
+ } catch (Error e2) {
+ // Ignore.
+ }
+ throw e;
+ }
+ }
+
+ return count;
+ }
+}
diff --git a/src/main/java/com/amazon/carbonado/repo/map/MapRepository.java b/src/main/java/com/amazon/carbonado/repo/map/MapRepository.java
new file mode 100644
index 0000000..2e82f89
--- /dev/null
+++ b/src/main/java/com/amazon/carbonado/repo/map/MapRepository.java
@@ -0,0 +1,125 @@
+/*
+ * Copyright 2008 Amazon Technologies, Inc. or its affiliates.
+ * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
+ * of Amazon Technologies, Inc. or its affiliates. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.amazon.carbonado.repo.map;
+
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+
+import com.amazon.carbonado.Repository;
+import com.amazon.carbonado.RepositoryException;
+import com.amazon.carbonado.Storable;
+import com.amazon.carbonado.Storage;
+import com.amazon.carbonado.TriggerFactory;
+
+import com.amazon.carbonado.capability.IndexInfo;
+import com.amazon.carbonado.capability.IndexInfoCapability;
+
+import com.amazon.carbonado.sequence.SequenceValueGenerator;
+import com.amazon.carbonado.sequence.SequenceValueProducer;
+
+import com.amazon.carbonado.qe.RepositoryAccess;
+import com.amazon.carbonado.qe.StorageAccess;
+
+import com.amazon.carbonado.spi.AbstractRepository;
+import com.amazon.carbonado.spi.LobEngine;
+import com.amazon.carbonado.spi.TransactionManager;
+import com.amazon.carbonado.spi.TransactionScope;
+
+/**
+ *
+ *
+ * @author Brian S O'Neill
+ * @see MapRepositoryBuilder
+ */
+class MapRepository extends AbstractRepository<MapTransaction>
+ implements RepositoryAccess, IndexInfoCapability
+{
+ private final AtomicReference<Repository> mRootRef;
+ private final boolean mIsMaster;
+ private final int mLockTimeout;
+ private final TimeUnit mLockTimeoutUnit;
+
+ final Iterable<TriggerFactory> mTriggerFactories;
+ private final MapTransactionManager mTxnManager;
+ private LobEngine mLobEngine;
+
+ MapRepository(AtomicReference<Repository> rootRef, MapRepositoryBuilder builder) {
+ super(builder.getName());
+ mRootRef = rootRef;
+ mIsMaster = builder.isMaster();
+ mLockTimeout = builder.getLockTimeout();
+ mLockTimeoutUnit = builder.getLockTimeoutUnit();
+
+ mTriggerFactories = builder.getTriggerFactories();
+ mTxnManager = new MapTransactionManager(mLockTimeout, mLockTimeoutUnit);
+ }
+
+ public Repository getRootRepository() {
+ return mRootRef.get();
+ }
+
+ public <S extends Storable> StorageAccess<S> storageAccessFor(Class<S> type)
+ throws RepositoryException
+ {
+ return (StorageAccess<S>) storageFor(type);
+ }
+
+ public <S extends Storable> IndexInfo[] getIndexInfo(Class<S> storableType)
+ throws RepositoryException
+ {
+ return ((MapStorage) storageFor(storableType)).getIndexInfo();
+ }
+
+ protected Log getLog() {
+ return null;
+ }
+
+ protected TransactionManager<MapTransaction> transactionManager() {
+ return mTxnManager;
+ }
+
+ protected TransactionScope<MapTransaction> localTransactionScope() {
+ return mTxnManager.localScope();
+ }
+
+ protected <S extends Storable> Storage<S> createStorage(Class<S> type)
+ throws RepositoryException
+ {
+ return new MapStorage<S>(this, type, mLockTimeout, mLockTimeoutUnit);
+ }
+
+ protected SequenceValueProducer createSequenceValueProducer(String name)
+ throws RepositoryException
+ {
+ return new SequenceValueGenerator(this, name);
+ }
+
+ LobEngine getLobEngine() throws RepositoryException {
+ if (mLobEngine == null) {
+ mLobEngine = new LobEngine(this, getRootRepository());
+ }
+ return mLobEngine;
+ }
+
+ boolean isMaster() {
+ return mIsMaster;
+ }
+}
diff --git a/src/main/java/com/amazon/carbonado/repo/map/MapRepositoryBuilder.java b/src/main/java/com/amazon/carbonado/repo/map/MapRepositoryBuilder.java
new file mode 100644
index 0000000..8d5d399
--- /dev/null
+++ b/src/main/java/com/amazon/carbonado/repo/map/MapRepositoryBuilder.java
@@ -0,0 +1,151 @@
+/*
+ * Copyright 2008 Amazon Technologies, Inc. or its affiliates.
+ * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
+ * of Amazon Technologies, Inc. or its affiliates. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.amazon.carbonado.repo.map;
+
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.TimeUnit;
+
+import com.amazon.carbonado.Repository;
+import com.amazon.carbonado.RepositoryException;
+
+import com.amazon.carbonado.repo.indexed.IndexedRepositoryBuilder;
+
+import com.amazon.carbonado.spi.AbstractRepositoryBuilder;
+
+/**
+ * Volatile repository implementation backed by a concurrent map. Locks used by
+ * repository are coarse, much like <i>table locks</i>. Loads and queries
+ * acquire read locks, and modifications acquire write locks. Within
+ * transactions, loads and queries always acquire upgradable locks, to reduce
+ * the likelihood of deadlock.
+ *
+ * <p>This repository supports transactions, which also may be
+ * nested. Supported isolation levels are read committed and serializable. Read
+ * uncommitted is promoted to read committed, and repeatable read is promoted
+ * to serializable.
+ *
+ * <p>
+ * The following extra capabilities are supported:
+ * <ul>
+ * <li>{@link com.amazon.carbonado.capability.IndexInfoCapability IndexInfoCapability}
+ * <li>{@link com.amazon.carbonado.capability.ShutdownCapability ShutdownCapability}
+ * <li>{@link com.amazon.carbonado.sequence.SequenceCapability SequenceCapability}
+ * </ul>
+ *
+ * <p>Note: This repository uses concurrent navigable map classes, which became
+ * available in JDK1.6.
+ *
+ * @author Brian S O'Neill
+ * @since 1.2
+ */
+public class MapRepositoryBuilder extends AbstractRepositoryBuilder {
+ /**
+ * Convenience method to build a new MapRepository.
+ */
+ public static Repository newRepository() {
+ try {
+ MapRepositoryBuilder builder = new MapRepositoryBuilder();
+ return builder.build();
+ } catch (RepositoryException e) {
+ // Not expected.
+ throw new RuntimeException(e);
+ }
+ }
+
+ private String mName = "";
+ private boolean mIsMaster = true;
+ private boolean mIndexSupport = true;
+ private int mLockTimeout;
+ private TimeUnit mLockTimeoutUnit;
+
+ public MapRepositoryBuilder() {
+ setLockTimeoutMillis(500);
+ }
+
+ public Repository build(AtomicReference<Repository> rootRef) throws RepositoryException {
+ if (mIndexSupport) {
+ // Temporarily set to false to avoid infinite recursion.
+ mIndexSupport = false;
+ try {
+ IndexedRepositoryBuilder ixBuilder = new IndexedRepositoryBuilder();
+ ixBuilder.setWrappedRepository(this);
+ ixBuilder.setMaster(isMaster());
+ ixBuilder.setAllClustered(true);
+ return ixBuilder.build(rootRef);
+ } finally {
+ mIndexSupport = true;
+ }
+ }
+
+ assertReady();
+
+ Repository repo = new MapRepository(rootRef, this);
+
+ rootRef.set(repo);
+ return repo;
+ }
+
+ public String getName() {
+ return mName;
+ }
+
+ public void setName(String name) {
+ mName = name;
+ }
+
+ public boolean isMaster() {
+ return mIsMaster;
+ }
+
+ public void setMaster(boolean b) {
+ mIsMaster = b;
+ }
+
+ /**
+ * Set the lock timeout, in milliseconds. Default value is 500 milliseconds.
+ */
+ public void setLockTimeoutMillis(int timeout) {
+ setLockTimeout(timeout, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Set the lock timeout. Default value is 500 milliseconds.
+ */
+ public void setLockTimeout(int timeout, TimeUnit unit) {
+ if (timeout < 0 || unit == null) {
+ throw new IllegalArgumentException();
+ }
+ mLockTimeout = timeout;
+ mLockTimeoutUnit = unit;
+ }
+
+ /**
+ * Returns the lock timeout. Call getLockTimeoutUnit to get the unit.
+ */
+ public int getLockTimeout() {
+ return mLockTimeout;
+ }
+
+ /**
+ * Returns the lock timeout unit. Call getLockTimeout to get the timeout.
+ */
+ public TimeUnit getLockTimeoutUnit() {
+ return mLockTimeoutUnit;
+ }
+}
diff --git a/src/main/java/com/amazon/carbonado/repo/map/MapStorage.java b/src/main/java/com/amazon/carbonado/repo/map/MapStorage.java
new file mode 100644
index 0000000..520d18c
--- /dev/null
+++ b/src/main/java/com/amazon/carbonado/repo/map/MapStorage.java
@@ -0,0 +1,813 @@
+/*
+ * Copyright 2008 Amazon Technologies, Inc. or its affiliates.
+ * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
+ * of Amazon Technologies, Inc. or its affiliates. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.amazon.carbonado.repo.map;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.EnumSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.Set;
+
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.TimeUnit;
+
+import org.cojen.classfile.TypeDesc;
+
+import com.amazon.carbonado.Cursor;
+import com.amazon.carbonado.FetchException;
+import com.amazon.carbonado.FetchInterruptedException;
+import com.amazon.carbonado.FetchTimeoutException;
+import com.amazon.carbonado.IsolationLevel;
+import com.amazon.carbonado.PersistException;
+import com.amazon.carbonado.PersistInterruptedException;
+import com.amazon.carbonado.PersistTimeoutException;
+import com.amazon.carbonado.Query;
+import com.amazon.carbonado.Repository;
+import com.amazon.carbonado.RepositoryException;
+import com.amazon.carbonado.Storable;
+import com.amazon.carbonado.Storage;
+import com.amazon.carbonado.SupportException;
+import com.amazon.carbonado.Trigger;
+
+import com.amazon.carbonado.capability.IndexInfo;
+
+import com.amazon.carbonado.cursor.ArraySortBuffer;
+import com.amazon.carbonado.cursor.EmptyCursor;
+import com.amazon.carbonado.cursor.FilteredCursor;
+import com.amazon.carbonado.cursor.SingletonCursor;
+import com.amazon.carbonado.cursor.SortBuffer;
+import com.amazon.carbonado.cursor.SortedCursor;
+
+import com.amazon.carbonado.filter.Filter;
+import com.amazon.carbonado.filter.FilterValues;
+import com.amazon.carbonado.filter.RelOp;
+
+import com.amazon.carbonado.sequence.SequenceValueProducer;
+
+import com.amazon.carbonado.gen.DelegateStorableGenerator;
+import com.amazon.carbonado.gen.DelegateSupport;
+import com.amazon.carbonado.gen.MasterFeature;
+
+import com.amazon.carbonado.util.QuickConstructorGenerator;
+
+import com.amazon.carbonado.filter.Filter;
+
+import com.amazon.carbonado.info.ChainedProperty;
+import com.amazon.carbonado.info.Direction;
+import com.amazon.carbonado.info.OrderedProperty;
+import com.amazon.carbonado.info.StorableIndex;
+import com.amazon.carbonado.info.StorableInfo;
+import com.amazon.carbonado.info.StorableIntrospector;
+import com.amazon.carbonado.info.StorableProperty;
+
+import com.amazon.carbonado.qe.BoundaryType;
+import com.amazon.carbonado.qe.QueryExecutorFactory;
+import com.amazon.carbonado.qe.QueryEngine;
+import com.amazon.carbonado.qe.StorageAccess;
+
+import com.amazon.carbonado.spi.IndexInfoImpl;
+import com.amazon.carbonado.spi.LobEngine;
+import com.amazon.carbonado.spi.TransactionScope;
+import com.amazon.carbonado.spi.TriggerManager;
+
+/**
+ *
+ *
+ * @author Brian S O'Neill
+ */
+class MapStorage<S extends Storable>
+ implements Storage<S>, DelegateSupport<S>, StorageAccess<S>
+{
+ private static final int DEFAULT_LOB_BLOCK_SIZE = 1000;
+ private static final Object[] NO_VALUES = new Object[0];
+
+ private final MapRepository mRepo;
+ private final StorableInfo<S> mInfo;
+ private final TriggerManager<S> mTriggers;
+ private final InstanceFactory mInstanceFactory;
+ private final StorableIndex<S> mPrimaryKeyIndex;
+ private final QueryEngine<S> mQueryEngine;
+
+ private final int mLockTimeout;
+ private final TimeUnit mLockTimeoutUnit;
+
+ private final ConcurrentNavigableMap<Key<S>, S> mMap;
+ private final Comparator<S> mFullComparator;
+ private final Comparator<S>[] mSearchComparators;
+
+ private final Key.Assigner<S> mKeyAssigner;
+
+ /**
+ * Simple lock which is reentrant for transactions, but auto-commit does not
+ * need to support reentrancy. Read lock requests in transactions can starve
+ * write lock requests, but auto-commit cannot cause starvation. In practice
+ * starvation is not possible since transactions always lock for upgrade.
+ */
+ final UpgradableLock<Object> mLock = new UpgradableLock<Object>() {
+ @Override
+ protected boolean isReadLockHeld(Object locker) {
+ return locker instanceof MapTransaction;
+ }
+ };
+
+ MapStorage(MapRepository repo, Class<S> type, int lockTimeout, TimeUnit lockTimeoutUnit)
+ throws SupportException
+ {
+ mRepo = repo;
+ mInfo = StorableIntrospector.examine(type);
+ mTriggers = new TriggerManager<S>();
+
+ EnumSet<MasterFeature> features;
+ if (repo.isMaster()) {
+ features = EnumSet.of(MasterFeature.INSERT_CHECK_REQUIRED,
+ MasterFeature.VERSIONING,
+ MasterFeature.INSERT_SEQUENCES);
+ } else {
+ features = EnumSet.of(MasterFeature.INSERT_CHECK_REQUIRED);
+ }
+
+ Class<? extends S> delegateStorableClass =
+ DelegateStorableGenerator.getDelegateClass(type, features);
+
+ mInstanceFactory = QuickConstructorGenerator
+ .getInstance(delegateStorableClass, InstanceFactory.class);
+
+ mPrimaryKeyIndex =
+ new StorableIndex<S>(mInfo.getPrimaryKey(), Direction.ASCENDING).clustered(true);
+
+ mQueryEngine = new QueryEngine<S>(type, repo);
+
+ mLockTimeout = lockTimeout;
+ mLockTimeoutUnit = lockTimeoutUnit;
+
+ mMap = new ConcurrentSkipListMap<Key<S>, S>();
+ List<OrderedProperty<S>> propList = createPkPropList();
+ mFullComparator = SortedCursor.createComparator(propList);
+ mSearchComparators = new Comparator[propList.size() + 1];
+ mSearchComparators[propList.size()] = mFullComparator;
+
+ mKeyAssigner = Key.getAssigner(type);
+
+ try {
+ if (LobEngine.hasLobs(type)) {
+ Trigger<S> lobTrigger = repo.getLobEngine()
+ .getSupportTrigger(type, DEFAULT_LOB_BLOCK_SIZE);
+ addTrigger(lobTrigger);
+ }
+
+ // Don't install automatic triggers until we're completely ready.
+ mTriggers.addTriggers(type, repo.mTriggerFactories);
+ } catch (SupportException e) {
+ throw e;
+ } catch (RepositoryException e) {
+ throw new SupportException(e);
+ }
+ }
+
+ public Class<S> getStorableType() {
+ return mInfo.getStorableType();
+ }
+
+ public S prepare() {
+ return (S) mInstanceFactory.instantiate(this);
+ }
+
+ public Query<S> query() throws FetchException {
+ return mQueryEngine.query();
+ }
+
+ public Query<S> query(String filter) throws FetchException {
+ return mQueryEngine.query(filter);
+ }
+
+ public Query<S> query(Filter<S> filter) throws FetchException {
+ return mQueryEngine.query(filter);
+ }
+
+ public void truncate() throws PersistException {
+ try {
+ Object locker = mRepo.localTransactionScope().getTxn();
+ if (locker == null) {
+ locker = Thread.currentThread();
+ }
+ doLockForWrite(locker);
+ try {
+ mMap.clear();
+ } finally {
+ mLock.unlockFromWrite(locker);
+ }
+ } catch (PersistException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new PersistException(e);
+ }
+ }
+
+ public boolean addTrigger(Trigger<? super S> trigger) {
+ return mTriggers.addTrigger(trigger);
+ }
+
+ public boolean removeTrigger(Trigger<? super S> trigger) {
+ return mTriggers.removeTrigger(trigger);
+ }
+
+ public IndexInfo[] getIndexInfo() {
+ StorableIndex<S> pkIndex = mPrimaryKeyIndex;
+
+ if (pkIndex == null) {
+ return new IndexInfo[0];
+ }
+
+ int i = pkIndex.getPropertyCount();
+ String[] propertyNames = new String[i];
+ Direction[] directions = new Direction[i];
+ while (--i >= 0) {
+ propertyNames[i] = pkIndex.getProperty(i).getName();
+ directions[i] = pkIndex.getPropertyDirection(i);
+ }
+
+ return new IndexInfo[] {
+ new IndexInfoImpl(getStorableType().getName(), true, true, propertyNames, directions)
+ };
+ }
+
+ public boolean doTryLoad(S storable) throws FetchException {
+ try {
+ TransactionScope<MapTransaction> scope = mRepo.localTransactionScope();
+ MapTransaction txn = scope.getTxn();
+ if (txn == null) {
+ doLockForRead(scope);
+ try {
+ return doTryLoadNoLock(storable);
+ } finally {
+ mLock.unlockFromRead(scope);
+ }
+ } else {
+ // Since lock is so coarse, all reads in transaction scope are
+ // upgrade to avoid deadlocks.
+ final boolean isForUpdate = scope.isForUpdate();
+ txn.lockForUpgrade(mLock, isForUpdate);
+ try {
+ return doTryLoadNoLock(storable);
+ } finally {
+ txn.unlockFromUpgrade(mLock, isForUpdate);
+ }
+ }
+ } catch (FetchException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new FetchException(e);
+ }
+ }
+
+ // Caller must hold lock.
+ boolean doTryLoadNoLock(S storable) {
+ S existing = mMap.get(new Key<S>(storable, mFullComparator));
+ if (existing == null) {
+ return false;
+ } else {
+ storable.markAllPropertiesDirty();
+ existing.copyAllProperties(storable);
+ storable.markAllPropertiesClean();
+ return true;
+ }
+ }
+
+ public boolean doTryInsert(S storable) throws PersistException {
+ try {
+ TransactionScope<MapTransaction> scope = mRepo.localTransactionScope();
+ MapTransaction txn = scope.getTxn();
+ if (txn == null) {
+ // No need to acquire full write lock since map is concurrent
+ // and existing storable (if any) is not being
+ // modified. Upgrade lock is required because a concurrent
+ // transaction might be in progress, and so insert should wait.
+ doLockForUpgrade(scope);
+ try {
+ return doTryInsertNoLock(storable);
+ } finally {
+ mLock.unlockFromUpgrade(scope);
+ }
+ } else {
+ txn.lockForWrite(mLock);
+ if (doTryInsertNoLock(storable)) {
+ txn.inserted(this, storable);
+ return true;
+ } else {
+ return false;
+ }
+ }
+ } catch (PersistException e) {
+ throw e;
+ } catch (FetchException e) {
+ throw e.toPersistException();
+ } catch (Exception e) {
+ throw new PersistException(e);
+ }
+ }
+
+ // Caller must hold upgrade or write lock.
+ private boolean doTryInsertNoLock(S storable) {
+ Key<S> key = new Key<S>(storable, mFullComparator);
+ S existing = mMap.get(key);
+ if (existing != null) {
+ return false;
+ }
+ // Create a fresh copy to ensure that custom fields are not saved.
+ S copy = (S) storable.prepare();
+ storable.copyAllProperties(copy);
+ copy.markAllPropertiesClean();
+ mMap.put(key, copy);
+ storable.markAllPropertiesClean();
+ return true;
+ }
+
+ public boolean doTryUpdate(S storable) throws PersistException {
+ try {
+ TransactionScope<MapTransaction> scope = mRepo.localTransactionScope();
+ MapTransaction txn = scope.getTxn();
+ if (txn == null) {
+ // Full write lock is required since existing storable is being
+ // modified. Readers cannot be allowed to see modifications
+ // until they are complete. In addtion, a concurrent
+ // transaction might be in progress, and so update should wait.
+ doLockForWrite(scope);
+ try {
+ return doTryUpdateNoLock(storable);
+ } finally {
+ mLock.unlockFromWrite(scope);
+ }
+ } else {
+ txn.lockForWrite(mLock);
+ S existing = mMap.get(new Key<S>(storable, mFullComparator));
+ if (existing == null) {
+ return false;
+ } else {
+ // Copy existing object to undo log.
+ txn.updated(this, (S) existing.copy());
+
+ // Copy altered values to existing object.
+ existing.markAllPropertiesDirty();
+ storable.copyDirtyProperties(existing);
+ existing.markAllPropertiesClean();
+
+ // Copy all values to user object, to simulate a reload.
+ storable.markAllPropertiesDirty();
+ existing.copyAllProperties(storable);
+ storable.markAllPropertiesClean();
+
+ return true;
+ }
+ }
+ } catch (PersistException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new PersistException(e);
+ }
+ }
+
+ // Caller must hold write lock.
+ private boolean doTryUpdateNoLock(S storable) {
+ S existing = mMap.get(new Key<S>(storable, mFullComparator));
+ if (existing == null) {
+ return false;
+ } else {
+ // Copy altered values to existing object.
+ existing.markAllPropertiesDirty();
+ storable.copyDirtyProperties(existing);
+ existing.markAllPropertiesClean();
+
+ // Copy all values to user object, to simulate a reload.
+ storable.markAllPropertiesDirty();
+ existing.copyAllProperties(storable);
+ storable.markAllPropertiesClean();
+
+ return true;
+ }
+ }
+
+ public boolean doTryDelete(S storable) throws PersistException {
+ try {
+ TransactionScope<MapTransaction> scope = mRepo.localTransactionScope();
+ MapTransaction txn = scope.getTxn();
+ if (txn == null) {
+ // No need to acquire full write lock since map is concurrent
+ // and existing storable (if any) is not being
+ // modified. Upgrade lock is required because a concurrent
+ // transaction might be in progress, and so delete should wait.
+ doLockForUpgrade(scope);
+ try {
+ return doTryDeleteNoLock(storable);
+ } finally {
+ mLock.unlockFromUpgrade(scope);
+ }
+ } else {
+ txn.lockForWrite(mLock);
+ S existing = mMap.remove(new Key<S>(storable, mFullComparator));
+ if (existing == null) {
+ return false;
+ } else {
+ txn.deleted(this, existing);
+ return true;
+ }
+ }
+ } catch (PersistException e) {
+ throw e;
+ } catch (FetchException e) {
+ throw e.toPersistException();
+ } catch (Exception e) {
+ throw new PersistException(e);
+ }
+ }
+
+ // Caller must hold upgrade or write lock.
+ private boolean doTryDeleteNoLock(S storable) {
+ return mMap.remove(new Key<S>(storable, mFullComparator)) != null;
+ }
+
+ // Called by MapTransaction, which implicitly holds lock.
+ void mapPut(S storable) {
+ mMap.put(new Key<S>(storable, mFullComparator), storable);
+ }
+
+ // Called by MapTransaction, which implicitly holds lock.
+ void mapRemove(S storable) {
+ mMap.remove(new Key<S>(storable, mFullComparator));
+ }
+
+ private void doLockForRead(Object locker) throws FetchException {
+ try {
+ if (!mLock.tryLockForRead(locker, mLockTimeout, mLockTimeoutUnit)) {
+ throw new FetchTimeoutException("" + mLockTimeout + ' ' +
+ mLockTimeoutUnit.toString().toLowerCase());
+ }
+ } catch (InterruptedException e) {
+ throw new FetchInterruptedException(e);
+ }
+ }
+
+ private void doLockForUpgrade(Object locker) throws FetchException {
+ try {
+ if (!mLock.tryLockForUpgrade(locker, mLockTimeout, mLockTimeoutUnit)) {
+ throw new FetchTimeoutException("" + mLockTimeout + ' ' +
+ mLockTimeoutUnit.toString().toLowerCase());
+ }
+ } catch (InterruptedException e) {
+ throw new FetchInterruptedException(e);
+ }
+ }
+
+ private void doLockForWrite(Object locker) throws PersistException {
+ try {
+ if (!mLock.tryLockForWrite(locker, mLockTimeout, mLockTimeoutUnit)) {
+ throw new PersistTimeoutException("" + mLockTimeout + ' ' +
+ mLockTimeoutUnit.toString().toLowerCase());
+ }
+ } catch (InterruptedException e) {
+ throw new PersistInterruptedException(e);
+ }
+ }
+
+ public Repository getRootRepository() {
+ return mRepo.getRootRepository();
+ }
+
+ public boolean isPropertySupported(String propertyName) {
+ return mInfo.getAllProperties().containsKey(propertyName);
+ }
+
+ public Trigger<? super S> getInsertTrigger() {
+ return mTriggers.getInsertTrigger();
+ }
+
+ public Trigger<? super S> getUpdateTrigger() {
+ return mTriggers.getUpdateTrigger();
+ }
+
+ public Trigger<? super S> getDeleteTrigger() {
+ return mTriggers.getDeleteTrigger();
+ }
+
+ public Trigger<? super S> getLoadTrigger() {
+ return mTriggers.getLoadTrigger();
+ }
+
+ public void locallyDisableLoadTrigger() {
+ mTriggers.locallyDisableLoad();
+ }
+
+ public void locallyEnableLoadTrigger() {
+ mTriggers.locallyEnableLoad();
+ }
+
+ public SequenceValueProducer getSequenceValueProducer(String name) throws PersistException {
+ try {
+ return mRepo.getSequenceValueProducer(name);
+ } catch (RepositoryException e) {
+ throw e.toPersistException();
+ }
+ }
+
+ public QueryExecutorFactory<S> getQueryExecutorFactory() {
+ return mQueryEngine;
+ }
+
+ public Collection<StorableIndex<S>> getAllIndexes() {
+ return Collections.singletonList(mPrimaryKeyIndex);
+ }
+
+ public Storage<S> storageDelegate(StorableIndex<S> index) {
+ // We're the grunt and don't delegate.
+ return null;
+ }
+
+ public long countAll() throws FetchException {
+ try {
+ Object locker = mRepo.localTransactionScope().getTxn();
+ if (locker == null) {
+ locker = Thread.currentThread();
+ }
+ doLockForRead(locker);
+ try {
+ return mMap.size();
+ } finally {
+ mLock.unlockFromRead(locker);
+ }
+ } catch (FetchException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new FetchException(e);
+ }
+ }
+
+ public Cursor<S> fetchAll() throws FetchException {
+ try {
+ return new MapCursor<S>(this, mRepo.localTransactionScope(), mMap.values());
+ } catch (FetchException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new FetchException(e);
+ }
+ }
+
+ public Cursor<S> fetchOne(StorableIndex<S> index, Object[] identityValues)
+ throws FetchException
+ {
+ try {
+ S key = prepare();
+ for (int i=0; i<identityValues.length; i++) {
+ key.setPropertyValue(index.getProperty(i).getName(), identityValues[i]);
+ }
+
+ TransactionScope<MapTransaction> scope = mRepo.localTransactionScope();
+ MapTransaction txn = scope.getTxn();
+ if (txn == null) {
+ doLockForRead(scope);
+ try {
+ S value = mMap.get(new Key<S>(key, mFullComparator));
+ if (value == null) {
+ return EmptyCursor.the();
+ } else {
+ return new SingletonCursor<S>(copyAndFireLoadTrigger(value));
+ }
+ } finally {
+ mLock.unlockFromRead(scope);
+ }
+ } else {
+ // Since lock is so coarse, all reads in transaction scope are
+ // upgrade to avoid deadlocks.
+ final boolean isForUpdate = scope.isForUpdate();
+ txn.lockForUpgrade(mLock, isForUpdate);
+ try {
+ S value = mMap.get(new Key<S>(key, mFullComparator));
+ if (value == null) {
+ return EmptyCursor.the();
+ } else {
+ return new SingletonCursor<S>(copyAndFireLoadTrigger(value));
+ }
+ } finally {
+ txn.unlockFromUpgrade(mLock, isForUpdate);
+ }
+ }
+ } catch (FetchException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new FetchException(e);
+ }
+ }
+
+ S copyAndFireLoadTrigger(S storable) throws FetchException {
+ storable = (S) storable.copy();
+ Trigger<? super S> trigger = getLoadTrigger();
+ if (trigger != null) {
+ trigger.afterLoad(storable);
+ // In case trigger modified the properties, make sure they're still clean.
+ storable.markAllPropertiesClean();
+ }
+ return storable;
+ }
+
+ public Query<?> indexEntryQuery(StorableIndex<S> index) {
+ return null;
+ }
+
+ public Cursor<S> fetchFromIndexEntryQuery(StorableIndex<S> index, Query<?> indexEntryQuery) {
+ return null;
+ }
+
+ public Cursor<S> fetchSubset(StorableIndex<S> index,
+ Object[] identityValues,
+ BoundaryType rangeStartBoundary,
+ Object rangeStartValue,
+ BoundaryType rangeEndBoundary,
+ Object rangeEndValue,
+ boolean reverseRange,
+ boolean reverseOrder)
+ throws FetchException
+ {
+ if (identityValues == null) {
+ identityValues = NO_VALUES;
+ }
+
+ NavigableMap<Key<S>, S> map = mMap;
+
+ int tieBreaker = 1;
+ if (reverseOrder) {
+ map = map.descendingMap();
+ reverseRange = !reverseRange;
+ tieBreaker = -tieBreaker;
+ }
+
+ if (reverseRange) {
+ BoundaryType t1 = rangeStartBoundary;
+ rangeStartBoundary = rangeEndBoundary;
+ rangeEndBoundary = t1;
+ Object t2 = rangeStartValue;
+ rangeStartValue = rangeEndValue;
+ rangeEndValue = t2;
+ }
+
+ tail: {
+ Key<S> startKey;
+ switch (rangeStartBoundary) {
+ case OPEN: default:
+ if (identityValues.length == 0) {
+ break tail;
+ } else {
+ // Tie breaker of -1 puts search key right before first actual
+ // match, thus forming an inclusive start match.
+ startKey = searchKey(-tieBreaker, identityValues);
+ }
+ break;
+ case INCLUSIVE:
+ // Tie breaker of -1 puts search key right before first actual
+ // match, thus forming an inclusive start match.
+ startKey = searchKey(-tieBreaker, identityValues, rangeStartValue);
+ break;
+ case EXCLUSIVE:
+ // Tie breaker of +1 puts search key right after first actual
+ // match, thus forming an exlusive start match.
+ startKey = searchKey(tieBreaker, identityValues, rangeStartValue);
+ break;
+ }
+
+ map = map.tailMap(startKey, true);
+ }
+
+ Cursor<S> cursor;
+ try {
+ cursor = new MapCursor<S>(this, mRepo.localTransactionScope(), map.values());
+ } catch (FetchException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new FetchException(e);
+ }
+
+ // Use filter to stop cursor at desired ending position.
+
+ // FIXME: Let query engine do this so that filter can be
+ // cached. Somehow indicate this at a high level so that query plan
+ // shows a filter.
+ Filter<S> filter;
+ FilterValues<S> filterValues;
+
+ if (rangeEndBoundary == BoundaryType.OPEN) {
+ if (identityValues.length == 0) {
+ filter = null;
+ filterValues = null;
+ } else {
+ filter = Filter.getOpenFilter(getStorableType());
+ for (int i=0; i<identityValues.length; i++) {
+ filter = filter.and(index.getProperty(i).getName(), RelOp.EQ);
+ }
+ filterValues = filter.initialFilterValues();
+ for (int i=0; i<identityValues.length; i++) {
+ filterValues = filterValues.with(identityValues[i]);
+ }
+ }
+ } else {
+ filter = Filter.getOpenFilter(getStorableType());
+ int i = 0;
+ for (; i<identityValues.length; i++) {
+ filter = filter.and(index.getProperty(i).getName(), RelOp.EQ);
+ }
+
+ filter = filter.and(index.getProperty(i).getName(),
+ rangeEndBoundary == BoundaryType.INCLUSIVE ? RelOp.LE : RelOp.LT);
+
+ filterValues = filter.initialFilterValues();
+
+ for (i=0; i<identityValues.length; i++) {
+ filterValues = filterValues.with(identityValues[i]);
+ }
+
+ filterValues = filterValues.with(rangeEndValue);
+ }
+
+ if (filter != null) {
+ cursor = FilteredCursor.applyFilter(filter, filterValues, cursor);
+ }
+
+ return cursor;
+ }
+
+ private List<OrderedProperty<S>> createPkPropList() {
+ return new ArrayList<OrderedProperty<S>>(mInfo.getPrimaryKey().getProperties());
+ }
+
+ private Key<S> searchKey(int tieBreaker, Object[] identityValues) {
+ S storable = prepare();
+ mKeyAssigner.setKeyValues(storable, identityValues);
+ Comparator<S> c = getSearchComparator(identityValues.length);
+ return new SearchKey<S>(tieBreaker, storable, c);
+ }
+
+ private Key<S> searchKey(int tieBreaker, Object[] identityValues, Object rangeValue) {
+ S storable = prepare();
+ mKeyAssigner.setKeyValues(storable, identityValues, rangeValue);
+ Comparator<S> c = getSearchComparator(identityValues.length + 1);
+ return new SearchKey<S>(tieBreaker, storable, c);
+ }
+
+ private Comparator<S> getSearchComparator(int propertyCount) {
+ Comparator<S> comparator = mSearchComparators[propertyCount];
+ if (comparator == null) {
+ List<OrderedProperty<S>> propList = createPkPropList().subList(0, propertyCount);
+ if (propList.size() > 0) {
+ comparator = SortedCursor.createComparator(propList);
+ } else {
+ comparator = SortedCursor.createComparator(getStorableType());
+ }
+ mSearchComparators[propertyCount] = comparator;
+ }
+ return comparator;
+ }
+
+ public SortBuffer<S> createSortBuffer() {
+ return new ArraySortBuffer<S>();
+ }
+
+ public static interface InstanceFactory {
+ Storable instantiate(DelegateSupport support);
+ }
+
+ private static class SearchKey<S extends Storable> extends Key<S> {
+ private final int mTieBreaker;
+
+ SearchKey(int tieBreaker, S storable, Comparator<S> comparator) {
+ super(storable, comparator);
+ mTieBreaker = tieBreaker;
+ }
+
+ @Override
+ protected int tieBreaker() {
+ return mTieBreaker;
+ }
+
+ @Override
+ public String toString() {
+ return super.toString() + ", tieBreaker=" + mTieBreaker;
+ }
+ }
+}
diff --git a/src/main/java/com/amazon/carbonado/repo/map/MapTransaction.java b/src/main/java/com/amazon/carbonado/repo/map/MapTransaction.java
new file mode 100644
index 0000000..d2cfe87
--- /dev/null
+++ b/src/main/java/com/amazon/carbonado/repo/map/MapTransaction.java
@@ -0,0 +1,240 @@
+/*
+ * Copyright 2008 Amazon Technologies, Inc. or its affiliates.
+ * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
+ * of Amazon Technologies, Inc. or its affiliates. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.amazon.carbonado.repo.map;
+
+import java.util.concurrent.TimeUnit;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import com.amazon.carbonado.FetchException;
+import com.amazon.carbonado.FetchInterruptedException;
+import com.amazon.carbonado.FetchTimeoutException;
+import com.amazon.carbonado.IsolationLevel;
+import com.amazon.carbonado.PersistException;
+import com.amazon.carbonado.PersistInterruptedException;
+import com.amazon.carbonado.PersistTimeoutException;
+import com.amazon.carbonado.Storable;
+import com.amazon.carbonado.Storage;
+
+/**
+ *
+ *
+ * @author Brian S O'Neill
+ */
+class MapTransaction {
+ private final MapTransaction mParent;
+ private final IsolationLevel mLevel;
+ private final Object mLocker;
+ private final int mLockTimeout;
+ private final TimeUnit mLockTimeoutUnit;
+
+ private Set<UpgradableLock> mUpgradeLocks;
+ private Set<UpgradableLock> mWriteLocks;
+
+ private List<Undoable> mUndoLog;
+
+ MapTransaction(MapTransaction parent, IsolationLevel level,
+ int lockTimeout, TimeUnit lockTimeoutUnit)
+ {
+ mParent = parent;
+ mLevel = level;
+ mLocker = parent == null ? this : parent.mLocker;
+ mLockTimeout = lockTimeout;
+ mLockTimeoutUnit = lockTimeoutUnit;
+ }
+
+ void lockForUpgrade(UpgradableLock lock, boolean isForUpdate) throws FetchException {
+ if (!isForUpdate && mLevel.isAtMost(IsolationLevel.READ_COMMITTED)) {
+ doLockForUpgrade(lock);
+ } else {
+ Set<UpgradableLock> locks = mUpgradeLocks;
+ if (locks == null) {
+ mUpgradeLocks = locks = new HashSet<UpgradableLock>();
+ doLockForUpgrade(lock);
+ locks.add(lock);
+ } else if (!locks.contains(lock)) {
+ doLockForUpgrade(lock);
+ locks.add(lock);
+ }
+ }
+ }
+
+ private void doLockForUpgrade(UpgradableLock lock) throws FetchException {
+ try {
+ if (!lock.tryLockForUpgrade(mLocker, mLockTimeout, mLockTimeoutUnit)) {
+ throw new FetchTimeoutException("" + mLockTimeout + ' ' +
+ mLockTimeoutUnit.toString().toLowerCase());
+ }
+ } catch (InterruptedException e) {
+ throw new FetchInterruptedException(e);
+ }
+ }
+
+ void unlockFromUpgrade(UpgradableLock lock, boolean isForUpdate) {
+ if (!isForUpdate && mLevel.isAtMost(IsolationLevel.READ_COMMITTED)) {
+ lock.unlockFromUpgrade(mLocker);
+ }
+ }
+
+ void lockForWrite(UpgradableLock lock) throws PersistException {
+ Set<UpgradableLock> locks = mWriteLocks;
+ if (locks == null) {
+ mWriteLocks = locks = new HashSet<UpgradableLock>();
+ doLockForWrite(lock);
+ locks.add(lock);
+ } else if (!locks.contains(lock)) {
+ doLockForWrite(lock);
+ locks.add(lock);
+ }
+ }
+
+ private void doLockForWrite(UpgradableLock lock) throws PersistException {
+ try {
+ if (!lock.tryLockForWrite(mLocker, mLockTimeout, mLockTimeoutUnit)) {
+ throw new PersistTimeoutException("" + mLockTimeout + ' ' +
+ mLockTimeoutUnit.toString().toLowerCase());
+ }
+ } catch (InterruptedException e) {
+ throw new PersistInterruptedException(e);
+ }
+ }
+
+ /**
+ * Add to undo log.
+ */
+ <S extends Storable> void inserted(final MapStorage<S> storage, final S key) {
+ addToUndoLog(new Undoable() {
+ public void undo() {
+ storage.mapRemove(key);
+ }
+ });
+ }
+
+ /**
+ * Add to undo log.
+ */
+ <S extends Storable> void updated(final MapStorage<S> storage, final S old) {
+ addToUndoLog(new Undoable() {
+ public void undo() {
+ storage.mapPut(old);
+ }
+ });
+ }
+
+ /**
+ * Add to undo log.
+ */
+ <S extends Storable> void deleted(final MapStorage<S> storage, final S old) {
+ addToUndoLog(new Undoable() {
+ public void undo() {
+ storage.mapPut(old);
+ }
+ });
+ }
+
+ void commit() {
+ MapTransaction parent = mParent;
+
+ if (parent == null) {
+ releaseLocks();
+ return;
+ }
+
+ // Pass undo log to parent.
+ if (parent.mUndoLog == null) {
+ parent.mUndoLog = mUndoLog;
+ } else if (mUndoLog != null) {
+ parent.mUndoLog.addAll(mUndoLog);
+ }
+ mUndoLog = null;
+
+ // Pass write locks to parent or release if parent already has the lock.
+ {
+ Set<UpgradableLock> locks = mWriteLocks;
+ if (locks != null) {
+ Set<UpgradableLock> parentLocks = parent.mWriteLocks;
+ if (parentLocks == null) {
+ parent.mWriteLocks = locks;
+ } else {
+ for (UpgradableLock lock : locks) {
+ if (!parentLocks.add(lock)) {
+ lock.unlockFromWrite(mLocker);
+ }
+ }
+ }
+ mWriteLocks = null;
+ }
+ }
+
+ // Upgrade locks can simply be released.
+ releaseUpgradeLocks();
+ }
+
+ void abort() {
+ List<Undoable> log = mUndoLog;
+ if (log != null) {
+ for (int i=log.size(); --i>=0; ) {
+ log.get(i).undo();
+ }
+ }
+ mUndoLog = null;
+
+ releaseLocks();
+ }
+
+ private void addToUndoLog(Undoable entry) {
+ List<Undoable> log = mUndoLog;
+ if (log == null) {
+ mUndoLog = log = new ArrayList<Undoable>();
+ }
+ log.add(entry);
+ }
+
+ private void releaseLocks() {
+ releaseWriteLocks();
+ releaseUpgradeLocks();
+ }
+
+ private void releaseWriteLocks() {
+ Set<UpgradableLock> locks = mWriteLocks;
+ if (locks != null) {
+ for (UpgradableLock lock : locks) {
+ lock.unlockFromWrite(mLocker);
+ }
+ mWriteLocks = null;
+ }
+ }
+
+ private void releaseUpgradeLocks() {
+ Set<UpgradableLock> locks = mUpgradeLocks;
+ if (locks != null) {
+ for (UpgradableLock lock : locks) {
+ lock.unlockFromUpgrade(mLocker);
+ }
+ mUpgradeLocks = null;
+ }
+ }
+
+ private static interface Undoable {
+ void undo();
+ }
+}
diff --git a/src/main/java/com/amazon/carbonado/repo/map/MapTransactionManager.java b/src/main/java/com/amazon/carbonado/repo/map/MapTransactionManager.java
new file mode 100644
index 0000000..5598353
--- /dev/null
+++ b/src/main/java/com/amazon/carbonado/repo/map/MapTransactionManager.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright 2008 Amazon Technologies, Inc. or its affiliates.
+ * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
+ * of Amazon Technologies, Inc. or its affiliates. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.amazon.carbonado.repo.map;
+
+import java.util.concurrent.TimeUnit;
+
+import com.amazon.carbonado.IsolationLevel;
+import com.amazon.carbonado.PersistException;
+import com.amazon.carbonado.Transaction;
+
+import com.amazon.carbonado.spi.TransactionManager;
+
+/**
+ *
+ *
+ * @author Brian S O'Neill
+ */
+class MapTransactionManager extends TransactionManager<MapTransaction> {
+ private final int mLockTimeout;
+ private final TimeUnit mLockTimeoutUnit;
+
+ MapTransactionManager(int lockTimeout, TimeUnit lockTimeoutUnit) {
+ mLockTimeout = lockTimeout;
+ mLockTimeoutUnit = lockTimeoutUnit;
+ }
+
+ protected IsolationLevel selectIsolationLevel(Transaction parent, IsolationLevel level) {
+ if (level == null) {
+ if (parent == null) {
+ return IsolationLevel.READ_COMMITTED;
+ }
+ return parent.getIsolationLevel();
+ }
+
+ switch (level) {
+ case NONE:
+ return IsolationLevel.NONE;
+ case READ_UNCOMMITTED:
+ case READ_COMMITTED:
+ return IsolationLevel.READ_COMMITTED;
+ case REPEATABLE_READ:
+ case SERIALIZABLE:
+ return IsolationLevel.SERIALIZABLE;
+ default:
+ // Not supported.
+ return null;
+ }
+ }
+
+ protected boolean supportsForUpdate() {
+ return true;
+ }
+
+ protected MapTransaction createTxn(MapTransaction parent, IsolationLevel level)
+ throws Exception
+ {
+ if (level == IsolationLevel.NONE) {
+ return null;
+ }
+ return new MapTransaction(parent, level, mLockTimeout, mLockTimeoutUnit);
+ }
+
+ @Override
+ protected MapTransaction createTxn(MapTransaction parent, IsolationLevel level,
+ int timeout, TimeUnit unit)
+ throws Exception
+ {
+ if (level == IsolationLevel.NONE) {
+ return null;
+ }
+ return new MapTransaction(parent, level, timeout, unit);
+ }
+
+ protected boolean commitTxn(MapTransaction txn) throws PersistException {
+ txn.commit();
+ return false;
+ }
+
+ protected void abortTxn(MapTransaction txn) throws PersistException {
+ txn.abort();
+ }
+}
diff --git a/src/main/java/com/amazon/carbonado/repo/map/UpgradableLock.java b/src/main/java/com/amazon/carbonado/repo/map/UpgradableLock.java
new file mode 100644
index 0000000..9862791
--- /dev/null
+++ b/src/main/java/com/amazon/carbonado/repo/map/UpgradableLock.java
@@ -0,0 +1,1161 @@
+/*
+ * Copyright 2007 Amazon Technologies, Inc. or its affiliates.
+ * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
+ * of Amazon Technologies, Inc. or its affiliates. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.amazon.carbonado.repo.map;
+
+import java.util.concurrent.TimeUnit;
+
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+
+import java.util.concurrent.locks.LockSupport;
+
+/**
+ * Partially reentrant, upgradable read/write lock. Up to 1,073,741,824 read
+ * locks can be held. Upgraders and writers may re-enter the lock up to
+ * 2,147,483,648 times. Attempts by readers to re-enter the lock is not
+ * detected and is deadlock prone, unless locker already holds an upgrade or
+ * write lock. Subclasses can support full reentrancy by overriding the
+ * protected read lock adjust and hold check methods.
+ *
+ * <p>This lock implementation differs from the usual Java lock with respect to
+ * lock ownership. Locks are not owned by threads, but by arbitrary locker
+ * objects. A thread which attempts to acquire an upgrade or write lock twice
+ * with different locker objects will deadlock on the second attempt.
+ *
+ * <p>As is typical of read/write lock implementations, a read lock blocks
+ * waiting writers, but it doesn't block other readers. A write lock is
+ * exclusive and can be held by at most one locker. Attempting to acquire a
+ * write lock while a read lock is held by the same locker is inherently
+ * deadlock prone, and some read/write lock implementations will always
+ * deadlock.
+ *
+ * <p>An upgrade lock allows a read lock to be safely upgraded to a write
+ * lock. Instead of acquiring a read lock, an upgrade lock is acquired. This
+ * acts like a shared read lock in that readers are not blocked, but it also
+ * acts like an exclusive write lock -- writers and upgraders are blocked. With
+ * an upgrade lock held, the locker may acquire a write lock without deadlock.
+ *
+ * <pre>
+ * Locks held Locks safely Locks acquirable
+ * by owner acquirable by owner by other lockers
+ * --------------------------------------------------
+ * - - - R U W R U W
+ * R - - - - - R U -
+ * R U - - U - R - -
+ * - U - R U W R - -
+ * - U W R U W - - -
+ * R U W R U W - - -
+ * R - W R U W - - -
+ * - - W R U W - - -
+ * </pre>
+ *
+ * @author Brian S O'Neill
+ * @param <L> Locker type
+ */
+class UpgradableLock<L> {
+ // Design note: This class borrows heavily from AbstractQueuedSynchronizer.
+ // Consult that class for understanding the locking mechanics.
+
+ private static enum Result {
+ /** Lock acquisition failed */
+ FAILED,
+ /** Lock has just been acquired by locker and can be safely released later */
+ ACQUIRED,
+ /** Lock is already owned by locker and should not be released more than once */
+ OWNED
+ }
+
+ private static final AtomicReferenceFieldUpdater<UpgradableLock, Node> cRWHeadRef =
+ AtomicReferenceFieldUpdater.newUpdater
+ (UpgradableLock.class, Node.class, "mRWHead");
+
+ private static final AtomicReferenceFieldUpdater<UpgradableLock, Node> cRWTailRef =
+ AtomicReferenceFieldUpdater.newUpdater
+ (UpgradableLock.class, Node.class, "mRWTail");
+
+ private static final AtomicReferenceFieldUpdater<UpgradableLock, Node> cUHeadRef =
+ AtomicReferenceFieldUpdater.newUpdater
+ (UpgradableLock.class, Node.class, "mUHead");
+
+ private static final AtomicReferenceFieldUpdater<UpgradableLock, Node> cUTailRef =
+ AtomicReferenceFieldUpdater.newUpdater
+ (UpgradableLock.class, Node.class, "mUTail");
+
+ private static final AtomicIntegerFieldUpdater<UpgradableLock> cStateRef =
+ AtomicIntegerFieldUpdater.newUpdater
+ (UpgradableLock.class, "mState");
+
+ // State mask bits for held locks. Read lock count is stored in lower 30 bits of state.
+ private static final int LOCK_STATE_UPGRADE = 0x40000000;
+ // Write state must be this value in order for quick sign check to work.
+ private static final int LOCK_STATE_WRITE = 0x80000000;
+
+ private static final int LOCK_STATE_MASK = LOCK_STATE_UPGRADE | LOCK_STATE_WRITE;
+
+ /**
+ * The number of nanoseconds for which it is faster to spin
+ * rather than to use timed park. A rough estimate suffices
+ * to improve responsiveness with very short timeouts.
+ */
+ private static final long SPIN_FOR_TIMEOUT_THRESHOLD = 1000L;
+
+ // Head of read-write queue.
+ private transient volatile Node mRWHead;
+
+ // Tail of read-write queue.
+ private transient volatile Node mRWTail;
+
+ // Head of write upgrade queue.
+ private transient volatile Node mUHead;
+
+ // Tail of write upgrade queue.
+ private transient volatile Node mUTail;
+
+ private transient volatile int mState;
+
+ // Owner holds an upgradable lock and possibly a write lock too.
+ private transient L mOwner;
+
+ // Counts number of times that owner has entered an upgradable or write lock.
+ private transient int mUpgradeCount;
+ private transient int mWriteCount;
+
+ public UpgradableLock() {
+ }
+
+ /**
+ * Acquire a shared read lock, possibly blocking indefinitely.
+ *
+ * @param locker object which might be write or upgrade lock owner
+ */
+ public final void lockForRead(L locker) {
+ if (!tryLockForRead(locker)) {
+ lockForReadQueued(locker, addReadWaiter());
+ }
+ }
+
+ /**
+ * Acquire a shared read lock, possibly blocking until interrupted.
+ *
+ * @param locker object which might be write or upgrade lock owner
+ */
+ public final void lockForReadInterruptibly(L locker) throws InterruptedException {
+ if (Thread.interrupted()) {
+ throw new InterruptedException();
+ }
+ if (!tryLockForRead(locker)) {
+ lockForReadQueuedInterruptibly(locker, addReadWaiter());
+ }
+ }
+
+ /**
+ * Attempt to immediately acquire a shared read lock.
+ *
+ * @param locker object which might be write or upgrade lock owner
+ * @return true if acquired
+ */
+ public final boolean tryLockForRead(L locker) {
+ int state = mState;
+ if (state >= 0) { // no write lock is held
+ if (isReadWriteFirst() || isReadLockHeld(locker)) {
+ do {
+ if (incrementReadLocks(state)) {
+ adjustReadLockCount(locker, 1);
+ return true;
+ }
+ // keep looping on CAS failure if a reader or upgrader mucked with the state
+ } while ((state = mState) >= 0);
+ }
+ } else if (mOwner == locker) {
+ // keep looping on CAS failure if a reader or upgrader mucked with the state
+ while (!incrementReadLocks(state)) {
+ state = mState;
+ }
+ adjustReadLockCount(locker, 1);
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Attempt to acquire a shared read lock, waiting a maximum amount of
+ * time.
+ *
+ * @param locker object which might be write or upgrade lock owner
+ * @return true if acquired
+ */
+ public final boolean tryLockForRead(L locker, long timeout, TimeUnit unit)
+ throws InterruptedException
+ {
+ if (Thread.interrupted()) {
+ throw new InterruptedException();
+ }
+ if (!tryLockForRead(locker)) {
+ return lockForReadQueuedInterruptibly(locker, addReadWaiter(), unit.toNanos(timeout));
+ }
+ return true;
+ }
+
+ /**
+ * Release a previously acquired read lock.
+ */
+ public final void unlockFromRead(L locker) {
+ adjustReadLockCount(locker, -1);
+
+ int readLocks;
+ while ((readLocks = decrementReadLocks(mState)) < 0) {}
+
+ if (readLocks == 0) {
+ Node h = mRWHead;
+ if (h != null && h.mWaitStatus != 0) {
+ unparkReadWriteSuccessor(h);
+ }
+ }
+ }
+
+ /**
+ * Acquire an upgrade lock, possibly blocking indefinitely.
+ *
+ * @param locker object trying to become lock owner
+ * @return true if acquired
+ */
+ public final boolean lockForUpgrade(L locker) {
+ return lockForUpgrade_(locker) != Result.FAILED;
+ }
+
+ /**
+ * Acquire an upgrade lock, possibly blocking indefinitely.
+ *
+ * @param locker object trying to become lock owner
+ * @return ACQUIRED or OWNED
+ */
+ private final Result lockForUpgrade_(L locker) {
+ Result result;
+ if ((result = tryLockForUpgrade_(locker)) == Result.FAILED) {
+ result = lockForUpgradeQueued(locker, addUpgradeWaiter());
+ }
+ return result;
+ }
+
+ /**
+ * Acquire an upgrade lock, possibly blocking until interrupted.
+ *
+ * @param locker object trying to become lock owner
+ * @return true if acquired
+ */
+ public final boolean lockForUpgradeInterruptibly(L locker) throws InterruptedException {
+ return lockForUpgradeInterruptibly_(locker) != Result.FAILED;
+ }
+
+ /**
+ * Acquire an upgrade lock, possibly blocking until interrupted.
+ *
+ * @param locker object trying to become lock owner
+ * @return ACQUIRED or OWNED
+ */
+ private final Result lockForUpgradeInterruptibly_(L locker) throws InterruptedException {
+ if (Thread.interrupted()) {
+ throw new InterruptedException();
+ }
+ Result result;
+ if ((result = tryLockForUpgrade_(locker)) == Result.FAILED) {
+ result = lockForUpgradeQueuedInterruptibly(locker, addUpgradeWaiter());
+ }
+ return result;
+ }
+
+ /**
+ * Attempt to immediately acquire an upgrade lock.
+ *
+ * @param locker object trying to become lock owner
+ * @return true if acquired
+ */
+ public final boolean tryLockForUpgrade(L locker) {
+ return tryLockForUpgrade_(locker) != Result.FAILED;
+ }
+
+ /**
+ * Attempt to immediately acquire an upgrade lock.
+ *
+ * @param locker object trying to become lock owner
+ * @return FAILED, ACQUIRED or OWNED
+ */
+ private final Result tryLockForUpgrade_(L locker) {
+ int state = mState;
+ if ((state & LOCK_STATE_MASK) == 0) { // no write or upgrade lock is held
+ if (isUpgradeFirst()) {
+ do {
+ if (setUpgradeLock(state)) {
+ mOwner = locker;
+ incrementUpgradeCount();
+ return Result.ACQUIRED;
+ }
+ // keep looping on CAS failure if a reader mucked with the state
+ } while (((state = mState) & LOCK_STATE_MASK) == 0);
+ }
+ } else if (mOwner == locker) {
+ incrementUpgradeCount();
+ return Result.OWNED;
+ }
+ return Result.FAILED;
+ }
+
+ /**
+ * Attempt to acquire an upgrade lock, waiting a maximum amount of time.
+ *
+ * @param locker object trying to become lock owner
+ * @return true if acquired
+ */
+ public final boolean tryLockForUpgrade(L locker, long timeout, TimeUnit unit)
+ throws InterruptedException
+ {
+ return tryLockForUpgrade_(locker, timeout, unit) != Result.FAILED;
+ }
+
+ /**
+ * Attempt to acquire an upgrade lock, waiting a maximum amount of time.
+ *
+ * @param locker object trying to become lock owner
+ * @return FAILED, ACQUIRED or OWNED
+ */
+ private final Result tryLockForUpgrade_(L locker, long timeout, TimeUnit unit)
+ throws InterruptedException
+ {
+ if (Thread.interrupted()) {
+ throw new InterruptedException();
+ }
+ Result result;
+ if ((result = tryLockForUpgrade_(locker)) == Result.FAILED) {
+ result = lockForUpgradeQueuedInterruptibly(locker, addUpgradeWaiter(),
+ unit.toNanos(timeout));
+ }
+ return result;
+ }
+
+ /**
+ * Release a previously acquired upgrade lock.
+ */
+ public final void unlockFromUpgrade(L locker) {
+ int upgradeCount = mUpgradeCount - 1;
+ if (upgradeCount < 0) {
+ throw new IllegalMonitorStateException("Too many upgrade locks released");
+ }
+ if (upgradeCount == 0 && mWriteCount > 0) {
+ // Don't release last upgrade lock and switch write lock to
+ // automatic upgrade mode.
+ clearUpgradeLock(mState);
+ return;
+ }
+ mUpgradeCount = upgradeCount;
+ if (upgradeCount > 0) {
+ return;
+ }
+
+ mOwner = null;
+
+ // keep looping on CAS failure if reader mucked with state
+ while (!clearUpgradeLock(mState)) {}
+
+ Node h = mUHead;
+ if (h != null && h.mWaitStatus != 0) {
+ unparkUpgradeSuccessor(h);
+ }
+ }
+
+ /**
+ * Acquire an exclusive write lock, possibly blocking indefinitely.
+ *
+ * @param locker object trying to become lock owner
+ */
+ public final void lockForWrite(L locker) {
+ Result writeResult;
+ if (!tryLockForWrite(locker)) {
+ Result upgradeResult = lockForUpgrade_(locker);
+ if (!tryLockForWrite(locker)) {
+ lockForWriteQueued(locker, addWriteWaiter());
+ }
+ if (upgradeResult == Result.ACQUIRED) {
+ // clear upgrade state bit to indicate automatic upgrade
+ while (!clearUpgradeLock(mState)) {}
+ } else {
+ // undo automatic upgrade count increment
+ mUpgradeCount--;
+ }
+ }
+ }
+
+ /**
+ * Acquire an exclusive write lock, possibly blocking until interrupted.
+ *
+ * @param locker object trying to become lock owner
+ */
+ public final void lockForWriteInterruptibly(L locker) throws InterruptedException {
+ if (Thread.interrupted()) {
+ throw new InterruptedException();
+ }
+ if (!tryLockForWrite(locker)) {
+ Result upgradeResult = lockForUpgradeInterruptibly_(locker);
+ if (!tryLockForWrite(locker)) {
+ lockForWriteQueuedInterruptibly(locker, addWriteWaiter());
+ }
+ if (upgradeResult == Result.ACQUIRED) {
+ // clear upgrade state bit to indicate automatic upgrade
+ while (!clearUpgradeLock(mState)) {}
+ } else {
+ // undo automatic upgrade count increment
+ mUpgradeCount--;
+ }
+ }
+ }
+
+ /**
+ * Attempt to immediately acquire an exclusive lock.
+ *
+ * @param locker object trying to become lock owner
+ * @return true if acquired
+ */
+ public final boolean tryLockForWrite(L locker) {
+ int state = mState;
+ if (state == 0) {
+ // no locks are held
+ if (isUpgradeOrReadWriteFirst() && setWriteLock(state)) {
+ // keep upgrade state bit clear to indicate automatic upgrade
+ mOwner = locker;
+ incrementUpgradeCount();
+ incrementWriteCount();
+ return true;
+ }
+ } else if (state == LOCK_STATE_UPGRADE) {
+ // only upgrade lock is held; upgrade to full write lock
+ if (mOwner == locker && setWriteLock(state)) {
+ incrementWriteCount();
+ return true;
+ }
+ } else if (state < 0) {
+ // write lock is held, and upgrade lock might be held too
+ if (mOwner == locker) {
+ incrementWriteCount();
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Attempt to acquire an exclusive lock, waiting a maximum amount of time.
+ *
+ * @param locker object trying to become lock owner
+ * @return true if acquired
+ */
+ public final boolean tryLockForWrite(L locker, long timeout, TimeUnit unit)
+ throws InterruptedException
+ {
+ if (Thread.interrupted()) {
+ throw new InterruptedException();
+ }
+ if (!tryLockForWrite(locker)) {
+ return lockForWriteQueuedInterruptibly(locker, addWriteWaiter(),
+ unit.toNanos(timeout));
+ }
+ return true;
+ }
+
+ /**
+ * Release a previously acquired write lock.
+ */
+ public final void unlockFromWrite(L locker) {
+ int writeCount = mWriteCount - 1;
+ if (writeCount < 0) {
+ throw new IllegalMonitorStateException("Too many write locks released");
+ }
+ mWriteCount = writeCount;
+ if (writeCount > 0) {
+ return;
+ }
+
+ // copy original state to check if upgrade lock was automatic
+ final int state = mState;
+
+ // make sure upgrade lock is still held after releasing write lock
+ mState = LOCK_STATE_UPGRADE;
+
+ Node h = mRWHead;
+ if (h != null && h.mWaitStatus != 0) {
+ unparkReadWriteSuccessor(h);
+ }
+
+ if (state == LOCK_STATE_WRITE) {
+ // upgrade owner was automatically set, so automatically clear it
+ unlockFromUpgrade(locker);
+ }
+ }
+
+ public String toString() {
+ int state = mState;
+ int readLocks = state & ~LOCK_STATE_MASK;
+ int upgradeLocks = mUpgradeCount;
+ int writeLocks = mWriteCount;
+
+ return super.toString()
+ + "[Read locks = " + readLocks
+ + ", Upgrade locks = " + upgradeLocks
+ + ", Write locks = " + writeLocks
+ + ", Owner = " + mOwner
+ + ']';
+ }
+
+ /**
+ * Add or subtract to the count of read locks held for the given
+ * locker. Default implementation does nothing, and so read locks are not
+ * reentrant.
+ *
+ * @throws IllegalMonitorStateException if count overflows or underflows
+ */
+ protected void adjustReadLockCount(L locker, int amount) {
+ }
+
+ /**
+ * Default implementation does nothing and always returns false, and so
+ * read locks are not reentrant. Overridden implementation may choose to
+ * always returns true, in which case read lock requests can starve upgrade
+ * and write lock requests.
+ */
+ protected boolean isReadLockHeld(L locker) {
+ return false;
+ }
+
+ private Node enqForReadWrite(final Node node) {
+ for (;;) {
+ Node t = mRWTail;
+ if (t == null) { // Must initialize
+ Node h = new Node(); // Dummy header
+ h.mNext = node;
+ node.mPrev = h;
+ if (cRWHeadRef.compareAndSet(this, null, h)) {
+ mRWTail = node;
+ return h;
+ }
+ } else {
+ node.mPrev = t;
+ if (cRWTailRef.compareAndSet(this, t, node)) {
+ t.mNext = node;
+ return t;
+ }
+ }
+ }
+ }
+
+ private Node enqForUpgrade(final Node node) {
+ for (;;) {
+ Node t = mUTail;
+ if (t == null) { // Must initialize
+ Node h = new Node(); // Dummy header
+ h.mNext = node;
+ node.mPrev = h;
+ if (cUHeadRef.compareAndSet(this, null, h)) {
+ mUTail = node;
+ return h;
+ }
+ } else {
+ node.mPrev = t;
+ if (cUTailRef.compareAndSet(this, t, node)) {
+ t.mNext = node;
+ return t;
+ }
+ }
+ }
+ }
+
+ private Node addReadWaiter() {
+ return addReadWriteWaiter(true);
+ }
+
+ private Node addWriteWaiter() {
+ return addReadWriteWaiter(false);
+ }
+
+ private Node addReadWriteWaiter(boolean shared) {
+ Node node = new Node(Thread.currentThread(), shared);
+ // Try the fast path of enq; backup to full enq on failure
+ Node pred = mRWTail;
+ if (pred != null) {
+ node.mPrev = pred;
+ if (cRWTailRef.compareAndSet(this, pred, node)) {
+ pred.mNext = node;
+ return node;
+ }
+ }
+ enqForReadWrite(node);
+ return node;
+ }
+
+ private Node addUpgradeWaiter() {
+ Node node = new Node(Thread.currentThread(), false);
+ // Try the fast path of enq; backup to full enq on failure
+ Node pred = mUTail;
+ if (pred != null) {
+ node.mPrev = pred;
+ if (cUTailRef.compareAndSet(this, pred, node)) {
+ pred.mNext = node;
+ return node;
+ }
+ }
+ enqForUpgrade(node);
+ return node;
+ }
+
+ private void setReadWriteHead(Node node) {
+ mRWHead = node;
+ node.mThread = null;
+ node.mPrev = null;
+ }
+
+ private void setUpgradeHead(Node node) {
+ mUHead = node;
+ node.mThread = null;
+ node.mPrev = null;
+ }
+
+ private void unparkReadWriteSuccessor(Node node) {
+ Node.cWaitStatusRef.compareAndSet(node, Node.SIGNAL, 0);
+
+ Node s = node.mNext;
+ if (s == null || s.mWaitStatus > 0) {
+ s = null;
+ for (Node t = mRWTail; t != null && t != node; t = t.mPrev) {
+ if (t.mWaitStatus <= 0) {
+ s = t;
+ }
+ }
+ }
+ if (s != null) {
+ LockSupport.unpark(s.mThread);
+ }
+ }
+
+ private void unparkUpgradeSuccessor(Node node) {
+ Node.cWaitStatusRef.compareAndSet(node, Node.SIGNAL, 0);
+
+ Node s = node.mNext;
+ if (s == null || s.mWaitStatus > 0) {
+ s = null;
+ for (Node t = mUTail; t != null && t != node; t = t.mPrev) {
+ if (t.mWaitStatus <= 0) {
+ s = t;
+ }
+ }
+ }
+ if (s != null) {
+ LockSupport.unpark(s.mThread);
+ }
+ }
+
+ private void setReadWriteHeadAndPropagate(Node node) {
+ setReadWriteHead(node);
+ if (node.mWaitStatus != 0) {
+ Node s = node.mNext;
+ if (s == null || s.mShared) {
+ unparkReadWriteSuccessor(node);
+ }
+ }
+ }
+
+ private void cancelAcquireReadWrite(Node node) {
+ if (node != null) {
+ node.mThread = null;
+ node.mWaitStatus = Node.CANCELLED;
+ unparkReadWriteSuccessor(node);
+ }
+ }
+
+ private void cancelAcquireUpgrade(Node node) {
+ if (node != null) {
+ node.mThread = null;
+ node.mWaitStatus = Node.CANCELLED;
+ unparkUpgradeSuccessor(node);
+ }
+ }
+
+ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
+ int s = pred.mWaitStatus;
+ if (s < 0) {
+ return true;
+ }
+ if (s > 0) {
+ node.mPrev = pred.mPrev;
+ } else {
+ Node.cWaitStatusRef.compareAndSet(pred, 0, Node.SIGNAL);
+ }
+ return false;
+ }
+
+ private static void selfInterrupt() {
+ Thread.currentThread().interrupt();
+ }
+
+ private final boolean parkAndCheckInterrupt() {
+ LockSupport.park(this);
+ return Thread.interrupted();
+ }
+
+ /**
+ * @return ACQUIRED or OWNED
+ */
+ private final Result lockForUpgradeQueued(L locker, final Node node) {
+ try {
+ boolean interrupted = false;
+ for (;;) {
+ final Node p = node.predecessor();
+ Result result;
+ if (p == mUHead && (result = tryLockForUpgrade_(locker)) != Result.FAILED) {
+ setUpgradeHead(node);
+ p.mNext = null; // help GC
+ if (interrupted) {
+ selfInterrupt();
+ }
+ return result;
+ }
+ if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
+ interrupted = true;
+ }
+ }
+ } catch (RuntimeException e) {
+ cancelAcquireUpgrade(node);
+ throw e;
+ }
+ }
+
+ /**
+ * @return ACQUIRED or OWNED
+ */
+ private final Result lockForUpgradeQueuedInterruptibly(L locker, final Node node)
+ throws InterruptedException
+ {
+ try {
+ for (;;) {
+ final Node p = node.predecessor();
+ Result result;
+ if (p == mUHead && (result = tryLockForUpgrade_(locker)) != Result.FAILED) {
+ setUpgradeHead(node);
+ p.mNext = null; // help GC
+ return result;
+ }
+ if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
+ break;
+ }
+ }
+ } catch (RuntimeException e) {
+ cancelAcquireUpgrade(node);
+ throw e;
+ }
+ // Arrive here only if interrupted
+ cancelAcquireUpgrade(node);
+ throw new InterruptedException();
+ }
+
+ /**
+ * @return FAILED, ACQUIRED or OWNED
+ */
+ private final Result lockForUpgradeQueuedInterruptibly(L locker, final Node node,
+ long nanosTimeout)
+ throws InterruptedException
+ {
+ long lastTime = System.nanoTime();
+ try {
+ for (;;) {
+ final Node p = node.predecessor();
+ Result result;
+ if (p == mUHead && (result = tryLockForUpgrade_(locker)) != Result.FAILED) {
+ setUpgradeHead(node);
+ p.mNext = null; // help GC
+ return result;
+ }
+ if (nanosTimeout <= 0) {
+ cancelAcquireUpgrade(node);
+ return Result.FAILED;
+ }
+ if (nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD
+ && shouldParkAfterFailedAcquire(p, node))
+ {
+ LockSupport.parkNanos(this, nanosTimeout);
+ }
+ long now = System.nanoTime();
+ nanosTimeout -= now - lastTime;
+ lastTime = now;
+ if (Thread.interrupted()) {
+ break;
+ }
+ }
+ } catch (RuntimeException e) {
+ cancelAcquireUpgrade(node);
+ throw e;
+ }
+ // Arrive here only if interrupted
+ cancelAcquireUpgrade(node);
+ throw new InterruptedException();
+ }
+
+ private final void lockForReadQueued(L locker, final Node node) {
+ try {
+ boolean interrupted = false;
+ for (;;) {
+ final Node p = node.predecessor();
+ if (p == mRWHead && tryLockForRead(locker)) {
+ setReadWriteHeadAndPropagate(node);
+ p.mNext = null; // help GC
+ if (interrupted) {
+ selfInterrupt();
+ }
+ return;
+ }
+ if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
+ interrupted = true;
+ }
+ }
+ } catch (RuntimeException e) {
+ cancelAcquireReadWrite(node);
+ throw e;
+ }
+ }
+
+ private final void lockForReadQueuedInterruptibly(L locker, final Node node)
+ throws InterruptedException
+ {
+ try {
+ for (;;) {
+ final Node p = node.predecessor();
+ if (p == mRWHead && tryLockForRead(locker)) {
+ setReadWriteHeadAndPropagate(node);
+ p.mNext = null; // help GC
+ return;
+ }
+ if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
+ break;
+ }
+ }
+ } catch (RuntimeException e) {
+ cancelAcquireReadWrite(node);
+ throw e;
+ }
+ // Arrive here only if interrupted
+ cancelAcquireReadWrite(node);
+ throw new InterruptedException();
+ }
+
+ /**
+ * @return true if acquired
+ */
+ private final boolean lockForReadQueuedInterruptibly(L locker, final Node node,
+ long nanosTimeout)
+ throws InterruptedException
+ {
+ long lastTime = System.nanoTime();
+ try {
+ for (;;) {
+ final Node p = node.predecessor();
+ if (p == mRWHead && tryLockForRead(locker)) {
+ setReadWriteHeadAndPropagate(node);
+ p.mNext = null; // help GC
+ return true;
+ }
+ if (nanosTimeout <= 0) {
+ cancelAcquireReadWrite(node);
+ return false;
+ }
+ if (nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD
+ && shouldParkAfterFailedAcquire(p, node))
+ {
+ LockSupport.parkNanos(this, nanosTimeout);
+ }
+ long now = System.nanoTime();
+ nanosTimeout -= now - lastTime;
+ lastTime = now;
+ if (Thread.interrupted()) {
+ break;
+ }
+ }
+ } catch (RuntimeException e) {
+ cancelAcquireReadWrite(node);
+ throw e;
+ }
+ // Arrive here only if interrupted
+ cancelAcquireReadWrite(node);
+ throw new InterruptedException();
+ }
+
+ private final void lockForWriteQueued(L locker, final Node node) {
+ try {
+ boolean interrupted = false;
+ for (;;) {
+ final Node p = node.predecessor();
+ if (p == mRWHead && tryLockForWrite(locker)) {
+ setReadWriteHead(node);
+ p.mNext = null; // help GC
+ if (interrupted) {
+ selfInterrupt();
+ }
+ return;
+ }
+ if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
+ interrupted = true;
+ }
+ }
+ } catch (RuntimeException e) {
+ cancelAcquireReadWrite(node);
+ throw e;
+ }
+ }
+
+ private final void lockForWriteQueuedInterruptibly(L locker, final Node node)
+ throws InterruptedException
+ {
+ try {
+ for (;;) {
+ final Node p = node.predecessor();
+ if (p == mRWHead && tryLockForWrite(locker)) {
+ setReadWriteHead(node);
+ p.mNext = null; // help GC
+ return;
+ }
+ if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
+ break;
+ }
+ }
+ } catch (RuntimeException e) {
+ cancelAcquireReadWrite(node);
+ throw e;
+ }
+ // Arrive here only if interrupted
+ cancelAcquireReadWrite(node);
+ throw new InterruptedException();
+ }
+
+ /**
+ * @return true if acquired
+ */
+ private final boolean lockForWriteQueuedInterruptibly(L locker, final Node node,
+ long nanosTimeout)
+ throws InterruptedException
+ {
+ long lastTime = System.nanoTime();
+ try {
+ for (;;) {
+ final Node p = node.predecessor();
+ if (p == mRWHead && tryLockForWrite(locker)) {
+ setReadWriteHead(node);
+ p.mNext = null; // help GC
+ return true;
+ }
+ if (nanosTimeout <= 0) {
+ cancelAcquireReadWrite(node);
+ return false;
+ }
+ if (nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD
+ && shouldParkAfterFailedAcquire(p, node))
+ {
+ LockSupport.parkNanos(this, nanosTimeout);
+ }
+ long now = System.nanoTime();
+ nanosTimeout -= now - lastTime;
+ lastTime = now;
+ if (Thread.interrupted()) {
+ break;
+ }
+ }
+ } catch (RuntimeException e) {
+ cancelAcquireReadWrite(node);
+ throw e;
+ }
+ // Arrive here only if interrupted
+ cancelAcquireReadWrite(node);
+ throw new InterruptedException();
+ }
+
+ private final boolean isReadWriteFirst() {
+ Node h;
+ if ((h = mRWHead) == null) {
+ return true;
+ }
+ Thread current = Thread.currentThread();
+ Node s;
+ return ((s = h.mNext) != null && s.mThread == current) || fullIsReadWriteFirst(current);
+ }
+
+ private final boolean fullIsReadWriteFirst(Thread current) {
+ Node h, s;
+ Thread firstThread = null;
+ if (((h = mRWHead) != null && (s = h.mNext) != null &&
+ s.mPrev == mRWHead && (firstThread = s.mThread) != null))
+ {
+ return firstThread == current;
+ }
+ Node t = mRWTail;
+ while (t != null && t != mRWHead) {
+ Thread tt = t.mThread;
+ if (tt != null) {
+ firstThread = tt;
+ }
+ t = t.mPrev;
+ }
+ return firstThread == current || firstThread == null;
+ }
+
+ private final boolean isUpgradeFirst() {
+ Node h;
+ if ((h = mUHead) == null) {
+ return true;
+ }
+ Thread current = Thread.currentThread();
+ Node s;
+ return ((s = h.mNext) != null && s.mThread == current) || fullIsUpgradeFirst(current);
+ }
+
+ private final boolean fullIsUpgradeFirst(Thread current) {
+ Node h, s;
+ Thread firstThread = null;
+ if (((h = mUHead) != null && (s = h.mNext) != null &&
+ s.mPrev == mUHead && (firstThread = s.mThread) != null))
+ {
+ return firstThread == current;
+ }
+ Node t = mUTail;
+ while (t != null && t != mUHead) {
+ Thread tt = t.mThread;
+ if (tt != null) {
+ firstThread = tt;
+ }
+ t = t.mPrev;
+ }
+ return firstThread == current || firstThread == null;
+ }
+
+ private final boolean isUpgradeOrReadWriteFirst() {
+ Node uh, rwh;
+ if ((uh = mUHead) == null || (rwh = mRWHead) == null) {
+ return true;
+ }
+ Thread current = Thread.currentThread();
+ Node us, rws;
+ return ((us = uh.mNext) != null && us.mThread == current)
+ || ((rws = rwh.mNext) != null && rws.mThread == current)
+ || fullIsUpgradeFirst(current)
+ || fullIsReadWriteFirst(current);
+ }
+
+ /**
+ * @return false if state changed
+ */
+ private boolean incrementReadLocks(int state) {
+ int readLocks = (state & ~LOCK_STATE_MASK) + 1;
+ if (readLocks == LOCK_STATE_MASK) {
+ throw new IllegalMonitorStateException("Maximum read lock count exceeded");
+ }
+ return cStateRef.compareAndSet(this, state, state & LOCK_STATE_MASK | readLocks);
+ }
+
+ /**
+ * @return number of remaining read locks or negative if concurrent
+ * modification prevented operation
+ */
+ private int decrementReadLocks(int state) {
+ int readLocks = (state & ~LOCK_STATE_MASK) - 1;
+ if (readLocks < 0) {
+ throw new IllegalMonitorStateException("Too many read locks released");
+ }
+ if (cStateRef.compareAndSet(this, state, state & LOCK_STATE_MASK | readLocks)) {
+ return readLocks;
+ }
+ return -1;
+ }
+
+ /**
+ * @return false if concurrent modification prevented operation
+ */
+ private boolean setUpgradeLock(int state) {
+ return cStateRef.compareAndSet(this, state, state | LOCK_STATE_UPGRADE);
+ }
+
+ /**
+ * @return false if concurrent modification prevented operation
+ */
+ private boolean clearUpgradeLock(int state) {
+ return cStateRef.compareAndSet(this, state, state & ~LOCK_STATE_UPGRADE);
+ }
+
+ private void incrementUpgradeCount() {
+ int upgradeCount = mUpgradeCount + 1;
+ if (upgradeCount < 0) {
+ throw new IllegalMonitorStateException("Maximum upgrade lock count exceeded");
+ }
+ mUpgradeCount = upgradeCount;
+ }
+
+ /**
+ * @return false if concurrent modification prevented operation
+ */
+ private boolean setWriteLock(int state) {
+ return cStateRef.compareAndSet(this, state, state | LOCK_STATE_WRITE);
+ }
+
+ private void incrementWriteCount() {
+ int writeCount = mWriteCount + 1;
+ if (writeCount < 0) {
+ throw new IllegalMonitorStateException("Maximum write lock count exceeded");
+ }
+ mWriteCount = writeCount;
+ }
+
+ /**
+ * Node class ripped off from AbstractQueuedSynchronizer and modified
+ * slightly. Read the comments in that class for better understanding.
+ */
+ static final class Node {
+ static final AtomicIntegerFieldUpdater<Node> cWaitStatusRef =
+ AtomicIntegerFieldUpdater.newUpdater(Node.class, "mWaitStatus");
+
+ static final int CANCELLED = 1;
+ static final int SIGNAL = -1;
+
+ volatile int mWaitStatus;
+ volatile Node mPrev;
+ volatile Node mNext;
+ volatile Thread mThread;
+
+ final boolean mShared;
+
+ // Used to establish initial head
+ Node() {
+ mShared = false;
+ }
+
+ Node(Thread thread, boolean shared) {
+ mThread = thread;
+ mShared = shared;
+ }
+
+ final Node predecessor() throws NullPointerException {
+ Node p = mPrev;
+ if (p == null) {
+ throw new NullPointerException();
+ } else {
+ return p;
+ }
+ }
+ }
+}
diff --git a/src/main/java/com/amazon/carbonado/repo/map/package-info.java b/src/main/java/com/amazon/carbonado/repo/map/package-info.java
new file mode 100644
index 0000000..b386251
--- /dev/null
+++ b/src/main/java/com/amazon/carbonado/repo/map/package-info.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2008 Amazon Technologies, Inc. or its affiliates.
+ * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
+ * of Amazon Technologies, Inc. or its affiliates. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Volatile repository implementation backed by a concurrent map.
+ *
+ * @see com.amazon.carbonado.repo.map.MapRepositoryBuilder
+ */
+package com.amazon.carbonado.repo.map;