欢迎来到天天文库
浏览记录
ID:37896260
大小:195.50 KB
页数:6页
时间:2019-06-02
《Spark发射任务源代码分析》由会员上传分享,免费在线阅读,更多相关内容在教育资源-天天文库。
1、其中TaskSetManager类的resourceOffer()方法调用流程CoarseGrainedSchedulerBackend的overridedefreceiveAndReply(context:RpcCallContext):PartialFunction[Any,Unit]=方法,注册executor,包括executorId,hostPort和cores.形成一个WorkOffer的列表,并发射任务valworkOffers=activeExecutors.map{case(id,executorData)=
2、>newWorkerOffer(id,executorData.executorHost,executorData.freeCores)}.toSeqlaunchTasks(scheduler.resourceOffers(workOffers))//LaunchtasksreturnedbyasetofresourceoffersprivatedeflaunchTasks(tasks:Seq[Seq[TaskDescription]]){for(task<-tasks.flatten){else{valexecutorDat
3、a=executorDataMap(task.executorId)executorData.freeCores-=scheduler.CPUS_PER_TASKexecutorData.executorEndpoint.send(LaunchTask(newSerializableBuffer(serializedTask)))}lancheTasks()方法的参数是TaskDescription的seq类型,对于其中的每个task,序列化,executorData.freeCores -=然后发射CoarseGrained
4、SchedulerBackend的makeOffers()调用TaskSchedulerImpl的resourceOffers()//MakefakeresourceoffersonallexecutorsprivatedefmakeOffers(){//FilteroutexecutorsunderkillingvalactiveExecutors=executorDataMap.filterKeys(!executorsPendingToRemove.contains(_))valworkOffers=activeExec
5、utors.map{case(id,executorData)=>newWorkerOffer(id,executorData.executorHost,executorData.freeCores)}.toSeqlaunchTasks(scheduler.resourceOffers(workOffers))}先对所有的WordOffer进行randomShuffle打乱顺序,并对TaskSet进行排序//TakeeachTaskSetinourschedulingorder,andthenofferiteachnodein
6、increasingorder//oflocalitylevelssothatitgetsachancetolaunchlocaltasksonallofthem.//NOTE:thepreferredLocalityorder:PROCESS_LOCAL,NODE_LOCAL,NO_PREF,RACK_LOCAL,ANY然后不断调用自己的resourceOfferSingleTaskSet()方法,直到taskSet发射成功。/***Calledbyclustermanagertoofferresourcesonslaves
7、.Werespondbyaskingouractivetask*setsfortasksinorderofpriority.Wefilleachnodewithtasksinaround-robinmannerso*thattasksarebalancedacrossthecluster.*/defresourceOffers(offers:Seq[WorkerOffer]):Seq[Seq[TaskDescription]]=synchronized{//Markeachslaveasaliveandrememberitsh
8、ostname//AlsotrackifnewexecutorisaddedvarnewExecAvail=falsefor(o<-offers){executorIdToHost(o.executorId)=o.hostactiveExecutorIds+=o.execut
此文档下载收益归作者所有