Spark发射任务源代码分析

Spark发射任务源代码分析

ID:37896260

大小:195.50 KB

页数:6页

时间:2019-06-02

Spark发射任务源代码分析_第1页
Spark发射任务源代码分析_第2页
Spark发射任务源代码分析_第3页
Spark发射任务源代码分析_第4页
Spark发射任务源代码分析_第5页
资源描述:

《Spark发射任务源代码分析》由会员上传分享,免费在线阅读,更多相关内容在教育资源-天天文库

1、其中TaskSetManager类的resourceOffer()方法调用流程CoarseGrainedSchedulerBackend的overridedefreceiveAndReply(context:RpcCallContext):PartialFunction[Any,Unit]=方法,注册executor,包括executorId,hostPort和cores.形成一个WorkOffer的列表,并发射任务valworkOffers=activeExecutors.map{case(id,executorData)=

2、>newWorkerOffer(id,executorData.executorHost,executorData.freeCores)}.toSeqlaunchTasks(scheduler.resourceOffers(workOffers))//LaunchtasksreturnedbyasetofresourceoffersprivatedeflaunchTasks(tasks:Seq[Seq[TaskDescription]]){for(task<-tasks.flatten){else{valexecutorDat

3、a=executorDataMap(task.executorId)executorData.freeCores-=scheduler.CPUS_PER_TASKexecutorData.executorEndpoint.send(LaunchTask(newSerializableBuffer(serializedTask)))}lancheTasks()方法的参数是TaskDescription的seq类型,对于其中的每个task,序列化,executorData.freeCores -=然后发射CoarseGrained

4、SchedulerBackend的makeOffers()调用TaskSchedulerImpl的resourceOffers()//MakefakeresourceoffersonallexecutorsprivatedefmakeOffers(){//FilteroutexecutorsunderkillingvalactiveExecutors=executorDataMap.filterKeys(!executorsPendingToRemove.contains(_))valworkOffers=activeExec

5、utors.map{case(id,executorData)=>newWorkerOffer(id,executorData.executorHost,executorData.freeCores)}.toSeqlaunchTasks(scheduler.resourceOffers(workOffers))}先对所有的WordOffer进行randomShuffle打乱顺序,并对TaskSet进行排序//TakeeachTaskSetinourschedulingorder,andthenofferiteachnodein

6、increasingorder//oflocalitylevelssothatitgetsachancetolaunchlocaltasksonallofthem.//NOTE:thepreferredLocalityorder:PROCESS_LOCAL,NODE_LOCAL,NO_PREF,RACK_LOCAL,ANY然后不断调用自己的resourceOfferSingleTaskSet()方法,直到taskSet发射成功。/***Calledbyclustermanagertoofferresourcesonslaves

7、.Werespondbyaskingouractivetask*setsfortasksinorderofpriority.Wefilleachnodewithtasksinaround-robinmannerso*thattasksarebalancedacrossthecluster.*/defresourceOffers(offers:Seq[WorkerOffer]):Seq[Seq[TaskDescription]]=synchronized{//Markeachslaveasaliveandrememberitsh

8、ostname//AlsotrackifnewexecutorisaddedvarnewExecAvail=falsefor(o<-offers){executorIdToHost(o.executorId)=o.hostactiveExecutorIds+=o.execut

当前文档最多预览五页,下载文档查看全文

此文档下载收益归作者所有

当前文档最多预览五页,下载文档查看全文
温馨提示:
1. 部分包含数学公式或PPT动画的文件,查看预览时可能会显示错乱或异常,文件下载后无此问题,请放心下载。
2. 本文档由用户上传,版权归属用户,天天文库负责整理代发布。如果您对本文档版权有争议请及时联系客服。
3. 下载前请仔细阅读文档内容,确认文档内容符合您的需求后进行下载,若出现内容与标题不符可向本站投诉处理。
4. 下载文档时可能由于网络波动等原因无法下载或下载错误,付费完成后未能成功下载的用户请联系客服处理。