概述 之前的文章《Java线程同步策略》对Java的并发策略有了一个比较详尽的介绍。其中讲到ReentrantLock
时提及到了AbstractQueuedSynchronizer
(下文简称AQS)这个类。 从字面含义去揣度,这就是一个抽象的队列同步器 。所谓抽象,想必有许多方法(或者说模板)还需要开发者具体实现;所谓队列,意味着其中维护一套遵循FIFO原则的存储结构;所谓同步,说明其中蕴含某种系统级别的同步机制,为线程安全设计而生的。 《Java并发编程艺术》一书是这么描述的:
队列同步器AbstractQueuedSynchronizer
,是用来构建锁或者其他同步组件的基础框架,它使用了一个int成员变量表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作。并发包作者(Doug Lea)期望它能够成为实现大部分同步需求的基础。
以下的介绍也是从这三个维度(抽象、队列、同步)具体展开来说。
抽象 上面说的,AQS规定了一套模板,也规定了一套可重写的方法,这就是我们所说的抽象。
To use this class as the basis of a synchronizer, redefine the following methods, as applicable, by inspecting and/or modifying the synchronization state using getState(), setState(int) and/or compareAndSetState(int, int)(用这个类作为同步器的核心基础类,需要重新定义如下方法来使用,通过调用getState()
、setState(int)
和compareAndSetState(int,int)
这几个方法,获取并改变同步状态)
这一解释已经很能够说明白这个类的使用方式了,我们来阅读一下API文档,了解一下这一套可重写的方法。后面会举例子具体说明如何使用。
方法
解释
tryAcquire()
独占式获取同步状态,需先询问是否允许被获取,允许则利用CAS改状态
tryRelease()
独占式释放同步状态。
tryAcquireShared()
共享式获取同步状态。
tryReleaseShared()
共享式释放同步状态。
isHeldExclusively()
查询当前同步器是否在独占模式下被占用。
PS: 这套方法如果不实现,则抛出UnsupportedOperationException
异常
队列 AQS另外也提供了一套模板方法,这里就牵扯到我们所说的队列。 这里列举一些重要的模板方法。
方法
解释
void acquire(int arg)
独占式获取同步,如果获取成功,则方法返回,否则将会进入同步队列等待,该方法会调用重写的tryAcquire(int arg)
void acquireShared(int arg)
共享式获取同步状态,会调用重写的tryAcquireShared(int arg)。
void acquireInterruptibly(int arg)
独占式获取同步,可响应中断,即中断后抛出InterruptedException
。
void acquireSharedInterruptibly(int arg)
共享可中断式获取同步状态
boolean tryAcquiredNanos(int arg,long nanos)
独占式获取同步,并设置了超时
boolean tryAcquiredSharedNanos(int arg,long nanos)
共享式获取同步,并设置了超时
boolean release(int arg)
独占式释放同步
boolean releaseShared(int arg)
共享式释放同步
Conllection< Thread > getQueuedThreads()
获取等待在同步队列上的线程集合
从API文档里不难看出,获取和释放同步支持两种方式,共享式和独占式。而无论独占还是共享,当该线程无法获取资源的时候,则会将该线程作为FIFO中的一个节点(node),插入到队列的尾部,另一边则不停轮询请求,判断该线程的先驱节点是否为头节点(head)以及是否有空闲资源能被获取,从而进一步获取所谓的锁。 以独占式获取锁资源为例,代码很清晰的表达了这一切:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire (int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter (Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null ) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued (final Node node, int arg) {
boolean failed = true ;
try {
boolean interrupted = false ;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null ;
failed = false ;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true ;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
同步 那么AQS如何保证线程安全的呢?
其实上边也说过,AQS是通过调用getState()
、setState(int)
和compareAndSetState(int,int)
这几个方法,获取并改变同步状态的。以独占式为例,我们可以将state设置为1,表示资源有且仅有一个,当有一个线程占用锁资源后,state则设置为0,此时,其余线程将因为无法获取该资源而进入队列中等待。
那么在多线程环境下,获取并改变state的值是必须要有线程安全的手段的。在CAS指令流行之前,我们很容易就想到了用synchronized
关键字给这一操作上锁。然而synchronized
是重量级操作,在频繁的线程上下文切换的场景下,synchronized
并不可取。而CAS在此时的作用显得尤为耀眼。
关于CAS,在《Java线程同步策略》一文中已经阐述,这里不做多余介绍。我们来看看compareAndSetState(int,int)
的源码。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import sun.misc.Unsafe;
* Setup to support compareAndSet. We need to natively implement
* this here: For the sake of permitting future enhancements, we
* cannot explicitly subclass AtomicInteger, which would be
* efficient and useful otherwise. So, as the lesser of evils, we
* natively implement using hotspot intrinsics API. And while we
* are at it, we do the same for other CASable fields (which could
* otherwise be done with atomic field updaters).
*/
private static final Unsafe unsafe = Unsafe.getUnsafe();
* Atomically sets synchronization state to the given updated
* value if the current state value equals the expected value.
* This operation has memory semantics of a <tt>volatile</tt> read
* and write.
*
* @param expect the expected value
* @param update the new value
* @return true if successful. False return indicates that the actual
* value was not equal to the expected value.
*/
protected final boolean compareAndSetState (int expect, int update) {
return unsafe.compareAndSwapInt(this , stateOffset, expect, update);
}
一个例子 我们从三个维度(抽象、队列、同步)来阐述AQS的实现原理以及基于模板自身的扩展,下面就以一个独占式的Mutex
为例来进一步加深对AQS的理解。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package org.leon.concurent.lock;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
* 独占锁Mutex是一个自定义的同步组件,在同一时刻只允许一个线程占有锁<br/>
*
* Created by LeonWong on 16/4/26.
*/
public class Mutex implements Lock {
private static class Sync extends AbstractQueuedSynchronizer {
protected boolean isHeldExclusively () {
return getState() == 1 ;
}
* 当状态为0的时候获取锁<br/>
* 获取锁的过程是首先将同步状态设置为已同步,然后设置持有锁的线程为自己本身
*/
public boolean tryAcquire (int acquires) {
if (compareAndSetState(0 , 1 )) {
setExclusiveOwnerThread(Thread.currentThread());
return true ;
}
return false ;
}
protected boolean tryRelease (int releases) {
if (getState() == 0 )
throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null );
setState(0 );
return true ;
}
Condition newCondition () {
return new ConditionObject();
}
}
private final Sync sync = new Sync();
* 阻塞方法 只有拿到了同步锁方法才会返回
*/
@Override
public void lock () {
System.out.println("正尝试获取锁,锁是否被独占?" + sync.isHeldExclusively());
sync.acquire(1 );
System.out.println("获取锁成功" );
}
@Override
public void lockInterruptibly () throws InterruptedException {
sync.acquireInterruptibly(1 );
}
@Override
public boolean tryLock () {
return sync.tryAcquire(1 );
}
@Override
public boolean tryLock (long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1 , unit.toNanos(time));
}
@Override
public void unlock () {
sync.release(1 );
}
@Override
public Condition newCondition () {
return sync.newCondition();
}
public boolean isLocked () {
return sync.isHeldExclusively();
}
public boolean hasQueuedThreads () {
return sync.hasQueuedThreads();
}
}
测试用例1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package org.leon.concurent.lock;
import org.leon.concurent.SleepUtils;
* Created by LeonWong on 16/4/26.
*/
public class MutexTest {
public static final Mutex lock = new Mutex();
public static void main (String args[]) {
for (int i = 0 ; i < 10 ; i++) {
new Thread(new Runner(), "Runner" + i).start();
}
}
static class Runner implements Runnable {
@Override
public void run () {
lock.lock();
System.out.println("简易测试锁程序 开始" );
SleepUtils.sleepForSecond(5 );
System.out.println("简易测试锁程序 结束" );
lock.unlock();
}
}
}
SleepUtil工具类1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package org.leon.concurent;
import java.util.concurrent.TimeUnit;
* Created by LeonWong on 16/4/21.
*/
public class SleepUtils {
public static void sleepForSecond (long seconds) {
try {
TimeUnit.SECONDS.sleep(seconds);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void sleepForMillsSecond (long millis) {
try {
TimeUnit.MILLISECONDS.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
总结 在大多数场景下,谈论Java并发编程的时候,AQS的实现思路可以称之为经典,甚至Java许多并发组件都是基于AQS进行开发的,例如ReentrantLock、CountDownLatch、FutureTask、Semaphore等等。可以说理解了AQS,就是理解了Java并发的核心套路。