diff options
Diffstat (limited to 'src/main/java')
9 files changed, 3058 insertions, 0 deletions
| diff --git a/src/main/java/com/amazon/carbonado/repo/map/Key.java b/src/main/java/com/amazon/carbonado/repo/map/Key.java new file mode 100644 index 0000000..5032f66 --- /dev/null +++ b/src/main/java/com/amazon/carbonado/repo/map/Key.java @@ -0,0 +1,273 @@ +/*
 + * Copyright 2008 Amazon Technologies, Inc. or its affiliates.
 + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
 + * of Amazon Technologies, Inc. or its affiliates.  All rights reserved.
 + *
 + * Licensed under the Apache License, Version 2.0 (the "License");
 + * you may not use this file except in compliance with the License.
 + * You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package com.amazon.carbonado.repo.map;
 +
 +import java.lang.reflect.UndeclaredThrowableException;
 +
 +import java.util.ArrayList;
 +import java.util.Comparator;
 +import java.util.List;
 +import java.util.Map;
 +
 +import org.cojen.classfile.ClassFile;
 +import org.cojen.classfile.CodeBuilder;
 +import org.cojen.classfile.Label;
 +import org.cojen.classfile.LocalVariable;
 +import org.cojen.classfile.MethodInfo;
 +import org.cojen.classfile.Modifiers;
 +import org.cojen.classfile.TypeDesc;
 +
 +import org.cojen.util.ClassInjector;
 +import org.cojen.util.SoftValuedHashMap;
 +
 +import com.amazon.carbonado.Storable;
 +
 +import com.amazon.carbonado.info.OrderedProperty;
 +import com.amazon.carbonado.info.StorableInfo;
 +import com.amazon.carbonado.info.StorableIntrospector;
 +import com.amazon.carbonado.info.StorableProperty;
 +
 +import com.amazon.carbonado.gen.CodeBuilderUtil;
 +
 +/**
 + * 
 + *
 + * @author Brian S O'Neill
 + */
 +class Key<S extends Storable> implements Comparable<Key<S>> {
 +    protected final S mStorable;
 +    protected final Comparator<S> mComparator;
 +
 +    Key(S storable, Comparator<S> comparator) {
 +        mStorable = storable;
 +        mComparator = comparator;
 +    }
 +
 +    @Override
 +    public String toString() {
 +        return mStorable.toString();
 +    }
 +
 +    public int compareTo(Key<S> other) {
 +        int result = mComparator.compare(mStorable, other.mStorable);
 +        if (result == 0) {
 +            int t1 = tieBreaker();
 +            int t2 = other.tieBreaker();
 +            if (t1 < t2) {
 +                result = -1;
 +            } else if (t1 > t2) {
 +                result = 1;
 +            }
 +        }
 +        return result;
 +    }
 +
 +    protected int tieBreaker() {
 +        return 0;
 +    }
 +
 +    public static interface Assigner<S extends Storable> {
 +        void setKeyValues(S storable, Object[] identityValues);
 +
 +        void setKeyValues(S storable, Object[] identityValues, Object rangeValue);
 +    }
 +
 +    private static final Map<Class, Assigner> mAssigners;
 +
 +    static {
 +        mAssigners = new SoftValuedHashMap();
 +    }
 +
 +    public static synchronized <S extends Storable> Assigner<S> getAssigner(Class<S> clazz) {
 +        Assigner<S> assigner = mAssigners.get(clazz);
 +        if (assigner == null) {
 +            assigner = createAssigner(clazz);
 +            mAssigners.put(clazz, assigner);
 +        }
 +        return assigner;
 +    }
 +
 +    private static <S extends Storable> Assigner<S> createAssigner(Class<S> clazz) {
 +        final StorableInfo<S> info = StorableIntrospector.examine(clazz);
 +
 +        ClassInjector ci = ClassInjector.create(clazz.getName(), clazz.getClassLoader());
 +        ClassFile cf = new ClassFile(ci.getClassName());
 +        cf.addInterface(Assigner.class);
 +        cf.markSynthetic();
 +        cf.setSourceFile(Key.class.getName());
 +        cf.setTarget("1.5");
 +
 +        cf.addDefaultConstructor();
 +
 +        // Define required setKeyValues methods.
 +
 +        List<OrderedProperty<S>> pk =
 +            new ArrayList<OrderedProperty<S>>(info.getPrimaryKey().getProperties());
 +
 +        TypeDesc storableType = TypeDesc.forClass(Storable.class);
 +        TypeDesc storableArrayType = storableType.toArrayType();
 +        TypeDesc userStorableType = TypeDesc.forClass(info.getStorableType());
 +        TypeDesc objectArrayType = TypeDesc.OBJECT.toArrayType();
 +
 +        MethodInfo mi = cf.addMethod(Modifiers.PUBLIC, "setKeyValues", null,
 +                                     new TypeDesc[] {storableType, objectArrayType});
 +
 +        CodeBuilder b = new CodeBuilder(mi);
 +
 +        LocalVariable userStorableVar = b.createLocalVariable(null, userStorableType);
 +        b.loadLocal(b.getParameter(0));
 +        b.checkCast(userStorableType);
 +        b.storeLocal(userStorableVar);
 +
 +        // Switch on the number of values supplied.
 +
 +        /* Switch looks like this for two pk properties:
 +
 +        switch(identityValues.length) {
 +        default:
 +            throw new IllegalArgumentException();
 +        case 2:
 +            storable.setPkProp2(identityValues[1]);
 +        case 1:
 +            storable.setPkProp1(identityValues[0]);
 +        case 0:
 +        }
 +
 +        */
 +
 +        b.loadLocal(b.getParameter(1));
 +        b.arrayLength();
 +
 +        int[] cases = new int[pk.size() + 1];
 +        Label[] labels = new Label[pk.size() + 1];
 +
 +        for (int i=0; i<labels.length; i++) {
 +            cases[i] = pk.size() - i;
 +            labels[i] = b.createLabel();
 +        }
 +
 +        Label defaultLabel = b.createLabel();
 +
 +        b.switchBranch(cases, labels, defaultLabel);
 +
 +        for (int i=0; i<labels.length; i++) {
 +            labels[i].setLocation();
 +            int prop = cases[i] - 1;
 +            if (prop >= 0) {
 +                b.loadLocal(userStorableVar);
 +                b.loadLocal(b.getParameter(1));
 +                b.loadConstant(prop);
 +                b.loadFromArray(storableArrayType);
 +                callSetPropertyValue(b, pk.get(prop));
 +            }
 +            // Fall through to next case.
 +        }
 +
 +        b.returnVoid();
 +
 +        defaultLabel.setLocation();
 +        CodeBuilderUtil.throwException(b, IllegalArgumentException.class, null);
 +
 +        // The setKeyValues method that takes a range value calls the other
 +        // setKeyValues method first, to take care of the identityValues.
 +
 +        mi = cf.addMethod(Modifiers.PUBLIC, "setKeyValues", null,
 +                          new TypeDesc[] {storableType, objectArrayType, TypeDesc.OBJECT});
 +
 +        b = new CodeBuilder(mi);
 +
 +        b.loadThis();
 +        b.loadLocal(b.getParameter(0));
 +        b.loadLocal(b.getParameter(1));
 +        b.invokeVirtual("setKeyValues", null, new TypeDesc[] {storableType, objectArrayType});
 +
 +        userStorableVar = b.createLocalVariable(null, userStorableType);
 +        b.loadLocal(b.getParameter(0));
 +        b.checkCast(userStorableType);
 +        b.storeLocal(userStorableVar);
 +
 +        // Switch on the number of values supplied.
 +
 +        /* Switch looks like this for two pk properties:
 +
 +        switch(identityValues.length) {
 +        default:
 +            throw new IllegalArgumentException();
 +        case 0:
 +            storable.setPkProp1(rangeValue);
 +            return;
 +        case 1:
 +            storable.setPkProp2(rangeValue);
 +            return;
 +        }
 +
 +        */
 +
 +        b.loadLocal(b.getParameter(1));
 +        b.arrayLength();
 +
 +        cases = new int[pk.size()];
 +        labels = new Label[pk.size()];
 +
 +        for (int i=0; i<labels.length; i++) {
 +            cases[i] = i;
 +            labels[i] = b.createLabel();
 +        }
 +
 +        defaultLabel = b.createLabel();
 +
 +        b.switchBranch(cases, labels, defaultLabel);
 +
 +        for (int i=0; i<labels.length; i++) {
 +            labels[i].setLocation();
 +            int prop = cases[i];
 +            b.loadLocal(userStorableVar);
 +            b.loadLocal(b.getParameter(2));
 +            callSetPropertyValue(b, pk.get(prop));
 +            b.returnVoid();
 +        }
 +
 +        defaultLabel.setLocation();
 +        CodeBuilderUtil.throwException(b, IllegalArgumentException.class, null);
 +
 +        try {
 +            return (Assigner<S>) ci.defineClass(cf).newInstance();
 +        } catch (IllegalAccessException e) {
 +            throw new UndeclaredThrowableException(e);
 +        } catch (InstantiationException e) {
 +            throw new UndeclaredThrowableException(e);
 +        }
 +    }
 +
 +    /**
 +     * Creates code to call set method. Assumes Storable and property value
 +     * are already on the stack.
 +     */
 +    private static void callSetPropertyValue(CodeBuilder b, OrderedProperty<?> op) {
 +        StorableProperty<?> property = op.getChainedProperty().getLastProperty();
 +        TypeDesc propType = TypeDesc.forClass(property.getType());
 +        if (propType != TypeDesc.OBJECT) {
 +            TypeDesc objectType = propType.toObjectType();
 +            b.checkCast(objectType);
 +            // Potentially unbox primitive.
 +            b.convert(objectType, propType);
 +        }
 +        b.invoke(property.getWriteMethod());
 +    }
 +}
 diff --git a/src/main/java/com/amazon/carbonado/repo/map/MapCursor.java b/src/main/java/com/amazon/carbonado/repo/map/MapCursor.java new file mode 100644 index 0000000..cd0f7e3 --- /dev/null +++ b/src/main/java/com/amazon/carbonado/repo/map/MapCursor.java @@ -0,0 +1,173 @@ +/*
 + * Copyright 2008 Amazon Technologies, Inc. or its affiliates.
 + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
 + * of Amazon Technologies, Inc. or its affiliates.  All rights reserved.
 + *
 + * Licensed under the Apache License, Version 2.0 (the "License");
 + * you may not use this file except in compliance with the License.
 + * You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package com.amazon.carbonado.repo.map;
 +
 +import java.util.ConcurrentModificationException;
 +import java.util.Iterator;
 +import java.util.NoSuchElementException;
 +
 +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 +
 +import com.amazon.carbonado.FetchException;
 +import com.amazon.carbonado.IsolationLevel;
 +import com.amazon.carbonado.Storable;
 +
 +import com.amazon.carbonado.cursor.AbstractCursor;
 +
 +import com.amazon.carbonado.spi.TransactionScope;
 +
 +/**
 + * Returns copies of Storables that it iterates over.
 + *
 + * @author Brian S O'Neill
 + */
 +class MapCursor<S extends Storable> extends AbstractCursor<S> {
 +    private static final AtomicReferenceFieldUpdater<MapCursor, Iterator> cIteratorRef =
 +        AtomicReferenceFieldUpdater.newUpdater
 +        (MapCursor.class, Iterator.class, "mIterator");
 +
 +    private final MapStorage<S> mStorage;
 +    private final TransactionScope<MapTransaction> mScope;
 +    private final MapTransaction mTxn;
 +    private final boolean mIsForUpdate;
 +
 +    private volatile Iterator<S> mIterator;
 +
 +    MapCursor(MapStorage<S> storage,
 +              TransactionScope<MapTransaction> scope,
 +              Iterable<S> iterable)
 +        throws Exception
 +    {
 +        MapTransaction txn = scope.getTxn();
 +
 +        mStorage = storage;
 +        mScope = scope;
 +        mTxn = txn;
 +
 +        if (txn == null) {
 +            mStorage.mLock.lockForRead(scope);
 +            mIsForUpdate = false;
 +        } else {
 +            // Since lock is so coarse, all reads in transaction scope are
 +            // upgrade to avoid deadlocks.
 +            txn.lockForUpgrade(mStorage.mLock, mIsForUpdate = scope.isForUpdate());
 +        }
 +
 +        scope.register(storage.getStorableType(), this);
 +        mIterator = iterable.iterator();
 +    }
 +
 +    public void close() {
 +        Iterator<S> it = mIterator;
 +        if (it != null) {
 +            if (cIteratorRef.compareAndSet(this, it, null)) {
 +                UpgradableLock lock = mStorage.mLock;
 +                if (mTxn == null) {
 +                    lock.unlockFromRead(mScope);
 +                } else {
 +                    mTxn.unlockFromUpgrade(lock, mIsForUpdate);
 +                }
 +                mScope.unregister(mStorage.getStorableType(), this);
 +            }
 +        }
 +    }
 +
 +    public boolean hasNext() throws FetchException {
 +        Iterator<S> it = mIterator;
 +        try {
 +            if (it != null && it.hasNext()) {
 +                return true;
 +            } else {
 +                close();
 +            }
 +            return false;
 +        } catch (ConcurrentModificationException e) {
 +            close();
 +            throw new FetchException(e);
 +        } catch (Error e) {
 +            try {
 +                close();
 +            } catch (Error e2) {
 +                // Ignore.
 +            }
 +            throw e;
 +        }
 +    }
 +
 +    public S next() throws FetchException {
 +        Iterator<S> it = mIterator;
 +        if (it == null) {
 +            close();
 +            throw new NoSuchElementException();
 +        }
 +        try {
 +            S next = mStorage.copyAndFireLoadTrigger(it.next());
 +            if (!hasNext()) {
 +                close();
 +            }
 +            return next;
 +        } catch (ConcurrentModificationException e) {
 +            close();
 +            throw new FetchException(e);
 +        } catch (Error e) {
 +            try {
 +                close();
 +            } catch (Error e2) {
 +                // Ignore.
 +            }
 +            throw e;
 +        }
 +    }
 +
 +    @Override
 +    public int skipNext(int amount) throws FetchException {
 +        if (amount <= 0) {
 +            if (amount < 0) {
 +                throw new IllegalArgumentException("Cannot skip negative amount: " + amount);
 +            }
 +            return 0;
 +        }
 +
 +        // Skip over entries without copying them.
 +
 +        int count = 0;
 +        Iterator<S> it = mIterator;
 +
 +        if (it != null) {
 +            try {
 +                while (--amount >= 0 && it.hasNext()) {
 +                    it.next();
 +                    count++;
 +                }
 +            } catch (ConcurrentModificationException e) {
 +                close();
 +                throw new FetchException(e);
 +            } catch (Error e) {
 +                try {
 +                    close();
 +                } catch (Error e2) {
 +                    // Ignore.
 +                }
 +                throw e;
 +            }
 +        }
 +
 +        return count;
 +    }
 +}
 diff --git a/src/main/java/com/amazon/carbonado/repo/map/MapRepository.java b/src/main/java/com/amazon/carbonado/repo/map/MapRepository.java new file mode 100644 index 0000000..2e82f89 --- /dev/null +++ b/src/main/java/com/amazon/carbonado/repo/map/MapRepository.java @@ -0,0 +1,125 @@ +/*
 + * Copyright 2008 Amazon Technologies, Inc. or its affiliates.
 + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
 + * of Amazon Technologies, Inc. or its affiliates.  All rights reserved.
 + *
 + * Licensed under the Apache License, Version 2.0 (the "License");
 + * you may not use this file except in compliance with the License.
 + * You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package com.amazon.carbonado.repo.map;
 +
 +import java.util.concurrent.atomic.AtomicReference;
 +import java.util.concurrent.TimeUnit;
 +
 +import org.apache.commons.logging.Log;
 +
 +import com.amazon.carbonado.Repository;
 +import com.amazon.carbonado.RepositoryException;
 +import com.amazon.carbonado.Storable;
 +import com.amazon.carbonado.Storage;
 +import com.amazon.carbonado.TriggerFactory;
 +
 +import com.amazon.carbonado.capability.IndexInfo;
 +import com.amazon.carbonado.capability.IndexInfoCapability;
 +
 +import com.amazon.carbonado.sequence.SequenceValueGenerator;
 +import com.amazon.carbonado.sequence.SequenceValueProducer;
 +
 +import com.amazon.carbonado.qe.RepositoryAccess;
 +import com.amazon.carbonado.qe.StorageAccess;
 +
 +import com.amazon.carbonado.spi.AbstractRepository;
 +import com.amazon.carbonado.spi.LobEngine;
 +import com.amazon.carbonado.spi.TransactionManager;
 +import com.amazon.carbonado.spi.TransactionScope;
 +
 +/**
 + * 
 + *
 + * @author Brian S O'Neill
 + * @see MapRepositoryBuilder
 + */
 +class MapRepository extends AbstractRepository<MapTransaction>
 +    implements RepositoryAccess, IndexInfoCapability
 +{
 +    private final AtomicReference<Repository> mRootRef;
 +    private final boolean mIsMaster;
 +    private final int mLockTimeout;
 +    private final TimeUnit mLockTimeoutUnit;
 +
 +    final Iterable<TriggerFactory> mTriggerFactories;
 +    private final MapTransactionManager mTxnManager;
 +    private LobEngine mLobEngine;
 +
 +    MapRepository(AtomicReference<Repository> rootRef, MapRepositoryBuilder builder) {
 +        super(builder.getName());
 +        mRootRef = rootRef;
 +        mIsMaster = builder.isMaster();
 +        mLockTimeout = builder.getLockTimeout();
 +        mLockTimeoutUnit = builder.getLockTimeoutUnit();
 +
 +        mTriggerFactories = builder.getTriggerFactories();
 +        mTxnManager = new MapTransactionManager(mLockTimeout, mLockTimeoutUnit);
 +    }
 +
 +    public Repository getRootRepository() {
 +        return mRootRef.get();
 +    }
 +
 +    public <S extends Storable> StorageAccess<S> storageAccessFor(Class<S> type)
 +        throws RepositoryException
 +    {
 +        return (StorageAccess<S>) storageFor(type);
 +    }
 +
 +    public <S extends Storable> IndexInfo[] getIndexInfo(Class<S> storableType)
 +        throws RepositoryException
 +    {
 +        return ((MapStorage) storageFor(storableType)).getIndexInfo();
 +    }
 +
 +    protected Log getLog() {
 +        return null;
 +    }
 +
 +    protected TransactionManager<MapTransaction> transactionManager() {
 +        return mTxnManager;
 +    }
 +
 +    protected TransactionScope<MapTransaction> localTransactionScope() {
 +        return mTxnManager.localScope();
 +    }
 +
 +    protected <S extends Storable> Storage<S> createStorage(Class<S> type)
 +        throws RepositoryException
 +    {
 +        return new MapStorage<S>(this, type, mLockTimeout, mLockTimeoutUnit);
 +    }
 +
 +    protected SequenceValueProducer createSequenceValueProducer(String name)
 +        throws RepositoryException
 +    {
 +        return new SequenceValueGenerator(this, name);
 +    }
 +
 +    LobEngine getLobEngine() throws RepositoryException {
 +        if (mLobEngine == null) {
 +            mLobEngine = new LobEngine(this, getRootRepository());
 +        }
 +        return mLobEngine;
 +    }
 +
 +    boolean isMaster() {
 +        return mIsMaster;
 +    }
 +}
 diff --git a/src/main/java/com/amazon/carbonado/repo/map/MapRepositoryBuilder.java b/src/main/java/com/amazon/carbonado/repo/map/MapRepositoryBuilder.java new file mode 100644 index 0000000..8d5d399 --- /dev/null +++ b/src/main/java/com/amazon/carbonado/repo/map/MapRepositoryBuilder.java @@ -0,0 +1,151 @@ +/*
 + * Copyright 2008 Amazon Technologies, Inc. or its affiliates.
 + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
 + * of Amazon Technologies, Inc. or its affiliates.  All rights reserved.
 + *
 + * Licensed under the Apache License, Version 2.0 (the "License");
 + * you may not use this file except in compliance with the License.
 + * You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package com.amazon.carbonado.repo.map;
 +
 +import java.util.concurrent.atomic.AtomicReference;
 +import java.util.concurrent.TimeUnit;
 +
 +import com.amazon.carbonado.Repository;
 +import com.amazon.carbonado.RepositoryException;
 +
 +import com.amazon.carbonado.repo.indexed.IndexedRepositoryBuilder;
 +
 +import com.amazon.carbonado.spi.AbstractRepositoryBuilder;
 +
 +/**
 + * Volatile repository implementation backed by a concurrent map. Locks used by
 + * repository are coarse, much like <i>table locks</i>. Loads and queries
 + * acquire read locks, and modifications acquire write locks. Within
 + * transactions, loads and queries always acquire upgradable locks, to reduce
 + * the likelihood of deadlock.
 + *
 + * <p>This repository supports transactions, which also may be
 + * nested. Supported isolation levels are read committed and serializable. Read
 + * uncommitted is promoted to read committed, and repeatable read is promoted
 + * to serializable.
 + *
 + * <p>
 + * The following extra capabilities are supported:
 + * <ul>
 + * <li>{@link com.amazon.carbonado.capability.IndexInfoCapability IndexInfoCapability}
 + * <li>{@link com.amazon.carbonado.capability.ShutdownCapability ShutdownCapability}
 + * <li>{@link com.amazon.carbonado.sequence.SequenceCapability SequenceCapability}
 + * </ul>
 + *
 + * <p>Note: This repository uses concurrent navigable map classes, which became
 + * available in JDK1.6.
 + *
 + * @author Brian S O'Neill
 + * @since 1.2
 + */
 +public class MapRepositoryBuilder extends AbstractRepositoryBuilder {
 +    /**
 +     * Convenience method to build a new MapRepository.
 +     */
 +    public static Repository newRepository() {
 +        try {
 +            MapRepositoryBuilder builder = new MapRepositoryBuilder();
 +            return builder.build();
 +        } catch (RepositoryException e) {
 +            // Not expected.
 +            throw new RuntimeException(e);
 +        }
 +    }
 +
 +    private String mName = "";
 +    private boolean mIsMaster = true;
 +    private boolean mIndexSupport = true;
 +    private int mLockTimeout;
 +    private TimeUnit mLockTimeoutUnit;
 +
 +    public MapRepositoryBuilder() {
 +        setLockTimeoutMillis(500);
 +    }
 +
 +    public Repository build(AtomicReference<Repository> rootRef) throws RepositoryException {
 +        if (mIndexSupport) {
 +            // Temporarily set to false to avoid infinite recursion.
 +            mIndexSupport = false;
 +            try {
 +                IndexedRepositoryBuilder ixBuilder = new IndexedRepositoryBuilder();
 +                ixBuilder.setWrappedRepository(this);
 +                ixBuilder.setMaster(isMaster());
 +                ixBuilder.setAllClustered(true);
 +                return ixBuilder.build(rootRef);
 +            } finally {
 +                mIndexSupport = true;
 +            }
 +        }
 +
 +        assertReady();
 +
 +        Repository repo = new MapRepository(rootRef, this);
 +
 +        rootRef.set(repo);
 +        return repo;
 +    }
 +
 +    public String getName() {
 +        return mName;
 +    }
 +
 +    public void setName(String name) {
 +        mName = name;
 +    }
 +
 +    public boolean isMaster() {
 +        return mIsMaster;
 +    }
 +
 +    public void setMaster(boolean b) {
 +        mIsMaster = b;
 +    }
 +
 +    /**
 +     * Set the lock timeout, in milliseconds. Default value is 500 milliseconds.
 +     */
 +    public void setLockTimeoutMillis(int timeout) {
 +        setLockTimeout(timeout, TimeUnit.MILLISECONDS);
 +    }
 +
 +    /**
 +     * Set the lock timeout. Default value is 500 milliseconds.
 +     */
 +    public void setLockTimeout(int timeout, TimeUnit unit) {
 +        if (timeout < 0 || unit == null) {
 +            throw new IllegalArgumentException();
 +        }
 +        mLockTimeout = timeout;
 +        mLockTimeoutUnit = unit;
 +    }
 +
 +    /**
 +     * Returns the lock timeout. Call getLockTimeoutUnit to get the unit.
 +     */
 +    public int getLockTimeout() {
 +        return mLockTimeout;
 +    }
 +
 +    /**
 +     * Returns the lock timeout unit. Call getLockTimeout to get the timeout.
 +     */
 +    public TimeUnit getLockTimeoutUnit() {
 +        return mLockTimeoutUnit;
 +    }
 +}
 diff --git a/src/main/java/com/amazon/carbonado/repo/map/MapStorage.java b/src/main/java/com/amazon/carbonado/repo/map/MapStorage.java new file mode 100644 index 0000000..520d18c --- /dev/null +++ b/src/main/java/com/amazon/carbonado/repo/map/MapStorage.java @@ -0,0 +1,813 @@ +/*
 + * Copyright 2008 Amazon Technologies, Inc. or its affiliates.
 + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
 + * of Amazon Technologies, Inc. or its affiliates.  All rights reserved.
 + *
 + * Licensed under the Apache License, Version 2.0 (the "License");
 + * you may not use this file except in compliance with the License.
 + * You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package com.amazon.carbonado.repo.map;
 +
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.Comparator;
 +import java.util.EnumSet;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.NavigableMap;
 +import java.util.Set;
 +
 +import java.util.concurrent.ConcurrentNavigableMap;
 +import java.util.concurrent.ConcurrentSkipListMap;
 +import java.util.concurrent.TimeUnit;
 +
 +import org.cojen.classfile.TypeDesc;
 +
 +import com.amazon.carbonado.Cursor;
 +import com.amazon.carbonado.FetchException;
 +import com.amazon.carbonado.FetchInterruptedException;
 +import com.amazon.carbonado.FetchTimeoutException;
 +import com.amazon.carbonado.IsolationLevel;
 +import com.amazon.carbonado.PersistException;
 +import com.amazon.carbonado.PersistInterruptedException;
 +import com.amazon.carbonado.PersistTimeoutException;
 +import com.amazon.carbonado.Query;
 +import com.amazon.carbonado.Repository;
 +import com.amazon.carbonado.RepositoryException;
 +import com.amazon.carbonado.Storable;
 +import com.amazon.carbonado.Storage;
 +import com.amazon.carbonado.SupportException;
 +import com.amazon.carbonado.Trigger;
 +
 +import com.amazon.carbonado.capability.IndexInfo;
 +
 +import com.amazon.carbonado.cursor.ArraySortBuffer;
 +import com.amazon.carbonado.cursor.EmptyCursor;
 +import com.amazon.carbonado.cursor.FilteredCursor;
 +import com.amazon.carbonado.cursor.SingletonCursor;
 +import com.amazon.carbonado.cursor.SortBuffer;
 +import com.amazon.carbonado.cursor.SortedCursor;
 +
 +import com.amazon.carbonado.filter.Filter;
 +import com.amazon.carbonado.filter.FilterValues;
 +import com.amazon.carbonado.filter.RelOp;
 +
 +import com.amazon.carbonado.sequence.SequenceValueProducer;
 +
 +import com.amazon.carbonado.gen.DelegateStorableGenerator;
 +import com.amazon.carbonado.gen.DelegateSupport;
 +import com.amazon.carbonado.gen.MasterFeature;
 +
 +import com.amazon.carbonado.util.QuickConstructorGenerator;
 +
 +import com.amazon.carbonado.filter.Filter;
 +
 +import com.amazon.carbonado.info.ChainedProperty;
 +import com.amazon.carbonado.info.Direction;
 +import com.amazon.carbonado.info.OrderedProperty;
 +import com.amazon.carbonado.info.StorableIndex;
 +import com.amazon.carbonado.info.StorableInfo;
 +import com.amazon.carbonado.info.StorableIntrospector;
 +import com.amazon.carbonado.info.StorableProperty;
 +
 +import com.amazon.carbonado.qe.BoundaryType;
 +import com.amazon.carbonado.qe.QueryExecutorFactory;
 +import com.amazon.carbonado.qe.QueryEngine;
 +import com.amazon.carbonado.qe.StorageAccess;
 +
 +import com.amazon.carbonado.spi.IndexInfoImpl;
 +import com.amazon.carbonado.spi.LobEngine;
 +import com.amazon.carbonado.spi.TransactionScope;
 +import com.amazon.carbonado.spi.TriggerManager;
 +
 +/**
 + * 
 + *
 + * @author Brian S O'Neill
 + */
 +class MapStorage<S extends Storable> 
 +    implements Storage<S>, DelegateSupport<S>, StorageAccess<S>
 +{
 +    private static final int DEFAULT_LOB_BLOCK_SIZE = 1000;
 +    private static final Object[] NO_VALUES = new Object[0];
 +
 +    private final MapRepository mRepo;
 +    private final StorableInfo<S> mInfo;
 +    private final TriggerManager<S> mTriggers;
 +    private final InstanceFactory mInstanceFactory;
 +    private final StorableIndex<S> mPrimaryKeyIndex;
 +    private final QueryEngine<S> mQueryEngine;
 +
 +    private final int mLockTimeout;
 +    private final TimeUnit mLockTimeoutUnit;
 +
 +    private final ConcurrentNavigableMap<Key<S>, S> mMap;
 +    private final Comparator<S> mFullComparator;
 +    private final Comparator<S>[] mSearchComparators;
 +
 +    private final Key.Assigner<S> mKeyAssigner;
 +
 +    /**
 +     * Simple lock which is reentrant for transactions, but auto-commit does not
 +     * need to support reentrancy. Read lock requests in transactions can starve
 +     * write lock requests, but auto-commit cannot cause starvation. In practice
 +     * starvation is not possible since transactions always lock for upgrade.
 +     */
 +    final UpgradableLock<Object> mLock = new UpgradableLock<Object>() {
 +        @Override
 +        protected boolean isReadLockHeld(Object locker) {
 +            return locker instanceof MapTransaction;
 +        }
 +    };
 +
 +    MapStorage(MapRepository repo, Class<S> type, int lockTimeout, TimeUnit lockTimeoutUnit)
 +        throws SupportException
 +    {
 +        mRepo = repo;
 +        mInfo = StorableIntrospector.examine(type);
 +        mTriggers = new TriggerManager<S>();
 +
 +        EnumSet<MasterFeature> features;
 +        if (repo.isMaster()) {
 +            features = EnumSet.of(MasterFeature.INSERT_CHECK_REQUIRED,
 +                                  MasterFeature.VERSIONING,
 +                                  MasterFeature.INSERT_SEQUENCES);
 +        } else {
 +            features = EnumSet.of(MasterFeature.INSERT_CHECK_REQUIRED);
 +        }
 +
 +        Class<? extends S> delegateStorableClass =
 +            DelegateStorableGenerator.getDelegateClass(type, features);
 +
 +        mInstanceFactory = QuickConstructorGenerator
 +            .getInstance(delegateStorableClass, InstanceFactory.class);
 +
 +        mPrimaryKeyIndex =
 +            new StorableIndex<S>(mInfo.getPrimaryKey(), Direction.ASCENDING).clustered(true);
 +
 +        mQueryEngine = new QueryEngine<S>(type, repo);
 +
 +        mLockTimeout = lockTimeout;
 +        mLockTimeoutUnit = lockTimeoutUnit;
 +
 +        mMap = new ConcurrentSkipListMap<Key<S>, S>();
 +        List<OrderedProperty<S>> propList = createPkPropList();
 +        mFullComparator = SortedCursor.createComparator(propList);
 +        mSearchComparators = new Comparator[propList.size() + 1];
 +        mSearchComparators[propList.size()] = mFullComparator;
 +
 +        mKeyAssigner = Key.getAssigner(type);
 +
 +        try {
 +            if (LobEngine.hasLobs(type)) {
 +                Trigger<S> lobTrigger = repo.getLobEngine()
 +                    .getSupportTrigger(type, DEFAULT_LOB_BLOCK_SIZE);
 +                addTrigger(lobTrigger);
 +            }
 +
 +            // Don't install automatic triggers until we're completely ready.
 +            mTriggers.addTriggers(type, repo.mTriggerFactories);
 +        } catch (SupportException e) {
 +            throw e;
 +        } catch (RepositoryException e) {
 +            throw new SupportException(e);
 +        }
 +    }
 +
 +    public Class<S> getStorableType() {
 +        return mInfo.getStorableType();
 +    }
 +
 +    public S prepare() {
 +        return (S) mInstanceFactory.instantiate(this);
 +    }
 +
 +    public Query<S> query() throws FetchException {
 +        return mQueryEngine.query();
 +    }
 +
 +    public Query<S> query(String filter) throws FetchException {
 +        return mQueryEngine.query(filter);
 +    }
 +
 +    public Query<S> query(Filter<S> filter) throws FetchException {
 +        return mQueryEngine.query(filter);
 +    }
 +
 +    public void truncate() throws PersistException {
 +        try {
 +            Object locker = mRepo.localTransactionScope().getTxn();
 +            if (locker == null) {
 +                locker = Thread.currentThread();
 +            }
 +            doLockForWrite(locker);
 +            try {
 +                mMap.clear();
 +            } finally {
 +                mLock.unlockFromWrite(locker);
 +            }
 +        } catch (PersistException e) {
 +            throw e;
 +        } catch (Exception e) {
 +            throw new PersistException(e);
 +        }
 +    }
 +
 +    public boolean addTrigger(Trigger<? super S> trigger) {
 +        return mTriggers.addTrigger(trigger);
 +    }
 +
 +    public boolean removeTrigger(Trigger<? super S> trigger) {
 +        return mTriggers.removeTrigger(trigger);
 +    }
 +
 +    public IndexInfo[] getIndexInfo() {
 +        StorableIndex<S> pkIndex = mPrimaryKeyIndex;
 +
 +        if (pkIndex == null) {
 +            return new IndexInfo[0];
 +        }
 +
 +        int i = pkIndex.getPropertyCount();
 +        String[] propertyNames = new String[i];
 +        Direction[] directions = new Direction[i];
 +        while (--i >= 0) {
 +            propertyNames[i] = pkIndex.getProperty(i).getName();
 +            directions[i] = pkIndex.getPropertyDirection(i);
 +        }
 +
 +        return new IndexInfo[] {
 +            new IndexInfoImpl(getStorableType().getName(), true, true, propertyNames, directions)
 +        };
 +    }
 +
 +    public boolean doTryLoad(S storable) throws FetchException {
 +        try {
 +            TransactionScope<MapTransaction> scope = mRepo.localTransactionScope();
 +            MapTransaction txn = scope.getTxn();
 +            if (txn == null) {
 +                doLockForRead(scope);
 +                try {
 +                    return doTryLoadNoLock(storable);
 +                } finally {
 +                    mLock.unlockFromRead(scope);
 +                }
 +            } else {
 +                // Since lock is so coarse, all reads in transaction scope are
 +                // upgrade to avoid deadlocks.
 +                final boolean isForUpdate = scope.isForUpdate();
 +                txn.lockForUpgrade(mLock, isForUpdate);
 +                try {
 +                    return doTryLoadNoLock(storable);
 +                } finally {
 +                    txn.unlockFromUpgrade(mLock, isForUpdate);
 +                }
 +            }
 +        } catch (FetchException e) {
 +            throw e;
 +        } catch (Exception e) {
 +            throw new FetchException(e);
 +        }
 +    }
 +
 +    // Caller must hold lock.
 +    boolean doTryLoadNoLock(S storable) {
 +        S existing = mMap.get(new Key<S>(storable, mFullComparator));
 +        if (existing == null) {
 +            return false;
 +        } else {
 +            storable.markAllPropertiesDirty();
 +            existing.copyAllProperties(storable);
 +            storable.markAllPropertiesClean();
 +            return true;
 +        }
 +    }
 +
 +    public boolean doTryInsert(S storable) throws PersistException {
 +        try {
 +            TransactionScope<MapTransaction> scope = mRepo.localTransactionScope();
 +            MapTransaction txn = scope.getTxn();
 +            if (txn == null) {
 +                // No need to acquire full write lock since map is concurrent
 +                // and existing storable (if any) is not being
 +                // modified. Upgrade lock is required because a concurrent
 +                // transaction might be in progress, and so insert should wait.
 +                doLockForUpgrade(scope);
 +                try {
 +                    return doTryInsertNoLock(storable);
 +                } finally {
 +                    mLock.unlockFromUpgrade(scope);
 +                }
 +            } else {
 +                txn.lockForWrite(mLock);
 +                if (doTryInsertNoLock(storable)) {
 +                    txn.inserted(this, storable);
 +                    return true;
 +                } else {
 +                    return false;
 +                }
 +            }
 +        } catch (PersistException e) {
 +            throw e;
 +        } catch (FetchException e) {
 +            throw e.toPersistException();
 +        } catch (Exception e) {
 +            throw new PersistException(e);
 +        }
 +    }
 +
 +    // Caller must hold upgrade or write lock.
 +    private boolean doTryInsertNoLock(S storable) {
 +        Key<S> key = new Key<S>(storable, mFullComparator);
 +        S existing = mMap.get(key);
 +        if (existing != null) {
 +            return false;
 +        }
 +        // Create a fresh copy to ensure that custom fields are not saved.
 +        S copy = (S) storable.prepare();
 +        storable.copyAllProperties(copy);
 +        copy.markAllPropertiesClean();
 +        mMap.put(key, copy);
 +        storable.markAllPropertiesClean();
 +        return true;
 +    }
 +
 +    public boolean doTryUpdate(S storable) throws PersistException {
 +        try {
 +            TransactionScope<MapTransaction> scope = mRepo.localTransactionScope();
 +            MapTransaction txn = scope.getTxn();
 +            if (txn == null) {
 +                // Full write lock is required since existing storable is being
 +                // modified. Readers cannot be allowed to see modifications
 +                // until they are complete. In addtion, a concurrent
 +                // transaction might be in progress, and so update should wait.
 +                doLockForWrite(scope);
 +                try {
 +                    return doTryUpdateNoLock(storable);
 +                } finally {
 +                    mLock.unlockFromWrite(scope);
 +                }
 +            } else {
 +                txn.lockForWrite(mLock);
 +                S existing = mMap.get(new Key<S>(storable, mFullComparator));
 +                if (existing == null) {
 +                    return false;
 +                } else {
 +                    // Copy existing object to undo log.
 +                    txn.updated(this, (S) existing.copy());
 +
 +                    // Copy altered values to existing object.
 +                    existing.markAllPropertiesDirty();
 +                    storable.copyDirtyProperties(existing);
 +                    existing.markAllPropertiesClean();
 +
 +                    // Copy all values to user object, to simulate a reload.
 +                    storable.markAllPropertiesDirty();
 +                    existing.copyAllProperties(storable);
 +                    storable.markAllPropertiesClean();
 +
 +                    return true;
 +                }
 +            }
 +        } catch (PersistException e) {
 +            throw e;
 +        } catch (Exception e) {
 +            throw new PersistException(e);
 +        }
 +    }
 +
 +    // Caller must hold write lock.
 +    private boolean doTryUpdateNoLock(S storable) {
 +        S existing = mMap.get(new Key<S>(storable, mFullComparator));
 +        if (existing == null) {
 +            return false;
 +        } else {
 +            // Copy altered values to existing object.
 +            existing.markAllPropertiesDirty();
 +            storable.copyDirtyProperties(existing);
 +            existing.markAllPropertiesClean();
 +
 +            // Copy all values to user object, to simulate a reload.
 +            storable.markAllPropertiesDirty();
 +            existing.copyAllProperties(storable);
 +            storable.markAllPropertiesClean();
 +
 +            return true;
 +        }
 +    }
 +
 +    public boolean doTryDelete(S storable) throws PersistException {
 +        try {
 +            TransactionScope<MapTransaction> scope = mRepo.localTransactionScope();
 +            MapTransaction txn = scope.getTxn();
 +            if (txn == null) {
 +                // No need to acquire full write lock since map is concurrent
 +                // and existing storable (if any) is not being
 +                // modified. Upgrade lock is required because a concurrent
 +                // transaction might be in progress, and so delete should wait.
 +                doLockForUpgrade(scope);
 +                try {
 +                    return doTryDeleteNoLock(storable);
 +                } finally {
 +                    mLock.unlockFromUpgrade(scope);
 +                }
 +            } else {
 +                txn.lockForWrite(mLock);
 +                S existing = mMap.remove(new Key<S>(storable, mFullComparator));
 +                if (existing == null) {
 +                    return false;
 +                } else {
 +                    txn.deleted(this, existing);
 +                    return true;
 +                }
 +            }
 +        } catch (PersistException e) {
 +            throw e;
 +        } catch (FetchException e) {
 +            throw e.toPersistException();
 +        } catch (Exception e) {
 +            throw new PersistException(e);
 +        }
 +    }
 +
 +    // Caller must hold upgrade or write lock.
 +    private boolean doTryDeleteNoLock(S storable) {
 +        return mMap.remove(new Key<S>(storable, mFullComparator)) != null;
 +    }
 +
 +    // Called by MapTransaction, which implicitly holds lock.
 +    void mapPut(S storable) {
 +        mMap.put(new Key<S>(storable, mFullComparator), storable);
 +    }
 +
 +    // Called by MapTransaction, which implicitly holds lock.
 +    void mapRemove(S storable) {
 +        mMap.remove(new Key<S>(storable, mFullComparator));
 +    }
 +
 +    private void doLockForRead(Object locker) throws FetchException {
 +        try {
 +            if (!mLock.tryLockForRead(locker, mLockTimeout, mLockTimeoutUnit)) {
 +                throw new FetchTimeoutException("" + mLockTimeout + ' ' +
 +                                                mLockTimeoutUnit.toString().toLowerCase());
 +            }
 +        } catch (InterruptedException e) {
 +            throw new FetchInterruptedException(e);
 +        }
 +    }
 +
 +    private void doLockForUpgrade(Object locker) throws FetchException {
 +        try {
 +            if (!mLock.tryLockForUpgrade(locker, mLockTimeout, mLockTimeoutUnit)) {
 +                throw new FetchTimeoutException("" + mLockTimeout + ' ' +
 +                                                mLockTimeoutUnit.toString().toLowerCase());
 +            }
 +        } catch (InterruptedException e) {
 +            throw new FetchInterruptedException(e);
 +        }
 +    }
 +
 +    private void doLockForWrite(Object locker) throws PersistException {
 +        try {
 +            if (!mLock.tryLockForWrite(locker, mLockTimeout, mLockTimeoutUnit)) {
 +                throw new PersistTimeoutException("" + mLockTimeout + ' ' +
 +                                                  mLockTimeoutUnit.toString().toLowerCase());
 +            }
 +        } catch (InterruptedException e) {
 +            throw new PersistInterruptedException(e);
 +        }
 +    }
 +
 +    public Repository getRootRepository() {
 +        return mRepo.getRootRepository();
 +    }
 +
 +    public boolean isPropertySupported(String propertyName) {
 +        return mInfo.getAllProperties().containsKey(propertyName);
 +    }
 +
 +    public Trigger<? super S> getInsertTrigger() {
 +        return mTriggers.getInsertTrigger();
 +    }
 +
 +    public Trigger<? super S> getUpdateTrigger() {
 +        return mTriggers.getUpdateTrigger();
 +    }
 +
 +    public Trigger<? super S> getDeleteTrigger() {
 +        return mTriggers.getDeleteTrigger();
 +    }
 +
 +    public Trigger<? super S> getLoadTrigger() {
 +        return mTriggers.getLoadTrigger();
 +    }
 +
 +    public void locallyDisableLoadTrigger() {
 +        mTriggers.locallyDisableLoad();
 +    }
 +
 +    public void locallyEnableLoadTrigger() {
 +        mTriggers.locallyEnableLoad();
 +    }
 +
 +    public SequenceValueProducer getSequenceValueProducer(String name) throws PersistException {
 +        try {
 +            return mRepo.getSequenceValueProducer(name);
 +        } catch (RepositoryException e) {
 +            throw e.toPersistException();
 +        }
 +    }
 +
 +    public QueryExecutorFactory<S> getQueryExecutorFactory() {
 +        return mQueryEngine;
 +    }
 +
 +    public Collection<StorableIndex<S>> getAllIndexes() {
 +        return Collections.singletonList(mPrimaryKeyIndex);
 +    }
 +
 +    public Storage<S> storageDelegate(StorableIndex<S> index) {
 +        // We're the grunt and don't delegate.
 +        return null;
 +    }
 +
 +    public long countAll() throws FetchException {
 +        try {
 +            Object locker = mRepo.localTransactionScope().getTxn();
 +            if (locker == null) {
 +                locker = Thread.currentThread();
 +            }
 +            doLockForRead(locker);
 +            try {
 +                return mMap.size();
 +            } finally {
 +                mLock.unlockFromRead(locker);
 +            }
 +        } catch (FetchException e) {
 +            throw e;
 +        } catch (Exception e) {
 +            throw new FetchException(e);
 +        }
 +    }
 +
 +    public Cursor<S> fetchAll() throws FetchException {
 +        try {
 +            return new MapCursor<S>(this, mRepo.localTransactionScope(), mMap.values());
 +        } catch (FetchException e) {
 +            throw e;
 +        } catch (Exception e) {
 +            throw new FetchException(e);
 +        }
 +    }
 +
 +    public Cursor<S> fetchOne(StorableIndex<S> index, Object[] identityValues)
 +        throws FetchException
 +    {
 +        try {
 +            S key = prepare();
 +            for (int i=0; i<identityValues.length; i++) {
 +                key.setPropertyValue(index.getProperty(i).getName(), identityValues[i]);
 +            }
 +
 +            TransactionScope<MapTransaction> scope = mRepo.localTransactionScope();
 +            MapTransaction txn = scope.getTxn();
 +            if (txn == null) {
 +                doLockForRead(scope);
 +                try {
 +                    S value = mMap.get(new Key<S>(key, mFullComparator));
 +                    if (value == null) {
 +                        return EmptyCursor.the();
 +                    } else {
 +                        return new SingletonCursor<S>(copyAndFireLoadTrigger(value));
 +                    }
 +                } finally {
 +                    mLock.unlockFromRead(scope);
 +                }
 +            } else {
 +                // Since lock is so coarse, all reads in transaction scope are
 +                // upgrade to avoid deadlocks.
 +                final boolean isForUpdate = scope.isForUpdate();
 +                txn.lockForUpgrade(mLock, isForUpdate);
 +                try {
 +                    S value = mMap.get(new Key<S>(key, mFullComparator));
 +                    if (value == null) {
 +                        return EmptyCursor.the();
 +                    } else {
 +                        return new SingletonCursor<S>(copyAndFireLoadTrigger(value));
 +                    }
 +                } finally {
 +                    txn.unlockFromUpgrade(mLock, isForUpdate);
 +                }
 +            }
 +        } catch (FetchException e) {
 +            throw e;
 +        } catch (Exception e) {
 +            throw new FetchException(e);
 +        }
 +    }
 +
 +    S copyAndFireLoadTrigger(S storable) throws FetchException {
 +        storable = (S) storable.copy();
 +        Trigger<? super S> trigger = getLoadTrigger();
 +        if (trigger != null) {
 +            trigger.afterLoad(storable);
 +            // In case trigger modified the properties, make sure they're still clean.
 +            storable.markAllPropertiesClean();
 +        }
 +        return storable;
 +    }
 +
 +    public Query<?> indexEntryQuery(StorableIndex<S> index) {
 +        return null;
 +    }
 +
 +    public Cursor<S> fetchFromIndexEntryQuery(StorableIndex<S> index, Query<?> indexEntryQuery) {
 +        return null;
 +    }
 +
 +    public Cursor<S> fetchSubset(StorableIndex<S> index,
 +                                 Object[] identityValues,
 +                                 BoundaryType rangeStartBoundary,
 +                                 Object rangeStartValue,
 +                                 BoundaryType rangeEndBoundary,
 +                                 Object rangeEndValue,
 +                                 boolean reverseRange,
 +                                 boolean reverseOrder)
 +        throws FetchException
 +    {
 +        if (identityValues == null) {
 +            identityValues = NO_VALUES;
 +        }
 +
 +        NavigableMap<Key<S>, S> map = mMap;
 +
 +        int tieBreaker = 1;
 +        if (reverseOrder) {
 +            map = map.descendingMap();
 +            reverseRange = !reverseRange;
 +            tieBreaker = -tieBreaker;
 +        }
 +
 +        if (reverseRange) {
 +            BoundaryType t1 = rangeStartBoundary;
 +            rangeStartBoundary = rangeEndBoundary;
 +            rangeEndBoundary = t1;
 +            Object t2 = rangeStartValue;
 +            rangeStartValue = rangeEndValue;
 +            rangeEndValue = t2;
 +        }
 +
 +        tail: {
 +            Key<S> startKey;
 +            switch (rangeStartBoundary) {
 +            case OPEN: default:
 +                if (identityValues.length == 0) {
 +                    break tail;
 +                } else {
 +                    // Tie breaker of -1 puts search key right before first actual
 +                    // match, thus forming an inclusive start match.
 +                    startKey = searchKey(-tieBreaker, identityValues);
 +                }
 +                break;
 +            case INCLUSIVE:
 +                // Tie breaker of -1 puts search key right before first actual
 +                // match, thus forming an inclusive start match.
 +                startKey = searchKey(-tieBreaker, identityValues, rangeStartValue);
 +                break;
 +            case EXCLUSIVE:
 +                // Tie breaker of +1 puts search key right after first actual
 +                // match, thus forming an exlusive start match.
 +                startKey = searchKey(tieBreaker, identityValues, rangeStartValue);
 +                break;
 +            }
 +
 +            map = map.tailMap(startKey, true);
 +        }
 +
 +        Cursor<S> cursor;
 +        try {
 +            cursor = new MapCursor<S>(this, mRepo.localTransactionScope(), map.values());
 +        } catch (FetchException e) {
 +            throw e;
 +        } catch (Exception e) {
 +            throw new FetchException(e);
 +        }
 +
 +        // Use filter to stop cursor at desired ending position.
 +
 +        // FIXME: Let query engine do this so that filter can be
 +        // cached. Somehow indicate this at a high level so that query plan
 +        // shows a filter.
 +        Filter<S> filter;
 +        FilterValues<S> filterValues;
 +
 +        if (rangeEndBoundary == BoundaryType.OPEN) {
 +            if (identityValues.length == 0) {
 +                filter = null;
 +                filterValues = null;
 +            } else {
 +                filter = Filter.getOpenFilter(getStorableType());
 +                for (int i=0; i<identityValues.length; i++) {
 +                    filter = filter.and(index.getProperty(i).getName(), RelOp.EQ);
 +                }
 +                filterValues = filter.initialFilterValues();
 +                for (int i=0; i<identityValues.length; i++) {
 +                    filterValues = filterValues.with(identityValues[i]);
 +                }
 +            }
 +        } else {
 +            filter = Filter.getOpenFilter(getStorableType());
 +            int i = 0;
 +            for (; i<identityValues.length; i++) {
 +                filter = filter.and(index.getProperty(i).getName(), RelOp.EQ);
 +            }
 +
 +            filter = filter.and(index.getProperty(i).getName(),
 +                                rangeEndBoundary == BoundaryType.INCLUSIVE ? RelOp.LE : RelOp.LT);
 +
 +            filterValues = filter.initialFilterValues();
 +
 +            for (i=0; i<identityValues.length; i++) {
 +                filterValues = filterValues.with(identityValues[i]);
 +            }
 +            
 +            filterValues = filterValues.with(rangeEndValue);
 +        }
 +
 +        if (filter != null) {
 +            cursor = FilteredCursor.applyFilter(filter, filterValues, cursor);
 +        }
 +
 +        return cursor;
 +    }
 +
 +    private List<OrderedProperty<S>> createPkPropList() {
 +        return new ArrayList<OrderedProperty<S>>(mInfo.getPrimaryKey().getProperties());
 +    }
 +
 +    private Key<S> searchKey(int tieBreaker, Object[] identityValues) {
 +        S storable = prepare();
 +        mKeyAssigner.setKeyValues(storable, identityValues);
 +        Comparator<S> c = getSearchComparator(identityValues.length);
 +        return new SearchKey<S>(tieBreaker, storable, c);
 +    }
 +
 +    private Key<S> searchKey(int tieBreaker, Object[] identityValues, Object rangeValue) {
 +        S storable = prepare();
 +        mKeyAssigner.setKeyValues(storable, identityValues, rangeValue);
 +        Comparator<S> c = getSearchComparator(identityValues.length + 1);
 +        return new SearchKey<S>(tieBreaker, storable, c);
 +    }
 +
 +    private Comparator<S> getSearchComparator(int propertyCount) {
 +        Comparator<S> comparator = mSearchComparators[propertyCount];
 +        if (comparator == null) {
 +            List<OrderedProperty<S>> propList = createPkPropList().subList(0, propertyCount);
 +            if (propList.size() > 0) {
 +                comparator = SortedCursor.createComparator(propList);
 +            } else {
 +                comparator = SortedCursor.createComparator(getStorableType());
 +            }
 +            mSearchComparators[propertyCount] = comparator;
 +        }
 +        return comparator;
 +    }
 +
 +    public SortBuffer<S> createSortBuffer() {
 +        return new ArraySortBuffer<S>();
 +    }
 +
 +    public static interface InstanceFactory {
 +        Storable instantiate(DelegateSupport support);
 +    }
 +
 +    private static class SearchKey<S extends Storable> extends Key<S> {
 +        private final int mTieBreaker;
 +
 +        SearchKey(int tieBreaker, S storable, Comparator<S> comparator) {
 +            super(storable, comparator);
 +            mTieBreaker = tieBreaker;
 +        }
 +
 +        @Override
 +        protected int tieBreaker() {
 +            return mTieBreaker;
 +        }
 +
 +        @Override
 +        public String toString() {
 +            return super.toString() + ", tieBreaker=" + mTieBreaker;
 +        }
 +    }
 +}
 diff --git a/src/main/java/com/amazon/carbonado/repo/map/MapTransaction.java b/src/main/java/com/amazon/carbonado/repo/map/MapTransaction.java new file mode 100644 index 0000000..d2cfe87 --- /dev/null +++ b/src/main/java/com/amazon/carbonado/repo/map/MapTransaction.java @@ -0,0 +1,240 @@ +/*
 + * Copyright 2008 Amazon Technologies, Inc. or its affiliates.
 + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
 + * of Amazon Technologies, Inc. or its affiliates.  All rights reserved.
 + *
 + * Licensed under the Apache License, Version 2.0 (the "License");
 + * you may not use this file except in compliance with the License.
 + * You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package com.amazon.carbonado.repo.map;
 +
 +import java.util.concurrent.TimeUnit;
 +
 +import java.util.ArrayList;
 +import java.util.HashSet;
 +import java.util.List;
 +import java.util.Set;
 +
 +import com.amazon.carbonado.FetchException;
 +import com.amazon.carbonado.FetchInterruptedException;
 +import com.amazon.carbonado.FetchTimeoutException;
 +import com.amazon.carbonado.IsolationLevel;
 +import com.amazon.carbonado.PersistException;
 +import com.amazon.carbonado.PersistInterruptedException;
 +import com.amazon.carbonado.PersistTimeoutException;
 +import com.amazon.carbonado.Storable;
 +import com.amazon.carbonado.Storage;
 +
 +/**
 + * 
 + *
 + * @author Brian S O'Neill
 + */
 +class MapTransaction {
 +    private final MapTransaction mParent;
 +    private final IsolationLevel mLevel;
 +    private final Object mLocker;
 +    private final int mLockTimeout;
 +    private final TimeUnit mLockTimeoutUnit;
 +
 +    private Set<UpgradableLock> mUpgradeLocks;
 +    private Set<UpgradableLock> mWriteLocks;
 +
 +    private List<Undoable> mUndoLog;
 +
 +    MapTransaction(MapTransaction parent, IsolationLevel level,
 +                   int lockTimeout, TimeUnit lockTimeoutUnit)
 +    {
 +        mParent = parent;
 +        mLevel = level;
 +        mLocker = parent == null ? this : parent.mLocker;
 +        mLockTimeout = lockTimeout;
 +        mLockTimeoutUnit = lockTimeoutUnit;
 +    }
 +
 +    void lockForUpgrade(UpgradableLock lock, boolean isForUpdate) throws FetchException {
 +        if (!isForUpdate && mLevel.isAtMost(IsolationLevel.READ_COMMITTED)) {
 +            doLockForUpgrade(lock);
 +        } else {
 +            Set<UpgradableLock> locks = mUpgradeLocks;
 +            if (locks == null) {
 +                mUpgradeLocks = locks = new HashSet<UpgradableLock>();
 +                doLockForUpgrade(lock);
 +                locks.add(lock);
 +            } else if (!locks.contains(lock)) {
 +                doLockForUpgrade(lock);
 +                locks.add(lock);
 +            }
 +        }
 +    }
 +
 +    private void doLockForUpgrade(UpgradableLock lock) throws FetchException {
 +        try {
 +            if (!lock.tryLockForUpgrade(mLocker, mLockTimeout, mLockTimeoutUnit)) {
 +                throw new FetchTimeoutException("" + mLockTimeout + ' ' +
 +                                                mLockTimeoutUnit.toString().toLowerCase());
 +            }
 +        } catch (InterruptedException e) {
 +            throw new FetchInterruptedException(e);
 +        }
 +    }
 +
 +    void unlockFromUpgrade(UpgradableLock lock, boolean isForUpdate) {
 +        if (!isForUpdate && mLevel.isAtMost(IsolationLevel.READ_COMMITTED)) {
 +            lock.unlockFromUpgrade(mLocker);
 +        }
 +    }
 +
 +    void lockForWrite(UpgradableLock lock) throws PersistException {
 +        Set<UpgradableLock> locks = mWriteLocks;
 +        if (locks == null) {
 +            mWriteLocks = locks = new HashSet<UpgradableLock>();
 +            doLockForWrite(lock);
 +            locks.add(lock);
 +        } else if (!locks.contains(lock)) {
 +            doLockForWrite(lock);
 +            locks.add(lock);
 +        }
 +    }
 +
 +    private void doLockForWrite(UpgradableLock lock) throws PersistException {
 +        try {
 +            if (!lock.tryLockForWrite(mLocker, mLockTimeout, mLockTimeoutUnit)) {
 +                throw new PersistTimeoutException("" + mLockTimeout + ' ' +
 +                                                  mLockTimeoutUnit.toString().toLowerCase());
 +            }
 +        } catch (InterruptedException e) {
 +            throw new PersistInterruptedException(e);
 +        }
 +    }
 +
 +    /**
 +     * Add to undo log.
 +     */
 +    <S extends Storable> void inserted(final MapStorage<S> storage, final S key) {
 +        addToUndoLog(new Undoable() {
 +            public void undo() {
 +                storage.mapRemove(key);
 +            }
 +        });
 +    }
 +
 +    /**
 +     * Add to undo log.
 +     */
 +    <S extends Storable> void updated(final MapStorage<S> storage, final S old) {
 +        addToUndoLog(new Undoable() {
 +            public void undo() {
 +                storage.mapPut(old);
 +            }
 +        });
 +    }
 +
 +    /**
 +     * Add to undo log.
 +     */
 +    <S extends Storable> void deleted(final MapStorage<S> storage, final S old) {
 +        addToUndoLog(new Undoable() {
 +            public void undo() {
 +                storage.mapPut(old);
 +            }
 +        });
 +    }
 +
 +    void commit() {
 +        MapTransaction parent = mParent;
 +
 +        if (parent == null) {
 +            releaseLocks();
 +            return;
 +        }
 +
 +        // Pass undo log to parent.
 +        if (parent.mUndoLog == null) {
 +            parent.mUndoLog = mUndoLog;
 +        } else if (mUndoLog != null) {
 +            parent.mUndoLog.addAll(mUndoLog);
 +        }
 +        mUndoLog = null;
 +
 +        // Pass write locks to parent or release if parent already has the lock.
 +        {
 +            Set<UpgradableLock> locks = mWriteLocks;
 +            if (locks != null) {
 +                Set<UpgradableLock> parentLocks = parent.mWriteLocks;
 +                if (parentLocks == null) {
 +                    parent.mWriteLocks = locks;
 +                } else {
 +                    for (UpgradableLock lock : locks) {
 +                        if (!parentLocks.add(lock)) {
 +                            lock.unlockFromWrite(mLocker);
 +                        }
 +                    }
 +                }
 +                mWriteLocks = null;
 +            }
 +        }
 +
 +        // Upgrade locks can simply be released.
 +        releaseUpgradeLocks();
 +    }
 +
 +    void abort() {
 +        List<Undoable> log = mUndoLog;
 +        if (log != null) {
 +            for (int i=log.size(); --i>=0; ) {
 +                log.get(i).undo();
 +            }
 +        }
 +        mUndoLog = null;
 +
 +        releaseLocks();
 +    }
 +
 +    private void addToUndoLog(Undoable entry) {
 +        List<Undoable> log = mUndoLog;
 +        if (log == null) {
 +            mUndoLog = log = new ArrayList<Undoable>();
 +        }
 +        log.add(entry);
 +    }
 +
 +    private void releaseLocks() {
 +        releaseWriteLocks();
 +        releaseUpgradeLocks();
 +    }
 +
 +    private void releaseWriteLocks() {
 +        Set<UpgradableLock> locks = mWriteLocks;
 +        if (locks != null) {
 +            for (UpgradableLock lock : locks) {
 +                lock.unlockFromWrite(mLocker);
 +            }
 +            mWriteLocks = null;
 +        }
 +    }
 +
 +    private void releaseUpgradeLocks() {
 +        Set<UpgradableLock> locks = mUpgradeLocks;
 +        if (locks != null) {
 +            for (UpgradableLock lock : locks) {
 +                lock.unlockFromUpgrade(mLocker);
 +            }
 +            mUpgradeLocks = null;
 +        }
 +    }
 +
 +    private static interface Undoable {
 +        void undo();
 +    }
 +}
 diff --git a/src/main/java/com/amazon/carbonado/repo/map/MapTransactionManager.java b/src/main/java/com/amazon/carbonado/repo/map/MapTransactionManager.java new file mode 100644 index 0000000..5598353 --- /dev/null +++ b/src/main/java/com/amazon/carbonado/repo/map/MapTransactionManager.java @@ -0,0 +1,98 @@ +/*
 + * Copyright 2008 Amazon Technologies, Inc. or its affiliates.
 + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
 + * of Amazon Technologies, Inc. or its affiliates.  All rights reserved.
 + *
 + * Licensed under the Apache License, Version 2.0 (the "License");
 + * you may not use this file except in compliance with the License.
 + * You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package com.amazon.carbonado.repo.map;
 +
 +import java.util.concurrent.TimeUnit;
 +
 +import com.amazon.carbonado.IsolationLevel;
 +import com.amazon.carbonado.PersistException;
 +import com.amazon.carbonado.Transaction;
 +
 +import com.amazon.carbonado.spi.TransactionManager;
 +
 +/**
 + * 
 + *
 + * @author Brian S O'Neill
 + */
 +class MapTransactionManager extends TransactionManager<MapTransaction> {
 +    private final int mLockTimeout;
 +    private final TimeUnit mLockTimeoutUnit;
 +
 +    MapTransactionManager(int lockTimeout, TimeUnit lockTimeoutUnit) {
 +        mLockTimeout = lockTimeout;
 +        mLockTimeoutUnit = lockTimeoutUnit;
 +    }
 +
 +    protected IsolationLevel selectIsolationLevel(Transaction parent, IsolationLevel level) {
 +        if (level == null) {
 +            if (parent == null) {
 +                return IsolationLevel.READ_COMMITTED;
 +            }
 +            return parent.getIsolationLevel();
 +        }
 +
 +        switch (level) {
 +        case NONE:
 +            return IsolationLevel.NONE;
 +        case READ_UNCOMMITTED:
 +        case READ_COMMITTED:
 +            return IsolationLevel.READ_COMMITTED;
 +        case REPEATABLE_READ:
 +        case SERIALIZABLE:
 +            return IsolationLevel.SERIALIZABLE;
 +        default:
 +            // Not supported.
 +            return null;
 +        }
 +    }
 +
 +    protected boolean supportsForUpdate() {
 +        return true;
 +    }
 +
 +    protected MapTransaction createTxn(MapTransaction parent, IsolationLevel level)
 +        throws Exception
 +    {
 +        if (level == IsolationLevel.NONE) {
 +            return null;
 +        }
 +        return new MapTransaction(parent, level, mLockTimeout, mLockTimeoutUnit);
 +    }
 +
 +    @Override
 +    protected MapTransaction createTxn(MapTransaction parent, IsolationLevel level,
 +                                       int timeout, TimeUnit unit)
 +        throws Exception
 +    {
 +        if (level == IsolationLevel.NONE) {
 +            return null;
 +        }
 +        return new MapTransaction(parent, level, timeout, unit);
 +    }
 +
 +    protected boolean commitTxn(MapTransaction txn) throws PersistException {
 +        txn.commit();
 +        return false;
 +    }
 +
 +    protected void abortTxn(MapTransaction txn) throws PersistException {
 +        txn.abort();
 +    }
 +}
 diff --git a/src/main/java/com/amazon/carbonado/repo/map/UpgradableLock.java b/src/main/java/com/amazon/carbonado/repo/map/UpgradableLock.java new file mode 100644 index 0000000..9862791 --- /dev/null +++ b/src/main/java/com/amazon/carbonado/repo/map/UpgradableLock.java @@ -0,0 +1,1161 @@ +/*
 + * Copyright 2007 Amazon Technologies, Inc. or its affiliates.
 + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
 + * of Amazon Technologies, Inc. or its affiliates.  All rights reserved.
 + *
 + * Licensed under the Apache License, Version 2.0 (the "License");
 + * you may not use this file except in compliance with the License.
 + * You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package com.amazon.carbonado.repo.map;
 +
 +import java.util.concurrent.TimeUnit;
 +
 +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 +
 +import java.util.concurrent.locks.LockSupport;
 +
 +/**
 + * Partially reentrant, upgradable read/write lock. Up to 1,073,741,824 read
 + * locks can be held. Upgraders and writers may re-enter the lock up to
 + * 2,147,483,648 times. Attempts by readers to re-enter the lock is not
 + * detected and is deadlock prone, unless locker already holds an upgrade or
 + * write lock. Subclasses can support full reentrancy by overriding the
 + * protected read lock adjust and hold check methods.
 + *
 + * <p>This lock implementation differs from the usual Java lock with respect to
 + * lock ownership. Locks are not owned by threads, but by arbitrary locker
 + * objects. A thread which attempts to acquire an upgrade or write lock twice
 + * with different locker objects will deadlock on the second attempt.
 + *
 + * <p>As is typical of read/write lock implementations, a read lock blocks
 + * waiting writers, but it doesn't block other readers. A write lock is
 + * exclusive and can be held by at most one locker. Attempting to acquire a
 + * write lock while a read lock is held by the same locker is inherently
 + * deadlock prone, and some read/write lock implementations will always
 + * deadlock.
 + *
 + * <p>An upgrade lock allows a read lock to be safely upgraded to a write
 + * lock. Instead of acquiring a read lock, an upgrade lock is acquired. This
 + * acts like a shared read lock in that readers are not blocked, but it also
 + * acts like an exclusive write lock -- writers and upgraders are blocked. With
 + * an upgrade lock held, the locker may acquire a write lock without deadlock.
 + *
 + * <pre>
 + * Locks held    Locks safely           Locks acquirable
 + * by owner      acquirable by owner    by other lockers
 + * --------------------------------------------------
 + *   - - -         R U W                  R U W
 + *   R - -         - - -                  R U -
 + *   R U -         - U -                  R - -
 + *   - U -         R U W                  R - -
 + *   - U W         R U W                  - - -
 + *   R U W         R U W                  - - -
 + *   R - W         R U W                  - - -
 + *   - - W         R U W                  - - -
 + * </pre>
 + *
 + * @author Brian S O'Neill
 + * @param <L> Locker type
 + */
 +class UpgradableLock<L> {
 +    // Design note: This class borrows heavily from AbstractQueuedSynchronizer.
 +    // Consult that class for understanding the locking mechanics.
 +
 +    private static enum Result {
 +        /** Lock acquisition failed */
 +        FAILED,
 +        /** Lock has just been acquired by locker and can be safely released later */
 +        ACQUIRED,
 +        /** Lock is already owned by locker and should not be released more than once */
 +        OWNED
 +    }
 +
 +    private static final AtomicReferenceFieldUpdater<UpgradableLock, Node> cRWHeadRef =
 +        AtomicReferenceFieldUpdater.newUpdater
 +        (UpgradableLock.class, Node.class, "mRWHead");
 +
 +    private static final AtomicReferenceFieldUpdater<UpgradableLock, Node> cRWTailRef =
 +        AtomicReferenceFieldUpdater.newUpdater
 +        (UpgradableLock.class, Node.class, "mRWTail");
 +
 +    private static final AtomicReferenceFieldUpdater<UpgradableLock, Node> cUHeadRef =
 +        AtomicReferenceFieldUpdater.newUpdater
 +        (UpgradableLock.class, Node.class, "mUHead");
 +
 +    private static final AtomicReferenceFieldUpdater<UpgradableLock, Node> cUTailRef =
 +        AtomicReferenceFieldUpdater.newUpdater
 +        (UpgradableLock.class, Node.class, "mUTail");
 +
 +    private static final AtomicIntegerFieldUpdater<UpgradableLock> cStateRef =
 +        AtomicIntegerFieldUpdater.newUpdater
 +        (UpgradableLock.class, "mState");
 +
 +    // State mask bits for held locks. Read lock count is stored in lower 30 bits of state.
 +    private static final int LOCK_STATE_UPGRADE = 0x40000000;
 +    // Write state must be this value in order for quick sign check to work.
 +    private static final int LOCK_STATE_WRITE = 0x80000000;
 +
 +    private static final int LOCK_STATE_MASK = LOCK_STATE_UPGRADE | LOCK_STATE_WRITE;
 +
 +    /**
 +     * The number of nanoseconds for which it is faster to spin
 +     * rather than to use timed park. A rough estimate suffices
 +     * to improve responsiveness with very short timeouts.
 +     */
 +    private static final long SPIN_FOR_TIMEOUT_THRESHOLD = 1000L;
 +
 +    // Head of read-write queue.
 +    private transient volatile Node mRWHead;
 +
 +    // Tail of read-write queue.
 +    private transient volatile Node mRWTail;
 +
 +    // Head of write upgrade queue.
 +    private transient volatile Node mUHead;
 +
 +    // Tail of write upgrade queue.
 +    private transient volatile Node mUTail;
 +
 +    private transient volatile int mState;
 +
 +    // Owner holds an upgradable lock and possibly a write lock too.
 +    private transient L mOwner;
 +
 +    // Counts number of times that owner has entered an upgradable or write lock.
 +    private transient int mUpgradeCount;
 +    private transient int mWriteCount;
 +
 +    public UpgradableLock() {
 +    }
 +
 +    /**
 +     * Acquire a shared read lock, possibly blocking indefinitely.
 +     *
 +     * @param locker object which might be write or upgrade lock owner
 +     */
 +    public final void lockForRead(L locker) {
 +        if (!tryLockForRead(locker)) {
 +            lockForReadQueued(locker, addReadWaiter());
 +        }
 +    }
 +
 +    /**
 +     * Acquire a shared read lock, possibly blocking until interrupted.
 +     *
 +     * @param locker object which might be write or upgrade lock owner
 +     */
 +    public final void lockForReadInterruptibly(L locker) throws InterruptedException {
 +        if (Thread.interrupted()) {
 +            throw new InterruptedException();
 +        }
 +        if (!tryLockForRead(locker)) {
 +            lockForReadQueuedInterruptibly(locker, addReadWaiter());
 +        }
 +    }
 +
 +    /**
 +     * Attempt to immediately acquire a shared read lock.
 +     *
 +     * @param locker object which might be write or upgrade lock owner
 +     * @return true if acquired
 +     */
 +    public final boolean tryLockForRead(L locker) {
 +        int state = mState;
 +        if (state >= 0) { // no write lock is held
 +            if (isReadWriteFirst() || isReadLockHeld(locker)) {
 +                do {
 +                    if (incrementReadLocks(state)) {
 +                        adjustReadLockCount(locker, 1);
 +                        return true;
 +                    }
 +                    // keep looping on CAS failure if a reader or upgrader mucked with the state
 +                } while ((state = mState) >= 0);
 +            }
 +        } else if (mOwner == locker) {
 +            // keep looping on CAS failure if a reader or upgrader mucked with the state
 +            while (!incrementReadLocks(state)) {
 +                state = mState;
 +            }
 +            adjustReadLockCount(locker, 1);
 +            return true;
 +        }
 +        return false;
 +    }
 +
 +    /**
 +     * Attempt to acquire a shared read lock, waiting a maximum amount of
 +     * time.
 +     *
 +     * @param locker object which might be write or upgrade lock owner
 +     * @return true if acquired
 +     */
 +    public final boolean tryLockForRead(L locker, long timeout, TimeUnit unit)
 +        throws InterruptedException
 +    {
 +        if (Thread.interrupted()) {
 +            throw new InterruptedException();
 +        }
 +        if (!tryLockForRead(locker)) {
 +            return lockForReadQueuedInterruptibly(locker, addReadWaiter(), unit.toNanos(timeout));
 +        }
 +        return true;
 +    }
 +
 +    /**
 +     * Release a previously acquired read lock.
 +     */
 +    public final void unlockFromRead(L locker) {
 +        adjustReadLockCount(locker, -1);
 +
 +        int readLocks;
 +        while ((readLocks = decrementReadLocks(mState)) < 0) {}
 +
 +        if (readLocks == 0) {
 +            Node h = mRWHead;
 +            if (h != null && h.mWaitStatus != 0) {
 +                unparkReadWriteSuccessor(h);
 +            }
 +        }
 +    }
 +
 +    /**
 +     * Acquire an upgrade lock, possibly blocking indefinitely.
 +     *
 +     * @param locker object trying to become lock owner
 +     * @return true if acquired
 +     */
 +    public final boolean lockForUpgrade(L locker) {
 +        return lockForUpgrade_(locker) != Result.FAILED;
 +    }
 +
 +    /**
 +     * Acquire an upgrade lock, possibly blocking indefinitely.
 +     *
 +     * @param locker object trying to become lock owner
 +     * @return ACQUIRED or OWNED
 +     */
 +    private final Result lockForUpgrade_(L locker) {
 +        Result result;
 +        if ((result = tryLockForUpgrade_(locker)) == Result.FAILED) {
 +            result = lockForUpgradeQueued(locker, addUpgradeWaiter());
 +        }
 +        return result;
 +    }
 +
 +    /**
 +     * Acquire an upgrade lock, possibly blocking until interrupted.
 +     *
 +     * @param locker object trying to become lock owner
 +     * @return true if acquired
 +     */
 +    public final boolean lockForUpgradeInterruptibly(L locker) throws InterruptedException {
 +        return lockForUpgradeInterruptibly_(locker) != Result.FAILED;
 +    }
 +
 +    /**
 +     * Acquire an upgrade lock, possibly blocking until interrupted.
 +     *
 +     * @param locker object trying to become lock owner
 +     * @return ACQUIRED or OWNED
 +     */
 +    private final Result lockForUpgradeInterruptibly_(L locker) throws InterruptedException {
 +        if (Thread.interrupted()) {
 +            throw new InterruptedException();
 +        }
 +        Result result;
 +        if ((result = tryLockForUpgrade_(locker)) == Result.FAILED) {
 +            result = lockForUpgradeQueuedInterruptibly(locker, addUpgradeWaiter());
 +        }
 +        return result;
 +    }
 +
 +    /**
 +     * Attempt to immediately acquire an upgrade lock.
 +     *
 +     * @param locker object trying to become lock owner
 +     * @return true if acquired
 +     */
 +    public final boolean tryLockForUpgrade(L locker) {
 +        return tryLockForUpgrade_(locker) != Result.FAILED;
 +    }
 +
 +    /**
 +     * Attempt to immediately acquire an upgrade lock.
 +     *
 +     * @param locker object trying to become lock owner
 +     * @return FAILED, ACQUIRED or OWNED
 +     */
 +    private final Result tryLockForUpgrade_(L locker) {
 +        int state = mState;
 +        if ((state & LOCK_STATE_MASK) == 0) { // no write or upgrade lock is held
 +            if (isUpgradeFirst()) {
 +                do {
 +                    if (setUpgradeLock(state)) {
 +                        mOwner = locker;
 +                        incrementUpgradeCount();
 +                        return Result.ACQUIRED;
 +                    }
 +                    // keep looping on CAS failure if a reader mucked with the state
 +                } while (((state = mState) & LOCK_STATE_MASK) == 0);
 +            }
 +        } else if (mOwner == locker) {
 +            incrementUpgradeCount();
 +            return Result.OWNED;
 +        }
 +        return Result.FAILED;
 +    }
 +
 +    /**
 +     * Attempt to acquire an upgrade lock, waiting a maximum amount of time.
 +     *
 +     * @param locker object trying to become lock owner
 +     * @return true if acquired
 +     */
 +    public final boolean tryLockForUpgrade(L locker, long timeout, TimeUnit unit)
 +        throws InterruptedException
 +    {
 +        return tryLockForUpgrade_(locker, timeout, unit) != Result.FAILED;
 +    }
 +
 +    /**
 +     * Attempt to acquire an upgrade lock, waiting a maximum amount of time.
 +     *
 +     * @param locker object trying to become lock owner
 +     * @return FAILED, ACQUIRED or OWNED
 +     */
 +    private final Result tryLockForUpgrade_(L locker, long timeout, TimeUnit unit)
 +        throws InterruptedException
 +    {
 +        if (Thread.interrupted()) {
 +            throw new InterruptedException();
 +        }
 +        Result result;
 +        if ((result = tryLockForUpgrade_(locker)) == Result.FAILED) {
 +            result = lockForUpgradeQueuedInterruptibly(locker, addUpgradeWaiter(),
 +                                                       unit.toNanos(timeout));
 +        }
 +        return result;
 +    }
 +
 +    /**
 +     * Release a previously acquired upgrade lock.
 +     */
 +    public final void unlockFromUpgrade(L locker) {
 +        int upgradeCount = mUpgradeCount - 1;
 +        if (upgradeCount < 0) {
 +            throw new IllegalMonitorStateException("Too many upgrade locks released");
 +        }
 +        if (upgradeCount == 0 && mWriteCount > 0) {
 +            // Don't release last upgrade lock and switch write lock to
 +            // automatic upgrade mode.
 +            clearUpgradeLock(mState);
 +            return;
 +        }
 +        mUpgradeCount = upgradeCount;
 +        if (upgradeCount > 0) {
 +            return;
 +        }
 +
 +        mOwner = null;
 +
 +        // keep looping on CAS failure if reader mucked with state
 +        while (!clearUpgradeLock(mState)) {}
 +
 +        Node h = mUHead;
 +        if (h != null && h.mWaitStatus != 0) {
 +            unparkUpgradeSuccessor(h);
 +        }
 +    }
 +
 +    /**
 +     * Acquire an exclusive write lock, possibly blocking indefinitely.
 +     *
 +     * @param locker object trying to become lock owner
 +     */
 +    public final void lockForWrite(L locker) {
 +        Result writeResult;
 +        if (!tryLockForWrite(locker)) {
 +            Result upgradeResult = lockForUpgrade_(locker);
 +            if (!tryLockForWrite(locker)) {
 +                lockForWriteQueued(locker, addWriteWaiter());
 +            }
 +            if (upgradeResult == Result.ACQUIRED) {
 +                // clear upgrade state bit to indicate automatic upgrade
 +                while (!clearUpgradeLock(mState)) {}
 +            } else {
 +                // undo automatic upgrade count increment
 +                mUpgradeCount--;
 +            }
 +        }
 +    }
 +
 +    /**
 +     * Acquire an exclusive write lock, possibly blocking until interrupted.
 +     *
 +     * @param locker object trying to become lock owner
 +     */
 +    public final void lockForWriteInterruptibly(L locker) throws InterruptedException {
 +        if (Thread.interrupted()) {
 +            throw new InterruptedException();
 +        }
 +        if (!tryLockForWrite(locker)) {
 +            Result upgradeResult = lockForUpgradeInterruptibly_(locker);
 +            if (!tryLockForWrite(locker)) {
 +                lockForWriteQueuedInterruptibly(locker, addWriteWaiter());
 +            }
 +            if (upgradeResult == Result.ACQUIRED) {
 +                // clear upgrade state bit to indicate automatic upgrade
 +                while (!clearUpgradeLock(mState)) {}
 +            } else {
 +                // undo automatic upgrade count increment
 +                mUpgradeCount--;
 +            }
 +        }
 +    }
 +
 +    /**
 +     * Attempt to immediately acquire an exclusive lock.
 +     *
 +     * @param locker object trying to become lock owner
 +     * @return true if acquired
 +     */
 +    public final boolean tryLockForWrite(L locker) {
 +        int state = mState;
 +        if (state == 0) {
 +            // no locks are held
 +            if (isUpgradeOrReadWriteFirst() && setWriteLock(state)) {
 +                // keep upgrade state bit clear to indicate automatic upgrade
 +                mOwner = locker;
 +                incrementUpgradeCount();
 +                incrementWriteCount();
 +                return true;
 +            }
 +        } else if (state == LOCK_STATE_UPGRADE) {
 +            // only upgrade lock is held; upgrade to full write lock
 +            if (mOwner == locker && setWriteLock(state)) {
 +                incrementWriteCount();
 +                return true;
 +            }
 +        } else if (state < 0) {
 +            // write lock is held, and upgrade lock might be held too
 +            if (mOwner == locker) {
 +                incrementWriteCount();
 +                return true;
 +            }
 +        }
 +        return false;
 +    }
 +
 +    /**
 +     * Attempt to acquire an exclusive lock, waiting a maximum amount of time.
 +     *
 +     * @param locker object trying to become lock owner
 +     * @return true if acquired
 +     */
 +    public final boolean tryLockForWrite(L locker, long timeout, TimeUnit unit)
 +        throws InterruptedException
 +    {
 +        if (Thread.interrupted()) {
 +            throw new InterruptedException();
 +        }
 +        if (!tryLockForWrite(locker)) {
 +            return lockForWriteQueuedInterruptibly(locker, addWriteWaiter(),
 +                                                   unit.toNanos(timeout));
 +        }
 +        return true;
 +    }
 +
 +    /**
 +     * Release a previously acquired write lock.
 +     */
 +    public final void unlockFromWrite(L locker) {
 +        int writeCount = mWriteCount - 1;
 +        if (writeCount < 0) {
 +            throw new IllegalMonitorStateException("Too many write locks released");
 +        }
 +        mWriteCount = writeCount;
 +        if (writeCount > 0) {
 +            return;
 +        }
 +
 +        // copy original state to check if upgrade lock was automatic
 +        final int state = mState;
 +
 +        // make sure upgrade lock is still held after releasing write lock
 +        mState = LOCK_STATE_UPGRADE;
 +
 +        Node h = mRWHead;
 +        if (h != null && h.mWaitStatus != 0) {
 +            unparkReadWriteSuccessor(h);
 +        }
 +
 +        if (state == LOCK_STATE_WRITE) {
 +            // upgrade owner was automatically set, so automatically clear it
 +            unlockFromUpgrade(locker);
 +        }
 +    }
 +
 +    public String toString() {
 +        int state = mState;
 +        int readLocks = state & ~LOCK_STATE_MASK;
 +        int upgradeLocks = mUpgradeCount;
 +        int writeLocks = mWriteCount;
 +
 +        return super.toString()
 +            + "[Read locks = " + readLocks
 +            + ", Upgrade locks = " + upgradeLocks
 +            + ", Write locks = " + writeLocks
 +            + ", Owner = " + mOwner
 +            + ']';
 +    }
 +
 +    /**
 +     * Add or subtract to the count of read locks held for the given
 +     * locker. Default implementation does nothing, and so read locks are not
 +     * reentrant.
 +     *
 +     * @throws IllegalMonitorStateException if count overflows or underflows
 +     */
 +    protected void adjustReadLockCount(L locker, int amount) {
 +    }
 +
 +    /**
 +     * Default implementation does nothing and always returns false, and so
 +     * read locks are not reentrant. Overridden implementation may choose to
 +     * always returns true, in which case read lock requests can starve upgrade
 +     * and write lock requests.
 +     */
 +    protected boolean isReadLockHeld(L locker) {
 +        return false;
 +    }
 +
 +    private Node enqForReadWrite(final Node node) {
 +        for (;;) {
 +            Node t = mRWTail;
 +            if (t == null) { // Must initialize
 +                Node h = new Node(); // Dummy header
 +                h.mNext = node;
 +                node.mPrev = h;
 +                if (cRWHeadRef.compareAndSet(this, null, h)) {
 +                    mRWTail = node;
 +                    return h;
 +                }
 +            } else {
 +                node.mPrev = t;
 +                if (cRWTailRef.compareAndSet(this, t, node)) {
 +                    t.mNext = node;
 +                    return t;
 +                }
 +            }
 +        }
 +    }
 +
 +    private Node enqForUpgrade(final Node node) {
 +        for (;;) {
 +            Node t = mUTail;
 +            if (t == null) { // Must initialize
 +                Node h = new Node(); // Dummy header
 +                h.mNext = node;
 +                node.mPrev = h;
 +                if (cUHeadRef.compareAndSet(this, null, h)) {
 +                    mUTail = node;
 +                    return h;
 +                }
 +            } else {
 +                node.mPrev = t;
 +                if (cUTailRef.compareAndSet(this, t, node)) {
 +                    t.mNext = node;
 +                    return t;
 +                }
 +            }
 +        }
 +    }
 +
 +    private Node addReadWaiter() {
 +        return addReadWriteWaiter(true);
 +    }
 +
 +    private Node addWriteWaiter() {
 +        return addReadWriteWaiter(false);
 +    }
 +
 +    private Node addReadWriteWaiter(boolean shared) {
 +        Node node = new Node(Thread.currentThread(), shared);
 +        // Try the fast path of enq; backup to full enq on failure
 +        Node pred = mRWTail;
 +        if (pred != null) {
 +            node.mPrev = pred;
 +            if (cRWTailRef.compareAndSet(this, pred, node)) {
 +                pred.mNext = node;
 +                return node;
 +            }
 +        }
 +        enqForReadWrite(node);
 +        return node;
 +    }
 +
 +    private Node addUpgradeWaiter() {
 +        Node node = new Node(Thread.currentThread(), false);
 +        // Try the fast path of enq; backup to full enq on failure
 +        Node pred = mUTail;
 +        if (pred != null) {
 +            node.mPrev = pred;
 +            if (cUTailRef.compareAndSet(this, pred, node)) {
 +                pred.mNext = node;
 +                return node;
 +            }
 +        }
 +        enqForUpgrade(node);
 +        return node;
 +    }
 +
 +    private void setReadWriteHead(Node node) {
 +        mRWHead = node;
 +        node.mThread = null;
 +        node.mPrev = null;
 +    }
 +
 +    private void setUpgradeHead(Node node) {
 +        mUHead = node;
 +        node.mThread = null;
 +        node.mPrev = null;
 +    }
 +
 +    private void unparkReadWriteSuccessor(Node node) {
 +        Node.cWaitStatusRef.compareAndSet(node, Node.SIGNAL, 0);
 +
 +        Node s = node.mNext;
 +        if (s == null || s.mWaitStatus > 0) {
 +            s = null;
 +            for (Node t = mRWTail; t != null && t != node; t = t.mPrev) {
 +                if (t.mWaitStatus <= 0) {
 +                    s = t;
 +                }
 +            }
 +        }
 +        if (s != null) {
 +            LockSupport.unpark(s.mThread);
 +        }
 +    }
 +
 +    private void unparkUpgradeSuccessor(Node node) {
 +        Node.cWaitStatusRef.compareAndSet(node, Node.SIGNAL, 0);
 +
 +        Node s = node.mNext;
 +        if (s == null || s.mWaitStatus > 0) {
 +            s = null;
 +            for (Node t = mUTail; t != null && t != node; t = t.mPrev) {
 +                if (t.mWaitStatus <= 0) {
 +                    s = t;
 +                }
 +            }
 +        }
 +        if (s != null) {
 +            LockSupport.unpark(s.mThread);
 +        }
 +    }
 +
 +    private void setReadWriteHeadAndPropagate(Node node) {
 +        setReadWriteHead(node);
 +        if (node.mWaitStatus != 0) {
 +            Node s = node.mNext;
 +            if (s == null || s.mShared) {
 +                unparkReadWriteSuccessor(node);
 +            }
 +        }
 +    }
 +
 +    private void cancelAcquireReadWrite(Node node) {
 +        if (node != null) {
 +            node.mThread = null;
 +            node.mWaitStatus = Node.CANCELLED;
 +            unparkReadWriteSuccessor(node);
 +        }
 +    }
 +
 +    private void cancelAcquireUpgrade(Node node) {
 +        if (node != null) {
 +            node.mThread = null;
 +            node.mWaitStatus = Node.CANCELLED;
 +            unparkUpgradeSuccessor(node);
 +        }
 +    }
 +
 +    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
 +        int s = pred.mWaitStatus;
 +        if (s < 0) {
 +            return true;
 +        }
 +        if (s > 0) {
 +            node.mPrev = pred.mPrev;
 +        } else {
 +            Node.cWaitStatusRef.compareAndSet(pred, 0, Node.SIGNAL);
 +        }
 +        return false;
 +    }
 +
 +    private static void selfInterrupt() {
 +        Thread.currentThread().interrupt();
 +    }
 +
 +    private final boolean parkAndCheckInterrupt() {
 +        LockSupport.park(this);
 +        return Thread.interrupted();
 +    }
 +
 +    /**
 +     * @return ACQUIRED or OWNED
 +     */
 +    private final Result lockForUpgradeQueued(L locker, final Node node) {
 +        try {
 +            boolean interrupted = false;
 +            for (;;) {
 +                final Node p = node.predecessor();
 +                Result result;
 +                if (p == mUHead && (result = tryLockForUpgrade_(locker)) != Result.FAILED) {
 +                    setUpgradeHead(node);
 +                    p.mNext = null; // help GC
 +                    if (interrupted) {
 +                        selfInterrupt();
 +                    }
 +                    return result;
 +                }
 +                if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
 +                    interrupted = true;
 +                }
 +            }
 +        } catch (RuntimeException e) {
 +            cancelAcquireUpgrade(node);
 +            throw e;
 +        }
 +    }
 +
 +    /**
 +     * @return ACQUIRED or OWNED
 +     */
 +    private final Result lockForUpgradeQueuedInterruptibly(L locker, final Node node)
 +        throws InterruptedException
 +    {
 +        try {
 +            for (;;) {
 +                final Node p = node.predecessor();
 +                Result result;
 +                if (p == mUHead && (result = tryLockForUpgrade_(locker)) != Result.FAILED) {
 +                    setUpgradeHead(node);
 +                    p.mNext = null; // help GC
 +                    return result;
 +                }
 +                if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
 +                    break;
 +                }
 +            }
 +        } catch (RuntimeException e) {
 +            cancelAcquireUpgrade(node);
 +            throw e;
 +        }
 +        // Arrive here only if interrupted
 +        cancelAcquireUpgrade(node);
 +        throw new InterruptedException();
 +    }
 +
 +    /**
 +     * @return FAILED, ACQUIRED or OWNED
 +     */
 +    private final Result lockForUpgradeQueuedInterruptibly(L locker, final Node node,
 +                                                           long nanosTimeout)
 +        throws InterruptedException
 +    {
 +        long lastTime = System.nanoTime();
 +        try {
 +            for (;;) {
 +                final Node p = node.predecessor();
 +                Result result;
 +                if (p == mUHead && (result = tryLockForUpgrade_(locker)) != Result.FAILED) {
 +                    setUpgradeHead(node);
 +                    p.mNext = null; // help GC
 +                    return result;
 +                }
 +                if (nanosTimeout <= 0) {
 +                    cancelAcquireUpgrade(node);
 +                    return Result.FAILED;
 +                }
 +                if (nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD
 +                    && shouldParkAfterFailedAcquire(p, node))
 +                {
 +                    LockSupport.parkNanos(this, nanosTimeout);
 +                }
 +                long now = System.nanoTime();
 +                nanosTimeout -= now - lastTime;
 +                lastTime = now;
 +                if (Thread.interrupted()) {
 +                    break;
 +                }
 +            }
 +        } catch (RuntimeException e) {
 +            cancelAcquireUpgrade(node);
 +            throw e;
 +        }
 +        // Arrive here only if interrupted
 +        cancelAcquireUpgrade(node);
 +        throw new InterruptedException();
 +    }
 +
 +    private final void lockForReadQueued(L locker, final Node node) {
 +        try {
 +            boolean interrupted = false;
 +            for (;;) {
 +                final Node p = node.predecessor();
 +                if (p == mRWHead && tryLockForRead(locker)) {
 +                    setReadWriteHeadAndPropagate(node);
 +                    p.mNext = null; // help GC
 +                    if (interrupted) {
 +                        selfInterrupt();
 +                    }
 +                    return;
 +                }
 +                if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
 +                    interrupted = true;
 +                }
 +            }
 +        } catch (RuntimeException e) {
 +            cancelAcquireReadWrite(node);
 +            throw e;
 +        }
 +    }
 +
 +    private final void lockForReadQueuedInterruptibly(L locker, final Node node)
 +        throws InterruptedException
 +    {
 +        try {
 +            for (;;) {
 +                final Node p = node.predecessor();
 +                if (p == mRWHead && tryLockForRead(locker)) {
 +                    setReadWriteHeadAndPropagate(node);
 +                    p.mNext = null; // help GC
 +                    return;
 +                }
 +                if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
 +                    break;
 +                }
 +            }
 +        } catch (RuntimeException e) {
 +            cancelAcquireReadWrite(node);
 +            throw e;
 +        }
 +        // Arrive here only if interrupted
 +        cancelAcquireReadWrite(node);
 +        throw new InterruptedException();
 +    }
 +
 +    /**
 +     * @return true if acquired
 +     */
 +    private final boolean lockForReadQueuedInterruptibly(L locker, final Node node,
 +                                                         long nanosTimeout)
 +        throws InterruptedException
 +    {
 +        long lastTime = System.nanoTime();
 +        try {
 +            for (;;) {
 +                final Node p = node.predecessor();
 +                if (p == mRWHead && tryLockForRead(locker)) {
 +                    setReadWriteHeadAndPropagate(node);
 +                    p.mNext = null; // help GC
 +                    return true;
 +                }
 +                if (nanosTimeout <= 0) {
 +                    cancelAcquireReadWrite(node);
 +                    return false;
 +                }
 +                if (nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD
 +                    && shouldParkAfterFailedAcquire(p, node))
 +                {
 +                    LockSupport.parkNanos(this, nanosTimeout);
 +                }
 +                long now = System.nanoTime();
 +                nanosTimeout -= now - lastTime;
 +                lastTime = now;
 +                if (Thread.interrupted()) {
 +                    break;
 +                }
 +            }
 +        } catch (RuntimeException e) {
 +            cancelAcquireReadWrite(node);
 +            throw e;
 +        }
 +        // Arrive here only if interrupted
 +        cancelAcquireReadWrite(node);
 +        throw new InterruptedException();
 +    }
 +
 +    private final void lockForWriteQueued(L locker, final Node node) {
 +        try {
 +            boolean interrupted = false;
 +            for (;;) {
 +                final Node p = node.predecessor();
 +                if (p == mRWHead && tryLockForWrite(locker)) {
 +                    setReadWriteHead(node);
 +                    p.mNext = null; // help GC
 +                    if (interrupted) {
 +                        selfInterrupt();
 +                    }
 +                    return;
 +                }
 +                if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
 +                    interrupted = true;
 +                }
 +            }
 +        } catch (RuntimeException e) {
 +            cancelAcquireReadWrite(node);
 +            throw e;
 +        }
 +    }
 +
 +    private final void lockForWriteQueuedInterruptibly(L locker, final Node node)
 +        throws InterruptedException
 +    {
 +        try {
 +            for (;;) {
 +                final Node p = node.predecessor();
 +                if (p == mRWHead && tryLockForWrite(locker)) {
 +                    setReadWriteHead(node);
 +                    p.mNext = null; // help GC
 +                    return;
 +                }
 +                if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
 +                    break;
 +                }
 +            }
 +        } catch (RuntimeException e) {
 +            cancelAcquireReadWrite(node);
 +            throw e;
 +        }
 +        // Arrive here only if interrupted
 +        cancelAcquireReadWrite(node);
 +        throw new InterruptedException();
 +    }
 +
 +    /**
 +     * @return true if acquired
 +     */
 +    private final boolean lockForWriteQueuedInterruptibly(L locker, final Node node,
 +                                                          long nanosTimeout)
 +        throws InterruptedException
 +    {
 +        long lastTime = System.nanoTime();
 +        try {
 +            for (;;) {
 +                final Node p = node.predecessor();
 +                if (p == mRWHead && tryLockForWrite(locker)) {
 +                    setReadWriteHead(node);
 +                    p.mNext = null; // help GC
 +                    return true;
 +                }
 +                if (nanosTimeout <= 0) {
 +                    cancelAcquireReadWrite(node);
 +                    return false;
 +                }
 +                if (nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD
 +                    && shouldParkAfterFailedAcquire(p, node))
 +                {
 +                    LockSupport.parkNanos(this, nanosTimeout);
 +                }
 +                long now = System.nanoTime();
 +                nanosTimeout -= now - lastTime;
 +                lastTime = now;
 +                if (Thread.interrupted()) {
 +                    break;
 +                }
 +            }
 +        } catch (RuntimeException e) {
 +            cancelAcquireReadWrite(node);
 +            throw e;
 +        }
 +        // Arrive here only if interrupted
 +        cancelAcquireReadWrite(node);
 +        throw new InterruptedException();
 +    }
 +
 +    private final boolean isReadWriteFirst() {
 +        Node h;
 +        if ((h = mRWHead) == null) {
 +            return true;
 +        }
 +        Thread current = Thread.currentThread();
 +        Node s;
 +        return ((s = h.mNext) != null && s.mThread == current) || fullIsReadWriteFirst(current);
 +    }
 +
 +    private final boolean fullIsReadWriteFirst(Thread current) {
 +        Node h, s;
 +        Thread firstThread = null;
 +        if (((h = mRWHead) != null && (s = h.mNext) != null &&
 +             s.mPrev == mRWHead && (firstThread = s.mThread) != null))
 +        {
 +            return firstThread == current;
 +        }
 +        Node t = mRWTail;
 +        while (t != null && t != mRWHead) {
 +            Thread tt = t.mThread;
 +            if (tt != null) {
 +                firstThread = tt;
 +            }
 +            t = t.mPrev;
 +        }
 +        return firstThread == current || firstThread == null;
 +    }
 +
 +    private final boolean isUpgradeFirst() {
 +        Node h;
 +        if ((h = mUHead) == null) {
 +            return true;
 +        }
 +        Thread current = Thread.currentThread();
 +        Node s;
 +        return ((s = h.mNext) != null && s.mThread == current) || fullIsUpgradeFirst(current);
 +    }
 +
 +    private final boolean fullIsUpgradeFirst(Thread current) {
 +        Node h, s;
 +        Thread firstThread = null;
 +        if (((h = mUHead) != null && (s = h.mNext) != null &&
 +             s.mPrev == mUHead && (firstThread = s.mThread) != null))
 +        {
 +            return firstThread == current;
 +        }
 +        Node t = mUTail;
 +        while (t != null && t != mUHead) {
 +            Thread tt = t.mThread;
 +            if (tt != null) {
 +                firstThread = tt;
 +            }
 +            t = t.mPrev;
 +        }
 +        return firstThread == current || firstThread == null;
 +    }
 +
 +    private final boolean isUpgradeOrReadWriteFirst() {
 +        Node uh, rwh;
 +        if ((uh = mUHead) == null || (rwh = mRWHead) == null) {
 +            return true;
 +        }
 +        Thread current = Thread.currentThread();
 +        Node us, rws;
 +        return ((us = uh.mNext) != null && us.mThread == current)
 +            || ((rws = rwh.mNext) != null && rws.mThread == current)
 +            || fullIsUpgradeFirst(current)
 +            || fullIsReadWriteFirst(current);
 +    }
 +
 +    /**
 +     * @return false if state changed
 +     */
 +    private boolean incrementReadLocks(int state) {
 +        int readLocks = (state & ~LOCK_STATE_MASK) + 1;
 +        if (readLocks == LOCK_STATE_MASK) {
 +            throw new IllegalMonitorStateException("Maximum read lock count exceeded");
 +        }
 +        return cStateRef.compareAndSet(this, state, state & LOCK_STATE_MASK | readLocks);
 +    }
 +
 +    /**
 +     * @return number of remaining read locks or negative if concurrent
 +     * modification prevented operation
 +     */
 +    private int decrementReadLocks(int state) {
 +        int readLocks = (state & ~LOCK_STATE_MASK) - 1;
 +        if (readLocks < 0) {
 +            throw new IllegalMonitorStateException("Too many read locks released");
 +        }
 +        if (cStateRef.compareAndSet(this, state, state & LOCK_STATE_MASK | readLocks)) {
 +            return readLocks;
 +        }
 +        return -1;
 +    }
 +
 +    /**
 +     * @return false if concurrent modification prevented operation
 +     */
 +    private boolean setUpgradeLock(int state) {
 +        return cStateRef.compareAndSet(this, state, state | LOCK_STATE_UPGRADE);
 +    }
 +
 +    /**
 +     * @return false if concurrent modification prevented operation
 +     */
 +    private boolean clearUpgradeLock(int state) {
 +        return cStateRef.compareAndSet(this, state, state & ~LOCK_STATE_UPGRADE);
 +    }
 +
 +    private void incrementUpgradeCount() {
 +        int upgradeCount = mUpgradeCount + 1;
 +        if (upgradeCount < 0) {
 +            throw new IllegalMonitorStateException("Maximum upgrade lock count exceeded");
 +        }
 +        mUpgradeCount = upgradeCount;
 +    }
 +
 +    /**
 +     * @return false if concurrent modification prevented operation
 +     */
 +    private boolean setWriteLock(int state) {
 +        return cStateRef.compareAndSet(this, state, state | LOCK_STATE_WRITE);
 +    }
 +
 +    private void incrementWriteCount() {
 +        int writeCount = mWriteCount + 1;
 +        if (writeCount < 0) {
 +            throw new IllegalMonitorStateException("Maximum write lock count exceeded");
 +        }
 +        mWriteCount = writeCount;
 +    }
 +
 +    /**
 +     * Node class ripped off from AbstractQueuedSynchronizer and modified
 +     * slightly. Read the comments in that class for better understanding.
 +     */
 +    static final class Node {
 +        static final AtomicIntegerFieldUpdater<Node> cWaitStatusRef =
 +            AtomicIntegerFieldUpdater.newUpdater(Node.class, "mWaitStatus");
 +
 +        static final int CANCELLED =  1;
 +        static final int SIGNAL    = -1;
 +
 +        volatile int mWaitStatus;
 +        volatile Node mPrev;
 +        volatile Node mNext;
 +        volatile Thread mThread;
 +
 +        final boolean mShared;
 +
 +        // Used to establish initial head
 +        Node() {
 +            mShared = false;
 +        }
 +
 +        Node(Thread thread, boolean shared) {
 +            mThread = thread;
 +            mShared = shared;
 +        }
 +
 +        final Node predecessor() throws NullPointerException {
 +            Node p = mPrev;
 +            if (p == null) {
 +                throw new NullPointerException();
 +            } else {
 +                return p;
 +            }
 +        }
 +    }
 +}
 diff --git a/src/main/java/com/amazon/carbonado/repo/map/package-info.java b/src/main/java/com/amazon/carbonado/repo/map/package-info.java new file mode 100644 index 0000000..b386251 --- /dev/null +++ b/src/main/java/com/amazon/carbonado/repo/map/package-info.java @@ -0,0 +1,24 @@ +/*
 + * Copyright 2008 Amazon Technologies, Inc. or its affiliates.
 + * Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
 + * of Amazon Technologies, Inc. or its affiliates.  All rights reserved.
 + *
 + * Licensed under the Apache License, Version 2.0 (the "License");
 + * you may not use this file except in compliance with the License.
 + * You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +/**
 + * Volatile repository implementation backed by a concurrent map.
 + *
 + * @see com.amazon.carbonado.repo.map.MapRepositoryBuilder
 + */
 +package com.amazon.carbonado.repo.map;
 | 
