欢迎来到天天文库
浏览记录
ID:11868338
大小:47.50 KB
页数:6页
时间:2018-07-14
《非阻塞同步算法boundlesscyclicbarrier实现》由会员上传分享,免费在线阅读,更多相关内容在行业资料-天天文库。
1、非阻塞同步算法BoundlessCyclicBarrier实现前言本文不需要太多的准备知识,但技巧性更强一些。因为分析、设计的过程比较复杂繁琐,也限于篇幅,所以,主要展示如何解决这些需求,和讲解代码。另外,所讲的内容也是后一篇实战中需要用到的一个工具类。需求介绍我需要编写一个同步工具,它需要提供这样几个方法:await、pass、cancel。某个线程调用await时,会被阻塞;当调用pass方法时,之前因为await而阻塞的线程将全部被解除阻塞,之后调用await的线程继续被阻塞,直到下一次调用pass。该工具同时还维护一个版本号,await方法
2、可以带一个目标版本号,如果当前的版本号比目标版本号新或相同,则直接通过,否则,阻塞本线程,直到到达或超过目标版本。调用pass的时候,更新版本号。如果停止了版本更新,可使用cancel方法来解除所有因await而阻塞的线程,包括指定版本号的。此方法用于避免无谓地等待。若await发生在cancel之后,则仍将被阻塞。因为CountDownLatch不允许重复使用,CyclicBarrier只支持固定个数的线程,并且都没有维护一个版本号,所以没有已有的类能实现上面的需求,需要自己实现。问题分析简单分析可知,应该维护一个队列,来保存当前被阻塞的线程,用
3、于在pass时对它们一一解除阻塞,pass时应该使用一个新的队列,否则不方便正确处理pass前和pass后调用await的线程。至此,问题的关键就明了了:如何将队列的替换和版本号的更新这两个操作做成原子的。解决方案以前在《JAVA并发编程实践》曾看到过这样一个小技巧,如果要原子地更新两个变量,那么可以创建一个新的类将它们封装起来,将这两个变量当定义成类成员变量,更新时,用CAS更新这个类的引用即可。因为较为复杂,下面先给出完整的代码,再讲解其中的关键。注意:上面所说pass,在代码中的具体实现为nextCycle,有两个版本,一个自动维护版本号,一
4、个由调用者维护版本号。/***@authortrytocatch@163.com*@time2013-1-31*/publicclassBoundlessCyclicBarrier{protectedfinalAtomicReferencewaitQueueRef;publicBoundlessCyclicBarrier(){this(0);}publicBoundlessCyclicBarrier(intstartVersion){waitQueueRef=newAtomicReference5、>(newVersionQueue(startVersion));}publicfinalvoidawaitWithAssignedVersion(intmyVersion)throwsInterruptedException{awaitImpl(true,myVersion,0);}/****@parammyVersion*@paramnanosTimeout*@returniftimeout,orbecanceledanddoesn'treachmyVersion,returnsfalse*@throwsInterruptedException6、*/publicfinalbooleanawaitWithAssignedVersion(intmyVersion,longnanosTimeout)throwsInterruptedException{returnawaitImpl(true,myVersion,nanosTimeout);}publicfinalvoidawait()throwsInterruptedException{awaitImpl(false,0,0);}/****@paramnanosTimeout*@returnifandonlyiftimeout,returnsf7、alse*@throwsInterruptedException*/publicfinalbooleanawait(longnanosTimeout)throwsInterruptedException{returnawaitImpl(false,0,nanosTimeout);}/***passandversion++(somethreadsmaynotbeunparkedwhenawaitImplisinprocess,butit'sOKinthisBarrier)*@returnoldqueueversion*/publicintnextCy8、cle(){VersionQueueoldQueue=waitQueueRef.get();VersionQueuenew
5、>(newVersionQueue(startVersion));}publicfinalvoidawaitWithAssignedVersion(intmyVersion)throwsInterruptedException{awaitImpl(true,myVersion,0);}/****@parammyVersion*@paramnanosTimeout*@returniftimeout,orbecanceledanddoesn'treachmyVersion,returnsfalse*@throwsInterruptedException
6、*/publicfinalbooleanawaitWithAssignedVersion(intmyVersion,longnanosTimeout)throwsInterruptedException{returnawaitImpl(true,myVersion,nanosTimeout);}publicfinalvoidawait()throwsInterruptedException{awaitImpl(false,0,0);}/****@paramnanosTimeout*@returnifandonlyiftimeout,returnsf
7、alse*@throwsInterruptedException*/publicfinalbooleanawait(longnanosTimeout)throwsInterruptedException{returnawaitImpl(false,0,nanosTimeout);}/***passandversion++(somethreadsmaynotbeunparkedwhenawaitImplisinprocess,butit'sOKinthisBarrier)*@returnoldqueueversion*/publicintnextCy
8、cle(){VersionQueueoldQueue=waitQueueRef.get();VersionQueuenew
此文档下载收益归作者所有