首页 > 首页 > java多线程之AQS
2019
03-23

java多线程之AQS

简介
AbstractQueuedSynchronizer又是java用于实现在显示锁的抽象队列同步器,又简称AQS,java并发包下的锁()都是基于AQS队列实现的。
其中在AQS中主要有三个关键字段:
volatile Node head;
volatile Node tail;
volatile int state;
state用来标记是否获取到锁状态,state=0标识当前没有锁,state=1标记已经获取到锁,未获取则把线程构造成一个Node节点插入队列尾,并阻塞等待锁释放。当释放后会通过head节点后的来获取锁。
/**
* Head of the wait queue, lazily initialized. Except for
* initialization, it is modified only via method setHead. Note:
* If head exists, its waitStatus is guaranteed not to be
* CANCELLED.
当前持有锁的线程节点
*/
private transient volatile Node head;

/**
* Tail of the wait queue, lazily initialized. Modified only via
* method enq to add new wait node.
需要排队等待锁的线程节点
*/
private transient volatile Node tail;

/**
* The synchronization state.
同步状态,即当前锁的状态,0:未锁住,1:锁住
*/
private volatile int state;

CLH Locks
CLH是一种基于队列的无锁算法

Node节点
当线程获取锁失败后,会构造成一个Node节点,插入等待队列中,等待再次获取锁。
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
// 共享锁模式
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
// 独占锁模式
static final Node EXCLUSIVE = null;

/** waitStatus value to indicate thread has cancelled */
// 取消状态
static final int CANCELLED = 1;
/** waitStatus value to indicate successor’s thread needs unparking */
// 需要前个节点唤醒阻塞状态
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
// 等待其它条件变量,用于Condition对象的排队
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;

/**
* Status field, taking on only the values:
* SIGNAL: The successor of this node is (or will soon be)
* blocked (via park), so the current node must
* unpark its successor when it releases or
* cancels. To avoid races, acquire methods must
* first indicate they need a signal,
* then retry the atomic acquire, and then,
* on failure, block.
* CANCELLED: This node is cancelled due to timeout or interrupt.
* Nodes never leave this state. In particular,
* a thread with cancelled node never again blocks.
* CONDITION: This node is currently on a condition queue.
* It will not be used as a sync queue node
* until transferred, at which time the status
* will be set to 0. (Use of this value here has
* nothing to do with the other uses of the
* field, but simplifies mechanics.)
* PROPAGATE: A releaseShared should be propagated to other
* nodes. This is set (for head node only) in
* doReleaseShared to ensure propagation
* continues, even if other operations have
* since intervened.
* 0: None of the above
*
* The values are arranged numerically to simplify use.
* Non-negative values mean that a node doesn’t need to
* signal. So, most code doesn’t need to check for particular
* values, just for sign.
*
* The field is initialized to 0 for normal sync nodes, and
* CONDITION for condition nodes. It is modified using CAS
* (or when possible, unconditional volatile writes).
*/
volatile int waitStatus;

/**
* Link to predecessor node that current node/thread relies on
* for checking waitStatus. Assigned during enqueuing, and nulled
* out (for sake of GC) only upon dequeuing. Also, upon
* cancellation of a predecessor, we short-circuit while
* finding a non-cancelled one, which will always exist
* because the head node is never cancelled: A node becomes
* head only as a result of successful acquire. A
* cancelled thread never succeeds in acquiring, and a thread only
* cancels itself, not any other node.
*/
//前驱节点
volatile Node prev;

/**
* Link to the successor node that the current node/thread
* unparks upon release. Assigned during enqueuing, adjusted
* when bypassing cancelled predecessors, and nulled out (for
* sake of GC) when dequeued. The enq operation does not
* assign next field of a predecessor until after attachment,
* so seeing a null next field does not necessarily mean that
* node is at end of queue. However, if a next field appears
* to be null, we can scan prev’s from the tail to
* double-check. The next field of cancelled nodes is set to
* point to the node itself instead of null, to make life
* easier for isOnSyncQueue.
*/
// 后驱节点
volatile Node next;

/**
* The thread that enqueued this node. Initialized on
* construction and nulled out after use.
*/
// 当前线程,用来做重入锁时判断是否是同一个
volatile Thread thread;

/**
* Link to next node waiting on condition, or the special
* value SHARED. Because condition queues are accessed only
* when holding in exclusive mode, we just need a simple
* linked queue to hold nodes while they are waiting on
* conditions. They are then transferred to the queue to
* re-acquire. And because conditions can only be exclusive,
* we save a field by using special value to indicate shared
* mode.
*/
// 下一个监视器,用于condition对象的排队
Node nextWaiter;

