发布于2021-05-29 20:31 阅读(1276) 评论(0) 点赞(21) 收藏(3)
源码解读java 并发包 AQS(阳哥讲解整理)通过ReentrantLock对AQS的独占模式非公平锁进行了讲解,本篇文章,我们将已Semaphore讲解下共享模式下的AQS实现
Semaphore就是信号量,我们可以用一个桥梁上可通行的最大车数来表示,入某大桥一次只能承载2辆汽车同时通行,当桥梁上的车小于2时,其他车辆才可以进入
我们来模拟一个案例,5个线程A,B,C,D,E去执行任务,定义一个Semaphore,通道为2,5个线程开始只能两个抢到
Semaphore semaphore = new Semaphore(2);
new Thread(()->{
try {
semaphore.acquire();
System.out.println("A thread come in ---------");
TimeUnit.MINUTES.sleep(5);
semaphore.release();
}catch (Exception e){
}
},"A").start();
new Thread(()->{
try {
semaphore.acquire();
System.out.println("B thread come in ---------");
TimeUnit.MINUTES.sleep(5);
semaphore.release();
}catch (Exception e){
}
},"B").start();
new Thread(()->{
try {
semaphore.acquire();
System.out.println("C thread come in ---------");
TimeUnit.MINUTES.sleep(5);
semaphore.release();
}catch (Exception e){
}
},"C").start();
等等。。。。。。
A,B进入acquire()方法
Semaphore .java
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
AbstractQueuedSynchronizer.java
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)//获取锁
doAcquireSharedInterruptibly(arg);
}
先进入tryAcquireShared(arg)方法尝试获取资源转到Semaphore 中的实现方法
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();//available =2
int remaining = available - acquires;//remaining=2-1
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
A进入获取通道为2,remaining为1返回,A线程可执行 返回false
B进入获取通道为1,remaining为0返回,B线程可执行 返回false
C进入获取通道为0,remaining为-1返回,C线程不可执行 返回true
同理DEF获取通道为0,remaining为-1返回,都不可执行 返回true
进入
doAcquireSharedInterruptibly(arg);
AbstractQueuedSynchronizer.java
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);//添加一个共享节点到队列尾部
boolean failed = true;//等待过程中是否被中断过的标志
try {
for (;;) {
final Node p = node.predecessor();//前驱
if (p == head) {//如果到head的下一个,因为head是拿到资源的线程,此时node被唤醒,很可能是head用完资源来唤醒自己的
int r = tryAcquireShared(arg);//再次尝试获取资源
if (r >= 0) {
setHeadAndPropagate(node, r);//将head指向自己,还有剩余资源可以再唤醒之后的线程
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
addWaiter(Node.SHARED);//添加一个共享节点到队列尾部在前一篇文章已经讲解,过程一直,不在赘述,前驱节点为head,再次尝试获取资源,明显失败,r<0走下面的if判断
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
shouldParkAfterFailedAcquire(p, node)同理也讲过,循环设置哨兵head节点waitStatus为SIGNEL(-1)返回true
进入parkAndCheckInterrupt()方法,挂机CDEF线程
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
此时A,B线程进入执行 release();
AQS.java
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
进入判断 tryReleaseShared(arg)方法
Semaphore .java
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
A 获取状态current =0,nextc =0+1=1,设置状态为1,返回true
同理(B 获取状态current =1,nextc =1+1=2,设置状态为2,返回true)
此时进入doReleaseShared();方法
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;//哨兵节点状态为-1
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))//哨兵节点状态设置为0
continue; // loop to recheck cases
unparkSuccessor(h);//唤醒后继
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
把哨兵节点状态设置为0,唤醒之前的DEF线程
D线程被唤醒继续执行方法
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);//添加一个共享节点到队列尾部
boolean failed = true;//等待过程中是否被中断过的标志
try {
for (;;) {
final Node p = node.predecessor();//前驱
if (p == head) {//如果到head的下一个,因为head是拿到资源的线程,此时node被唤醒,很可能是head用完资源来唤醒自己的
int r = tryAcquireShared(arg);//再次尝试获取资源
if (r >= 0) {
setHeadAndPropagate(node, r);//将head指向自己,还有剩余资源可以再唤醒之后的线程
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
得到前驱为head,再次尝试获取资源成功,r>0,执行
setHeadAndPropagate(node, r);方法
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // 拿到head哨兵节点
setHead(node);//把哨兵节点置为刚才的A节点
//propagate>0
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;//找到后继几点
if (s == null || s.isShared())//不为空或者是共享节点
doReleaseShared();//获取通道唤醒
}
}
此方法在setHead()的基础上多了一步,就是自己苏醒的同时,如果条件符合(比如还有剩余资源),还会去唤醒后继结点,共享模式
移除原来的哨兵节点,设置当前的节点为哨兵节点,propagate为1继续执行获取获取通道判断是否可以继续唤醒,不断唤醒所有等待中的共享节点线程
至此Semaphore 源码分析完毕
Semaphore 使用的AQS的共享模式实现主要就是继承实现
tryAcquireShared(int),tryReleaseShared(int)这两方法
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)//获取锁
doAcquireSharedInterruptibly(arg);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
原文链接:https://blog.csdn.net/qq_25210899/article/details/117333640
作者:我是小豆丁
链接:http://www.javaheidong.com/blog/article/207234/64edf9c1561354791241/
来源:java黑洞网
任何形式的转载都请注明出处,如有侵权 一经发现 必将追究其法律责任
昵称:
评论内容:(最多支持255个字符)
---无人问津也好,技不如人也罢,你都要试着安静下来,去做自己该做的事,而不是让内心的烦躁、焦虑,坏掉你本来就不多的热情和定力
Copyright © 2018-2021 java黑洞网 All Rights Reserved 版权所有,并保留所有权利。京ICP备18063182号-2
投诉与举报,广告合作请联系vgs_info@163.com或QQ3083709327
免责声明:网站文章均由用户上传,仅供读者学习交流使用,禁止用做商业用途。若文章涉及色情,反动,侵权等违法信息,请向我们举报,一经核实我们会立即删除!