/*
* Copyright 2007-2012 Amazon Technologies, Inc. or its affiliates.
* Amazon, Amazon.com and Carbonado are trademarks or registered trademarks
* of Amazon Technologies, Inc. or its affiliates. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.amazon.carbonado.repo.map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.locks.LockSupport;
/**
* Partially reentrant, upgradable read/write lock. Up to 1,073,741,824 read
* locks can be held. Upgraders and writers may re-enter the lock up to
* 2,147,483,648 times. Attempts by readers to re-enter the lock is not
* detected and is deadlock prone, unless locker already holds an upgrade or
* write lock. Subclasses can support full reentrancy by overriding the
* protected read lock adjust and hold check methods.
*
*
This lock implementation differs from the usual Java lock with respect to
* lock ownership. Locks are not owned by threads, but by arbitrary locker
* objects. A thread which attempts to acquire an upgrade or write lock twice
* with different locker objects will deadlock on the second attempt.
*
*
As is typical of read/write lock implementations, a read lock blocks
* waiting writers, but it doesn't block other readers. A write lock is
* exclusive and can be held by at most one locker. Attempting to acquire a
* write lock while a read lock is held by the same locker is inherently
* deadlock prone, and some read/write lock implementations will always
* deadlock.
*
*
An upgrade lock allows a read lock to be safely upgraded to a write
* lock. Instead of acquiring a read lock, an upgrade lock is acquired. This
* acts like a shared read lock in that readers are not blocked, but it also
* acts like an exclusive write lock -- writers and upgraders are blocked. With
* an upgrade lock held, the locker may acquire a write lock without deadlock.
*
*
* Locks held Locks safely Locks acquirable
* by owner acquirable by owner by other lockers
* --------------------------------------------------
* - - - R U W R U W
* R - - - - - R U -
* R U - - U - R - -
* - U - R U W R - -
* - U W R U W - - -
* R U W R U W - - -
* R - W R U W - - -
* - - W R U W - - -
*
*
* @author Brian S O'Neill
* @param Locker type
*/
class UpgradableLock {
// Design note: This class borrows heavily from AbstractQueuedSynchronizer.
// Consult that class for understanding the locking mechanics.
private static enum Result {
/** Lock acquisition failed */
FAILED,
/** Lock has just been acquired by locker and can be safely released later */
ACQUIRED,
/** Lock is already owned by locker and should not be released more than once */
OWNED
}
private static final AtomicReferenceFieldUpdater cRWHeadRef =
AtomicReferenceFieldUpdater.newUpdater
(UpgradableLock.class, Node.class, "mRWHead");
private static final AtomicReferenceFieldUpdater cRWTailRef =
AtomicReferenceFieldUpdater.newUpdater
(UpgradableLock.class, Node.class, "mRWTail");
private static final AtomicReferenceFieldUpdater cUHeadRef =
AtomicReferenceFieldUpdater.newUpdater
(UpgradableLock.class, Node.class, "mUHead");
private static final AtomicReferenceFieldUpdater cUTailRef =
AtomicReferenceFieldUpdater.newUpdater
(UpgradableLock.class, Node.class, "mUTail");
private static final AtomicIntegerFieldUpdater cStateRef =
AtomicIntegerFieldUpdater.newUpdater
(UpgradableLock.class, "mState");
// State mask bits for held locks. Read lock count is stored in lower 30 bits of state.
private static final int LOCK_STATE_UPGRADE = 0x40000000;
// Write state must be this value in order for quick sign check to work.
private static final int LOCK_STATE_WRITE = 0x80000000;
private static final int LOCK_STATE_MASK = LOCK_STATE_UPGRADE | LOCK_STATE_WRITE;
/**
* The number of nanoseconds for which it is faster to spin
* rather than to use timed park. A rough estimate suffices
* to improve responsiveness with very short timeouts.
*/
private static final long SPIN_FOR_TIMEOUT_THRESHOLD = 1000L;
// Head of read-write queue.
private transient volatile Node mRWHead;
// Tail of read-write queue.
private transient volatile Node mRWTail;
// Head of write upgrade queue.
private transient volatile Node mUHead;
// Tail of write upgrade queue.
private transient volatile Node mUTail;
private transient volatile int mState;
// Owner holds an upgradable lock and possibly a write lock too.
private transient L mOwner;
// Counts number of times that owner has entered an upgradable or write lock.
private transient int mUpgradeCount;
private transient int mWriteCount;
public UpgradableLock() {
}
/**
* Acquire a shared read lock, possibly blocking indefinitely.
*
* @param locker object which might be write or upgrade lock owner
*/
public final void lockForRead(L locker) {
if (!tryLockForRead(locker)) {
lockForReadQueued(locker, addReadWaiter());
}
}
/**
* Acquire a shared read lock, possibly blocking until interrupted.
*
* @param locker object which might be write or upgrade lock owner
*/
public final void lockForReadInterruptibly(L locker) throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
}
if (!tryLockForRead(locker)) {
lockForReadQueuedInterruptibly(locker, addReadWaiter());
}
}
/**
* Attempt to immediately acquire a shared read lock.
*
* @param locker object which might be write or upgrade lock owner
* @return true if acquired
*/
public final boolean tryLockForRead(L locker) {
int state = mState;
if (state >= 0) { // no write lock is held
if (isReadWriteFirst() || isReadLockHeld(locker)) {
do {
if (incrementReadLocks(state)) {
adjustReadLockCount(locker, 1);
return true;
}
// keep looping on CAS failure if a reader or upgrader mucked with the state
} while ((state = mState) >= 0);
}
} else if (mOwner == locker) {
// keep looping on CAS failure if a reader or upgrader mucked with the state
while (!incrementReadLocks(state)) {
state = mState;
}
adjustReadLockCount(locker, 1);
return true;
}
return false;
}
/**
* Attempt to acquire a shared read lock, waiting a maximum amount of
* time.
*
* @param locker object which might be write or upgrade lock owner
* @return true if acquired
*/
public final boolean tryLockForRead(L locker, long timeout, TimeUnit unit)
throws InterruptedException
{
if (Thread.interrupted()) {
throw new InterruptedException();
}
if (!tryLockForRead(locker)) {
return lockForReadQueuedInterruptibly(locker, addReadWaiter(), unit.toNanos(timeout));
}
return true;
}
/**
* Release a previously acquired read lock.
*/
public final void unlockFromRead(L locker) {
adjustReadLockCount(locker, -1);
int readLocks;
while ((readLocks = decrementReadLocks(mState)) < 0) {}
if (readLocks == 0) {
Node h = mRWHead;
if (h != null && h.mWaitStatus != 0) {
unparkReadWriteSuccessor(h);
}
}
}
/**
* Acquire an upgrade lock, possibly blocking indefinitely.
*
* @param locker object trying to become lock owner
* @return true if acquired
*/
public final boolean lockForUpgrade(L locker) {
return lockForUpgrade_(locker) != Result.FAILED;
}
/**
* Acquire an upgrade lock, possibly blocking indefinitely.
*
* @param locker object trying to become lock owner
* @return ACQUIRED or OWNED
*/
private final Result lockForUpgrade_(L locker) {
Result result;
if ((result = tryLockForUpgrade_(locker)) == Result.FAILED) {
result = lockForUpgradeQueued(locker, addUpgradeWaiter());
}
return result;
}
/**
* Acquire an upgrade lock, possibly blocking until interrupted.
*
* @param locker object trying to become lock owner
* @return true if acquired
*/
public final boolean lockForUpgradeInterruptibly(L locker) throws InterruptedException {
return lockForUpgradeInterruptibly_(locker) != Result.FAILED;
}
/**
* Acquire an upgrade lock, possibly blocking until interrupted.
*
* @param locker object trying to become lock owner
* @return ACQUIRED or OWNED
*/
private final Result lockForUpgradeInterruptibly_(L locker) throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
}
Result result;
if ((result = tryLockForUpgrade_(locker)) == Result.FAILED) {
result = lockForUpgradeQueuedInterruptibly(locker, addUpgradeWaiter());
}
return result;
}
/**
* Attempt to immediately acquire an upgrade lock.
*
* @param locker object trying to become lock owner
* @return true if acquired
*/
public final boolean tryLockForUpgrade(L locker) {
return tryLockForUpgrade_(locker) != Result.FAILED;
}
/**
* Attempt to immediately acquire an upgrade lock.
*
* @param locker object trying to become lock owner
* @return FAILED, ACQUIRED or OWNED
*/
private final Result tryLockForUpgrade_(L locker) {
int state = mState;
if ((state & LOCK_STATE_MASK) == 0) { // no write or upgrade lock is held
if (isUpgradeFirst()) {
do {
if (setUpgradeLock(state)) {
mOwner = locker;
incrementUpgradeCount();
return Result.ACQUIRED;
}
// keep looping on CAS failure if a reader mucked with the state
} while (((state = mState) & LOCK_STATE_MASK) == 0);
}
} else if (mOwner == locker) {
incrementUpgradeCount();
return Result.OWNED;
}
return Result.FAILED;
}
/**
* Attempt to acquire an upgrade lock, waiting a maximum amount of time.
*
* @param locker object trying to become lock owner
* @return true if acquired
*/
public final boolean tryLockForUpgrade(L locker, long timeout, TimeUnit unit)
throws InterruptedException
{
return tryLockForUpgrade_(locker, timeout, unit) != Result.FAILED;
}
/**
* Attempt to acquire an upgrade lock, waiting a maximum amount of time.
*
* @param locker object trying to become lock owner
* @return FAILED, ACQUIRED or OWNED
*/
private final Result tryLockForUpgrade_(L locker, long timeout, TimeUnit unit)
throws InterruptedException
{
if (Thread.interrupted()) {
throw new InterruptedException();
}
Result result;
if ((result = tryLockForUpgrade_(locker)) == Result.FAILED) {
result = lockForUpgradeQueuedInterruptibly(locker, addUpgradeWaiter(),
unit.toNanos(timeout));
}
return result;
}
/**
* Release a previously acquired upgrade lock.
*/
public final void unlockFromUpgrade(L locker) {
int upgradeCount = mUpgradeCount - 1;
if (upgradeCount < 0) {
throw new IllegalMonitorStateException("Too many upgrade locks released");
}
if (upgradeCount == 0 && mWriteCount > 0) {
// Don't release last upgrade lock and switch write lock to
// automatic upgrade mode.
clearUpgradeLock(mState);
return;
}
mUpgradeCount = upgradeCount;
if (upgradeCount > 0) {
return;
}
mOwner = null;
// keep looping on CAS failure if reader mucked with state
while (!clearUpgradeLock(mState)) {}
Node h = mUHead;
if (h != null && h.mWaitStatus != 0) {
unparkUpgradeSuccessor(h);
}
}
/**
* Acquire an exclusive write lock, possibly blocking indefinitely.
*
* @param locker object trying to become lock owner
*/
public final void lockForWrite(L locker) {
if (!tryLockForWrite(locker)) {
Result upgradeResult = lockForUpgrade_(locker);
if (!tryLockForWrite(locker)) {
lockForWriteQueued(locker, addWriteWaiter());
}
if (upgradeResult == Result.ACQUIRED) {
// clear upgrade state bit to indicate automatic upgrade
while (!clearUpgradeLock(mState)) {}
} else {
// undo automatic upgrade count increment
mUpgradeCount--;
}
}
}
/**
* Acquire an exclusive write lock, possibly blocking until interrupted.
*
* @param locker object trying to become lock owner
*/
public final void lockForWriteInterruptibly(L locker) throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
}
if (!tryLockForWrite(locker)) {
Result upgradeResult = lockForUpgradeInterruptibly_(locker);
if (!tryLockForWrite(locker)) {
lockForWriteQueuedInterruptibly(locker, addWriteWaiter());
}
if (upgradeResult == Result.ACQUIRED) {
// clear upgrade state bit to indicate automatic upgrade
while (!clearUpgradeLock(mState)) {}
} else {
// undo automatic upgrade count increment
mUpgradeCount--;
}
}
}
/**
* Attempt to immediately acquire an exclusive lock.
*
* @param locker object trying to become lock owner
* @return true if acquired
*/
public final boolean tryLockForWrite(L locker) {
int state = mState;
if (state == 0) {
// no locks are held
if (isUpgradeOrReadWriteFirst() && setWriteLock(state)) {
// keep upgrade state bit clear to indicate automatic upgrade
mOwner = locker;
incrementUpgradeCount();
incrementWriteCount();
return true;
}
} else if (state == LOCK_STATE_UPGRADE) {
// only upgrade lock is held; upgrade to full write lock
if (mOwner == locker && setWriteLock(state)) {
incrementWriteCount();
return true;
}
} else if (state < 0) {
// write lock is held, and upgrade lock might be held too
if (mOwner == locker) {
incrementWriteCount();
return true;
}
}
return false;
}
/**
* Attempt to acquire an exclusive lock, waiting a maximum amount of time.
*
* @param locker object trying to become lock owner
* @return true if acquired
*/
public final boolean tryLockForWrite(L locker, long timeout, TimeUnit unit)
throws InterruptedException
{
if (Thread.interrupted()) {
throw new InterruptedException();
}
if (!tryLockForWrite(locker)) {
long start = System.nanoTime();
Result upgradeResult = tryLockForUpgrade_(locker, timeout, unit);
if (upgradeResult == Result.FAILED) {
return false;
}
if (!tryLockForWrite(locker)) {
unlockFromUpgrade(locker);
if ((timeout = unit.toNanos(timeout) - (System.nanoTime() - start)) <= 0) {
return false;
}
if (!lockForWriteQueuedInterruptibly(locker, addWriteWaiter(), timeout)) {
return false;
}
}
if (upgradeResult == Result.ACQUIRED) {
// clear upgrade state bit to indicate automatic upgrade
while (!clearUpgradeLock(mState)) {}
} else {
// undo automatic upgrade count increment
mUpgradeCount--;
}
}
return true;
}
/**
* Release a previously acquired write lock.
*/
public final void unlockFromWrite(L locker) {
int writeCount = mWriteCount - 1;
if (writeCount < 0) {
throw new IllegalMonitorStateException("Too many write locks released");
}
mWriteCount = writeCount;
if (writeCount > 0) {
return;
}
// copy original state to check if upgrade lock was automatic
final int state = mState;
// make sure upgrade lock is still held after releasing write lock
mState = LOCK_STATE_UPGRADE;
Node h = mRWHead;
if (h != null && h.mWaitStatus != 0) {
unparkReadWriteSuccessor(h);
}
if (state == LOCK_STATE_WRITE) {
// upgrade owner was automatically set, so automatically clear it
unlockFromUpgrade(locker);
}
}
@Override
public String toString() {
int state = mState;
int readLocks = state & ~LOCK_STATE_MASK;
int upgradeLocks = mUpgradeCount;
int writeLocks = mWriteCount;
return super.toString()
+ "[Read locks = " + readLocks
+ ", Upgrade locks = " + upgradeLocks
+ ", Write locks = " + writeLocks
+ ", Owner = " + mOwner
+ ']';
}
/**
* Add or subtract to the count of read locks held for the given
* locker. Default implementation does nothing, and so read locks are not
* reentrant.
*
* @throws IllegalMonitorStateException if count overflows or underflows
*/
protected void adjustReadLockCount(L locker, int amount) {
}
/**
* Default implementation does nothing and always returns false, and so
* read locks are not reentrant. Overridden implementation may choose to
* always returns true, in which case read lock requests can starve upgrade
* and write lock requests.
*/
protected boolean isReadLockHeld(L locker) {
return false;
}
private Node enqForReadWrite(final Node node) {
for (;;) {
Node t = mRWTail;
if (t == null) { // Must initialize
Node h = new Node(); // Dummy header
h.mNext = node;
node.mPrev = h;
if (cRWHeadRef.compareAndSet(this, null, h)) {
mRWTail = node;
return h;
}
} else {
node.mPrev = t;
if (cRWTailRef.compareAndSet(this, t, node)) {
t.mNext = node;
return t;
}
}
}
}
private Node enqForUpgrade(final Node node) {
for (;;) {
Node t = mUTail;
if (t == null) { // Must initialize
Node h = new Node(); // Dummy header
h.mNext = node;
node.mPrev = h;
if (cUHeadRef.compareAndSet(this, null, h)) {
mUTail = node;
return h;
}
} else {
node.mPrev = t;
if (cUTailRef.compareAndSet(this, t, node)) {
t.mNext = node;
return t;
}
}
}
}
private Node addReadWaiter() {
return addReadWriteWaiter(true);
}
private Node addWriteWaiter() {
return addReadWriteWaiter(false);
}
private Node addReadWriteWaiter(boolean shared) {
Node node = new Node(Thread.currentThread(), shared);
// Try the fast path of enq; backup to full enq on failure
Node pred = mRWTail;
if (pred != null) {
node.mPrev = pred;
if (cRWTailRef.compareAndSet(this, pred, node)) {
pred.mNext = node;
return node;
}
}
enqForReadWrite(node);
return node;
}
private Node addUpgradeWaiter() {
Node node = new Node(Thread.currentThread(), false);
// Try the fast path of enq; backup to full enq on failure
Node pred = mUTail;
if (pred != null) {
node.mPrev = pred;
if (cUTailRef.compareAndSet(this, pred, node)) {
pred.mNext = node;
return node;
}
}
enqForUpgrade(node);
return node;
}
private void setReadWriteHead(Node node) {
mRWHead = node;
node.mThread = null;
node.mPrev = null;
}
private void setUpgradeHead(Node node) {
mUHead = node;
node.mThread = null;
node.mPrev = null;
}
private void unparkReadWriteSuccessor(Node node) {
Node.cWaitStatusRef.compareAndSet(node, Node.SIGNAL, 0);
Node s = node.mNext;
if (s == null || s.mWaitStatus > 0) {
s = null;
for (Node t = mRWTail; t != null && t != node; t = t.mPrev) {
if (t.mWaitStatus <= 0) {
s = t;
}
}
}
if (s != null) {
LockSupport.unpark(s.mThread);
}
}
private void unparkUpgradeSuccessor(Node node) {
Node.cWaitStatusRef.compareAndSet(node, Node.SIGNAL, 0);
Node s = node.mNext;
if (s == null || s.mWaitStatus > 0) {
s = null;
for (Node t = mUTail; t != null && t != node; t = t.mPrev) {
if (t.mWaitStatus <= 0) {
s = t;
}
}
}
if (s != null) {
LockSupport.unpark(s.mThread);
}
}
private void setReadWriteHeadAndPropagate(Node node) {
setReadWriteHead(node);
if (node.mWaitStatus != 0) {
Node s = node.mNext;
if (s == null || s.mShared) {
unparkReadWriteSuccessor(node);
}
}
}
private void cancelAcquireReadWrite(Node node) {
if (node != null) {
node.mThread = null;
node.mWaitStatus = Node.CANCELLED;
unparkReadWriteSuccessor(node);
}
}
private void cancelAcquireUpgrade(Node node) {
if (node != null) {
node.mThread = null;
node.mWaitStatus = Node.CANCELLED;
unparkUpgradeSuccessor(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int s = pred.mWaitStatus;
if (s < 0) {
return true;
}
if (s > 0) {
node.mPrev = pred.mPrev;
} else {
Node.cWaitStatusRef.compareAndSet(pred, 0, Node.SIGNAL);
}
return false;
}
private static void selfInterrupt() {
Thread.currentThread().interrupt();
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
/**
* @return ACQUIRED or OWNED
*/
private final Result lockForUpgradeQueued(L locker, final Node node) {
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
Result result;
if (p == mUHead && (result = tryLockForUpgrade_(locker)) != Result.FAILED) {
setUpgradeHead(node);
p.mNext = null; // help GC
if (interrupted) {
selfInterrupt();
}
return result;
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
interrupted = true;
}
}
} catch (RuntimeException e) {
cancelAcquireUpgrade(node);
throw e;
}
}
/**
* @return ACQUIRED or OWNED
*/
private final Result lockForUpgradeQueuedInterruptibly(L locker, final Node node)
throws InterruptedException
{
try {
for (;;) {
final Node p = node.predecessor();
Result result;
if (p == mUHead && (result = tryLockForUpgrade_(locker)) != Result.FAILED) {
setUpgradeHead(node);
p.mNext = null; // help GC
return result;
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
break;
}
}
} catch (RuntimeException e) {
cancelAcquireUpgrade(node);
throw e;
}
// Arrive here only if interrupted
cancelAcquireUpgrade(node);
throw new InterruptedException();
}
/**
* @return FAILED, ACQUIRED or OWNED
*/
private final Result lockForUpgradeQueuedInterruptibly(L locker, final Node node,
long nanosTimeout)
throws InterruptedException
{
long lastTime = System.nanoTime();
try {
for (;;) {
final Node p = node.predecessor();
Result result;
if (p == mUHead && (result = tryLockForUpgrade_(locker)) != Result.FAILED) {
setUpgradeHead(node);
p.mNext = null; // help GC
return result;
}
if (nanosTimeout <= 0) {
cancelAcquireUpgrade(node);
return Result.FAILED;
}
if (nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD
&& shouldParkAfterFailedAcquire(p, node))
{
LockSupport.parkNanos(this, nanosTimeout);
}
long now = System.nanoTime();
nanosTimeout -= now - lastTime;
lastTime = now;
if (Thread.interrupted()) {
break;
}
}
} catch (RuntimeException e) {
cancelAcquireUpgrade(node);
throw e;
}
// Arrive here only if interrupted
cancelAcquireUpgrade(node);
throw new InterruptedException();
}
private final void lockForReadQueued(L locker, final Node node) {
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == mRWHead && tryLockForRead(locker)) {
setReadWriteHeadAndPropagate(node);
p.mNext = null; // help GC
if (interrupted) {
selfInterrupt();
}
return;
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
interrupted = true;
}
}
} catch (RuntimeException e) {
cancelAcquireReadWrite(node);
throw e;
}
}
private final void lockForReadQueuedInterruptibly(L locker, final Node node)
throws InterruptedException
{
try {
for (;;) {
final Node p = node.predecessor();
if (p == mRWHead && tryLockForRead(locker)) {
setReadWriteHeadAndPropagate(node);
p.mNext = null; // help GC
return;
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
break;
}
}
} catch (RuntimeException e) {
cancelAcquireReadWrite(node);
throw e;
}
// Arrive here only if interrupted
cancelAcquireReadWrite(node);
throw new InterruptedException();
}
/**
* @return true if acquired
*/
private final boolean lockForReadQueuedInterruptibly(L locker, final Node node,
long nanosTimeout)
throws InterruptedException
{
long lastTime = System.nanoTime();
try {
for (;;) {
final Node p = node.predecessor();
if (p == mRWHead && tryLockForRead(locker)) {
setReadWriteHeadAndPropagate(node);
p.mNext = null; // help GC
return true;
}
if (nanosTimeout <= 0) {
cancelAcquireReadWrite(node);
return false;
}
if (nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD
&& shouldParkAfterFailedAcquire(p, node))
{
LockSupport.parkNanos(this, nanosTimeout);
}
long now = System.nanoTime();
nanosTimeout -= now - lastTime;
lastTime = now;
if (Thread.interrupted()) {
break;
}
}
} catch (RuntimeException e) {
cancelAcquireReadWrite(node);
throw e;
}
// Arrive here only if interrupted
cancelAcquireReadWrite(node);
throw new InterruptedException();
}
private final void lockForWriteQueued(L locker, final Node node) {
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == mRWHead && tryLockForWrite(locker)) {
setReadWriteHead(node);
p.mNext = null; // help GC
if (interrupted) {
selfInterrupt();
}
return;
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
interrupted = true;
}
}
} catch (RuntimeException e) {
cancelAcquireReadWrite(node);
throw e;
}
}
private final void lockForWriteQueuedInterruptibly(L locker, final Node node)
throws InterruptedException
{
try {
for (;;) {
final Node p = node.predecessor();
if (p == mRWHead && tryLockForWrite(locker)) {
setReadWriteHead(node);
p.mNext = null; // help GC
return;
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
break;
}
}
} catch (RuntimeException e) {
cancelAcquireReadWrite(node);
throw e;
}
// Arrive here only if interrupted
cancelAcquireReadWrite(node);
throw new InterruptedException();
}
/**
* @return true if acquired
*/
private final boolean lockForWriteQueuedInterruptibly(L locker, final Node node,
long nanosTimeout)
throws InterruptedException
{
long lastTime = System.nanoTime();
try {
for (;;) {
final Node p = node.predecessor();
if (p == mRWHead && tryLockForWrite(locker)) {
setReadWriteHead(node);
p.mNext = null; // help GC
return true;
}
if (nanosTimeout <= 0) {
cancelAcquireReadWrite(node);
return false;
}
if (nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD
&& shouldParkAfterFailedAcquire(p, node))
{
LockSupport.parkNanos(this, nanosTimeout);
}
long now = System.nanoTime();
nanosTimeout -= now - lastTime;
lastTime = now;
if (Thread.interrupted()) {
break;
}
}
} catch (RuntimeException e) {
cancelAcquireReadWrite(node);
throw e;
}
// Arrive here only if interrupted
cancelAcquireReadWrite(node);
throw new InterruptedException();
}
private final boolean isReadWriteFirst() {
Node h;
if ((h = mRWHead) == null) {
return true;
}
Thread current = Thread.currentThread();
Node s;
return ((s = h.mNext) != null && s.mThread == current) || fullIsReadWriteFirst(current);
}
private final boolean fullIsReadWriteFirst(Thread current) {
Node h, s;
Thread firstThread = null;
if (((h = mRWHead) != null && (s = h.mNext) != null &&
s.mPrev == mRWHead && (firstThread = s.mThread) != null))
{
return firstThread == current;
}
Node t = mRWTail;
while (t != null && t != mRWHead) {
Thread tt = t.mThread;
if (tt != null) {
firstThread = tt;
}
t = t.mPrev;
}
return firstThread == current || firstThread == null;
}
private final boolean isUpgradeFirst() {
Node h;
if ((h = mUHead) == null) {
return true;
}
Thread current = Thread.currentThread();
Node s;
return ((s = h.mNext) != null && s.mThread == current) || fullIsUpgradeFirst(current);
}
private final boolean fullIsUpgradeFirst(Thread current) {
Node h, s;
Thread firstThread = null;
if (((h = mUHead) != null && (s = h.mNext) != null &&
s.mPrev == mUHead && (firstThread = s.mThread) != null))
{
return firstThread == current;
}
Node t = mUTail;
while (t != null && t != mUHead) {
Thread tt = t.mThread;
if (tt != null) {
firstThread = tt;
}
t = t.mPrev;
}
return firstThread == current || firstThread == null;
}
private final boolean isUpgradeOrReadWriteFirst() {
Node uh, rwh;
if ((uh = mUHead) == null || (rwh = mRWHead) == null) {
return true;
}
Thread current = Thread.currentThread();
Node us, rws;
return ((us = uh.mNext) != null && us.mThread == current)
|| ((rws = rwh.mNext) != null && rws.mThread == current)
|| fullIsUpgradeFirst(current)
|| fullIsReadWriteFirst(current);
}
/**
* @return false if state changed
*/
private boolean incrementReadLocks(int state) {
int readLocks = (state & ~LOCK_STATE_MASK) + 1;
if (readLocks == LOCK_STATE_MASK) {
throw new IllegalMonitorStateException("Maximum read lock count exceeded");
}
return cStateRef.compareAndSet(this, state, state & LOCK_STATE_MASK | readLocks);
}
/**
* @return number of remaining read locks or negative if concurrent
* modification prevented operation
*/
private int decrementReadLocks(int state) {
int readLocks = (state & ~LOCK_STATE_MASK) - 1;
if (readLocks < 0) {
throw new IllegalMonitorStateException("Too many read locks released");
}
if (cStateRef.compareAndSet(this, state, state & LOCK_STATE_MASK | readLocks)) {
return readLocks;
}
return -1;
}
/**
* @return false if concurrent modification prevented operation
*/
private boolean setUpgradeLock(int state) {
return cStateRef.compareAndSet(this, state, state | LOCK_STATE_UPGRADE);
}
/**
* @return false if concurrent modification prevented operation
*/
private boolean clearUpgradeLock(int state) {
return cStateRef.compareAndSet(this, state, state & ~LOCK_STATE_UPGRADE);
}
private void incrementUpgradeCount() {
int upgradeCount = mUpgradeCount + 1;
if (upgradeCount < 0) {
throw new IllegalMonitorStateException("Maximum upgrade lock count exceeded");
}
mUpgradeCount = upgradeCount;
}
/**
* @return false if concurrent modification prevented operation
*/
private boolean setWriteLock(int state) {
return cStateRef.compareAndSet(this, state, state | LOCK_STATE_WRITE);
}
private void incrementWriteCount() {
int writeCount = mWriteCount + 1;
if (writeCount < 0) {
throw new IllegalMonitorStateException("Maximum write lock count exceeded");
}
mWriteCount = writeCount;
}
/**
* Used by unit tests.
*/
boolean noLocksHeld() {
return mState == 0 && mOwner == null && mUpgradeCount == 0 && mWriteCount == 0;
}
/**
* Node class ripped off from AbstractQueuedSynchronizer and modified
* slightly. Read the comments in that class for better understanding.
*/
static final class Node {
static final AtomicIntegerFieldUpdater cWaitStatusRef =
AtomicIntegerFieldUpdater.newUpdater(Node.class, "mWaitStatus");
static final int CANCELLED = 1;
static final int SIGNAL = -1;
volatile int mWaitStatus;
volatile Node mPrev;
volatile Node mNext;
volatile Thread mThread;
final boolean mShared;
// Used to establish initial head
Node() {
mShared = false;
}
Node(Thread thread, boolean shared) {
mThread = thread;
mShared = shared;
}
final Node predecessor() throws NullPointerException {
Node p = mPrev;
if (p == null) {
throw new NullPointerException();
} else {
return p;
}
}
}
}