/**
* Returns true if node is waiting in shared mode.
*/
final boolean isShared() {
return nextWaiter == SHARED;
}

/**
* Returns previous node, or throws NullPointerException if null.
* Use when predecessor cannot be null. The null check could
* be elided, but is present to help the VM.
*
* @return the predecessor of this node
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}

Node() { // Used to establish initial head or SHARED marker
}

Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}

Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
阻塞链表
通过分析AQS和Node节点可以得出,head节点就是当前持有锁的对象,之后的属于阻塞等待获取锁的对象。
图片: https://uploader.shimo.im/f/RZBMGiFUKsYi4meT.png
AQS源码分析
由于AQS是一个抽象实现只提供了具体的队列和状态,我们可通过ReentrantLock实现来分析AQS获取锁状态释放锁状态
ReentrantLock(可重入锁,独占锁)
特性:可重入锁、公平(非公平锁)、独占锁

ReentrantLock存在抽象成员Sync,实现于AQS队列,其中主要实现了非分平锁获取同步状态的方法,并抽象了lock()方法,实现代码如下:
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;

/**
* Performs {@link Lock#lock}. The main reason for subclassing
* is to allow fast path for nonfair version.
*/
// 抽象方法用于重入锁实现公平/非分平锁
abstract void lock();

/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
// tryLock()方法非分平方式获取锁,没有获取成功直接返回
final boolean nonfairTryAcquire(int acquires) {
// 获取当前线程对象
final Thread current = Thread.currentThread();
// 获取当前同步状态
int c = getState();
// 如果当前没有锁
if (c == 0) {
// 则尝试通过CAS方式获取,成功则返回
if (compareAndSetState(0, acquires)) {
// 设置当前独占锁为当前线程,并返回
setExclusiveOwnerThread(current);
return true;
}
}
// 如果当前获取锁的线程是当前对象,则重入将状态加1
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error(“Maximum lock count exceeded”);
setState(nextc);
return true;
}
return false;
}

// 释放锁资源
protected final boolean tryRelease(int releases) {
//当前同步状态减少
int c = getState() – releases;
// 判断是否是当前线程,不是则抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 如果锁资源已释放完,则设置当前独占锁线程为null
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
// 判断当前线程是否是锁持有者
protected final boolean isHeldExclusively() {
// While we must in general read state before owner,
// we don’t need to do so to check if current thread is owner
return getExclusiveOwnerThread() == Thread.currentThread();
}
// 创建一个条件
final ConditionObject newCondition() {
return new ConditionObject();
}

// Methods relayed from outer class
// 获取当前持有锁的线程
final Thread getOwner() {
return getState() == 0 ? null : getExclusiveOwnerThread();
}
// 获取当前持有锁的线程个数
final int getHoldCount() {
return isHeldExclusively() ? getState() : 0;
}
// 判断是否锁住
final boolean isLocked() {
return getState() != 0;
}

/**
* Reconstitutes the instance from a stream (that is, deserializes it).
*/
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state
}
}

从上述代码分析可以得出:
1、获取同步状态(获取锁)是通过CAS无锁算法,设置同步状态state来确认是否获取锁。
2、可重入锁指的是当前持有锁后,通过累加state状态值来确认是否释放,所以调用lock()方法后必须要调用unlock()方法,释放锁.

在ReentrantLock中存在两种锁,公平锁与非公平锁。默认是非分平锁,可通过构造参数来进行指定。
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
常用方法
图片: https://uploader.shimo.im/f/9qxPHcbixYwoEfb0.png
获取同步状态(获取锁)
lock()方法实现
公平锁实现获取锁状态
/**
* Sync object for fair locks
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;

final void lock() {
//调用AQS方法获取锁
acquire(1);
}

/**
* Fair version of tryAcquire. Don’t grant access unless
* recursive call or no waiters or is first.
*/
// 调用AQS acquire()获取锁会回调用tryAcquire()来获取
protected final boolean tryAcquire(int acquires) {
//获取当前线程
final Thread current = Thread.currentThread();
int c = getState();
// 未加锁
if (c == 0) {
// 验证队列是否有前驱节点,如果没有前驱节点则尝试获取锁
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
// 获取锁成功,设置当前线程为独占锁
setExclusiveOwnerThread(current);
return true;
}
}
// 如果当前线程已独占则进行重入
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error(“Maximum lock count exceeded”);
// 设置锁值
setState(nextc);
return true;
}
return false;
}
}

