资源描述:
《futuretask源码解析-java开发java经验技巧》由会员上传分享,免费在线阅读,更多相关内容在工程资料-天天文库。
FuturcTask源码解析-Java开发Java经验技巧FutureTask源码解析原文出处:Jd刘钳洋站在使用者的角度,future是一个经常在多线程环境下使用的Runnable,使用它的好处有两个:1.线程执行结果带有返I叫值2.提供了一个线程超时的功能,超过超时时间抛出界常后返回。那,怎么实现future这种超时控制呢?来看看代码:publicclassFutureTaskimplementsRunnableFuture{/**SynchronizationcontrolforFuturcTask*/privatefinalSyncsync;FutureTask的实现只是依赖了一个内部类Sync实现的,Sync是AQS(AbstractQueuedSynchronizer)的子类,这个类承担了所有future的功能,AbstractQueuedSynchronizer的作者是大名鼎鼎的并发编程大师DougLea,它的作用远远不止实现一个Future这么简单,后面在说。下面,我们从一个future提交到线程池开始,直到future超时或者执行结束来看看future都做了些什么。怎么做的。首先,向线程池ThreadPoolExecutor提交一个future:future二exec,submit(newWebDivideFuture(cookieUti1s,jediusUti1,request,sclcctFactory,result,tcstlnfos));ThreadPoolExecutor将提交的任务用FutureTask包装一下:publicFuturesubmit(Callabletask){if(task二二null)thrownewNullPointerException();RunnableFutureftask二newTaskFor(task);execute(ftask);returnftask;}protectedRunnableFuturenewTaskFor(Callablecallable){returnnewFuturcTask(callable);然后尝试将包装后的Future用Thread类包装下后启动,红色标记的地方表示,当当前线程池的大小小于corePoolSize时,将任务捉交,否则将该任务加入到workQueue中去,如果workQueue装满了,则尝试在线程数小于MaxPoolSize的条件下提交该任务。 顺便说明下,我们使用线程池时,常常看到有关有界队列,无界队列作为工作队列的字眼:使用无界队列时,线程池的大小永远不大于corePoolSize,使用有界队列时的maxPoolSize才有效,原因就在这里,如果是无界队列,红框中的add永远为true下方的addlfUnderMaximumPoolSize怎么也走不到了,也就不会有线程数量大于MaxPoolSize的情况。言归正传,看看add!FUnderCorePoolSize中做了什么事:new了一个Thread,将我们提交的任务包装下后就直接启动了privateboolcanaddlfUndcrCorcPoolSizc(RunnablefirstTask){Threadt=null;finalReentrantLockmainLock=this.mainLock;mainLock,lock();try{if(poolsizcimplementsRunndbleFuture〈V>{/**SynchronizationcontrolforFutureTask*/publicinterfaceRunnableFutureextendsRunnable,Future{???/**????*SetsthisFuturetotheresultofitscomputation????*unlessithasbeencancelled.????*/???voidrun();FutureTask的run()方法i|【是这么写:publicvoidrun(){sync,inncrRun();innerRun方法先使用原子方式更改了一下自己的一个标志位state(用于标示任务的执行情况) 然后红色框的方法实现回调函数call的调用,并且将返回值作为参数传递下去,放置在一个叫做result的泛型变量中,然后future只管等待一段时间后去拿result这个变量的值就可以了。??至于怎么实现的“等待一段时间再去拿”后面马上说明。 innerSet在经过一系列的状态判断后,最终将V这个call方法返回的值赋值给了result说到这里,我们知道,future是通过将call方法的返回值放在一个叫做result的变量中,经过一段时间的等待后再去拿出来返回就可以了。怎么实现这个“等一段时间”呢?要从Sync的父类AbstractQueuedSynchronizer这个类说起:我们知道AbstractQueuedSynchronizer后者的中文名字叫做同步器,顾名思义,是用来控制资源占用的一种方式。对于FutureTask来说,“资源”就是result,线程执行的结果。思路就是通过控制对result这个资源的访问来决定是否需要马上去取得result这个结果,当超时时间未到,或者线程未执行结束时,是不能去取result的。当线程正常执行结束后,一系列的标志位会被修改,并告诉等待future执行结果的各个线程,可以来获取resultTo 这里会涉及到独占锁和共享锁的概念。 独占锁:同一时间只冇一个线程获取锁。再冇线程尝试加锁,将失败。典型例子reentrantLock共享锁:同一时间可以有多个线程获取锁。典型例子,木例中的FutureTask为什么说他们?因为Sync本质上就是想完成一个共享锁的功能,所以Sync继承了AbstractQucucdSynchronizcr所以Sync的方法使用的是AbstractQueuedSynchronizer的共享锁的API首先,我们明白,future结束有两种状态:1.线程正常执行完毕,通知等待结果的主线程对应于future.get()方法。2.线程还未执行完毕,等待结果的主线程已经等不到了(超时),抛出一个TimeOutException后不再等待。对应于future.get(longtimeout,TimeUnitunit)卜•面我们依次看看对于这两种状态,我们是怎么处理的:从上图屮可以得知,线程在执行完毕后会将执行的结果放到result屮,红色框中同时提到了releaseShared方法,我们从这里进入AbstractQueuedSynchronizer 当result已经被赋值,或者FutureTask为cancel状态时,FutureTask会尝试去释放共享锁(可以同吋有多个线程调用future.get()方法,也就是会有多个线程在等待future执行结果,而furue在执行完毕后会依次唤醒各个线程)如杲尝试成功,则开始真止的释放锁,这里是AbstractQueuedSynchronizer比较精妙的地方,“尝试”动作都定义为抽象方法,交个各个子类去定义“尝试成功的含义”而真正的释放则自己实现,这种复杂规则交个子类,流程交给自己的思路很值得借鉴。publicfinalbooleanreleaseShared(intarg){if(tryRelcascSharcd(arg)){doReleaseShared();returntrue;returnfalse;再看FutureTask的“尝试释放”的规则: 没啥好说,怎么尝试都成功*TmplernentsAQSbaserleasetoalwayssigneilaftersetting*finaldonestatusbynullingrunner*/protectedbooleantryReleaseShared(intignore){runner二null;returntrue;}接着AbstractQueuedSynchronizer开始了真正的释放唤醒I.作:privatevoiddoReleaseShared(){/**Ensurethatareleasepropagates,evenifthereareother*in-progressacquires/releases.Thisproceedsintheusual*wayoftryingtounparkSuccessorofheadifitneeds*signal.Butifitdoesnot,statusissettoPROPAGATEto*ensurethatuponrelease,propagationcontinues.*Additionally,wemustloopincaseanewnodeisadded*whilewearedoingthis.Also,uniikeotherusesof*unparkSuccessor,weneedtoknowifCAStoresetstatus*fails,ifsorechecking.*/for(;;){Nodeh=head;//把头元素取出来,保持头元素的引用,防止head被更改if(h!=null&&h!=tail){intws二h.waitstatus;if(ws=Node.SIGNAL){//如果状态位为:需耍一个信号去唤醒注释原话:/**wciitStatusvaluetoindiceitesuccessor'sthreadneedsunparking*/if(!compareAndSetWaitStatus(h,Node.SIGNAL,0))//修改状态位continue;//looptorecheckcasesunparkSuccessor(h);//如果修改成功,则通过头元素找到一个线程,并且唤醒它(唤醒动作是通过JNI方法去调用的)}elseif(ws二二0&&!compareAndSetWaitStatus(h,0,Node.PROPAGATE))continue;//looponfailedCASif(h二二head)//loopifheadchangedbreak; 循环遍历后,知道己经没有结点需要唤醒则返回,依次return后,future的run方法执行完毕。以上是针对future线程的,我们知道,FutureTask已经将执行结果放在了result中,并且按等的先后顺序依唤醒了等待队列上的线程。那,猜测future,get方法就不难了,对于带超时的get方法:最大的可能性就是不断的检查future的一个状态位,看它是否执行完毕,执行完则获取结果返回,否则,再阻塞自己一段时间。对于不待超时的,就上来就先尝试获取结果,拿不到就阻塞自己,直到上述的innerSet方法唤醒它。究竟是不是这样呢?一起來看看:/***©throwsCancellationException{©inheritDoc}*/publicVget()throwsTnterruptedException,ExecutionException{returnsync,irmerGet();}/***@throwsCancellationException*/publicVget(longtimeout,TimeUnitunit)throwsIntcrruptcdExccption,ExecutionException,TimeoutException{returnsync.innerGet(unit.toNanos(timeout));}innerGet()throwsIntcrruptcdExccption,ExccutionException{acquireSharedInterruptibly(0);if(getStateO二二CANCELLED)thrownewCancellationException();if(exception!二null)thrownewExccutionExccption(exception);returnresult;innerGet(longnanosTimeout)throwsInterruptedException,ExecutionException,TimeoutException{if(!tryAcquireSharedNanos(0,nemosTimeout))thrownewTimeoutException();if(getStateO==CANCELLED)thrownewCancellationExceptionO; if(exception!二null)thrownewExecutionException(exception);returnresult;}因为innerGet(longnanosTimeout)和innerGet()流程大致相同,所以我们重点讲解innerGet(longnanosTimeout),在唯个有区别的地方说明下即可。如下图所示,对于innerGet(longnanosTimeout)方法,FutureTask釆用的方法是直接加锁或者每隔一段时间尝试加锁,如果成功,则返回true,则如上图所示,直接返回result,主线程拿到执行结果。否则,抛出超时异常。publicfinalbooleantryAcquireSharedNanos(intarg,longnanosTimeout)throwsTnterruptedException{if(Thread,interruptedO)thrownewInterruptedExccption();returntr)^AcquireShared(arg)>=0||doAcquireSharedNanos(arg,nanosTimeout);}对于tryAcquircSharcd方法,比较简单,直接看future是否执行完毕/***ImplementsAQSbaseacquiretosucceedifranorcancelled*/protectedinttryAcquireShared(intignore){returninnerTsDone()?1:~1;}如果没有结束,则进入doAcquireSharedNanos方法:privatebooleandoAcquireSharedNanos(intarg,longnanosTimeout)throwsInterruptedException{longlastTimc二System.nemoTimcO;finalNodenode=addWaiter(Node.SHARED);//在队列尾部增加一个结点,我的理解是,用来标明这个队列是共享者队列还是独占队列try{for(;;){finalNodep=node,predecessor();//拿出刚才新增结点的前一个结点:实际冇效的队尾结点。if(p==head){int=tryAcquireShared(arg);//尝试获取锁。if(r>二0){//sctllcadAndPropagatc(node,r);//返回值大于1 对于FutureTask代表任务已经被cancel了,则更改队列头部结点。p.next=null;//helpGC将p结点脱离队列,帮助GCreturntrue;//返回trueJs上述中可以知道当前线成会抛出超时异常确定卜•会不会唤醒其他节点?}}if(nanosTimeout<=0){//如果设置的超时时间小于等于0则取消获取锁cancelAcquire(node);rcturnfalse;}if(nanosTimeout>spinForTimeoutThreshold&&//等待的吋间必须大于一个自旋锁的周期吋间shouldParkAfterFai1edAcquire(p,node))//遍历队列,找到需要沉睡的第一个节点LockSupport・park'anos(this,nanosTimeout);//调用JNI方法,沉睡当前线程longnow二System.nanoTime();nanosTimeout-二now-lastTime;//更新等待吋间循环遍历lastTime=now;if(Thread,interrupted())break;}}catch(RuntimcExccptionex){cancelAcquire(node);throwex;}//ArrivehereonlyifinterruptedcamcelAcquire(node);这样通过AQS的协作,所有调用future,get(longtimeout,TimeUnitunit)的线程都会按顺序等待,直到线成执行完被唤醍或者超时时间到主动抛出界常。总结至此为止FutureTask的解析已经基木结束了,可以看到。它依靠AQS的共享锁实现了对线程执行结果的访问控制。和我们通常意义上的访问控制(并发访问某个资源,获取失败时,沉睡自己等待唤醒或者超时后返冋)基本是一致的,不外乎维护了一个等待资源的列表。将等待资源的线程通过链表的方式串了起来。当然AQS的功能远不仅如此,它述捉供了一套独山锁的API,帮助使用者实现独占锁的功能。最常用的Reentrantlock就是使用这套API做的。 有机会的话再和人家分享下它的实现。