看了都说好:深入理解 Java 锁与线程阻塞,才是你的最佳选择
来自:Pqpo's?Notes?,链接:https://pqpo.me

责编:乐乐?|?封面来自网络
00 前言?
相信大家对线程锁和线程阻塞都很了解,无非就是?synchronized,?wait/notify?等,?但是你有仔细想过?Java?虚拟机是如何实现锁和阻塞的呢?它们之间又有哪些联系呢?如果感兴趣的话请接着往下看。
00 正文?
为保障多线程下处理共享数据的安全性,Java?语言给我们提供了线程锁,保证同一时刻只有一个线程能处理共享数据。当一个锁被某个线程持有的时候,另一个线程尝试去获取这个锁将产生线程阻塞,直到持有锁的线程释放了该锁。
除了抢占锁的时候会出现线程阻塞,另外还有一些方法也会产生线程阻塞,比如:?Object.wait(),?Thread.sleep(),?ArrayBlockingQueue.put()?等等,他们都有一个共同特点:不消耗?CPU?时间片。另外值得指出的是?Object.wait()?会释放持有的锁,而?Thread.sleep()?不会,相信这点大家都清楚。?当然?while(true){?}?也能产生阻塞线程的效果,自旋锁就是使用循环,配合?CAS?(compareAndSet)?实现的,这个不在我们的讨论之列。
相信大家对线程锁都很熟悉,目前有两种方法,准确来说是三种,synchronized?方法,synchronized?区块,ReentrantLock。先说?synchronized,代码如下:
public?class?Lock?{
????public?static?void?synchronized?print()?{
????????System.out.println("method?synchronized");
????}
????public?static?void?print2()?{
????????synchronized(Lock.class)?{
????????????System.out.println("synchronized");
????????}
????}
????public?static?void?main(String[]?args)?{
????????Lock.print();
????????Lock.print2();
????}
}
编译后通过如下命令查看其字节码:
javap?-c?-v?Lock
其中节选方法一(Lock.print)的字节码如下:
public?static?synchronized?void?print();
????descriptor:?()V
????flags:?ACC_PUBLIC,?ACC_STATIC,?ACC_SYNCHRONIZED
????Code:
??????stack=2,?locals=0,?args_size=0
?????????0:?getstatic?????#2????//?Field?java/lang/System.out:Ljava/io/PrintStream;
?????????3:?ldc???????????#3????//?String?method?synchronized
?????????5:?invokevirtual?#4????//?Method?java/io/PrintStream.println:(Ljava/lang/String;)V
?????????8:?return
}
可以看到方法表的访问标志位?(flags)?中多了个?ACC_SYNCHRONIZED,然后看字节码指令区域?(Code)?,和普通方法没任何差别,?猜测?Java?虚拟机通过检查方法表中是否存在标志位?ACC_SYNCHRONIZED?来决定是否需要获取锁,至于获取锁的原理后文会提到。
然后看第二个使用?synchronized?区块的方法(Lock.print2)字节码:
public?static?void?print2();
????descriptor:?()V
????flags:?ACC_PUBLIC,?ACC_STATIC
????Code:
??????stack=2,?locals=2,?args_size=0
?????????0:?ldc?#5????//?将锁对象?Lock.class?入栈
?????????2:?dup?//?复制一份,此时栈中有两个?Lock.class?
?????????3:?astore_0?//?出栈一个?Lock.class?对象保存到局部变量表?Slot?1?中
?????????4:?monitorenter?//?以栈顶元素?Lock.class?作为锁,开始同步
?????????5:?getstatic?#2????//?5-10?调用?System.out.println("synchronized");
?????????8:?ldc?#6
????????10:?invokevirtual?#4
????????13:?aload_0?//?将局部变量表?Slot?1?中的数据入栈,即?Lock.class
????????14:?monitorexit?//?使用栈顶数据退出同步
????????15:?goto?23?//?方法结束,跳转到?23?返回
????????18:?astore_1?//?从这里开始是异常路径,将异常信息保存至局部变量表?Slot?2?中,查看异常表
????????19:?aload_0?//?将局部变量表?Slot?1?中的?Lock.class?入栈
????????20:?monitorexit?//?使用栈顶数据退出同步
????????21:?aload_1?//?将局部变量表?Slot?2?中的异常信息入栈
????????22:?athrow?//?把异常对象重新抛出给方法的调用者
????????23:?return?//?方法正常返回
??????Exception?table:?//?异常表
?????????from????to??target?type
?????????????5????15????18???any?//?5-15?出现任何(any)异常跳转到?18?
????????????18????21????18???any?//?18-21?出现任何(any)异常跳转到?18?
synchronized?区块的字节码相比较?synchronized?方法复杂了许多。每一行字节码的含义我都作了详细注释,可以看到此时是通过字节码指令?monitorenter,monitorexit?来进入和退出同步的。特别值得注意的是,我们并没有写?try.catch?捕获异常,但是字节码指令中存在异常处理的代码,其实为了保证在方法异常完成时?monitorenter?和?monitorexit?指令依然可以正确配对执行,编译器会自动产生一个异常处理器,这个异常处理器声明可处理所有的异常,它的目的就是用来执行?monitorexit?指令。这个机制确保在?synchronized?区块中产生任何异常都可以正常退出同步,释放锁资源。
不管是检查标志位中的?ACC_SYNCHRONIZED,还是字节码指令?monitorenter,monitorexit,锁机制的实现最终肯定存在于?JVM?中,后面我们会再提到这点。
接下来继续看?ReentrantLock?的实现,鉴于篇幅有限,ReentrantLock?的原理不会讲的很详细,感兴趣的可以自行研究。ReentrantLock?是基于并发基础组件?AbstractQueuedSynchronizer?实现的,内部有一个?int?类型的?state?变量来控制同步状态,为?0?时表示无线程占用锁资源,等于?1?时表示则说明有线程占用,由于?ReentrantLock?是可重入锁,state?也可能大于?1?表示该线程有多次获取锁。AQS?内部还有一个由内部类?Node?构成的队列用来完成线程获取锁的排队。本文只是简单的介绍一下?lock?和?unLock?方法。
下面先看?ReentrantLock.lock?方法:
//?ReentrantLock.java
public?void?lock()?{
????this.sync.lock();
}
//?ReentrantLock.NonfairSync.class
final?void?lock()?{
????//?使用?cas?设置?state,如果设置成功表示当前无其他线程竞争锁,优先获取锁资源
????if?(this.compareAndSetState(0,?1))?{
????????//?保存当前线程由于后续重入锁的判断
????????this.setExclusiveOwnerThread(Thread.currentThread());
????}?else?{
????????this.acquire(1);
????}
}
//?AbstractQueuedSynchronizer.java
?public?final?void?acquire(int?arg)?{
????if?(!tryAcquire(arg)?&&?acquireQueued(addWaiter(Node.EXCLUSIVE),?arg))
????????selfInterrupt();?//?如果阻塞被中断,重新设置中断通知调用者
}
//?判断是否是重入
protected?final?boolean?tryAcquire(int?var1)?{
????return?this.nonfairTryAcquire(var1);
}
//?处理等待队列
final?boolean?acquireQueued(final?Node?node,?int?arg)?{
????boolean?failed?=?true;
????try?{
????????boolean?interrupted?=?false;
????????for?(;;)?{
????????????final?Node?p?=?node.predecessor();
????????????if?(p?==?head?&&?tryAcquire(arg))?{
????????????????setHead(node);
????????????????p.next?=?null;?//?help?GC
????????????????failed?=?false;
????????????????return?interrupted;
????????????}
????????????if?(shouldParkAfterFailedAcquire(p,?node)?&&
????????????????parkAndCheckInterrupt())
????????????????interrupted?=?true;
????????}
????}?finally?{
????????if?(failed)
????????????cancelAcquire(node);
????}
}
private?final?boolean?parkAndCheckInterrupt()?{
????LockSupport.park(this);?//?阻塞线程
????return?Thread.interrupted();
}
对于锁竞争的情况,最终会调用?LockSupport.park(this)?阻塞当前线程,同样的?ReentrantLock.unlock?方法会调用?LockSupport.unpark(thread)?来恢复阻塞的线程。继续看?LockSupport?的实现:
public?static?void?unpark(Thread?thread)?{
????if?(var0?!=?null)?{
????????UNSAFE.unpark(thread);
????}
}
public?static?void?park(Object?obj)?{
????Thread?thread?=?Thread.currentThread();
????setBlocker(thread,?obj);
????UNSAFE.park(false,?0L);
????setBlocker(thread,?(Object)null);
}
LockSupport?内部调用了?UnSafe?类的?park?和?unpark,?是?native?代码,该类由虚拟机实现,以?Hotspot?虚拟机为例,查看?park?方法:
//?unsafe.cpp
UNSAFE_ENTRY(void,?Unsafe_Park(JNIEnv?*env,?jobject?unsafe,?jboolean?isAbsolute,?jlong?time))
??UnsafeWrapper("Unsafe_Park");
#ifndef?USDT2
??HS_DTRACE_PROBE3(hotspot,?thread__park__begin,?thread->parker(),?(int)?isAbsolute,?time);
#else?/*?USDT2?*/
???HOTSPOT_THREAD_PARK_BEGIN(
?????????????????????????????(uintptr_t)?thread->parker(),?(int)?isAbsolute,?time);
#endif?/*?USDT2?*/
??JavaThreadParkedState?jtps(thread,?time?!=?0);
??thread->parker()->park(isAbsolute?!=?0,?time);
#ifndef?USDT2
??HS_DTRACE_PROBE1(hotspot,?thread__park__end,?thread->parker());
#else?/*?USDT2?*/
??HOTSPOT_THREAD_PARK_END(
??????????????????????????(uintptr_t)?thread->parker());
#endif?/*?USDT2?*/
UNSAFE_END
调用了:?thread->parker()->park(isAbsolute?!=?0,?time);?我们可以猜测是这句代码阻塞了当前线程。HotSpot?虚拟机里的?Thread?类对应着一个?OS?的?Thread,JavaThread?类继承于?Thread,JavaThread?实例对应着一个?Java?层的?Thread。
简而言之,Java?层的?Thread?对应着一个?OS?的?Thread。使用如下代码创建线程:
//linux_os.cpp
pthread_t?tid;
int?ret?=?pthread_create(&tid,?&attr,?(void*?(*)(void*))?thread_native_entry,?thread);
回到?Thread?类中的?Park,我们查看?HotSpot?的?thread.hpp,?找到了如下三个?Park:
public:
??ParkEvent?*?_ParkEvent?;????//?for?synchronized()
??ParkEvent?*?_SleepEvent?;???//?for?Thread.sleep
//?JSR166?per-thread?parker
private:
??Parker*????_parker;
从注释上可以看出分别是用于?synchronized?的阻塞,Thread.sleep?的阻塞还有用于?UnSafe?的线程阻塞,继续查看?park.hpp?节选:
//?A?word?of?caution:?The?JVM?uses?2?very?similar?constructs:
//?1.?ParkEvent?are?used?for?Java-level?"monitor"?synchronization.
//?2.?Parkers?are?used?by?JSR166-JUC?park-unpark.
class?Parker?:?public?os::PlatformParker?{
//?略
}
class?ParkEvent?:?public?os::PlatformEvent?{
//?略
}
注释上更近一步解释了两种?Parker?的区别,他们的实现非常相似,那为什么会存在两个呢?网络上有解释说是只是没重构而已。下面只看?Parker?的实现,发现?park.cpp?中并没有实现?park?方法,猜测应该是父类中实现了,因为这是和系统相关的操作,以?Linux?系统为例,查看?linux_os.cpp?找到了?park?的实现,截取了主要部分:
void?Parker::park(bool?isAbsolute,?jlong?time)?{
??//?省略了前置判断
??//?获取锁
??if?(Thread::is_interrupted(thread,?false)?||?pthread_mutex_trylock(_mutex)?!=?0)?{
????return;
??}
??if?(time?==?0)?{
????_cur_index?=?REL_INDEX;?//?arbitrary?choice?when?not?timed
????//?调用?pthread_cond_wait?阻塞线程
????status?=?pthread_cond_wait?(&_cond[_cur_index],?_mutex)?;
??}?else?{
????_cur_index?=?isAbsolute???ABS_INDEX?:?REL_INDEX;
????status?=?os::Linux::safe_cond_timedwait?(&_cond[_cur_index],?_mutex,?&absTime)?;
????if?(status?!=?0?&&?WorkAroundNPTLTimedWaitHang)?{
??????pthread_cond_destroy?(&_cond[_cur_index])?;
??????pthread_cond_init????(&_cond[_cur_index],?isAbsolute???NULL?:?os::Linux::condAttr());
????}
??}
??_cur_index?=?-1;
??//?已从?block?中恢复,释放锁
??_counter?=?0?;
??status?=?pthread_mutex_unlock(_mutex)?;
??//?略
}
总共分三步走,先获取锁,再调用?pthread_cond_wait?阻塞线程,最后阻塞恢复了之后释放锁,是不是和我们使用?Object.wait?十分类似,事实上?Object.wait?底层也是这种方式实现的。为了更清楚的了解底层的实现,写了一段?c?代码看一下线程的创建和锁的使用:
int?counter?=?0;
//?互斥锁对象
pthread_mutex_t?mutex?=?PTHREAD_MUTEX_INITIALIZER;
void*?add()?{
??for(int?i?=?0;i?< ?2;++i)?{
????//?获取锁
????pthread_mutex_lock(?&mutex?);
????++counter;
????sleep(1);
????//?释放锁
????pthread_mutex_unlock(?&mutex?);
????printf("counter?=?%dn",?counter);
??}
??pthread_exit(NULL);
}
int?main()?{
??pthread_t?thread_1,?thread_2;
??//?创建线程
??pthread_create(&thread_1,?NULL,?add,?NULL);
??pthread_create(&thread_2,?NULL,?add,?NULL);
??pthread_join(thread_1,?NULL);
??pthread_join(thread_2,?NULL);
??return?0;
}
使用?pthread_create?创建线程,使用?pthread_mutex_lock?获取锁,使用?pthread_mutex_unlock?释放锁。那既然?pthread_mutex_lock?和?pthread_mutex_unlock?就能实现锁了,那为什么锁实现的时候还要使用?pthread_cond_wait?来阻塞线程呢?回过头看?PlatformParker?:
//os_linux.hpp
class?PlatformParker?{
?pthread_mutex_t?_mutex[1];
?//一个是给park用,?另一个是给parkUntil用
?pthread_cond_t??_cond[2];?//?one?for?relative?times?and?one?for?abs.
?//略...
};
每个?JavaThread?实例都有自己的?mutex,在上述自己写的例子中是多个线程竞争同一个?mutex,阻塞线程队列管理的逻辑直接由?mutex?实现,而此处的?mutex?线程私有,不存在直接竞争关系,事实上,JVM?为了提升平台通用性(?),只提供了线程阻塞和恢复操作,阻塞线程队列的管理工作交给了?Java?层,也就是前面提到的?AQS。对于?Java?层来说?JVM?只需要提供?「阻塞」?和?「唤醒」?的操作即可。
在?Java?中讲解?Object.wait,?Object.notify?的时候通常会用生产者-消费者作为例子,这里我也简单的写了一个?c?的例子,让大家了解底层线程阻塞的原理:
#define?TRUE?1
#define?FALSE?0
#define?BUFFER_SIZE?10
pthread_cond_t?msg_cond?=?PTHREAD_COND_INITIALIZER;
pthread_mutex_t?mutex?=?PTHREAD_MUTEX_INITIALIZER;
char*?msgBuffer[BUFFER_SIZE]?=?{0};
int?bufferIndex?=?-1;
int?counter?=?0;
void*?readMsg()?{
????while?(TRUE)?{
????????//?获取锁
????????pthread_mutex_lock(?&mutex?);
????????if?(bufferIndex?< ?0)?{
????????????printf("wait?for?messagen");
????????????//?消息队列如果为空则阻塞等待
????????????pthread_cond_wait(?&msg_cond,?&mutex);
????????}
????????for(;?bufferIndex?>=?0;?--bufferIndex){
????????????char*?msg?=?msgBuffer[bufferIndex];
????????????msgBuffer[bufferIndex]?=?0;
????????????printf("read?message?=?%s,?%dn",?msg,?counter++);
????????????//?通知生产者线程
????????????pthread_cond_signal(&msg_cond);
????????}
????????sleep(1);
????????//?释放锁
????????pthread_mutex_unlock(?&mutex?);
????}
????return?0;
}
void*?writeMsg()?{
????//?获取锁
????pthread_mutex_lock(?&mutex?);
????if?(bufferIndex?< ?BUFFER_SIZE?-?1)?{
????????char*?msg?=?"haha!";
????????msgBuffer[++bufferIndex]?=?msg;
????????//?通知消费者线程
????????pthread_cond_signal(&msg_cond);?//?notify();
????????//?pthread_cond_broadcast(&msg_cond);?//?notifyAll();
????}?else?{
????????printf("message?buffer?is?full!n");
????????//?缓冲队列已满阻塞等待
????????pthread_cond_wait(?&msg_cond,?&mutex);
????}
????//?释放锁
????pthread_mutex_unlock(?&mutex?);
????return?0;
}
int?main(int?argc,?char?const?*argv[])?{
????pthread_t?thread_r;
????//?创建后台消费者线程
????pthread_create(&thread_r,?NULL,?readMsg,?NULL);
????for(int?i?=?0;?i?< ?50;?i++){
????????printf("send?message?%d?n",?i);
????????//?生产消息
????????writeMsg();
????}
????pthread_join(thread_r,?NULL);
????return?0;
}
其中消费者线程是一个循环,在循环中先获取锁,然后判断队列是否为空,如果为空则调用?pthread_cond_wait?阻塞线程,这个阻塞操作会自动释放持有的锁并出让?cpu?时间片,恢复的时候自动获取锁,消费完队列之后会调用?pthread_cond_signal?通知生产者线程,另外还有一个通知所有线程恢复的?pthread_cond_broadcast,与?notifyAll?类似。
最后再简单谈一下阻塞中断,Java?层?Thread?中有个?interrupt?方法,它的作用是在线程收到阻塞的时候抛出一个中断信号,这样线程就会退出阻塞状态,但是并不是我们遇到的所有阻塞都会中断,要看是否会响应中断信号,Object.wait,?Thread.join,Thread.sleep,ReentrantLock.lockInterruptibly?这些会抛出受检异常?InterruptedException?的都会被中断。synchronized,ReentrantLock.lock?的锁竞争阻塞是不会被中断的,interrupt?并不会强制终止线程,而是会将线程设置成?interrupted?状态,我们可以通过判断?isInterrupted?或?interrupted?来获取中断状态,区别在于后者会重置中断状态为?false。看一下底层线程中断的代码:
//?os_linux.cpp
void?os::interrupt(Thread*?thread)?{
??OSThread*?osthread?=?thread->osthread();
??if?(!osthread->interrupted())?{
????osthread->set_interrupted(true);
????OrderAccess::fence();
????ParkEvent?*?const?slp?=?thread->_SleepEvent?;
????if?(slp?!=?NULL)?slp->unpark()?;
??}
??//?For?JSR166.?Unpark?even?if?interrupt?status?already?was?set
??if?(thread->is_Java_thread())
????((JavaThread*)thread)->parker()->unpark();
??ParkEvent?*?ev?=?thread->_ParkEvent?;
??if?(ev?!=?NULL)?ev->unpark()?;
}
可以看到,线程中断也是由?unpark?实现的,?即恢复了阻塞的线程。并且对之前提到的三个?Parker?(_ParkEvent,_SleepEvent,_parker)?都进行了?unpark。
说到这里相信大家对?Java?线程锁与线程阻塞有个大体的了解了吧,由于本人水平实在有限,有些地方讲的不好或者有错误的地方请多包涵,如果发现任何问题,请提出讨论,我会及时修改。
关注公众号:拾黑(shiheibook)了解更多
[广告]赞助链接:
四季很好,只要有你,文娱排行榜:https://www.yaopaiming.com/
让资讯触达的更精准有趣:https://www.0xu.cn/
关注网络尖刀微信公众号随时掌握互联网精彩
- 1 习近平听取岑浩辉述职报告 7904562
- 2 哈尔滨大雪人原来是挖出来的 7807939
- 3 央视曝光走私孕妇血样黑色产业链 7713315
- 4 2025年度文化记忆 重温感动瞬间 7616213
- 5 女子毛衣粘走3000元翡翠耳环主动归还 7523920
- 6 中央财办:扩大内需是明年首位任务 7428483
- 7 女子罕见被控迷信罪 “供奉”2.1亿 7332884
- 8 52岁民警处置高速事故被撞身亡 7236785
- 9 小区收益分红17万 264户业主领现金 7138493
- 10 用漫画方式了解海南自贸港封关 7041428

程序员之家