公平锁获取方法说明:
获取当前线程,验证当前同步状态,state是否为0,为0则进行加锁操作,由于是公平锁先判断队列中是否有前驱节点,有则说明有线程等待获取锁,当前线程不进行抢占,并返回false,如果没有则使用CAS方法进行加锁操作(即设置state为1,表示获取锁成功),并把自己设置为当前锁占用线程。否则进行判断当前持有锁线程是否是当前线程,如果是则进行重入操作,state+1并设置值,否则进行排队处理,等待下次获取锁。

AQS类hasQueuedPredecessors()方法说明:
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
// 头节点不等于尾节点,并且头节点的下个节点为空或者不等于当前线程
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
h != t && ((s = h.next) == null当前条件成立,必须是获取锁失败,增加到队列时先创建head节点再指向tail节点,但当创建完还没有指向tail节点时,当前线程去获取锁,由于公平锁的原则是由头节点的下个节点获取锁,所以当前线程去获取时发现已经有线程先比他进入获取锁,则不进行获取锁操作。
s.thread != Thread.currentThread())不相等说明下个获取锁的不是自己,根据公平锁的原则则不能获取锁

AQS类acquire()方法说明:
public final void acquire(int arg) {
//调用实现类获取锁的方法,如果未成功则将当前线程构造成节点,放到请求队列中
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

addWaiter()方法构造等待节点:
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
// 根据当前线程创建节点
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
// 判断尾结点是否为空
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 如果快速入队失败,则进行正常入队
enq(node);
return node;
}

acquireQueued()方法将节点加入请求队列:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
// 是否中断
boolean interrupted = false;
for (;;) {
// 判断前驱节点是否是头节点
final Node p = node.predecessor();
// 如果是头节点,再次请求获取锁
if (p == head && tryAcquire(arg)) {
// 获取成功设置当前节点为头节点
setHead(node);
p.next = null; // help GC
// 不取消节点标记
failed = false;
return interrupted;
}
// 判断是否应该再请求错误之后将线程阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 线程异常或者中断
if (failed)
// 取消请求
cancelAcquire(node);
}

shouldParkAfterFailedAcquire()验证方法:
/**
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev.
*
* @param pred node’s predecessor holding status 前驱节点当前持有的状态
* @param node the node
* @return {@code true} if thread should block
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 前驱节点状态
int ws = pred.waitStatus;
// 需要等待前个节点唤醒,则需要阻塞当前线程
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
// 如果前驱节点是取消的,再次向前驱的前驱进行查找,直到waitStatus<0,并设置当前节点为前驱的下个节点,再次进行请求
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don’t park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
// 如果前驱节点正常,则设置为SIGNAL,完成后通知自己
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

/**
* Convenience method to park and then check if interrupted
*
* @return {@code true} if interrupted
阻塞当前线程,并标记为中断
*/
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}

中断或者退出异常处理:
**
* Cancels an ongoing attempt to acquire.
*
* @param node the node
*/
private void cancelAcquire(Node node) {
// Ignore if node doesn’t exist
if (node == null)
return;

node.thread = null;

// Skip cancelled predecessors
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;

// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
Node predNext = pred.next;

// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
node.waitStatus = Node.CANCELLED;

// If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred’s next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}

node.next = node; // help GC
}
}

非公平锁实现获取锁状态
非分平锁与公平锁区别不太大,唯一的区别是非分平锁在获取锁时会先尝试直接获取锁,公平锁会先判断是否有前驱节点再获取锁。

释放锁
/**
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
* This method can be used to implement method {@link Lock#unlock}.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
*/
public final boolean release(int arg) {
// 释放锁
if (tryRelease(arg)) {
Node h = head;
// 头节点不为空并且状态不为0
if (h != null && h.waitStatus != 0)
// 唤醒后驱节点
unparkSuccessor(h);
return true;
}
return false;
}

tryRelease()方法:
protected final boolean tryRelease(int releases) {
// 同步状态减少
int c = getState() – releases;
// 如果当前线程不是持有锁状态抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// state==0则释放锁,不为0则还持有锁,持续释放完
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

unpark()唤醒后续节点竟争锁:
/**
* Wakes up node’s successor, if one exists.
*
* @param node the node
唤醒后驱节点
*/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);

/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
//循环查找下个节点,如果节点为空或者节点是已取消状态,则继续查找
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 唤醒后续节点
if (s != null)
LockSupport.unpark(s.thread);
}

ConditionObject
在线程间相互协作可以通过Object类的wait()方法和notify()方法,进行线程等待和唤醒。在java并发包中存在Condition协作。
conditionObject对象内置一个条件队列,通过与AQS的同步队列合作,完成线程的协作。
await()方法说明:
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}

最后编辑:
作者:noname
noname
这个作者貌似有点懒,什么都没有留下。

留下一个回复

你的email不会被公开。