资源描述:
《architecture for stateful data-intensive analytics》由会员上传分享,免费在线阅读,更多相关内容在学术论文-天天文库。
UNIVERSITYOFCALIFORNIA,SANDIEGOArchitecturesforStatefulData-intensiveAnalyticsAdissertationsubmittedinpartialsatisfactionoftherequirementsforthedegreeDoctorofPhilosophyinComputerSciencebyDionysiosLogothetisCommitteeincharge:KennethYocum,ChairReneCruzAlinDeutschMassimoFranceschettiAlexSnoerenGeoffreyM.Voelker2011 CopyrightDionysiosLogothetis,2011Allrightsreserved. ThedissertationofDionysiosLogothetisisapproved,anditisacceptableinqualityandformforpublicationonmicrofilmandelectronically:ChairUniversityofCalifornia,SanDiego2011iii TABLEOFCONTENTSSignaturePage..................................iiiTableofContents.................................ivListofFigures..................................viiListofTables...................................xiAcknowledgements................................xiiVita........................................xiiiAbstractoftheDissertation...........................xivChapter1Introduction............................11.1The“bigdata”promise...................11.2Data-intensivecomputing..................31.3Statefuldata-intensivecomputing.............51.3.1Anexample:webcrawlqueue...........61.3.2Challengesinmanagingstate...........71.4Architecturesforstatefulanalytics.............81.4.1ContinuousETLanalytics.............91.4.2Statefulbulkdataprocessing............101.5Contributions........................111.6Outline............................12Chapter2Backgroundandrelatedwork..................142.1Groupwiseprocessing....................142.2TheMapReduceprogrammingmodel...........152.3MapReducearchitecture..................182.3.1Programexecution.................182.3.2Faulttolerance...................202.4Statefulanalyticsonunstructureddata..........212.4.1Automaticincrementalization...........212.4.2Iterativeanalytics..................222.4.3Modelsforincrementalanalytics..........23Chapter3Statefulonlineanalytics.....................253.1Design............................273.1.1ContinuousMapReduce..............293.1.2Programexecution.................293.1.3Usingin-networkaggregationtrees........31iv 3.1.4Efficientwindowprocessingwithpanes......323.2Fidelity-latencytradeoffs..................363.2.1Measuringdatafidelity...............363.2.2UsingC2inapplications..............383.2.3Resulteviction:tradingfidelityforavailability..413.3Relatedwork........................433.3.1“Online”bulkprocessing..............433.3.2Logcollectionsystems...............443.3.3Loadsheddingindatastreamprocessors.....443.3.4Distributedaggregation...............453.4Acknowledgments......................45Chapter4Anarchitectureforin-situprocessing..............464.1Implementation.......................464.1.1Buildinganin-situMapReducequery.......474.1.2Mapandreduceoperators.............484.1.3Loadcancellationandshedding..........514.1.4Paneflowcontrol..................524.1.5MapReducewithgaprecovery...........524.2Evaluation..........................534.2.1Scalability......................544.2.2Loadshedding....................554.2.3Failureeviction...................574.2.4UsingC2.......................584.2.5In-situperformance.................674.3Acknowledgments......................69Chapter5Statefulbulkprocessing.....................705.1Abasictranslateoperator.................715.2Continuousbulkprocessing.................735.3Supportforgraphalgorithms...............765.4SummaryofCBPmodel..................775.5Applications.........................795.5.1Miningevolvinggraphs...............795.5.2Clusteringcoefficients................805.5.3IncrementalPageRank...............825.6Relatedwork........................845.7Acknowledgments......................85Chapter6CBPdesignandimplementation.................866.1Controllingstageinputsandexecution..........876.2Schedulingwithbottleneckdetection...........876.3Failurerecovery.......................88v 6.4CBPontopofMap-Reduce................896.4.1Incrementalcrawlqueueexample.........906.4.2Incrementmanagement...............916.5DirectCBP.........................926.5.1Incrementalshufflingforloopbackflows......926.5.2RandomaccesswithBIPtables...........936.5.3Multicastandbroadcastrouting..........946.5.4FlowseparationinMap-Reduce..........956.6Evaluation..........................966.6.1Incrementalcrawlqueue..............966.6.2BIPtablemicrobenchmarks.............996.6.3Clusteringcoefficients................1006.6.4PageRank......................1016.7Acknowledgements.....................103Chapter7Conclusion.............................104Bibliography...................................108vi LISTOFFIGURESFigure1.1:Anillustrationoftheevolutionfromtraditionaldataprocessingto“big-data”analytics,includingtheevolutionintherelationaldatamanagementtechnology...................3Figure1.2:Groupwiseprocessingrequiresuserstospecify:(i)howtogroupinputrecords,and(ii)theoperationtoperformoneachgroup.Togroupinputrecords,usersspecifyafunctionthatextractsagroupingkeyforeveryrecord.Tospecifytheoperation,usersimplementagenericoperatorthatreceivesasinputthegroupingkeyandalltheinputrecordsthatsharethesamekey......4Figure1.3:GroupwiseprocessingisacoreabstractioninDISCsystems.Figure(a)showsthesemanticsofagroupwisecountoperation.HereinputURLsaregroupedaccordingtothedomaintheybelongtoandtheoperationoneachgroupisacount.Figure(b)illustratesthephysicalexecution.Groupingallowsindependentoperationstoexecuteinparallel..................5Figure1.4:Adataflowforcomputingawebcrawlqueue.StatefulstagesarelabeledwithanS........................6Figure1.5:AnincrementalapproachtoupdatetheURLcountsinthecrawlqueuedataflow.Thisapproachre-usespreviouslycomputedURLcounts.............................7Figure1.6:Statefulgroupwiseprocessingextendsthegroupwiseprocessingconstructbyintegratingstateinthemodel.Auser-specifiedoperationnowhasaccesstostatethatcanbeusedtosaveandre-usecomputations.........................9Figure2.1:AMapReducecomputationisexpressedusingtwomainfunc-tions.TheMapisusedtoextractinformationfromrawdataanddeterminethepartitioningofdataintogroups.TheRe-ducefunctionisusedtoaggregatedatainthesamegroup.TheCombinefunctionisusedasanoptimizationtoreducethesizeoftheMapoutput..........................16Figure2.2:AMapReduceexampleprogramthatcountswordoccurrencesintext.TheMapfunctionreceivesasinputthedocumenttextandextractswordsfromit.Foreveryword,itemitsanin-termediatekey-valuepair,withthekeysetequaltowordandthevaluesetto“1”,indicatingasingleoccurrence.Forev-eryintermediatekey,theReducefunctioncountsthenumberofoccurrencesofthecorrespondingword...............16vii Figure2.3:TheexecutionofaMapReduceprogram.DataflowbetweenMapandReducetasks.Maptasksprocessinputsplitsinpar-allelandoutputintermediatekey-valuepairs.Intermediatekey-valuepairsfromeachMapoutputarepartitionedacrossmulti-pleReducetasksaccordingtotheirkey.Reducetasksarealsoexecutedinparallel.........................19Figure3.1:Thein-situMapReducearchitectureavoidsthecostandlatencyofthestore-first-query-laterdesignbymovingprocessingontothedatasources...........................27Figure3.2:ThisillustratesthephysicalinstantiationofoneiMRMapRe-ducepartitionasamulti-levelaggregationtree..........30Figure3.3:iMRnodesprocesslocallogfilestoproducesub-windowsorpanes.Thesystemassumeslogrecordshavealogicaltimestampandarriveinorder.........................32Figure3.4:iMRaggregatesindividualpanesPiinthenetwork.Toproducearesult,therootmayeithercombinetheconstituentpanesorupdatethepriorwindowbyremovinganexpiredpaneandaddingthemostrecent.......................34Figure3.5:iMRextendsthetraditionalMapReduceinterfacewithanun-combinefunctionthatallowsthespecificationofdifferentialfunctions.Theuncombinefunctionsubtractsolddataandthecombinefunctionaddsnewdatatoproducethefinalresult...35Figure3.6:C2describesthesetofpaneseachnodecontributestothewin-dow.HereweshowtwodifferentwaysinwhichC2represents50%ofthetotaldata:allthenodesprocesshalftheirdataorhalfthenodesprocessalltheirdata................36Figure4.1:EachiMRjobconsistsofaMortarqueryforthemapandaqueryforthereduce.HeretherearetwoMapReducepartitions(r=2),whichresultintwoaggregationtrees.Awordcountexampleillustratespartitioningmapoutputacrossmultiplere-duceoperators............................48Figure4.2:Dataprocessingthroughputasthenumberofworkersandrootsincreases.Whentherootofthequerybecomesthebottleneck,iMRscalesbypartitioningdataacrossmoreroots........54Figure4.3:Impactofloadsheddingonfidelityandlatencyforawordcountjobundermaximumlatencyrequirementandvaryingworkerload.56Figure4.4:Applicationgoodputasthepercentageoffailedworkersin-creases.Failureevictiondeliverspanesearlier,improvinggood-putbyupto64%..........................57viii Figure4.5:Theperformanceofacountstatisticondatauniformlydis-tributedacrossthelogserverpool.Therelativecounterrordropslinearlyasweincludemoredata.Becauseoftheuni-formdatadistribution,boththecountandthefrequencydonotdependontheC2specification.................59Figure4.6:Theperformanceofacountstatisticondataskewedacrossthelogserverpool.Becauseofthespatialskew,enforcingei-therrandompaneselectionorspatialcompletenessallowsthesystemtobetterapproximatecountfrequenciesthantemporalcompleteness,andlowerresultlatency..............60Figure4.7:EstimatingusersessioncountusingiMRanddifferentC2poli-cies.Wepreservetheoriginaldatadistribution,whereclicksfromthesameusermayexistondifferentservers.RandompaneselectionandtemporalcompletenessprovidehigherdatafidelityandsamplemoreuserIDsthanwhenenforcingspatialcompleteness.............................62Figure4.8:EstimatingusersessioncountusingiMRanddifferentC2poli-cies.Herewedistributedatasothatclicksfromthesameuserexistonasingleserver.Temporalcompletenessreturnsses-sionsthatareaccurate,butsamplesthesmallestpercentageofuserIDs.Instead,randomsamplingcansamplealargerspaceofuserIDs..............................63Figure4.9:(a)ResultsfromtheKolmogorov-SmirnovtestillustratetheimpactofreduceddatafidelityonthehistogramsreportedforeachHDFSserver.(b)ForHDFSanomalydetection,randomandspatialcompletenessC2improvelatencybyatleast30%..66Figure4.10:FidelityandHadoopperformanceasafunctionoftheiMRpro-cessniceness.Thehighertheniceness,thelessCPUisallocatedtoiMR.Hadoopisalwaysgiventhehighestpriority,nice=0..68Figure5.1:Theprogressionfromastatelessgroupwiseprocessingprimi-tivetostatefultranslation,T(·),withmultipleinputs/outputs,groupedstate,andinnergroupings.................71Figure5.2:TranslatorpseudocodethatcountsobservedURLs.Thetrans-latorreadsandupdatesthesavedcount..............73Figure5.3:AstageimplementingsymmetricsetdifferenceofURLsfromtwoinputcrawls,AandB.....................74Figure5.4:Usersspecifyper-inputflowRouteByfunctionstoextractkeysforgrouping.Specialkeysenablethebroadcastandmulticastofrecordstogroups.HereweshowthatmulticastaddressmcXisboundtokeysk1andk3.....................77Figure5.5:Incrementalclusteringcoefficientdataflow.Eachnodemain-tainsasstateitsadjacencylistandits“friends-of-friends”list..79ix Figure5.6:Theclusteringcoefficientstranslatoraddsnewedges(2-3),sendsneighborsupdates(4-6),andprocessesthoseupdates(7-10)..80Figure5.7:IncrementalPageRankdataflow.Theloopbackflowsareusedtopropagatemessagesbetweennodesinthegraph........82Figure5.8:PseudocodeforincrementalPageRank.Thetranslatoractsasaneventhandler,usingthepresenceofrecordsoneachloopbackflowasanindicationtorunaparticularphaseofthealgorithm.83Figure6.1:TheMap-ReducejobsthatemulatetheCBPincrementalcrawlqueuedataflow............................91Figure6.2:Cumulativeexecutiontimewith30GBand7.5GBincrements.Thesmallertheincrements,thegreaterthegainfromavoidingstatere-shuffling...........................97Figure6.3:Theperformanceoftheincrementalversuslandmarkcrawlqueue.ThedirectCBPimplementationprovidesnearlyconstantruntime.98Figure6.4:Runningtimeusingindexedstatefiles.BIPTableoutperformssequentialaccessevenifaccessingmorethan60%ofstate....99Figure6.5:IncrementalclusteringcoefficientonFacebookdata.Themul-ticastoptimizationimprovesrunningtimeby45%andreducesdatashuffledby84%overtheexperiment’slifetime.......100Figure6.6:IncrementalPageRank.(a)Cumulativerunningtimeofourin-crementalPageRanktranslatoradding2800edgestoa7millionnodegraph.(b)CumulativedatamovedduringincrementalPageRank..............................102x LISTOFTABLESTable5.1:Fivefunctionscontrolstageprocessing.Defaultfunctionsexistforeachexceptfortranslation....................78xi ACKNOWLEDGEMENTSFirstandforemost,Iwouldliketothankmyadvisor,Dr.KennethYocum,forhisguidanceduringmygraduatestudies.Iamgratefulforhissupportovertheseyearsandforalwaystryingtoteachmehowtobecomeabetterresearcher.Ihavelearnedalotfromhim.Iwouldliketothankallthemembersofmythesiscommitteeforacceptingtoevaluatethiswork.IwanttospeciallythankDr.AlinDeutsch,Dr.GeoffVoelker,andDr.AlexSnoerenfortheiradviceandvaluablefeedbackonmanyaspectsofmyresearch.IwouldalsoliketoacknowledgeChrisOlstonandBenReedfortheircontributiontoalargebodyofthiswork.Finally,IwouldliketothankmycolleaguesChrisTrezzoandKevinWebbfortheircollaboration.Itwasgreatfunworkingwiththem.Chapters3and4,inpart,arereprintsofthematerialpublishedinthePro-ceedingsoftheUSENIXAnnualTechnicalConference2011.Logothetis,Dionysios;Trezzo,Chris;Webb,KevinC.;Yocum;Ken.Thedissertationauthorwasthepri-maryinvestigatorandauthorofthispaper.Chapters5and6,inpart,arereprintsofthematerialpublishedinthePro-ceedingsoftheACMSymposiumonCloudComputing2010.Logothetis,Diony-sios;Olston,Christopher;Reed,Benjamin;Webb,KevinC.;YocumKen.Thedissertationauthorwastheprimaryinvestigatorandauthorofthispaper.xii VITA2004DiplomainComputerScienceandEngineering,NationalTech-nicalUniversityofAthens,Greece2007MasterofScienceinComputerScience,UniversityofCali-fornia,SanDiego2011DoctorofPhilosophyinComputerScience,UniversityofCal-ifornia,SanDiegoPUBLICATIONSDionysiosLogothetis,ChrisTrezzo,KevinWebb,KennethYocum,“In-situMapRe-duceforLogProcessing”,USENIXAnnualTechnicalConference,Portland,OR,June2011.DionysiosLogothetis,ChristopherOlston,BenjaminReed,KevinWebb,KennethYocum,“StatefulBulkProcessingforIncrementalAnalytics”,1stACMSympo-siumonCloudComputing,Indianapolis,IN,June2010DionysiosLogothetis,KennethYocum,“DataIndexingforStateful,Large-scaleDataProcessing”,5thInternationalWorkshoponNetworkingMeetsDatabases,BigSky,MT,October2009DionysiosLogothetis,KennethYocum,“Ad-hocDataProcessingintheCloud”,34thInternationalConferenceonVeryLargeDatabases(demo),Auckland,NewZealand,August2008EmiranCurtmola,AlinDeutsch,DionysiosLogothetis,K.K.Ramakrishnan,Di-veshSrivastavaandKennethYocum,“XTreeNet:DemocraticCommunitySearch”,34thInternationalConferenceonVeryLargeDatabases(demo),Auckland,NewZealand,August2008DionysiosLogothetis,KennethYocum,“Wide-ScaleDataStreamProcessing”,USENIXAnnualTechnicalConference,Boston,MA,June2008.Yang-SukKee,DionysiosLogothetis,RichardHuang,HenriCasanova,AndrewChien,“EfficientResourceDescriptionandHighQualitySelectionforVirtualGrids”,5thInternationalSymposiumonClusterComputingandtheGrid,Cardiff,UK,May2005xiii ABSTRACTOFTHEDISSERTATIONArchitecturesforStatefulData-intensiveAnalyticsbyDionysiosLogothetisDoctorofPhilosophyinComputerScienceUniversityofCalifornia,SanDiego,2011KennethYocum,ChairTheabilitytodorichanalyticsonmassivesetsofunstructureddatadrivestheoperationofmanyorganizationstodayandhasgivenrisetoanewclassofdata-intensivecomputingsystems.Manyoftheseanalyticsareupdate-driven,theymustconstantlyintegratenewdataintheanalysis,andafundamentalrequirementforefficiencyistheabilitytomaintainstate.However,currentdata-intensivecomputingsystemsdonotdirectlysupportstatefulanalytics,makingprogrammingharderandresultingininefficientprocessing.Thisdissertationproposesthatstatebecomeafirst-classabstractionindata-intensivecomputing.Itintroducesstatefulgroupwiseprocessing,aprogram-mingabstractionthatintegratesdata-parallelismandstate,allowingsophisticated,easilyparallelizablestatefulanalytics.Theexplicitmodelingofstateabstractsthexiv detailsofstatemanagement,makingprogrammingeasier,andallowstheruntimesystemtooptimizestatemanagement.Thisworkinvestigatestheuseofstatefulgroupwiseprocessingintwodistinctphasesinthedatamanagementlifecycle:(i)theextractionofdatafromitssourcesandonlineanalysis,and(ii)itsstorageandfollow-onanalysis.Weproposetwocomplementaryarchitecturesthatmanagedatainthesetwophases.ThisworkproposesIn-situMapReduce(iMR),amodelandarchitectureforefficientonlineanalytics.TheiMRmodelcombinesstatefulgroupwiseprocessingwithwindowedprocessingforanalyzingstreamsofunstructureddata.Toallowtimelyanalytics,theiMRmodelsupportsreduceddatafidelitythroughpartialdataprocessingandintroducesanovelmetricforthesystematiccharacterizationofpartialdata.Forefficiency,theiMRarchitecturemovesthedataanalysisfromdedicatedcomputeclustersontothesourcesthemselves,avoidingcostlydatami-grations.Oncedataareextractedandstored,afundamentalchallengeishowtowriterichanalyticstogaindeeperinsightsfrombulkdata.ThisworkintroducesContinuousBulkProcessing(CBP),amodelandarchitectureforsophisticateddataflowsonbulkdata.CBPusesstatetefulgroupwiseprocessingasthebuildingblockforexpressinganalytics,lendingitselftoincrementalanditerativeanalytics.Further,CBPprovidesprimitivesfordataflowcontrolthatsimplifythecompo-sitionofsophisticatedanalytics.Leveragingtheexplicitmodelingofstate,CBPexecutesthesedataflowsinascalable,efficient,andfault-tolerantmanner.xv Chapter1Introduction1.1The“bigdata”promiseDataisemergingasanewscience,aresultoftheunprecedentedincreaseintheamountofdigitalinformationproducedtodayandtherealizationofin-novativewaystoextractvaluefromit.Technologicaladvanceshaveledtoanabundanceofdigitalinformationsourcesthatconstantlygeneratedata:Inter-netusersproducingvaluablecontent(browsinghistory,e-mailexchanges,onlinepurchases,socialnetworkinteractionsetc.),ubiquitousdigitaldevicesgeneratinginformation(mobilephones,cameras,RFIDsensorsetc.),andadvancedscientificinstrumentsproducinghundredsofterabytesofdata[48].Theinformationhiddeninthisvastamountofdatahasthepotentialtotransformhumanendeavors,likescience,business,andfinance,inwaysthatwerepreviouslyunimagined.Forin-stance,publichealthorganizationsnowmonitortrendsinsearchenginequeriestodetectfluepidemics[40].Socialnetworksanalyzeuserinteractionstodotargetedadvertising[27]andretailerstracksalesdatatounderstandconsumerbehavior[63],recommendproducts,andincreaseprofit.Banksanalyzefinancialandpersonalinformationtodetectfraud[24],whilestocktradersbasetheirdecisionmakingontheanalysisofreal-timefinancialdata[78].Realizingthispotentialwithtraditionaldatamanagementapproacheshasbeenchallenging.Fordecades,DatabaseManagementSystems(DBMSs)havebeenthedominantapproachfordatastorageandanalysis.Databaseswerede-1 2signedtostoredatausingwell-definedstructure,aschema,carefullydeterminedinadvance.DBMSsmodeldataasrelations[30],essentiallytables,andtypicalusesofaDBMSincludethetransactionalaccessofthesedatatoensuredataintegrity.Forinstance,banksuseDBMSstostoretablesofbankaccounts,andguaranteeaccountinformationintegrity(e.g.duringconcurrentaccountwithdrawals).Rela-tionalalgebra[30]andtheSQLlanguagehavebeenthemaintoolformanipulatingthistypeofdata.ThestrengthofSQLismainlyinupdatingorretrievingdatafromtablesandproducingsimplereportsthroughdatasummaries.Infact,yearsofworkonparalleldatabasesandrelationalqueryoptimizationhaveresultedinDBMSsthatcanstoreandanalyzelargeamountsofstructureddataefficiently.However,meetingthepromiseofbigdatadependsonafundamentallydif-ferentkindofanalysis.First,thereisagrowingdemandforcomplexanalyticsonunstructureddata,suchastext(e.g.webpages,e-mails,serverlogs),video(e.g.surveillancevideos,YouTubeuploads)andimages(e.g.photoalbums,MRIscans,satelliteimages).Suchdataareoftendifficulttorepresentasrelationaltables,thecoreabstractioninDBMSs.Furthermore,transactionalaccessisoftenun-necessarysincethesedataarearchivedandanalyzedinbulk,andtheseanalyticsmaynotbeexpressiblewithrelationalalgebra[72,35].Forinstance,severalgraphanalysesfoundinsocialnetworksorwebminingapplicationsarehardandsome-timesimpossibletoexpressusingrelationalalgebra[80].Othersuchapplicationsincludemachinelearning[29,64]andnaturallanguageprocessing[20]algorithms.Aconstrainedprogrammingmodelmayhinderusersfromunlockingthepotentialvalueoftheirdatathroughsophisticatedanalysis[72,71].Second,therehasbeenanenormousincreaseinthescaleofthedata,withscientistsestimatingthatdigitaldataaregrowingatleastasfastasMoore’sLaw[12].Studieshaveshownthatthisunstructureddataisaccumulatingindatacentersatthreetimestherateoftraditionaltransaction-baseddata[65].Forinstance,Facebookreportedin2009thatitcollecteddataatarateof15TBperday[59],anumberthatwithinlessthantwoyearshasincreasedto130TBperday[49],whileYouTubeintegrates24hoursofnewvideoaminute[4].TraditionalDBMSsoftencannotmanagedataatthisscale,orinsomecasescannotdosoin 3Dynamicdata,continuous/iterativeanalyticsStatefulBigdataanalyticsUnstructureddata,DISCrich,single-passanalyticsDISCBasicanalyticsDatasummariesParallelDBMSReportingDatawarehousesDepthofanalysisRelationaldata,transcationsDBMSGBTBPBEB+Figure1.1:Anillustrationoftheevolutionfromtraditionaldataprocessingto“big-data”analytics,includingtheevolutionintherelationaldatamanagementtechnology.acost-efficientway.Figure1.1showsthisevolutionondataanalysis,fromsim-plereportsonsmall-scaledatatowhatwecalltoday“bigdataanalytics”:richanalyticsonverylargeunstructureddata.1.2Data-intensivecomputingTheemergenceofbigdataanalyticshasgivenrisetoanewclassofdata-intensivescalablecomputing(DISC)[21]systemsthataddresstheabovechal-lenges.SystemslikeMapReduce[34],Hadoop[8]andDryad[46]arerepresentativeofthisclass.Thesesystemshaveturnedachallenge,thescaleofthedata,intoanadvantagebyenablinguserstoobtaindeepinsightsfromlargedata.TheyareusedforawidevarietyofdataanalyticsbyorganizationsrangingfromeducationalinstitutionsandsmallenterprisestolargeInternetfirmsthatmanagepetabytesofdata[3,34,46].Thesesystemsassumenostructureinthedataandsupportsophisticatedanalytics.UnlikethedeclarativemodeloftheSQLlanguage,inthesemodelsusersexpresstheapplicationlogicusinggeneral-purposelanguages,likeJava,thatallowarbitrarydatastructuresandcomputations.Thismodelismorepowerfuland 4group(rin)→keyprocess(key,{rin})→routFigure1.2:Groupwiseprocessingrequiresuserstospecify:(i)howtogroupinputrecords,and(ii)theoperationtoperformoneachgroup.Togroupinputrecords,usersspecifyafunctionthatextractsagroupingkeyforeveryrecord.Tospecifytheoperation,usersimplementagenericoperatorthatreceivesasinputthegroupingkeyandalltheinputrecordsthatsharethesamekey.intuitiveforavarietyofapplications.Ithasbeenusedtoimplementanalyticslikemachinelearning[5,64],naturallanguagetranslation[20,58]andgraphmining[31,51].Atthecoreofthesemodelsisgroupwiseprocessing,aprogrammingabstrac-tionthatmakesiteasyforuserstoexpressthedata-parallelisminherentinmanyanalytics.Manyanalysescanbeexpressedasthesamecomputationonmultipleindependentgroupsofdata.Groupwiseprocessingrequiresuserstospecifyonly(i)howtopartitionthedataintogroups,and(ii)thecomputationtoperformoneachgroup.UsersspecifytheseoperationsthroughtwofunctionsillustratedinFigure1.2.Forinstance,searchenginesextractURLlinksfromwebpages,groupthemaccordingtotheirURLdomain,andcounttheURLsineachgrouptomeasurethepopularityofdifferentURLdomains.Figure1.3(a)illustratesthisgroupwisecountoperation.ThisabstractionallowstheseDISCsystemstoanalyzemassiveamountsofdatabydistributingthecomputationacrosslargecomputeclusters.Forinstance,Figure1.3(b)showshowgroupwiseprocessingdividesthecountoperationtoin-dependentoperationsthatcanexecuteinparallelonseparateCPUs.Asopposedtousingreliable,high-performancehardware,thesesystemsscalebyusingpoolsofcommoditymachinesinafault-tolerantmanner.Theyemployasimplefaulttolerancemodelthatcanrestartindividualcomputationswhenmachinesfail.ThisallowsDISCsystemstoscaletovirtuallyanyamountofdatasimplybyemployingmoremachines. 5cnn.comnbc.com/index.htmcnn.com,2Groupwisecountfox.comfox.com,1perURLdomaincnn.com/index.htmnbc.com,2nbc.com(a)Logicalrepresentationofagroupwisecountcnn.comCountcnn.com,2cnn.com/index.htmCPU2fox.comCountfox.com,1CPU1nbc.comCountnbc.com,2nbc.com/index.htmCPU0(b)ParallelevaluationofthegroupwisecountonmultipleCPUsFigure1.3:GroupwiseprocessingisacoreabstractioninDISCsystems.Figure(a)showsthesemanticsofagroupwisecountoperation.HereinputURLsaregroupedaccordingtothedomaintheybelongtoandtheoperationoneachgroupisacount.Figure(b)illustratesthephysicalexecution.Groupingallowsindependentoperationstoexecuteinparallel.1.3Statefuldata-intensivecomputingWhileDISCsystemscanscaletolargedata,itisbecomingapparentthatscalabilityaloneisnotsufficientforcertainapplications.Manyapplicationsmustnowmanagelargedatabyfollowingafundamentallydifferentprogrammingap-proach.AstherightmostpartofFigure1.1shows,severalapplicationsnolongerviewdataanalyticsasa“one-shot”process,ratherasaprocessthatmustcon-stantlyupdatetheanalysis.Forinstance,searchenginesmustcrawlnewwebpagesandupdatetheirindicestodeliverup-to-dateresults,whileiterativegraphmin-inganalytics,likePageRank[62],mustrepeatedlyrefinetheresultoftheanalysisacrossiterations.However,currentDISCsystemsarenotdesignedforupdate-drivenanalyt-ics,oftenresultingininefficientdataprocessing.Forinstance,theseDISCsystemscanre-generateasearchindexfromtheentirewebcorpussimplybyaddingcom- 6SextractcountlinkslinksCrawledpagesSCrawlmergescorequeueFigure1.4:Adataflowforcomputingawebcrawlqueue.StatefulstagesarelabeledwithanS.putepowerwhenmorepagesarecrawled.Processingpetabytesofdataonlytoupdateasmallpartoftheindexeachtimeisinefficient[66].Thisapproachdis-cardspriorcomputation,wastingCPUcyclesandnetworkresources,andincreasesenergyconsumptionandmonetarycost.Instead,toruntheseanalyticsefficiently,usershavetoprogramtheseap-plicationsinadifferentmanner.Criticaltotheefficiencyoftheseanalyticsistheabilitytomaintainstate,deriveddatathatpersistandarere-usedacrossex-ecutionsoftheanalysis.Statefulcomputationsariseinatleastthreekindsofanalytics:incremental,continuous,anditerative.Forinstance,analyticsthatareamenabletoincrementalcomputationre-usepreviouscomputations(e.g.thepre-viouslycomputedsearchindex)toupdatetheanalysisinsteadofrecomputingfromscratchwhennewdataarrive.Iterativeanalyticsrefinetheresultoftheanalysisbyrepeatedlycombiningthesameinputdatawiththeresultsofprioriterations.ExamplesofsuchiterativeapplicationsincludePageRank[62],dataclustering[37],socialnetworkanalytics[77],andmachinelearningalgorithms[5].1.3.1Anexample:webcrawlqueueToillustratetheimportanceofstate,weuseatoywebdata-miningapplica-tion.Ingeneral,suchanalysesconsistofmultipleprocessingstagesthatcomprisealargerdataflow.Inparticular,weuseacrawlqueue,acommonandimportantdataflowinsearchenginesthatdeterminesthecrawlfrontier,thepagestocrawl.Atahigh-level,thedataflowparsescrawledwebpagestoextractURLlinksandcountsthefrequencyoftheseextractedURLs.Subsequently,itcomparestheex- 7newdata{nbc.com/main.htmcnn.com,2Statefulupdatedcnn.com,2fox.com,1{countstatestatefox.com,1nbc.com,2{nbc.com,3nbc.com,2Figure1.5:AnincrementalapproachtoupdatetheURLcountsinthecrawlqueuedataflow.Thisapproachre-usespreviouslycomputedURLcounts.tractedURLsagainstthealreadycrawledpagesandoutputsuncrawledpageswiththehighestscore(URLfrequency).Figure1.3.1showsthedifferentstagesofthisdataflow.Sincethewebchangesovertime(e.g.usersuploadnewpages),asearchenginemustcontinuouslyrunthisdataflow.Thismeans,forinstance,updatingallthecountscomputedbythecountlinksstage.Oneapproachistorepeatedlyusethesamedatafloweverytimenewpagesarecrawled,re-processingallwebpagesfromscratch.However,thisapproachisinefficientsincealargefractionoftheanalysismaynotbeaffectedbynewlycrawledpages.Inthecountlinksstage,onlyafewcountsmaybeupdated.Instead,ausermayprogramsuchanapplicationtore-usepriorresultsbymaintainingpreviouslycomputedURLcountsasstate.Figure1.3.1showsthisapproach.Thisway,whennewwebpagesbecomeavailableforprocessing,wecansimplyincrementthecountsoftheaffectedURLsinsteadofre-computingallcountsfromscratch.Suchastatefulapproachproducesthesameresult,reducestheprocessingtimeandusesfewercomputeresources.1.3.2ChallengesinmanagingstateStatefulanalyticspresentnewdatamanagementchallenges.CurrentDISCsystemswheredesignedfor“one-shot”processingastheydidnotconsiderupdate-drivencomputations.Attemptingtoretrofitstatefulprogramminginthesesystemshasthefollowingimplications.•Statemanagement:Toturnastatelessapplicationintoastatefulone,usersoftenmodifytheiranalysistomanuallymanagestatebystoringittoexternalstor- 8agesystems,likedistributedfilesystemsordatabases.Thismakesprogrammingmoreerror-proneandlessportable,sincethedetailsofstatemanagementarenotabstractedfromtheuser.Theuserhastotakeintoconsiderationthespecificsofthestoragesystem,whichmayvaryamongcomputingenvironments.Forinstance,portinganapplicationtoadifferentenvironmentdependsontheavailabilityofthesamestoragesystems.•Faulttolerance:WhileDISCsystemsaredesignedtohandlefailurestrans-parently,storingstatetoastoragesystemexternaltotheprocessingsystemmayrequireextraeffortfromtheusertohandlestoragefailures.Thisaddsprogram-mingcomplexityanddistractsusersfromspecifyingthedataprocessinglogic.•Datafloworchestration:Todevelopcontinuousdataflows,applicationsmustmanuallyorchestratethesepotentiallycomplexmulti-stepcomputations.Forin-stance,asnewdataarrive,applicationsmustdecidewhentoexecuteeachstage,howmuchdatatoprocess,andsynchronizeexecutionacrossstages.Implementingad-hocsynchronizationlogiconlargedataflowscanbeanerror-pronetask.In-stead,thistaskshouldbesimplifiedandautomatedthroughprogrammaticcontrol.•Efficiency:Retrofittingstatefulprogrammingtechniquesinsystemsthatarenotdesignedforthispurposeresultsininefficientprocessing.AspointedoutintheexampleinSection1.3.1,oftenonlyasmallfractionofthestateneedstobeupdatedwhennewdataarrive.However,existingDISCsystemstreatstatejustlikeanyotherdatainput.Astheyrepeatedlyexecutetheanalysis,theymustre-readandre-processtheentirestate.Thiswastesresourcesandresultsinprocessingtimethatisproportionaltothesizeoftheconstantlygrowingstate,nottheamountofchangestothestate.Instead,toallowefficientprocessingasystemshoulddirectlysupportstatefulanalytics.1.4ArchitecturesforstatefulanalyticsThisdissertationaddressestheabovechallengesinlarge-scalestatefulana-lytics.Thisworkaimstobuildsystemsforstatefulanalyticsindifferentscenarios 9group(rin)→keyprocess(key,stateold,{rin})→(statenew,rout)Figure1.6:Statefulgroupwiseprocessingextendsthegroupwiseprocessingcon-structbyintegratingstateinthemodel.Auser-specifiedoperationnowhasaccesstostatethatcanbeusedtosaveandre-usecomputations.thatprocessunstructureddata.Thegoalofthisdissertationistodesign(i)pro-grammingmodelsthatallowuserstoeasilywritesophisticatedstatefulanalyticsand(ii)scalable,efficientandfault-tolerantruntimesystems.Thisdissertationproposesthatstatebecomeafirst-classabstraction,andintroducesstatefulgroupwiseprocessing,aprogrammingabstractionthatinte-gratesdata-parallelismandstate.Figure1.4showstheintegrationofstateinthegroupwiseprocessingmodel.Whilegroupwiseprocessinghidesthedetailsofparallelexecution,statefulgroupwiseprocessingabstractsthedetailsofstateman-agement.Taskslikereliablystoringandaccessingstatearelefttotheunderlyingruntimesystem,makingstatefulapplicationseasiertoprogramandmoreportable.Atthesametime,byexplicitlymodelingstate,thisabstractiongivesthesystemtheopportunitytooptimizestatemanagement,resultinginreducedprocessingtimesandefficientresourceusage.Inthisdissertation,weexploretheapplicationofstatefulgroupwisepro-cessingintwodistinctphasesofthedatamanagementlifecycle:(i)theextractionofdatafromthesourcesandonlineanalysis,and(ii)thestorageandfollow-onanalysis.Theanalyticsinbothphasespresentthegeneralchallengesoutlinedabove:theymustperformsophisticated,update-drivenanalysisinascalable,ef-ficient,andfault-tolerantmanner.However,theanalyticsinthesescenariosservedifferentpurposesandadmitdifferentsolutions.1.4.1ContinuousETLanalyticsThefirstphaseindataanalyticsistheextractionofdatafromtheirsourcesandpreparationforstorageandfollow-onanalysis,whatisoftenreferredtoasanExtract-Transform-Load(ETL)process[69,76].TheETLprocesscollectsrawdatafromsources,likeclicklogsfromwebserversandnewsfeedsfromsocial 10networkservers.Suchrawdatamustusuallyundergoaninitialanalysisthatincludesfiltering,transformingdataintoanappropriateformat,andsummarizingdatabeforetheyarestored.TheETLprocessisoftenusedtoobtainquickinsightsbyanalyzingdatainanonlinefashion[44,32]:onlineanalyticsoftenrequiretheanalysistoreturnresultsbeforealldatahavebeencollected,toprovidetimelyresults.Onlineanaly-sisisimportantforapplicationslikefailuredetectionthatmustberesponsiveevenwhennotalldataareavailable.Afundamentalchallengeinonlineanalyticsistheabilitytoassesstheaccuracyoftheanalysiswhendataareincomplete.Atthesametime,unlike“one-shot”batchprocessing,theETLprocessmustcontinuouslyupdatetheresultoftheanalysisasdataaregenerated.AchallengehereishowtoprogramETLanalyticsondatastreams.Toaddressthesechallenges,thisdissertationproposesIn-situMapReduce(iMR),amodelandarchitectureforonlineETLanalytics.iMR(i)providesaprogrammingmodelthatcombinesstatefulgroupwiseprocessingwithwindowedprocessingforunstructureddataand(ii)introducesanovelmetricthathelpsusersassesstheaccuracyoftheiranalysisinonlineanalyticsandallowsapplicationstotradeaccuracyfortimeliness,and(iii)proposesanarchitectureformovingETLanalyticsfromdedicatedcomputeclusterstothedatasourcesthemselvestoavoidcostlydatamigration.1.4.2StatefulbulkdataprocessingOncedataarecollectedbytheETLprocess,usersrunanalyticstoobtaindeeperinsights.Inthisphase,theinterestshiftsfromtheabilitytoobtainquickinsightstowardtheabilitytodoricheranalysisonbulkdata.UnlikeETLana-lyticsthatfilterorsummarizedata,applicationsnowrunsophisticateddataflowsthatmustcontinuouslyintegratelargebatchesofdataintheanalysis.Iterativeanalytics,likePageRank[62],mustmakemultiplepassesoverlargedatasets.Afundamentalchallengethatprogrammersfacenowishowtoprogramthesesophis-ticatedanalyticsandhowtoexecutethemefficientlyonbulkdata.Toaddressthesechallenges,thisdissertationintroducesContinuousBulk 11Processing(CBP),aprogrammingmodelandarchitectureforsophisticatedstatefulanalytics.CBP(i)introducesamodelforstatefulgroupwiseprocessingonbulkdata,(ii)providesprimitivesthatallowuserstoorchestratesophisticatedstatefuldataflows,and(iii)introducesanarchitecturethatleveragestheprogrammingmodeltoexecutethesedataflowsinascalable,efficient,andfault-tolerantmanner.1.5ContributionsInsummary,thisdissertationpresentstwocomplementaryarchitecturesthatincorporatestatetoaddressthedistinctchallengesinthetwophasesofdatamanagement.Thisdissertationmakesthefollowingcontributions:•Stateasafirst-classabstraction:Thisdissertationintroducesstatefulgroup-wiseprocessing,anabstractionthatintegratesdata-parallelismandstate,allowinguserstowritesophisticated,easilyparallelizablestatefulanalytics.Byabstract-ingthenotionofstate,usersnolongerhavetomanagestatemanually,andtheunderlyingsystemcanoptimizestatemanagement.•ModelforonlineETLanalytics:ThisdissertationpresentsIn-situMapRe-duce(iMR),aprogrammingmodelthatsupportsstatefulgroupwiseprocessingtailoredforcontinuousETLanalyticsonunstructureddata.TheiMRmodelex-tendstheMapReduce[34]programmingmodelwithaslidingwindowconstruct,aconceptthatarisesnaturallyinanalyticsondatastreams.Bycombiningstateandwindows,themodelallowsefficientprocessingthroughincrementalupdatestotheanalysis.•Architectureforin-situprocessing:TheiMRarchitectureprocessesdatain-place,obviatingthecostlyandoftenunnecessarymigrationofdatafromtheirsourcestocentralizedcomputeclusters.Thisapproachreducesresourceusageandallowsfasteranalysisbyprocessingdataastheyaregenerated.Weevaluateourin-situarchitectureandshowhowitcanprovideefficientanalysisinrealapplicationscenarios. 12•Metricforsystematiccharacterizationofincompletedata:Tosupportonlineanalysis,thisdissertationintroducesanovelfidelitymetricthat(i)allowsuserstoassesstheimpactofincompletedataonthefidelityoftheanalysisand(ii)providesaflexiblewaytoexpressfidelityrequirementsandtradefidelityforresultavailability.Further,weprovidegeneralguidelinesonusingthemetric.Throughourevaluationwevalidateitsusefulnessinrealapplications.•Modelforstatefulbulkdataanalytics:ThecorecomponentofContinuousBulkProcessing(CBP)isagenericstatefulgroupwiseoperatorthatallowsuserstowriteincrementalanditerativeanalytics.Byabstractingstatemanagement,CBPsimplifiesprogramming.CBPextendstheconceptofgroupingwithnewconstructstoefficientlysupportawiderangeofiterativeanalytics,suchasgraphmining.Further,theCBPmodelprovidesprimitivesthatallowthecompositionandorchestrationofsophisticatedstatefuldataflows.•Efficientstatefulgroupwiseprocessing:Thefundamentalmismatchbe-tweencurrentDISCmodelsandstatefulcomputationsresultsininefficientpro-cessing.TheCBPruntimeleveragestheexplicitmodelingofstate,tooptimizestatemanagement.ThroughaseriesofoptimizationsCBPimprovesprocessingtimesandreducescomputationandnetworkusage.Bycomparingagainststate-of-the-artDISCsystems,ourevaluationvalidatesthebenefitsinperformanceandefficiencythattheexplicitmodelingofstateoffers.Inmanycases,CBPreducestherunningtimeandthenetworkusagebyatleast50%.1.6OutlineTherestofthedissertationisorganizedasfollows.Chapter2presentsbasicconceptsandsystemsthatthisworkbuildsupon,likegroupwiseprocessingandDISCarchitectures,andreviewsrelatedworkonstatefulanalytics.Chap-ter3introducestheiMRprogrammingmodelforcontinuousETLanalytics,whileChapter4presentstheimplementationandevaluationoftheiMRprototype.InChapter5,weintroducetheCBPmodelforstatefulanalyticsonbulkdata.Chap- 13ter6describesthedesignandimplementationofCBP,aswellasanevaluationwithreal-worldapplications.Finally,Chapter7summarizesthedissertation. Chapter2BackgroundandrelatedworkToprovidescalableanalytics,thisworkbuildsupontheconceptofdata-parallelismandthearchitecturalprinciplesofDISCsystems,likeMapReduceandHadoop.Thischapterreviewsthebasicprogrammingabstractionsandarchitec-turalprinciplesthatallowscalableanalyticsonunstructureddata.Wedescribethegroupwiseprocessingabstractionforexpressingdata-parallelanalyticsinthecontextoftheMapReduceprogrammingmodel.Further,thischaptergivesabriefoverviewofthedesignofDISCsystems,likeMapReduceandHadoop.Lastly,thischapterdiscussesrelatedworkinstatefullarge-scaleanalytics.2.1GroupwiseprocessingGroupingisaconceptthatappearsnaturallyinmanyreal-worlddataanal-ysisscenarios[26]andallowsparallelexecutionoftheanalytics.InSection1.2weillustratedthiswithasimplewebanalyticsexample.Becauseofitsimpor-tanceindataanalysisandtheopportunityforscalableexecution,groupingisacoreconstructinvariousprogrammingmodels.TheGROUPBYoperationintheSQLlanguageisanexample.GroupingalsounderliestheprogrammingmodelsinDISC,likeMapReduce[34]andHadoop[8],butalsoinhigher-level,SQL-likelanguageslayeredontopoftheseprocessingsystems[82,81,61,9].Whilegroupwiseprocessingappearsinvariousflavorsintheseprogrammingmodels,therearetwobasicpartsofeverygroupwiseoperation:(i)specifyinghow14 15topartitioninputrecordsintogroupsand(ii)specifyingthefunctiontoapplyontherecordsofeverygroup.TheseoperationswereshowninFigure1.2.Theoutputofagroupwiseoperationisalistofgroup-valuepairs,witheverypaircorrespondingtoadistinctgroupandtheresultofapplyingtheuser-specifiedfunctiononthegroup.ThiswasshownintheexampleofFigure1.3(a).Forinstance,intheSQLlanguageauserpartitionsaninputtablebyspec-ifyingoneofthetablecolumnsasthepartitioningattribute.Partofthegroupoperationspecificationisalsoanaggregatefunction,likeSUM,toapplyoneachgroup.TheresultofaGROUPBYqueryinSQLisatablewitharowforeachgroupthatcontainstheresultoftheaggregatefunction.ParallelDBMSsmayleveragethisconstructtodistributetheevaluationofGROUPBYaggregatequeriesacrossmultiplemachines.NoticethattheGROUPBYconstructinSQLassumessomestructureinthedata.Forinstance,thegroupingoperationassumestheexistenceofacolumninthetablethatisusedasthepartitioningattribute.Next,weshowhowtheMapRe-duceprogrammingmodelextendsthebasicgroupwiseprocessingabstractionwithconstructsthatallowanalysisonunstructureddata.2.2TheMapReduceprogrammingmodelMapReduce[34]isaprogrammingmodeldesignedbyGoogletominelargewebdatasets,likecrawledpagesandhasbeenusedfortaskslikegeneratingGoogle’ssearchindexandrankwebpages.ThemotivationfortheMapReducemodelwastheneedforasimpleabstractionthatcanleveragethepowerofhun-dredsofthousandsofmachinesavailableinlargedatacenters,withoutexposingthecomplexitiesofparallelprocessinginsuchanenvironment.Atitscore,aMapRe-duceprogramperformsgroupwiseprocessing,providingthetoolsforpartitioningdataandperformingarbitrarycomputationsonaperpartitionbasis.AMapReduceprogramconsistsoftwouser-specifiedfunctions,theMapandtheReducefunction.InputdatainaMapReduceprogramconsistofasetofinputrecords,modeledaskey-valuepairs.TheMapfunctiontakesaninput 16map(k,v)→{(k’,v’)}reduce(k’,{v’})→{v’}combine(k’,{v’})→{v’}Figure2.1:AMapReducecomputationisexpressedusingtwomainfunctions.TheMapisusedtoextractinformationfromrawdataanddeterminetheparti-tioningofdataintogroups.TheReducefunctionisusedtoaggregatedatainthesamegroup.TheCombinefunctionisusedasanoptimizationtoreducethesizeoftheMapoutput.Reduce(Keyk,Valuevals[])Map(Keyk,Valuetext)1count:=0;1foreachwordwintext2foreachvinvals[]2emit(w,“1”);3count++;4emit(k,count);Figure2.2:AMapReduceexampleprogramthatcountswordoccurrencesintext.TheMapfunctionreceivesasinputthedocumenttextandextractswordsfromit.Foreveryword,itemitsanintermediatekey-valuepair,withthekeysetequaltowordandthevaluesetto“1”,indicatingasingleoccurrence.Foreveryintermediatekey,theReducefunctioncountsthenumberofoccurrencesofthecorrespondingword.key-valuepairandemitszeroormoreintermediatekey-valuepairs.MapReduceappliestheMapfunctiononeveryinputkey-valuepair,andsubsequentlygroupsintermediatekey-valuepairsaccordingtotheirkey.TheReducefunctionacceptsasinputanintermediatekeyandalltheintermediatevaluesthatsharethesamekey.TheReducefunctionmayaggregateortransforminanarbitrarywaythegroupofvaluesandemitoneormoreoutputvalues.MapReduceappliestheReducefunctiononeverydistinctintermediatekey.Figure2.2showsthedefinitionoftheMapandReducefunctions.InFigure2.2,weshowhowausercanwriteaMapReduceprogramtocountthefrequencyofeachwordinalargebodyofdocuments.Hereaninputkey-valuepairrepresentsadocument,wherethekeyisthedocumentnameandthevalueisthedocumentcontents.TheMapfunctionextractsthewordsfromthedocuments 17andoutputsasetofintermediatekey-valuepairs,oneforeachword,wherethekeyistheworditselfandthenumberofoccurrencesofeachword,herethenumber’1’,indicatingasingleoccurrence.Theseintermediatekey-valuepairsaregroupedaccordingtokey.TheReducefunctionsumsthenumberofoccurrencesandemitsthetotalsumforeveryword.WhiletheMapandReducefunctioncomprisethecoreofthemodel,usersmayoptionallyspecifyaCombinefunction.TheMapReducesystemmayusetheCombinefunctionasanoptimizationtodecreasetheamountdatashippedacrossthenetwork.AsSection2.3shows,MapReducesystemssendtheoutputoftheMapfunctionsacrossthenetworkfromtheprocessesthatexecutetheMapfunction(Maptasks)totheprocessesthatexecutetheReducefunction(Reducetasks).MapReduceusestheCombinefunctiontocalculatepartialresultsfromtheMapoutputbeforetheyarepassedtotheReducefunction.Often,thesamekeyappearsmultipletimesintheoutputofaMaptasks.Reducefunctionsthatarecommutativeandassociativeallowthefinalresulttobecomputedbycombiningpartialresults.TheCombinefunctionmaydecreasethesizeoftheMapoutputbeforetheReducefunctionisapplied.TheMapfunctionisgenerallyusedtoextractinformationfromrawdata(e.g.wordsfromtext),whiletheReducefunctionistypicallyusedtoaggregategroupsofvaluesfromtheintermediatedata(e.g.countorsumvalues).AlthoughinthepreviousexampletheMapandReducefunctionsperformsimpleoperations,ausermayimplementarbitraryapplicationlogic.Thisflexiblemodelgivesuserstheabilitytoimplementsophisticatedanalyses,leveragingthecapabilitiesofgeneralpurposelanguages,likeC++andJava.ThismakestheMapReducemodelmoreexpressivethanSQL,allowingawiderrangeofdataanalytics.Forinstance,ithasbeenusedtodosatelliteimagestitching,rendermapimages,andcreateinvertedindicesfromwebdocuments[34,35].ProgrammersalsousetheMapfunctiontodeterminehowtopartitioninputdataintogroups.TheMapfunctionessentiallyimplementsthegroupoperationshowninFigure1.2byspecifyingtheintermediatekey.Unlikegroupinginrela-tionaldatabases,wherethegroupingattributemustbeoneofthetablecolumns, 18theMapfunctioncanbeusedtogroupinputdatainanarbitraryway.Forin-stance,aMapfunctionthatextractsURLsfromwebpagesmayspecifytheURLitselfasthegroupingkey(e.g.tocountperURL),oritmayextracttheURLdomainasthegroupingkey(e.g.tocountperdomain),asshowninFigure1.3.ThisflexibilityisakeyaspectoftheMapReducemodel.Itallowsuserstowriteanalyticswithoutassuminganystructureonthedata.Unlikedatabases,wheredatahaveawelldefinedschema,determinedinadvance,MapReduceallowsuserstointerpretinputdatainad-hocmanner.2.3MapReducearchitectureWhiletheMapReduceprogrammingmodelallowsuserstoexpressparal-lelismintheanalysis,theroleoftheMapReduceruntimeistohidethedetailsofparallelprocessingfromtheuser.Theruntimesystemhandlestaskslikeprogramexecution,machinecommunicationandfaulttolerancetransparently,allowingpro-grammersfocusontheapplicationlogic.Inthissection,wereviewbasicdesignprinciplesoftheMapReducearchitecturethatallowMapReducetoexecutepro-gramsinascalableandfaulttolerantmanner.2.3.1ProgramexecutionThemaintasksofaMapReduceruntimeareto(i)executetheMapfunctionontheinputdata,(ii)grouptheintermediatedataaccordingtokey,and(iii)calltheReducefunctionforeverygroup.TheexecutionoftheMapfunctionontheinputdataisadata-paralleloperation.Inputdataaretypicallypartitionedintoinputsplitsthataredistributedacrossmachines.Aninputsplitmaybe,forinstance,adocument.ForeverysuchinputsplitaMaptaskisresponsibleforreadingthesplit,convertingitintoinputkey-valuepairs,andcallingtheMapfunctionforeveryinputpair.Thisway,theMaptaskgeneratesasetofintermediatekey-valuepairsthatarestoredonthelocaldisk.Everymachineoutputsasubsetofalltheintermediatekey-valuepairsthatmustcollectivelybegroupedandreduced. 19InputsplitsMaptasksIntermediateReducetasksOutputfilesdataFigure2.3:TheexecutionofaMapReduceprogram.DataflowbetweenMapandReducetasks.Maptasksprocessinputsplitsinparallelandoutputintermediatekey-valuepairs.Intermediatekey-valuepairsfromeachMapoutputarepartitionedacrossmultipleReducetasksaccordingtotheirkey.Reducetasksarealsoexecutedinparallel.Inordertoparallelizethegroupingandreductionaswell,thesystemexe-cutesinparallelanumberofReducetaskswitheachReducetaskbeingresponsibleforgroupingandreducingapartitionofalltheintermediatekey-valuepairs.Ev-eryMaptaskpartitionsitsintermediatekey-valuepairsintoRsetsaccordingtoapartitioningfunction,usuallyahashonthekey.ThesystemstartsRReducetasksandassignseachReducetasktoapartition.EveryReducetaskfetchesitscorrespondingpartitionsfromalltheMaptasks,andgroupskey-valuepairsinthemaccordingtokey.Thisoperationiscalledtheshufflephase.Aftertheshufflephasehascompleted,theReducetaskcanexecutetheReducefunctiononeverygroupofvalues.EveryReducetaskwritestheoutputinitsownoutputpartitionfile.Figure2.3.1displaystheexecutionofaMapReduceprogram.BecauseboththeMapandReduceoperationsaredata-parallel,thesystemcaneasilyscaletolargedatabyusingmoremachines.Astheinputdataincreases,thesystemdistributestheinputsplitsacrossmoremachines.TheruntimecanthendividetheexecutionoftheMapfunctionacrossmoreMaptasks.Similarly,toscaletheReducephase,theruntimecanincreasethenumberofpartitionsandthecorrespondingReducetasks. 202.3.2FaulttoleranceTheMapReducesystemscalesbydistributingtheanalysistopotentiallythousandsofcommoditymachines.Becausefailuresarecommoninsuchanenvi-ronment,itiscriticalfortheruntimetotoleratefailures.Toprovidefaulttolerantcomputations,theMapReducesystemmusten-surethat(i)inputdataarenotlostduetofailures,and(ii)aMapReduceprogramcancompletedespitefailures.Toensureinputdataavailability,MapReducearchi-tecturesleveragedistributedfilesystemsliketheGoogleFileSystem(GFS)[39]andtheHadoopDistributedFileSystem(HDFS)[8]thatreplicatedataacrossmultiplemachines.Toensurethataprogramcompletesintheeventoffailures,MapReduceemploysarestartmechanism.AtaskisthegranularityatwhichMapReducerestartsprocessing.MapReduceleveragesthepartitioningoftheanalysistoin-dependentoperations,executedbyMapandReducetasks,andhandlesfailuresbyre-executingindividualtasks.TheMapReduceruntimemonitorsthestatusofbothmachinesandrunningtasksacrossthecomputecluster,andcanre-startallthetasksthatdidnotcompletesuccessfullyonanewmachine.TheMapReducefaulttolerancemechanismensuresthataprogramwillal-waysmakeprogresstowardcompletionevenunderhighfailurerates.MaptaskstypicallysavetheiroutputtolocaldisksandintheeventofaReducetaskfailure,therestartedReducetaskcansimplyre-fetchtheMapoutput.Thissimplemecha-nismpreventslongrunningprogramsfromabortingexecutionandre-startingfromscratchbecauseofsinglemachinefailures.Toachievefaulttoleranceatascaleofthousandsofcommoditymachines,abasicassumptionoftheMapReducefaulttolerancemodelisthattheexecutionofMapandReducetasksisfreeofside-effects.Forinstance,tasksarenotallowedtocommunicatewitheachotherthroughfiles,orwithexternalservices.Thissimplifiesfaulttoleranceandensuresthatre-startingataskdoesnotrequirecoor-dinationwithothertasksorexternalservices,aprocessthatcanbecomplicatedanderror-prone.However,theMapReducemodelallowsuserstoimplementarbitrarylogic 21insideaMapReduceprogram.Forinstance,aspointedoutinSection1.3.2,usersmayabusethisflexibilitytoimplementstatefulcomputationsbystoringstatetoexternalstoragesystems.Thisintroducesside-effects,andmayleaveaprograminaninconsistentstateintheeventofataskfailureandrestart.Ensuringcor-rectnessunderfailuresisnowtheresponsibilityoftheuser,addingprogrammingcomplexity.InChapter6,weshowhowtheexplicitmodelingofstateeliminatesthisproblem.2.4StatefulanalyticsonunstructureddataWhiletheMapReducesystemprovidesascalableplatformforlarge-scaleanalysis,itisnotdesignedforstatefulcomputations.Recentworkhasinvesti-gatedtheuseofstateforefficientincrementaloriterativeanalysisonlarge-scaleunstructureddata.Theseapproachesvaryinthetargetapplicationscenariosandthewaytheyincorporatestateintheprogrammingmodel.Severalapproachesaretargetedtowarditerativeanalytics,whileothersfocusonefficientprocessingthroughincrementalanalytics.Additionally,certainapproachesincorporatestateinmannertransparenttotheuser,whileothersallowuser-specifiedstatefulpro-grams.Thissectiongivesanoverviewoftheseapproachesandcomparesthemwiththeproposedsolutionofthisthesis.2.4.1AutomaticincrementalizationIncrementalanalysisthroughcomputationre-useiscriticalfortheefficientprocessingofdatathatchangeovertime.Varioussystemsaimtoincrementallyupdatetheanalysisinanautomaticmanner[43,42,67,22].Unliketheapproachproposedinthisthesis,thesesystemsdonotrequireuserstowritecustomin-crementalcomputationsusingstate.Instead,usersspecifyanalyticsasone-shotdataflows.Theruntimesystemsareresponsibleformaintaininganynecessarystateandforautomaticallytransformingthedataflowtoanincrementalone.Theseapproachesarebasedontheconceptofmemoization,thecachingandre-useofintermediateresults.Ingeneral,thesesystemsrepresentanalyt- 22icsasdataflowsconsistingofmultiplesub-computationsandcachetheresultsofindividualsub-computations.Whenonlypartoftheinputdatachangesacrossdataflowinvocations,theinputofsomesub-computationsinthedataflowmayremainthesameasinpreviousinvocations.Memoizationavoidsre-runningthesesub-computationsbyre-usingthecachedresults.Thesetechniquesmaybeapplied,forinstance,toMapReduceprograms.AMapReduceprogramcanberepresentedasadataflowconsistingofMapandReducetasksasinFigure2.3.1.TheoutputofsuchaprogramcanbeincrementallyupdatedbymemoizingtheoutputofMapandReducetasks.Forexample,ifonlyoneoftheinputsplitschanges,weneedtore-executeonlythecorrespondingMaptask.Reducetaskscanre-usethememoizedoutputoftherestoftheMaptasks.Similarly,becauseonlyafractionoftheintermediatedatamaychange,wemayneedtore-runonlyafractionoftheReducetasks.Whiletheseapproachesdonotrequireuserstodeviseincrementalanalytics,makingprogrammingeasier,theyrestricttherangeofworkloadsthatmaybenefitfromincrementalprocessing.Forinstance,evenifasingleinputsplitchangesinaMapReduceprogram,itispossiblethatallReducetaskshavetobere-executeddespitethecachingofintermediateresults.AsweshowinChapter5,explicitincrementalprogrammingusingstate,asdescribedintheexampleofSection1.3,canimproveefficiencyinsuchcasesbyincrementallyevaluatingtheReducefunctionsusingpreviousresults.2.4.2IterativeanalyticsIterativeanalysis,likeseveralmachinelearningalgorithms,areinherentlystateful.Theyrepeatedlyrefinetheanalysisbycombiningastaticinputsetwiththeanalysisresultofpreviousiterations.Theseanalyticsmustre-useboththestaticinputsetandtheanalysisresultacrossiterations.TheSpark[83]systemsupportsiterativeanalysisbyallowinguserstospec-ifydistributedread-onlystate,usedtostorethestaticinputset.Themodelingofthisstaticstateallowsthesystemtooptimizeiterativeanalyticsbyavoidingthere-loadingoftheinputdatafromafilesystemineveryiteration.Instead,Sparkmaintainsthisdistributeddatasetinmemory,allowingprocessingtasksto 23accessitfast.Twister[36]isanextensiontotheMapReducesystemthatalsosupportsiterativeanalytics.LikeSpark,itgivesprogramsaccesstostaticstate.InTwister,MapandReducetaskspersistacrossiterations,amortizingthecostofstateloading.Whiletheseapproachesimprovetheefficiencyiniterativeanalytics,theyrestricttheabilitytore-usecomputationonlytothestaticinputdataset.Instead,ourapproachprovidesamoregeneralabstractionofstatethatallowstheupdateofthestate.Iterativeanalyticsmayusethisstateabstractiontomodelnotonlythestaticinputset,butalsotheiterativelyrefinedresultoftheanalysis.2.4.3ModelsforincrementalanalyticsUnliketheapproachesdescribedabove,certainsystemsrequireuserstowritecustomincrementalprograms.Likethesolutionproposedinthisthesis,thesesystemsgiveusersaccesstostate,allowingthemtostoreandre-usecompu-tationresults.Becausetransparentincrementalanalysisisnotpossibleforcertainanalytics,theexplicitaccesstostateallowsawiderrangeofanalyticstoleveragetheopportunityforefficientprocessingthroughincrementalanalysis.ThePercolatorsystem[66]adoptsthisapproach.Percolatorismotivatedbytheneedtoincrementallymaintainanindexofthewebaswebdocumentschangeovertime.InPercolator,programsarestructuredassequencesofobservers,piecesofcodethataretriggeredwhenuser-specifieddataareupdated.Forinstance,achangetoanexistingdocumentmaytriggeranobserverthatupdatesthelinkcounts,asintheexampleofSection1.3.1,ofthedocumentsaffectedbythechange.Anobservercanaccessandmodifythedatarepository.Suchmodificationsmaysubsequentlytriggerdownstreamobservers.Percolatorallowsmultipleobserverstoaccessandmodifytherepositoryinasharedmanner.Becausemultipleobserversmaybetriggeredatthesametime,Percolatorprovidestransactions,toallowuserstoreasonabouttheconsistencyofthedata.InPercolator,dataarestoredintablesthatpersistacrosstime.Forin-stance,thewebindexingapplicationmayusethetablestostorewebdocuments.Analyticscanaccessandmodifythesetableswhenchangesintheinputhappen. 24Tablesproviderandomaccess,allowingtheselectiveupdateofanalysisresultswhensmallchangesoccurintheinputdata.Forexample,newlycrawledwebdocumentsmightaffectonlypartofthestoreddocumentsintheindex.Thisavoidsaccessingtheentirewebdocumentbodywhensmallchangeshappen.AswealsoshowinChapters5and6,theabilitytoselectivelyaccessstateiscriticalforefficientincrementalprocessing.WhilePercolatorallowsmultipleapplicationstosharethesamestate,state-fulgroupwiseprocessingprovidesasimplerstatemodelwherestateisprivatetoasingleapplicationandconcurrentstatemodificationsarenotallowed.Thissim-plifiesprogrammingandobviatestheneedfortransactionsthatcanlimitscalabil-ity[66].Atthesametime,ourmodelleveragesthegroupwiseprocessingabstrac-tiontoeasilyexposedata-parallelisminanalytics. Chapter3StatefulonlineanalyticsThepurposeofanETLprocessistocontinuouslyextractdatafromitssourcesandpreparedataforfollow-onanalysis.Analyticsinthisphaseincludefiltering,transformingrawdataintoanappropriateformatforstorage,andsum-marizingdata.ETLanalyticsareintendedtoallowapplicationstopreparedata.Byanalyzingdatainanonlinefashion,theETLprocessalsoallowsuserstogetquickinsightsfromthedata.ThedominantapproachforETLanalyticstodayistomigratedatafromtheirsourcestocentralizedclustersdedicatedtorunninganalytics.Typically,applicationsstoredataondistributedfilesystems,liketheGoogleFileSystem(GFS)[39],ortheHadoopDistributedFileSystem(HDFS)[8]andexecutetheanalyticswithbatchprocessingsystems,likeMapReduceandHadoop.Thisap-proachisillustratedontheleft-handinFigure3.1.However,thisapproachhascertaindrawbacks.First,thesebatchprocess-ingsystemsarenotdesignedforcontinuousdataprocessing.Theirprogrammingmodelssupportonlyone-shotprocessingonwelldefinedinput,makingprogram-mingcontinuousapplicationsharderandlessefficient.Forinstance,usersmayhavetodevisecustomandoftencomplicatedapplicationlogictorepeatedlyupdatetheanalysiswithnewdata.Instead,continuousETLanalyticsrequireprogrammingmodelsthatdirectlysupportprocessingonstreamsofunstructureddata.Atthesametime,theseanalyticsareupdate-drivenandpresentopportunitiesforefficientprocessingthroughincrementalcomputationthatone-shotprogrammingmodels25 26failtoleverage.Second,transferringbulkdatatoacentralizedcomputeclusterlimitsthetimelinessoftheanalysisandisinefficient.Duetothesizeofthedataitisoftenimpossibletomigratealldatainatimelyfashion.Evenwithwell-provisionedinfrastructure,thisbulkdatatransferstressesboththenetworkandthedisksasdataaremigratedfromthesourcestothededicatedcluster.Furthermore,thisbulkdatatransferisoftenunnecessarysincetheETLprocessmayeventuallyreducethesizeofthedatathroughfilteringorsummarization,makingthisapproachinefficient.Third,theseapproachesdonotsupportonlineanalysisthroughpartialdataprocessing.Oftendatamaybecomeunavailableduetoloadatthesourcesorfailures.Forinstance,serverlogsmaybecomeunavailablewhenserversbecomeunreachable.Inthiscase,currentapproachesmustsacrificeresultavailabilityandtimeliness,waitinguntilalldataareavailable.Alternatively,currentsystemsmaychoosetoblindlyreturnpartialresults,degradingarbitrarilytheaccuracyoftheresults.However,applicationshavenomeanstoassesstheimpactofpartialdataontheaccuracyoftheanalysis,renderingtheresultspracticallyunusable.Toaddressthesechallenges,thischapterintroducesIn-situMapReduce(iMR),aprogrammingmodelandarchitectureforonlineETLanalytics.iMRleveragestheflexibilityandinherentparallelismoftheMapReducemodel,andextendsthemodelforcontinuousunstructureddata.TheiMRmodelcombines(i)statefulgroupwiseprocessingtoimproveefficiencythroughincrementalprocess-ingand(ii)windowedprocessingtosimplifytheprogrammingofcontinuousdata.Tosupportonlineanalytics,theiMRmodelallowsprocessingofpartialdataandintroducesanovel,generalpurposemetricthatallowsapplicationsto(i)assesstheanalysisfidelity,and(ii)explicitlytradefidelityforlatency.Toavoidexpen-sivedatatransfers,theiMRarchitecturemovestheETLanalyticsfromdedicatedclustersontothedataserversthemselves.Intherestofthischapter,weshowhowtheiMRmodelallowssophisticatedETLanalyticsoncontinuousdataanddescribethemechanismsthatallowscalable,efficient,andtimelyexecutionoftheanalyticsin-situ. 27StoreFirst/QueryLaterContinuousIn-situAnalyticsservercloud(1000'sservers)servercloudmapmap(1000'sservers)reducereduce1.)In-networkdata1.)Loaddataprocessingmapmapmap2.)StoreresultsreducereducereduceHDFSHDFSHDFSHDFSHDFSHDFS2.)ProcessindedicatedclusterFigure3.1:Thein-situMapReducearchitectureavoidsthecostandlatencyofthestore-first-query-laterdesignbymovingprocessingontothedatasources.3.1DesigniMRisdesignedtoprovidescalable,onlineETLanalytics.Itismeantforanalyticsthatfilterortransformdataeitherforimmediateuseorbeforeloadingitintoadistributedstoragesystem(e.g.,HDFS)forfollow-onanalysis.Moreover,today’sbatchprocessingqueriesexhibitcharacteristicsthatmakethemamenabletocontinuous,in-networkprocessing.Forinstance,manyanalyticsarehighlyselective.A3-monthtracefromaMicrosoftlarge-scaledataprocessingsystemshowedthatfilterswereoftenhighlyselective(17-26%)[43],andthefirststepformanyFacebookloganalyticsistoreducethelogdataby80%[11].Additionally,manyofthesequeriesareupdate-driven,integratethemostrecentdataarrivals,andrecuronanhourly,daily,orweeklybasis.BelowwesummarizethegoalsoftheiMRsystemandthedesignprinciplestomeetthesegoals:Scalable:Thetargetoperatingenvironmentconsistsofthousandsofserversinoneormoredatacenters,eachproducingKBstoMBsoflogdatapersecond.IniMR,MapReducejobsruncontinuouslyontheserversthemselves(shownontherightinFigure3.1).Thisprovideshorizontalscalingbysimplyrunningin-place,i.e,theprocessingnodecountisproportionaltothenumberofdatasources. 28Thisdesignalsolowersthecostandlatencyofloadingdataintoastorageclus-terbyfilteringdataonsiteandusingin-networkaggregation,iftheuser’sreduceimplementsanaggregatefunction[41].Responsive:TodaythelatencyofETLanalyticsdictatesvariousaspectsofasite’sperformance,suchasthespeedofsocialnetworkupdatesoraccuracyofad-targeting.TheiMRarchitecturebuildsonpreviousworkinstreamprocess-ing[15,23,13]tosupportlow-latencycontinuousdataprocessing.Likestreamprocessors,iMRMapReducejobscanprocessoverslidingwindows,updatinganddeliveringresultsasnewdataarrives.Available:iMR’slossydatamodelallowsthesystemtoreturnresultsthatmaybeincomplete.Thisallowsthesystemtoimproveresultavailabilityintheeventoffailuresorprocessingandnetworkdelays.Additionally,iMRmaypro-activelyreduceprocessingfidelitythroughloadshedding,reducingtheimpactonexistingservertasks.iMRattachesametricofresultqualitytoeachoutput,al-lowinguserstojudgetherelativeaccuracyofprocessing.Usersmayalsoexplicitlytradefidelityforimprovedresultlatencybyspecifyinglatencyandfidelityboundsontheirqueries.Efficient:Adataprocessingarchitectureshouldmakeparsimonioususeofcomputationalandnetworkresources.iMRexplorestheuseofsub-windowsorpanesforefficientcontinuousprocessing.Insteadofre-computingeachwin-dowfromscratch,iMRallowsincrementalprocessing,mergingrecentdatawithpreviouslycomputedpanestocreatethenextresult.Andadaptiveload-sheddingpoliciesensurethatnodesusecomputecyclesforresultsthatmeetlatencyrequire-ments.Compatible:iMRsupportsthetraditionalMapReduceAPI,makingittrivialto“port”existingMapReducejobstorunin-situ.Itprovidesasingleextension,uncombine,toallowuserstofurtheroptimizeincrementalprocessinginsomecontexts(Section3.1.4). 293.1.1ContinuousMapReduceProgrammingintheiMRsystemisinprinciplesimilartotheMapReduceprogrammingmodel.TheiMRmodelmaintainstheflexibilityandabilitytoeas-ilyexposeparallelismthatcomeswithMapReduce.However,unliketraditionalMapReduceprogramsthatprocessawelldefinedinputdataset,inourapplicationscenarios,dataarecontinuouslygeneratedandiMRmustcontinuouslyupdatetheanalysis.AnalyzingsuchinfinitedatastreamsrequiresawaytoboundtheamountofdatathatiMRcanprocess.iMRborrowstheconceptofwindowsusedinstreamprocessors[15].Awindowspecificationconsistsofarangeandaslide.AwindowrangeRspecifiestheamountofdataprocessedeachtime.Typically,usersdefinethewindowrangeintermsoftime.Forinstance,inawebloganalysisscenario,ausermayrequiretocountuserclicksgeneratedoverthelast5minutes(R=5min).Awindowrangemayevenbedefinedintermsofdatasizeinbytes(e.g.thelast10MBofclickdata),recordcount,oranyuser-definedlogicalsequencenumber.AwindowslideSdefineshowfrequentlytoupdatetheanalysiswithnewdata.Inthepreviousexample,ausermayspecifythattheclickcountmustbeupdatedeveryminute(S=1min).Justliketheinput,theoutputofaniMRprogramisalsoaninfinitestreamofresults,oneforeachwindowofdata.Semantically,theresultofexecutinganiMRprogramisthesameasex-ecutingtraditionalMapReduceprogramscontinuouslyonoverlappingdatasets.Foreverysuchwindow,datagothroughthesamemap-group-reduceprocessasinMapReduceandthefinaloutputisalistofkey-valuepairs.However,thisnaiveapproachofevaluatingslidingwindowsbyreprocessingoverlappingdatamayre-sultininefficientprocessing.Section3.1.4showshowiMRoptimizeswindowedprocessingformoreefficientanalysis.3.1.2ProgramexecutionThepoweroftheMapReducemodelliesnotonlyinitsflexibility,butalsotheabilitytoparallelizetheanalysisinascalablemanner.Hereweshow,howiMRanalyzesdatain-situbydistributingtheexecutionofaprogramacrossthe 30ReduceMergeGroupByHadoop/HDFSclusterServerpoolCombineCombineGroupByGroupByMapMaplocalrecordslocalrecordsCombineCombineCombineCombineGroupByGroupByGroupByGroupByMapMapMapMaplocalrecordslocalrecordslocalrecordslocalrecordsFigure3.2:ThisillustratesthephysicalinstantiationofoneiMRMapReducepartitionasamulti-levelaggregationtree.datasources.InChapter2,wesawthataMapReducesystemistaskedwiththreefunda-mentaloperations:mappinginputrecords,groupingintermediatedataaccordingtokeyandreducinggroupstoproducethefinaloutput.Maptasksruninparal-lel,toapplythemapfunctionandproducetheintermediatedata.Intermediatedataareshuffled,groupedaccordingtokeyandpartitioned,witheachpartitioncontainingasubsetofthekeys.Subsequentlyparallelreducetasks,oneforeachpartition,applythereducefunctionforeverygroupinthecorrespondingpartition,toproducethefinalresult.iMRdistributestheworkofaMapReducejobacrossmultipletrees,oneforeachreducerpartition.Figure3.2showshowasingletreeoperates.AniMRtreeconsistsofanumberofprocessingnodes,oneforeverydatasource.Everyprocessingnodeisresponsibleformappinginputrecordsthatareobtainedfromthelocalsource,andgroupingtheintermediatedataaccordingtotheirkey.JustlikeinMapReduce,aprocessingnodemayalsoleveragethecombineAPI,toproducepartialvaluesanddecreasethesizeofthedatathatexitaprocessingnode.Thesepartialvaluesarethenshippedtotherootofthetreethatishosted, 31notonthedatasources,butonadedicatedprocessingcluster.Therootisrespon-sibleformergingpartialvaluesandapplyingthereducefunction.Thefinalresultofthereducefunctionisstoredonthededicatedcluster.3.1.3Usingin-networkaggregationtreesLikeinMapReduce,iMRleveragesthecombineAPItodecreasethedatashippedfrommapperstoreducers.However,iMRcanusemulti-levelaggregationtrees,tofurtherdecreasethesizeofthedatatransferredbyaggregatingin-network,liketheDryadbulkprocessingsystem[46,47].Thisrequiresdecomposablefunc-tions[55,81]thatcanbenefitfromin-networkaggregation.Operations,likeasumormaxareexamplesofsuchfunctionsthatpresentthegreatestopportunityfordatareduction.SuchoperationsmayleveragethecombineAPItomergepartialvaluesproducedbyprocessingnodesinatree-likemanner.Incontrast,holis-ticfunctions[41],likeunionandmedianalwaysproducepartialvalueswithsizeproportionaltothesizeoftheinputdataand,therefore,donotbenefitfromag-gregationtrees.Apartfromreducingnetworktrafficamulti-levelaggregationtreemayalsodistributetheprocessingloadoftheroot.Therootisresponsibleforreceivingandmergingpartialvaluesfromallthesources.Asthenumberofsourcesorthesizeofthepartialvaluesgrows,therootmaynotbeabletoprocesstheminatimelymanner.Instead,partialvaluescanbemergedinahierarchicalwaybeforetheyreachtheroot.Notethattheeffectivenessofanaggregationtreeinreducingnetworktrafficanddistributingprocessingloaddependsnotonlyontheappropriatenessofthefunctionbutalsoontheinputdata.RecallthatdataintheMapReducemodelarestructuredaslistsofkey-valuepairs.Eachprocessingnodemapinputdatatoproducesuchalistofintermediatekey-valuepairsandthecombinefunctionisappliedonaperkeybasis.Therefore,mergingkey-valuelistsinanin-networkfashioniseffectiveonlyifthereisasignificantoverlapintheintermediatekeysproducedbetweennodes.If,instead,thelistsaredisjointinthekeystheycontain,theresultofacombinefunctionissimplyaconcatenationoftwolists,anoperation 32timestampedrrrrrts=1259618096logrecordsMap(r)key-value{k,v}{k,v}{k,v}{k,v}{k,v}{k,v}pairs112259259261380396Group/combineinpanepanesP0={k,v}1PV{k,v}2PVP1={k,v}2PV{k,v}3PV0Min60Min120MinFigure3.3:iMRnodesprocesslocallogfilestoproducesub-windowsorpanes.Thesystemassumeslogrecordshavealogicaltimestampandarriveinorder.thatdoesnotreducethesizeofthedata.Thismayactuallyincreasethenetworktrafficcomparedtoaone-leveltreesincelistsmaytraversethenetworkmultipletimesintheaggregationtreeinsteadofbeingsentdirectlytotheroot.Samplingtheinputdatamaybeusedasawaytoinferthekeydistributionanddeterminewhetheratreeisnecessary.AsinMapReduce,reducersmaybecomeaperformancebottleneckastheyaggregatedatafromanincreasingnumberofmappers.Addingmorepartitionsandreducersaddressesthisbottleneck.Inasimilarway,iMRmayrunmultiplesuchaggregationtrees,oneforeachpartition.Section4.1.1describesinmoredetailhowiMRdistributestheworkloadtomultipletrees.3.1.4EfficientwindowprocessingwithpanesSlidingwindowscaptureanaturalpropertyinmanyonlineanalyticsap-plications:theanalysismustcontinuouslybeupdatedwiththemostrecentdataandolddatabecomeobsolete.RecallfromSection3.1.1that,withtheexceptionoftheslidingwindowspecification,aniMRprogramisinprinciplesimilartoaMapReduceprogram.Anobviouswaytoupdatetheanalysiswithnewdata,istocontinuouslyexecutethesameprogramiMRoneverynewwindowofdata.However,thisapproachmaywasteresources,introducingunnecessarypro-cessinganddatatransfers.Betweensuccessivewindowsthereareoverlappinginput 33records.Ingeneral,everyinputrecordmaybelonginR/Sprocessingwindows,whereRistherangeandSistheslideofthewindow.Thismeansthateveryin-putrecordismapped,grouped,combinedandtransmittedtothenetworkmultipletimesbyaniMRprocessingnode.Toeliminatetheseoverheads,iMRusespanes,aconceptintroducedin[50]forefficientwindowprocessingonsingle-nodestreamprocessors.Panesdivideaprocessingwindowintonon-overlappingsub-windowsthatthesystemprocessesindividually,creatingpartialresults.Thesystemmergespartialresultstocalculatetheresultforanentirewindow.ThissectionshowshowiMRadaptsthistechniquefordistributedin-situexecutionofMapReduceprograms.PanemanagementApaneisaquantumofre-usablecomputation.Aprocessingnodecreatesapanebymapping,groupingandcombiningrawinputrecordsinthecorrespondingsub-window.Anodeprocessesrawdatainapaneonlyonce.Figure3.3illustratesa2-hourwindowdividedintotwo1-hourpanes.Everypaneisannotatedwithalogicalindexthatcorrespondstothesub-windowitrepresents.Figure3.3showspanesP0andP1.Afterprocessingtherawinputrecords,apanecontainsapartialresultthatisessentiallyakey-valuelist.Insteadofsendingentirewindows,nodesinaniMRaggregationtreenowsendprocessedpanestotheirparents.Interiornodesinatreemergeandcombinepaneswiththesame.Asthepanesreachtherootinanaggregationtree,therootcombinesthesepartialresultsintoawindowresult.Bysendingpanesupthetreeinsteadofwholewindows,thesystemsendsasinglecopyofakey-valuepairproducedataprocessingnode,reducingnetworktraffic.Thesizeofthepanesisaparameterthataffectstheabilityofthesystemtodecreasenetworktraffic.Bydefault,iMRsetsthepanesizetoR/S,butitcanbeanycommondivisorofRandS.Byincreasingthesizeofthepane,weallowaprocessingnodetoaggregatemoredatainasinglepanethroughthecombineoperation,potentiallyreducingtheamountofthedatasentupthetree.Theabilitytoreducenetworktrafficbyincreasingthepanesizedependsonthedistributionof 34W1=P1+P2ORW1=W0-P0+P2abababP+PP+PP+P001122AggregatepanepartialvaluesChildaabbChildPPPPA1212BFigure3.4:iMRaggregatesindividualpanesPiinthenetwork.Toproducearesult,therootmayeithercombinetheconstituentpanesorupdatethepriorwindowbyremovinganexpiredpaneandaddingthemostrecent.thekeysacrossaprocessingwindow.Forinstance,theexistenceofmanyuniquekeysfavorssmallpanesizesasthereislittleopportunitytoreducethedatasize.Instead,thebiggeroverlapinthekeysacrossthewindow,themorethesystemcanreducedatasize.However,aswedescribeinSection3.2.1,panesalsorepresentthegranular-ityatwhichiMRmayproactivelyshedprocessingload,orthegranularityatwhichfailednodesrestartprocessing.ReducingthesizeofthepanegivesiMRmorefinecontrolwhensheddingloadandmayreducetheamountoflostdatawhennodesfailandrestart.WindowcreationTherootofaniMRjobisresponsibleforcalculatingthefinalresultfortheentirewindowfromtheindividualpanes.Therootmustgroupandcombineallkeysinthewindowbeforeexecutingthereducefunction.Figure3.4showstwostrategiestodoso.Here,awindowisdividedintwopanesandforsimplicitythereareonlytwonodessendingpanestotheroot.ThefirststrategyleveragespanestoallowincrementalprocessingbysimplyusingthetraditionalMapReduceAPI.Inthisapproach,therootmaintainsalistofoutstandingpanes.Eachsuchpaneistheresultofcombiningcorresponding 35uncombine(k′,{v′},{v′})→{v′}currentoldpartialcombine(k′,{v′},{v′})→{v′}partialnewFigure3.5:iMRextendsthetraditionalMapReduceinterfacewithanuncombinefunctionthatallowsthespecificationofdifferentialfunctions.Theuncombinefunctionsubtractsolddataandthecombinefunctionaddsnewdatatoproducethefinalresult.a+bpanesfromnodesinthetreeastheyreachtheroot.Forinstance,paneP1attherootistheresultofcombiningPafromnodeAandPbfromnodeB.Window11W1consistsofpanesP1andP2andtherootcancalculatethefinalresultforW1a+ba+bbycombiningthekey-valuelistsinP1andP2.Thisimprovesefficiencybyre-usingthepartialvaluesinapaneforeverywindow.Mergingpanesischeaperthanrepeatedlymappingandcombininginputvalueforeverywindow.Notethatthebenefitfrommergingpanesdependsagainonthekeydistribution.ThemorevaluesperkeythatiMRhastocombineinasinglepane,themoreworkitsavesbyre-usingthepane.Thisisbecausethecostofcreatingapanefromrawdataislargerelativetothecostofmergingpanestocalculateawindow.Importantly,thisoptimizationistransparenttotheuser.ThesystemleveragesthesameMapReducecombineAPI.However,forslidingwindowsitsometimesmoreefficienttoremoveolddataandthenaddnewdatatothepriorwindow.Forinstance,consideraquerywitha24-hourwindowthatupdatesevery1hour.Thismeansthatforeverywindowtherootmustcombine24panes,eventhoughthereisonlyonepanethatisoldandonepanethatisneweachtimeandthemajorityofthepanesremainthesame.Incontrast,therootcansimplyremoveandaddapane’sworthofkeystothepriorwindow,reducingtheamountofprocessingforeverywindow.Figure3.4showshowtocomputewindowW1fromW0.Assumingthatthecostofremovingandaddingkeystoawindowisequivalent,thisstrategyisalwaysmoreefficientthatmergingallconstituentpanesinawindowwhentheslideSislessthanhalftherangeRofthewindow.However,applyingthisstrategyrequiresdifferentialfunctions[50,14].Adifferentialfunctionallowsuserstoupdatetheanalysisby“subtracting”olddata 36250%CCompletenessDDCCnodesBBAAPPPPPPPP12341234windowpaneswindowpanesFigure3.6:C2describesthesetofpaneseachnodecontributestothewindow.HereweshowtwodifferentwaysinwhichC2represents50%ofthetotaldata:allthenodesprocesshalftheirdataorhalfthenodesprocessalltheirdata.andaddingnewdata.Thecountandsumfunctionareexamplesofdifferentialfunc-tions,whilemaxandminarenotdifferential.Todefinehowtosubtractolddata,theiMRmodelextendsMapReducewithanuncombinefunction.Figure3.1.4showshowiMRcanusetheuncombineandcombinefunctionstocalculateanewwindowfromthepriorone.3.2Fidelity-latencytradeoffsThissectiondescribesthefeaturesofiMRthatallowapplicationstoaccom-modatedataloss.Datalossmayoccurbecauseofnodeornetworkfailures,orasaconsequenceofresultlatencyrequirements.Insuchcases,aniMRjobmayneedtoreportaresultbeforethesystemhashadtimetoprocessallthedatainthewindow.Thekeychallengesweaddresshereare(i)howtorepresentandcalculateresultqualitytoallowuserstointerpretpartialresults,and(ii)howtousethismetrictotraderesultfidelityforimprovedresultlatency.3.2.1MeasuringdatafidelityAusefulmetricfordatafidelityshouldnotonlyinformusersthatdataismissing,butalsoallowthemtoassesstheimpactofdatalossontheaccuracyoftheanalytics.HereweintroduceC2,afidelitymetricthatexposesthespatialandtemporalpropertiesofdata,allowinguserstobetterunderstandincompletedata. 37Dataarenaturallydistributedacrossspace,thenodesthatgeneratethedata,andtime,theprocessingwindow.TheC2metricannotatesresultswithspa-tialandtemporalinformationaboutthelostdata.Spatialinformationdescribesthesourcenodesfromwhichdatawerelost.Temporalinformationdescribesthetimeperiodsduringwhichdatawerelost.Suchinformationcanoftenprovidevaluableinsightsabouttheimpactofthelostdataontheanalysis.Forinstance,losingdatafromcertainnodesmaybeoflesssignificanceiftheyarenotactive.Similarly,theremaybecertaintimeperiodsofinactivityonaspecificnode.Beingabletodistinguishthesecasescanhelpusersunderstandtheimpactofdatalossontheaccuracyoftheanalysis.Asacomparison,onefidelitymetricthathasbeenproposediscompleteness,thefractionofnodeswhosedataareincludedintheresult[53,60].Noticethatsuchacoarsemetriccannotdifferentiatebetweenanodethatproducesdatathatspantheentirewindowandanodethatdoesnot.Completenesscannotdescribesituationswhereanodefailsintermittently,forasmallperiodoftimeduringawindow,andloosesonlyapartofthedata.Analternativeproposedmetricisprogress,thepercentageofdataprocessed,whichisusedbysystemslikeHadoopOnline[32].Thismetrictoodoesnotdescribethesourceofthelostdataorthetimeduringwhichitwaslost.Instead,theC2metricleveragestheconceptofpanestodescribethespatio-temporalpropertiesofdata.Apanerepresentsafractionofthetotalwindowandisannotatedwiththecorrespondingtemporalinformation,therangeoftimewithinthewindowitrepresents.Additionally,apanecarriesinformationaboutthesourcenodethatcreatedthepanefromrawdata.Panesarethequantumofaccuracy.Apaneisincludedintheresultei-therinitsentiretyornotatall.Byvaryingthesizeofapane,wecancontrolthegranularityinthetemporaldimensionatwhichwewanttomeasurefidelity.Smallerpanesallowfinerinformationintheevent,forinstance,ofaveryshortnodefailure.However,asdescribedinSection3.1.4,apanethatistoosmallmayhurttheabilitytocombinedatalocally,increasingtheamountofdatathatanodeshipstothenetwork. 38Toinformusersaboutthespatio-temporalproperties,eachdeliveredresultisannotatedwithalogicalscoreboardthatindicateswhichpanesweresuccessfullyreceived.Figure3.6showstwoexamplescoreboards.Suchascoreboard,hastwodimensions.Itshowswhichpanesacrossatimewindow(temporaldimension)aresuccessfullyincludedintheresultforeveryindividualnode(spatialdimension).Asanexample,Figure3.6showstwodifferentscenariosthatprocessthesameamountofdata,inthiscase50%ofthetotaldata,buthavedifferentspatio-temporalproperties.Inthefirstcase,allthenodesprocesshalfoftheirdata,whileinthesecondcase,halfofthenodesprocessalltheirdata.Ingeneral,therearemanydifferentwaysinwhichthesystemcanprocess50%ofthedata,andC2allowsuserstodifferentiatebetweenthem.TomaintaintheC2metricinanaggregationtree,wheninteriornodesmergepanes,theyalsomergetheircorrespondingC2metric.EssentiallyeachpanemaintainsacountandtheIDsofthedatasourcesthathavecontributedtothepane.Aspanesaremergedinsidetheaggregationtree,thelistofsourceIDsismergedaswell.NotethatwecancompactlysummarizetheIDsofthedatasourcesusingabloomfilter.ThisapproachisusefulwhenmaintainingalargenumberofsourceIDsperpaneaddssignificantspaceoverhead.Thismayhappenwhenthesizeofthepanesissmallandtherearemanydatasources.Whilebloomfiltersaddthepossibilityoffalsepositiveswhentestingfortheexistenceofasourcenode,wecancontroltheprobabilityoffalsepositives,makingitpracticallynegligible.3.2.2UsingC2inapplicationsThegoaloftheC2metricistoallowuserstobetterunderstandtheimpactofdatalossontheanalysis,butalsoallowuserstotraderesultfidelityforresultlatency.Here,wedescribehowuserscanachievethisbyappropriatelyspecifyinglatencyandfidelityrequirementsusingC2.Ingeneral,theC2metricgivesusersflexibilityinspecifyingfidelityrequire-ments.Forinstance,ausermayrequiresimplyaminimumamountofdata,asthepercentageofthetotaldata,tobeprocessed.Alternatively,usersmayrequire 39theprocesseddatatohavespecificspatialandtemporalproperties.Forinstance,usersmaysimplyrequireaminimum50%ofthedatatobeprocessed,orputmoreconstraints.Theymayrequirethatallofthenodesshouldreturnatleasthalfoftheirdata.Thisspecificationisillustratedintheleft-handofFigure3.6.Thisisonlyoneofmanypossibilities,andusersmayarbitrarilyspecifywhichpanestheyrequiretobeprocessed,byspecifyingtheexactlayoutoftheC2scoreboard.Specificationswithdifferentspatio-temporalproperties,liketheonesinFigure3.6mayaffecttheaccuracyoftheanalysis,butalsotheresultlatencyincompletelydifferentways,eveniftheyspecifythesamepercentageofdata.TheimpactofthedifferentC2specificationsmaydependonfactorsliketheparticularoperationappliedonthedataandthedistributionofdataacrossnodesandacrossthetimewindow.UsersshouldsettheC2inawaythatallowsthemtodeterminetheresultqualitywhenthereisdataloss,butalsotoreducetheresultlatency.WehaveidentifiedfourgeneralC2specificationsthatallowdifferentfidelity/latencytradeoffsandmaybeusefulforawiderangeofapplications.MinimumvolumewithearliestresultsThisC2specificationgivesthesystemthemostfreedomtodecreaseresultlatency.UsersspecifyapercentageX%ofthedatatheyrequiretoprocessandthesystemwillreturnthefirstX%ofthepanesavailable.Thistypeofspecificationissuitableforapplicationswheretheaccuracyoftheanalysisdependsontherelativefrequencyofevents,andthedataareuniformlydistributed.Here,aneventisanydistinctpieceofinformationofinterestappearingwithinthedataset.Asanexample,consideranapplicationthatanalyzesclickslogsfromwebservers,tocountuser’sclicks.Ifevents,clicksfromusers,areuniformlydistributedacrosstheserversandthetimewindow,thenprocessinganypercentageofthetotaldatacanstillsummarizetherelativefrequencyoftheevents,andgiveusersagoodestimateoftheimpactofdatalossontheanalysis.However,thisspecificationmayprovidepoorestimatesifdataarenotuni-formlydistributed.Forinstance,ifeventsareassociatedmainlywiththeremain-ingpanesthatwerenotprocessed,thisspecificationmaynotcapturetherelative 40frequencyoftheevents.MinimumvolumewithrandomsamplingThisC2specificationensuresthatthesystemwillprocessaminimumper-centageX%ofthedatathatisrandomlysampledacrosstheentiredataset.Thisspecificationgiveslessfreedomtodecreaselatencysincethesampleddatamaynotbetheyearliestavailable,butitcanreproducetherelativefrequencyofevents,evenifdataarenotuniformlydistributed.SinceapaneisthequantumofaccuracyinC2,randomsamplingoccursatthegranularityofpanes.Toperformrandomsampling,everynodedecideswhethertoprocessapanewithaprobabilityproportionaltothepercentageX.Notethatalthoughthisguaranteesarandomsample,itispossiblethatduetoconditionsthatthesystemhasnocontrolof,likenodeornetworkfailures,thereturnedresultmaynotsatisfytheuser’scriteriaforrandomness.Forinstance,panesamplingmayappearbiasedtowardspecificnodes.However,userscanstillleveragetheC2scoreboardtoverifytheappropriatenessofthesample.TemporalcompletenessThisC2specificationensuresthataminimumpercentageX%ofthenodesprocess100%ofthepanesinawindow.Thisspecificationissuitableforapplica-tionsthatmustcorrelateeventsonaperserverbasis.Forexample,anapplicationmayanalyzeserverlogstocounthowoftenindividualserversproducelogerrors.Thisimplicitlypartitionstheanalysisalsoonaperserverbasis.Asaconsequence,loosingdatafromanodedoesnotaffecttheaccuracyoftheanalysisontherestoftheservers.Inthepreviousexample,theresultingerrorcountsfortherestoftheserverswillbeaccurate.Thisspecificationisalsousefulforapplicationsinwhichitisdifficulttoestimatetheaccuracyoftheanalysisbasedonincompletedata,andconclusionscanbemadeonlybyprocessingallthedata.Thespecificationguaranteesthatanodewillnotpollutetheanalysiswithincompletedata.Notethatthenotionoftemporalcompletenesscanbeappliednotonlyon 41individualnodes,butalsoonclustersofnodes.Often,ananalysismaycorrelateeventsacrossspecificclustersofnodes.Forinstance,adatacenteroperatormayanalyzesystemlogstomeasureresourceusageacrossracksofservers,oracrossserversownedbydistinctusers.Requiringtemporallycompleteresultsforclustersofnodescansimilarlyaccommodatedatalossonsomeclustersofnodes,andatthesametimeguaranteetheaccuracyoftheanalysisontherest.Thisspecificationmay,however,resultinhighresultlatency,comparedtotherestofthespecifications.Foraresulttobeproduced,thesystemmustwaitfornodestoprocessthewholewindow.SpatialcompletenessThisspecificationensuresthataminimumpercentageX%ofthepanesintheresultwindowcontaindatafromthe100%ofthenodesinthesystem.Thesystemdiscardsanypanesforwhichdatafromsomenodesaremissing.ThisC2specificationisusefulforapplicationsthatcorrelateeventsacrossthewholesystemthatoccurcloseintime,thatis,withinthesamepane.Asanexample,consideranapplicationthatanalyzesclicklogsfromwebserverstocharacterizeuserbehavior.Acommonclickanalysisistofindusersessions,groupsofclicksfromauserthatarecloseintime.Inload-balancedwebserverarchitectures,clicksfromasingleusermaybeservedbymultipleserversinthesystem,evenduringthesamesession.Therefore,toaccuratelycapturesessionsfromauser,weneeddatafromallthenodesinthesystemwithinthesamepane.3.2.3Resulteviction:tradingfidelityforavailabilityiMRallowsuserstospecifylatencyandfidelityboundsoncontinuousMapReducequeries.Herewedescribethepoliciesthatdeterminewhentherootevictsresults.Theroothasfinalauthoritytoevictawindowanditusesthewindow’scompleteness,C2,andlatencytodetermineeviction.Thusalatency-onlyevictionpolicymayreturnincompleteresultstomeetthedeadline,whileafidelity-onlypolicywillevictwhentheresultsmeetthequalityrequirement.Latencyeviction:Aquery’slatencybounddeterminesthemaximum 42amountoftimethesystemspendscomputingeachsuccessivewindow.Ifthetimeoutperiodexpires,theoperatorevictsthewindowregardlessofC2.Beforethetimeout,therootmayevictearlyunderthreeconditions:ifthewindowiscompletebeforethetimeout,ifitmeetstheoptionalfidelityboundC2,orifthesystemcandeducethatfurtherdelayswillnotimprovefidelity.Liketheroot,interiornodesalsoevictbasedontheuser’slatencydeadline,butmaydosobeforethedeadlinetoensureadequatetimetotraveltotheroot[55].Fidelityeviction:Thefidelityevictionpolicydeliversresultsbasedonaminimumwindowfidelityattheroot.Aspanesarrivefromnodesinthenetwork,therootupdatesC2forthecurrentwindow.Whenthefidelityreachestheboundtherootmergestheexistingpanesinthewindowandoutputstheanswer.Failureeviction:Justasthesystemevictsresultsthatare100%com-plete,thesystemmayalsoevictresultsifadditionalwaittimecannotimprovefidelity.Thisoccurswhennodesareheavilyloadedorbecomedisconnectedorfail.iMRemploysboundarypanes(wheretraditionalstreamprocessorsuseboundarytuples[70])todistinguishbetweenfailednodesandstalledoremptydatastreams1.Nodesperiodicallyissueboundarypanestotheirparentswhenpaneshavebeenskippedbecauseofalackofdataorloadshedding(Section4.1.3).Boundarypanesallowtheroottodistinguishbetweenmissingdatathatmayarrivelaterandmissingdatathatwillneverarrive.iMRmaintainsboundaryinformationonaper-panebasisusingtwocounters.ThefirstcounteristheC2completenesscount;thenumberofsuccessfulpanemerges.Evenifachildhasnolocaldataforapane,itsparentintheaggregationtreemayincreasethecomplete-nesscountforthispane.However,childrenmayskippaneseitherbecausetheyre-startedlaterinthestream(Section4.1.5)orbecausetheycanceledprocessingtoshedload(Section4.1.3).Inthesecases,theparentnodeincreasesanincom-pletenesscounterindicatingthenumberofnodesthatwillnevercontributetothispane.Bothinteriornodesandtherootusethesecountstoevictpanesorentirewindowsrespectively.Interiornodesevictearlyifthepanesarecompleteorthe1Inreality,allpanescontainboundarymetadata,butnodesmayissuepanesthatareother-wiseemptyexceptforthismetadata. 43sumofthesetwocountersisequaltothesumofthechildreninthissubtree.Therootdetermineswhetherornottheuser’sfidelityboundcaneverbemet.Bysimplysubtractingincompletenessfromthetotalnodecount(perfectcompleteness),therootcansetanupperboundonC2foranyparticularwindow.IfthisestimateofC2everfallsbelowtheuser’starget,therootevictsthewindow.Notethattheuseoffidelityandlatencyboundspresumesthattheusereitherreceivedausableresultorcannotwaitlongerforittoimprove.Thus,unlikeotherapproaches,suchastentativetuples[16]orre-runningthereductionphase[32],iMRdoesnot,bydefault,updateevictedresults.iMRonlysupportsthismodefordebuggingordeterminingaproperlatencybound,asitcanbeexpensive,forcingthesystemtorepeatedlyre-process(re-reduce)awindowonlateupdates.3.3Relatedwork3.3.1“Online”bulkprocessingiMRfocusesonthechallengesofmigratinginitialdataanalyticstothedatasources.Adifferent(andcomplementary)approachhasbeentooptimizetraditionalMapReducearchitecturesforcontinuousprocessingthemselves.Forinstance,theHadoopOnlinePrototype(HOP)[32]canruncontinuously,butre-quirescustomreducefunctionstomanagetheirownstateforincrementalcompu-tationandframingincomingdataintomeaningfulunits(windows).iMR’sdesignavoidsthisrequirementbyexplicitlysupportingslidingwindow-basedcomputa-tion(Section3.1.1),allowingexistingreducefunctionstoruncontinuouslywithoutmodification.LikeiMR,HOPalsoallowsincompleteresults,producing“snapshots”ofreduceoutput,wherethereducephaseexecutesonthemapoutputthathasaccu-mulatedthusfar.HOPdescribesincompleteresultswitha”progress”metricthat(selfadmittedly)isoftentoocoarsetobeuseful.Incontrast,iMR’sC2framework(Section3.2)notonlyprovidesbothspatialandtemporalinformationabouttheresult,butmaybeusedtotradeparticularaspectsofdatafidelityfordecreased 44processingtime.Dremel[57]isanothersystemthat,likeiMR,aimstoprovidefastanalysisonlarge-scaledata.WhileiMRtargetscontinuousrawlogdata,Dremelfocusesonstaticnesteddata,likewebdocuments.Itemploysanefficientcolumnarstorageformatthatisbeneficialwhenafractionofthefieldsofthenesteddatamustbeaccessed.LikeHOP,Dremelusesacoarseprogressmetricfordescribingearly,partialresults.3.3.2LogcollectionsystemsAsystemcloselyrelatedtoiMRisFlume[6],adistributedlogcollectionsystemthatplacesagentsin-situonserverstorelaylogdatatoatierofcollectors.Whileauser’s“flows”(i.e.,queries)maytransformorfilterindividualevents,iMRprovidesamorepowerfuldataprocessingmodelwithgrouping,reduction,andwindowing.WhileFlumesupportsbest-effortoperation,usersremaininthedarkaboutresultqualityorlatency.However,Flumedoesprovidehigherreliabilitymodes,recoveringeventsfromawrite-aheadlogtopreventdataloss.Whilenotdiscussedhere,iMRcouldemploysimilarupstreambackup[16]techniquestobettersupportqueriesthatspecifyfidelitybounds.3.3.3LoadsheddingindatastreamprocessorsiMR’sloadshedding(Section4.1.3)andresultevictionpolicies(Section3.2.3)builduponthevariousloadsheddingtechniquesexploredinstreamprocess-ing[23,75,74].Forinstance,iMR’slatencyandfidelityboundsarerelatedtotheQoSmetricsfoundintheAurorastreamprocessor[23].Auroraallowsuserstoprovide“graphs”whichseparatelymapincreaseddelayandpercenttupleslostwithdecreasingoutputquality(QoS).iMRtakesadifferentapproach,allowinguserstospecifylatencyandfidelityboundsabovewhichthey’dbesatisfied.Ad-ditionally,iMRleveragesthetemporalandspatialnatureoflogdatatoprovideusersmorecontrolthanpercenttupleslost.Manyoftheseloadsheddingmechanismsinserttupledroppingoperators 45intoqueryplansandcoordinatedropprobabilities,typicallyviaacentralizedcon-troller,tomaintainresultqualityunderhigh-loadconditions.Incontrast,ourloadsheddingpoliciesactlocallyateachoperator,sheddingsub-windows(panes)astheyarecreatedormerged.These“panedrop”policiesaremorecloselyrelatedtotheprobabilistic“windowdrop”operatorsproposedbyTatbul,etal.[75]foraggregateoperators.Incontrast,iMR’soperatorsmaydroppanesbothdetermin-isticallyorprobabilisticallydependingontheC2fidelitybound.3.3.4DistributedaggregationAggregationtreeshavebeenexploredinsensornetworks[55],monitoringwirednetworks[79],anddistributeddatastreamprocessing[53,45].MorerecentworkexploredavarietyofstrategiesfordistributedGroupByaggregationrequiredinMapReduce-styleprocessing[81].Ouruseofsub-windows(panes)ismostcloselyrelatedtotheirAccumulator-PartialHashstrategy,sinceweaccumulate(throughcombining)key-valuepairsintoeachsub-window.Whiletheyevictedthesubwindowbasedonitsstoragesize(experiencingahashcollision),iMRusesfixed-sizedpanes.3.4AcknowledgmentsChapter3,inpart,isreprintofthematerialpublishedintheProceedingsoftheUSENIXAnnualTechnicalConference2011.Logothetis,Dionysios;Trezzo,Chris;Webb,KevinC.;Yocum;Ken.Thedissertationauthorwastheprimaryinvestigatorandauthorofthispaper. Chapter4Anarchitectureforin-situprocessingThischapterdescribesthedesignoftheiMRarchitecture.Here,weshowhowiMRcanexecutecontinuousMapReduceprogramsinascalableandefficientmanner.WealsodescribethemechanismsthatallowiMRanalyticstorunin-situ.Thisincludesloadsheddingmechanismstoaccommodateserverload,aswellasfaulttolerancemechanisms.ThischapterpresentsanevaluationofiMRthroughmicrobenchmarks.Furthermore,wevalidatetheusefulnessoftheC2metricinunderstandingincompletedataandtradingfidelityfortimelinessintheanalysisthroughexperimentswithrealapplications.4.1ImplementationTheiMRdesignbuildsuponMortar,adistributedstreamprocessingarchi-tecture[53].WesignificantlyextendedMortar’scorefunctionalitytosupportthesemanticsofiMRandtheMapReduceprogrammingmodelalongfouraxes:•ImplementtheiMRMapReduceAPIusinggenericmapandreduceMortaroperators.•Pane-basedcontinuousprocessingwithflowcontrol.46 47•Loadshedding/cancellationandpane/windowevictionpolicies.•Fault-tolerancemechanisms,includingoperatorre-startandadaptiveroutingschemes.4.1.1Buildinganin-situMapReducequeryMortarcomputescontinuousin-networkaggregatesacrossfederatedsys-temswiththousandsofnodes.Thisisanaturalfitforthemap,combine,andreducefunctionssincetheyareeitherlocalper-recordtransforms(map)oroftenin-networkaggregates.AMortarqueryconsistsofasingleoperator,oraggregatefunction,whichMortarreplicatesacrossnodesthatproducetherawdatastreams.Thesein-situoperatorsgiveiMRtheopportunitytoactivelyfilterandreducein-termediatedatabeforeitissentacrossthenetwork.Eachqueryisdefinedbyitsoperatortypeandproducesasingle,continuousoutputdatastream.Operatorspush,asopposedtothepull-basedmethodusedinHadoop,recordsacrossthenetworktootheroperatorsofthesametype.Mortarsupportstwoquerytypes:localandin-networkqueries.Alocalqueryprocessesdatastreamsindependentlyateachnode.Incontrast,in-networkqueriesuseatreeofoperatorstoaggregatedataacrossnodes.Eitherquerytypemaysubscribetoalocal,rawdatasourcesuchasalogfile,ortotheoutputofanexistingquery.Userscomposethesequerytypestoaccomplishmoresophisticatedtasks,suchasMapReducejobs.Figure4.1illustratesaniMRjobthatconsistsofalocalqueryformapoperatorsandanin-networkqueryforreduceoperators.Mapoperatorsrunonthelogserversandpartitiontheiroutputamongco-locatedreduceoperators(heretherearetwopartitions,hencetworeducetrees).Thereduceoperatordoesmostoftheheavylifting,groupingkey-valuepairsissuedbythemapoperatorsbeforecallingtheuser’scombine,uncombine,andreducefunctions.UnliketraditionalMapReducearchitectures,wherethenumberofreducersisfixedduringexecution,iMRmaydynamicallyadd(orsubtract)reducersduringprocessing. 48"theforthe""theforthe"NodeANodeBSourceSourceMapMapReduce1Reduce2Reduce1Reduce2NodeCNodeDReduce1Reduce2Partition1Partition2FinalOutputFinalOutputFigure4.1:EachiMRjobconsistsofaMortarqueryforthemapandaqueryforthereduce.HeretherearetwoMapReducepartitions(r=2),whichresultintwoaggregationtrees.Awordcountexampleillustratespartitioningmapoutputacrossmultiplereduceoperators.4.1.2MapandreduceoperatorsLikeotherstreamprocessors,MortarusesprocessingwindowstoboundcomputationandprovidesasimpleAPItofacilitateprogrammingcontinuousop-erators.WeimplementedgenericmapandreduceoperatorsusingthisAPItocalluser-definedMapReducefunctionsattheappropriatetimeandproperlygroupthekey-valuepairs.WemodifiedoperatorinternalssothattheyoperateonpanesasdescribedinSection3.1.4.Operatorstakeasinputeitherrawrecordsfromalocalsourceortheyreceivepanesfromupstreamoperatorsintheaggregationtree.Internally,iMRrepresentspanesas(possiblysorted)hashmapstofacilitatekey-valuegrouping.IniMRoperatorshavetwomaintasks:panecreation,creatinganinitialpanefromalocaldatasource,andpanemerging,combiningpanesfromchildreninanaggregationtree.Panecreationoperatesonarecord-by-recordbasis,addingnewrecordsintothecurrentpane.Incontrast,panemergingcombineslocally 49producedpaneswiththosearrivingfromthenetwork.Becauseofdifferencesinprocessingtimeandnetworkcongestion,operatorsmaintainasequenceofpanesthatthesystemisactivelymerging(theyhavenotyetbeenevicted).WecallthistheactivepanelistorAPL.ToadaptMortarforMapReduceprocessing,weintroduceimmutabletimes-tampsintothesystem.Mortarassumeslogicallyindependentoperatorsthattimes-tampoutputrecordsatthemomentofcreation.Incontrast,iMRdefinesprocess-ingwindowswithrespecttotheoriginaltimestampsontheinputdata,notwithrespecttothetimeatwhichanoperatorwasabletoevictapane.iMRassignsatimestamptoeachdatarecordwhenitfirstentersthesystem(e.g.usingapre-existingtimestampembeddedinthedata,orthecurrentrealtime).Thistimestampremainswiththedataasittravelsthroughsuccessivequeries.Thusnetworkingorprocessingdelaysdonotalterthewindowinwhichthedatabelongs.ThemapoperatorThesimplicityofmappingallowsastreamlinedmapoperator.Theoperatorcallstheuser’smapfunctionforeacharrivingrecord,whichmaycontainoneormorelogentries1.Foreachrecord,themapoperatoremitszeroormorekey-valuepairs.Weoptimizedthemapoperatorbypermanentlyassigningitawindowwitharangeandslideequaltoonerecord.Thisallowedustoremovewindow-relatedbufferinganddirectlyissuerecordscontainingkey-valuepairstosubscribedoperators.Finally,themapoperatorpartitionskey-valuepairsacrosssubscribedreduceoperators.ThereduceoperatorThereduceoperatorhandlesthein-networkfunctionalityofiMRincludingthegrouping,combining,sortingandreducingofkey-valuepairs.Theoperatorsmaintainahashmapforeachpaneintheactivepanelist.Herewedescribehowthereduceoperatorcreatesandmergespanes.1LikeHadoop,iMRincludeshandlersthatinterpretlogrecords. 50Afterareduceoperatorsubscribestoalocalmapoperatoritbeginstore-ceiverecords(containingkey-value{k,v}pairs).Thereduceroperatorfirstchecksthelogicaltimestampofeach{k,v}pair.Ifitbelongstothecurrentpane,thesysteminsertsthepairintothehashtableandcallsthecombiner(ifdefined).Whena{k,v}pairarriveswithatimestampforthenextpane,thesysteminsertsthepriorpaneintotheactive-panelist(APL).Theoperatormayskippanesforwhichthereisnolocaldata.Inthatcase,theoperatorinsertsboundarypanesintotheAPLwithcompletenesscountsofone.Loadsheddingoccursduringpanecreation.Asrecordsarrive,theoperatormaintainsanestimateofwhenthepanewillcomplete.Theoperatorperiodicallyupdatesthisestimate,maintainedasanExponentiallyWeightedMovingAverage(EWMA)biasedtowardsrecentobservations(α=0.8),anddetermineswhethertheuser’slatencydeadlinewillbemet.Foraccuracy,theoperatorprocesses30%ofthepanebeforethefirstestimateupdate.Forresponsiveness,theoperatorperiodicallyupdatesandcheckstheestimate(everytwoseconds).Foreachskippedpanetheoperatorissuesaboundarypanewithanincompletenesscountofone.TheAPLmergeslocallyproducedpaneswithpanesfromotherreduceoperatorsintheaggregationtree.Thereduceoperatorcallstheuser’scombinerforanygroupwithnewkeysinthepane’shashmap.TheoperatorperiodicallyinspectstheAPLtodeterminewhetheritshouldevictapane(basedonthepoliciesinSection3.2.3).Reduceoperatorsoninternalorleafnodesforwardthepanedownstreamoneviction.Iftheoperatorisatthetree’sroot,ithastheadditionalresponsibilityofdeterminingwhentoevicttheentirewindow.Theoperatorchecksevictionpoliciesonperiodictimeouts(theuser’slatencyrequirement)orwhenanewpanearrives(possiblymeetingthefidelitybound).Atthatpoint,theoperatormayproducethefinalresulteitherbyusingtheoptionaluncombinefunctionorbysimplycombiningtheconstituentpanes(strategiesdiscussedinSection3.1.4).Afterthiscombiningstep,theoperatorcallstheuser-definedreducefunctionforeachkeyinthewindow’shashmap. 514.1.3LoadcancellationandsheddingWhentherootevictsincompletewindows,nodesintheaggregationtreemaystillbeprocessingpanesforthatwindow.Thismaybeduetopaneswithinordinateamountsofdataorserversthatareheavilyloaded(havelittletimeforlogprocessing).Thustheyarecomputingandmergingpanesthat,oncetheyarriveattheroot,willnolongerbeused.Thissectiondiscussesmechanismsthatcancelorshedtheworkofcreatingandmergingpanesintheaggregationtree.NotethatiMRassumesthatmechanismsalreadyexisttoapportionserverresourcesbetweentheserver’snormaldutiesandiMRjobs.Forinstance,iMRmayruninaseparatevirtualmachine,lettingtheVMschedulerallocateresourcesbetweenlogprocessingandVMsrunningsiteservices.HereourgoalistoensurethatiMRnodesusetheresourcestheyaregiveneffectively.iMR’sloadcancellationpoliciestrytoensurethatinternalnodesdonotwastecyclescreatingormergingpanesthatwillneverbeused.WhentherootevictsawindowbecauseithasmettheminimumC2fidelityrequirement,thereisalmostsurelyoutstandingworkinthenetwork.Thus,oncetherootdeterminesthatitwillnolongeruseapane,itrelaysthatpane’sindexdowntheaggregationtree.Thisinformstheothernodesthattheymaysafelystopprocessing(creat-ing/merging)thepane.Incontrast,iMR’sloadsheddingstrategyworkstopreventwastedeffortwhenindividualnodesareheavilyloaded.Herenodesobservetheirlocalprocessingratesforcreatingapanefromlocallogrecords.Iftheexpectedtimetocompletionexceedstheuser’slatencybound,itwillcancelprocessingforthatpane.Itwillthenestimatethenextprocessingdeadlinethatitcanmeetandskiptheinterveningpanes(andsendboundarypanesintheirplace).Internalnodesalsospendcycles(andmemory)mergingpanesfromchildrenintheaggregationtree.Hereinteriornodeseitherchoosetoproceedwithpanemergingor,intheeventthatitviolatestheuser’slatencybound,“fastforward”thepanetoitsimmediateparent.AsweshallseeinSection6.6,thesepoliciescanimproveresultfidelityinthepresenceofstragglernodes. 524.1.4PaneflowcontrolRecallthatthegoalofloadsheddinginiMRisnottouselessresources,buttousethegivenresourceseffectively.Givensomelargeinputdataatasource,loadsheddingchangestheworkdone,notitsprocessingrate.Thus,itisstillpossibleforsomenodestoproducepanesfasterthanothers,eitherbecausetheyhavelessdataperpaneormorecyclesavailable.Inthesecases,thelocalactivepanelist(APL)couldgrowinanunboundedfashion,consumingservermemoryandimpactingitsclient-facingservices.WecontroltheamountofmemoryusedbytheAPLbyemployingawindow-orientedflowcontrolscheme.Eachoperatormonitorsthememoryused(bytheJVMinourimplementation)andissuesapauseindicatorwhenitreachesauser-definedlimit.Theindicatorcontainsthelogicalindexoftheyoungestpaneintheoperator’sAPL.Internally,panecreationwaitsuntiltheindicatorisgreaterthanthecurrentindexortheindicatorisremoved.Pauseindicatorsarealsopropagatedtop-downintheaggregationtree,ensuringthatoperatorssendevictedpanesupwardonlywhentheindicatorisgreaterthantheevictedindicesoritisnotpresent.4.1.5MapReducewithgaprecoveryWhileloadsheddingandpaneevictionpoliciesimproveavailabilityduringprocessingandnetworkdelays,nodesmayfailcompletely,losingtheirdataandcurrentqueries.WhiletraditionalMapReducedesigns,suchasHadoop,canrestartmaporreducetasksonanynodeinthecluster,iMRdoesnotassumeasharedfilesystemthatcanreliablestoredata.Instead,iMRprovidesgaprecovery[16],meaningthatthesystemmaydroprecords(i.e.,panes)intheeventofnodefailures.Multi-treeaggregationMortaravoidsfailednetworkelementsandnodesbyroutingdataupmul-tipletrees.Nodesroutedataupasingletreeuntilthenodestopsreceivingheartbeatsfromitsparent.Ifaparentbecomesunreachable,itchoosesanothertree 53(i.e.,anotherparent)torouterecordsto.Forthiswork,weuseasingletree;thissimplifiesourimplementationoffailureevictionpoliciesbecauseinternalnodesknowthemaximumpossiblecompletenessofpanesarrivingfromtheirchildren.Mortaremploysnewroutingrulestoretainadegreeoffailureresilience.Ifaparentbecomesunreachable,thechildforwardsdatadirectlytotheroot.Thispolicyallowsdatatobypassfailednodesattheexpenseoffeweraggregationopportunities.Mortaralsodesignsitstreesbyclusteringnetworkcoordinates[33],andweusethesamemechanisminourexperiments.Weleavemoreadvancedroutingandtree-buildingschemesasfuturework.Operatorre-installiMRguaranteesthatqueries(operators)willbeinstalledandremovedonnodesinaneventuallyconsistentmanner.Mortarprovidesareconciliational-gorithmtoensurethatnodeseventuallyinstall(orun-install)queryoperators.Thus,whennodesrecoverfromafailure,theywillre-installtheircurrentsetofoperators.Whilewelosethedataintheoperator’sAPLatthetimeoffailure,weneedtore-startprocessingatanappropriatepointtoavoidduplicatedata.Todoso,operators,duringpanecreation,maintainasimpleon-diskwrite-aheadlogtoindicatethenextsafepointinthelogtobeginprocessingonre-start.Formanyqueriesthecostofwritingtothislogissmallrelativetopanecomputation,andwesimplypointtothenextpane.4.2EvaluationInthesection,weevaluatethescalabilityofiMRthroughmicrobenchmarksbutalsoitsabilitytodeliverhigh-fidelityresultsinatimelymannerunderfailuresorconstrainedcomputationalresources.WealsoassesstheusefulnessoftheC2metricinunderstandingincompletedataandtradingdatafidelityforresultlatency. 54141root122roots103roots4roots8642Milliontuplespersecond0051015202530WorkersFigure4.2:Dataprocessingthroughputasthenumberofworkersandrootsincreases.Whentherootofthequerybecomesthebottleneck,iMRscalesbypartitioningdataacrossmoreroots.4.2.1ScalabilityOneofthedesigngoalsofiMRistoscaletolargedatasets.Here,weevalu-atetheabilityofiMRtoscaleasweincreasethecomputationalresourcesavailable.AsdescribedinSection3.1,intheiMRin-networkarchitecture,machinesmayplayoneoftwodistinctroles:theyareeitherworkersthatprocessandsummarizerawdatafromthelocalsources,orrootsthatcombinesummariesfromchildnodes.Increasingthenumberofworkersshouldincreasedataprocessingthroughputuntilthepointthatrootsbecomeaperformancebottleneck.AswiththeMapReduceframework,iMRhandlesthisbottleneckbyincreasingthenumberofpartitions,thatis,thenumberofroots.Therefore,wemustverifytheabilityofiMRtoscalebyaddingmoreworkersandroots.Forthisexperiment,weevaluatediMRona40nodeclusterofHPDL380G6servers,eachwithtwoIntelE5520CPUs(2.27GHz),24GBofmemory,and16HP507750-B21500GB7,200RPM2.5SATAdrives.EachserverhastwoHPP410drivecontrollers,aswellasaMyricom10Gbpsnetworkinterface.Thenetworkinterconnectweuseisa52-portCiscoNexus5020datacenterswitch.TheserversrunLinux2.6.35.OurimplementationofiMRiswritteninJava. 55Inthisexperiment,weimplementawordcountquerythatrunsonsyntheticinputdata,asetofrandomlygeneratednumbers.Inourquery,themapfunctionimplementstheidentityfunction,whilethereducerimplementsacount.Thequeryspecifiesatumblingwindow,wheretherangeis150millionrecords.Thiswindowrangecorrespondstoprocessingapproximately1GBofinputdatapernode.Weallowthequerytorunforfiveminutesandcomputetheaveragethroughputacrossallwindowscomputed.InFigure4.2,weplotthroughput,thetotalnumberofrecordsprocessedpersecond,asweincreasecomputationalresources,thatis,workersandroots.Morespecifically,onthex-axisweincreasethenumberofworkersandeachlinecorrespondstoanincreasingnumberofroots.Weobservethataslongastherootisnotaperformancebottleneck,addingmoreworkersincreasesthroughputlinearly.Noticethatasinglerootcanhandletheincomingdatasentbyupto10workers.Atthispoint,bydoublingthenumberofroots,wecanalsodoublethethroughput.Giventheavailableresourcesinourexperimentalcluster,wewereabletouseupto30workersandweobservethatthreerootsareenoughtohandlethisload.4.2.2LoadsheddingiMRemploystechniquesthatshedprocessingloadwhennodesdonothavesufficientresourcesduetootherservicesandtheanalysishastodeliverresultsbeforeadeadline.Thesetechniquesaredesignedtomaximizeresultfidelityundergiventimeconstraints.Here,weevaluatetheeffectivenessofthesetechniquesinprovidinghigh-fidelityresultsunderlimitedCPUresults.Morespecifically,weverifythatasinglenodecanaccuratelyestimatetherightamountofdatatoshedundervaryingCPUload.Forthisandtheremainingexperimentsweuseda30-nodeclusterwithDualIntelXeon2.4GHzCPUs.Nodeshave4GBofRAMandareconnectedonagigabitEthernet.Inthisexperiment,weexecuteawordcountquery,wherethemapfunctionistheidentityfunctionandthereducefunctionimplementsacount.Thequery 56120100Baseline10080BaselineTimeoutTimeout80Shedding60Shedding6040Fidelity(%)40Latency(min)202000020406080100020406080100Load(%)Load(%)(a)Fidelity(b)LatencyFigure4.3:Impactofloadsheddingonfidelityandlatencyforawordcountjobundermaximumlatencyrequirementandvaryingworkerload.specifiesatumblingwindowwith20millionrecordsandweconfigureiMRtouse20panesperwindow.Weinstallthequeryonasingleworkerthatdeliversresulttoasingleroot.TolimittheCPUavailabletoiMR,weusetheLinuxcpulimittool[2]ontheworker.Weexecutethequeryuntilitdelivers10resultsandreporttheaverageresultlatencyandfidelityaswevarytheCPUavailabletoiMR.InFigure4.3(a)weshowthefidelityofthedeliveredresultsastheCPUloadincreases,whileFigure4.3(b)showsthelatencyofthedeliveredresults.Thebaselinecaserepresentsaquerywithnolatencyrequirementsthatalwaysdeliversresultswith100%fidelity.Asexpected,theresultlatencyofthebaselinecasegrowshyperbolically2astheloadincreases.Next,wesetthelatencyrequirementofthequerytotheobservedbaselinewindowlatency,whichis160seconds.Basedonthistimeout,theidealmaximumfractionofrawdatathatcannotbeprocessedwithinthelatencyrequirementisequaltotheCPUloadpercentagewise.Theeffectivenessofourloadsheddingtechniqueisdeterminedbyhowmuchfidelityapproachesthisidealmaximum.Forcomparison,thetimeoutlinerepresentsaquerythatdoesnotemployanyshedding.Thenodekeepsprocessingdatauntilthetimeoutpasses,inwhich2LatencyasafunctionoftheCPUloadxisofthetypeL(x)=c1−x 574TimeoutFailureeviction32Panes/min10020406080100Failedworkers(%)Figure4.4:Applicationgoodputasthepercentageoffailedworkersincreases.Failureevictiondeliverspanesearlier,improvinggoodputbyupto64%.casetherootevictsanydatadelivereduptothatpoint.Weobservethatalthoughresultsmeetthelatencyrequirement,fidelitydropsquicklyastheloadincreases.Withoutloadshedding,thenodeattemptstoprocessallrawdataastheybecomeavailable.However,onlythefirstfewpanescanbedeliveredwithinthedeadlineandprocessingsubsequentpanesisuseless.Instead,byenablingloadsheddingworkersuseavailableCPUintelligently.Theyprocessonlypanesthatcanbedeliveredwithinthedeadline.Thisimprovesresultfidelityonaverageby242%.Additionally,resultfidelityisonaveragewithin10%oftheidealfidelity.Noticethatthehighertheloadis,thegreaterthediver-gencefromtheidealis.SheddingdataincurssomeCPUoverheadandincreasingtheCPUloadresultsinsheddingmoredata.Therefore,iMRspendsmoreCPUcyclesinsheddingratherthanusefulprocessing,causingthisdivergencefromtheidealfidelity.4.2.3FailureevictionApartfrommaximizingresultfidelitythroughloadshedding,iMRisde-signedtominimizeresultlatency.Throughthefailureevictionmechanism,iMRdetectsopportunitiestodeliverresultsearlywhenfidelitycannotbefurtherim-provedbywaitingforoverloadedorfailednodes.Here,weevaluatetheabilityofthefailureevictionmechanismtoimproveresultlatencywhennodesfail. 58Inthisexperiment,weexecuteawordcountquerywithawindowof2millionrecordsand2panesperwindow.Wesetthequerylatencyrequirementto30seconds.Weexecutethequeryon10workersandemulatetransientfailuresbystoppinganincreasingnumberofworkersfor4minutesandthenresumingthem.Werunthequeryuntilitdelivers20results.Figure4.4plotstheapplicationgoodput,thenumberofpanesdeliveredtotheuserpertime.Notethatthismetricdoesnotmeasurehowfastworkerscanprocessrawdata.Insteaditreflectstheabilityofthesystemtodetectfailuresanddeliverpanestotheuserearly.Thehigherthemetric,thelesstheuserwaitstogetthesamenumberofpanes.Withoutfailureeviction,therootwaitsuntilthe30-secondtimeoutbeforeitdeliversincompleteresults,evenifalllivenodeshavedeliveredtheirdataandfidelitycannotimprove.Withfailureevictionenabled,therootdetectsfailedworkersanddeliversresultsbeforethetimeout,improvinggoodputby57-64%.4.2.4UsingC2Inthisexperiment,weshowhowusingtheC2metricuserscantraderesultfidelityforlatency.Wedisplayhowdependingontheapplicationrequirementsanddatadistributions,usersmayappropriatelysettheC2specification.Inpar-ticular,weexploretheuseofthreegeneralclassesofC2specifications:temporalcompleteness,spatialcompleteness,andminimumvolumewithrandompanese-lection.WealsoshowhowchoosingtherightC2specificationallowsuserstomakemoreusefulconclusionsforincompletedatathanwithcoarsermetrics,likesimpleprogress.Weperformexperimentswiththreedifferentapplicationscenarios:awordcountwithvaryingworddistributions,click-streamanalysis,andanHDFSanomalydetector.WordcountHere,weexecuteawordcountqueryonsyntheticdata.Wevarythetotalpercentageofdataincludedintheresultandmeasurehowtheaccuracyoftheresultchanges.WerepeatthisforthethreedifferentclassesofC2specifications. 59100100100%time,X%space100%time,X%spaceX%time,100%space80X%time,100%space80RandomX%RandomX%606040402020Relativecounterror(%)00020406080100Relativefrequencyerror(%)020406080100Datavolume(%)Datavolume(%)(a)Counterror(b)Frequencyerror.1601401201008060Latency(sec)40100%time,X%spaceX%time,100%space20RandomX%0020406080100Datavolume(%)(c)Resultlatency.Figure4.5:Theperformanceofacountstatisticondatauniformlydistributedacrossthelogserverpool.Therelativecounterrordropslinearlyasweincludemoredata.Becauseoftheuniformdatadistribution,boththecountandthefrequencydonotdependontheC2specification.Wereporttheaccuracyoftwoapplicationmetrics,thecountofthewordsandtherelativefrequency.Forbothmetricswereporttherelativeerror3.Additionally,wereporttheresultlatency.Thequeryspecifiesatumblingwindowwithasize-basedrangeof100MB.Eachwindowconsistsof10panes.Inthisexperiment,thereisnolatencybound.Thequeryisexecutedon10workers.Thedatasetistextconsistingofrandomwordschosenfromapoolof100Kwords.Wedistributethewordsacrossthe10workersandexperimentwith3TherelativeerrorofameasurementXwithrespecttoanidealvalueYis100Y−X%.Y 60200100100%time,X%space180100%time,X%spaceX%time,100%space160X%time,100%space80RandomX%RandomX%14060120100408060204020Relativecounterror(%)00020406080100Relativefrequencyerror(%)020406080100Datavolume(%)Datavolume(%)(a)Counterror(b)Frequencyerror.1401201008060Latency(sec)40100%time,X%spaceX%time,100%space20RandomX%0020406080100Datavolume(%)(c)Resultlatency.Figure4.6:Theperformanceofacountstatisticondataskewedacrossthelogserverpool.Becauseofthespatialskew,enforcingeitherrandompaneselectionorspatialcompletenessallowsthesystemtobetterapproximatecountfrequenciesthantemporalcompleteness,andlowerresultlatency.differentdatadistributions,todisplayhowtheC2specificationimpactsourabilitytounderstandtheresultsindifferentscenarios.Wechangethedistributionofthewordfrequencybutalsothedistributionofwordsacrossdifferentnodes.Whilethisisasimpletextprocessingapplication,theresultsarerelevanttoanyapplicationthatcounts,insteadofwords,eventswithsimilardistributions.First,wecreateadatasetwherewordfrequencyisuniformlydistributedandalsowordsarespatiallydistributeduniformlyacrosstheworkers.Figures4.5(a)and4.5(b)plottherelativecountandfrequencyerrorsrespectively,whileFig-ure4.5(c)plotstheresultlatencyasthedatavolumeincludedintheresultin- 61creases.Theerrorreportedistheaverageacrossallwordsintheresult.Theverticalbarsareequaltoplusorminusonestandarddeviation.Asexpected,therelativecounterrordecreaseslinearlyasmoredataareincludeintheresult.Becauseoftheuniformspatialdistribution,allC2specifica-tionsresultinthesameerror.Duetotheuniformwordfrequencydistribution,thestandarddeviationoftheerroracrossallwordsisverysmall,sincetheaccuracyofeverywordcountisequallydegraded.Furthermore,duetotheuniformdistribution,therelativefrequencyerrorislowandalsoexhibitsasmallstandarddeviation.Thisisbecausetheabsolutecountofeverywordisreducedbythesamepercentageasthetotalvolumeofthedata.InFigure4.5(c),wenoticethatrequiringtemporallycompleteresultsimpliesthatwehavetowaituntilatleastoneoftheworkersprocessesalldatainawindow,thus,increasingresultlatency.Incontrast,byspecifyingspatiallycompletenessorrandompanedrops,wecanreducetheresultlatency.Therefore,whendataareuniformlydistributed,thisC2specificationispreferredsinceitreducesresultlatencyandachievesresultfidelityequaltotheotherspecifications.Next,wechangethespatialdistributionofthedata.Weskewthedistri-butionsothatsomewordsaremorelikelytoexistonsomeworkersthanothers.AsFigure4.6(a)shows,whiletheaveragerelativecounterrorissimilartothepreviouscase,noticethatthestandarddeviationismuchhigherhere,especiallyfortheC2specificationthatrequirestemporallycompleteresults.Inthiscase,windowsthatarenottemporallycompletearedroppedintheirentirety,anddatafromthecorrespondingworkersarecompletelylost.Therefore,wordsthatarelocatedonthosenodesexhibitahigherrelativecounterror,whichimpactsthestandarddeviation.Theeffectofthespatialskewismoreobviousintherelativefrequencyer-ror,showninFigure4.6(b).Byremovingentirewindowsfromspecificworkers,wereducethecountofthewordsthatexistonthoseworkersmorethanthecountofthewordsontherestoftheworkerspercentagewise.Thisaddstotherelativefrequencyerrorofthesewords.Instead,withspatiallycompletewindowsorran-domlydroppedpanes,thesystemsampleskeysfromtheentireserverpoolandthe 62relativefrequencyerrorremainslowasinthepreviouscase.WeobservethatdifferentclassesofC2specifications,whichreturnthesamevolumeofdata,mayreturnqualitativelydifferentresults.TheC2metricexposesthespatio-temporalcharacteristicsofdata,allowinguserstoeitherunderstandtheeffectofdatalossontheiranalysis,orsettheC2specificationbasedonknowledgeaboutthedatadistribution.Click-streamanalysis100100100%time,X%space80X%time,100%space80RandomX%60604040100%time,X%spaceRelativeerror(%)20%userIDsfound20X%time,100%spaceRandomX%00020406080100020406080100Datavolume(%)Datavolume(%)(a)AverageerrorperuserID(b)PercentageuserID’sfound.141210864100%time,X%spaceLatency(sec)X%time,100%space2RandomX%0020406080100Datavolume(%)(c)Resultlatency.Figure4.7:EstimatingusersessioncountusingiMRanddifferentC2policies.Wepreservetheoriginaldatadistribution,whereclicksfromthesameusermayexistondifferentservers.RandompaneselectionandtemporalcompletenessprovidehigherdatafidelityandsamplemoreuserIDsthanwhenenforcingspatialcompleteness. 63100100100%time,X%space80X%time,100%space80RandomX%60604040100%time,X%spaceRelativeerror(%)20%userIDsfound20X%time,100%spaceRandomX%00020406080100020406080100Datavolume(%)Datavolume(%)(a)AverageerrorperuserID(b)PercentageuserID’sfound.141210864100%time,X%spaceLatency(sec)X%time,100%space2RandomX%0020406080100Datavolume(%)(c)Resultlatency.Figure4.8:EstimatingusersessioncountusingiMRanddifferentC2policies.Herewedistributedatasothatclicksfromthesameuserexistonasingleserver.Temporalcompletenessreturnssessionsthatareaccurate,butsamplesthesmallestpercentageofuserIDs.Instead,randomsamplingcansamplealargerspaceofuserIDs.Here,weimplementacommonclick-streamanalysisapplication.Inpar-ticular,wedevelopaquerythattakesasinputwebserverlogsthatcontainuserclicksandcomputesusersessions.Clicksaretheresultofusersbrowsingonasiteandareusuallydefinedby(i)auserID,theidentityoftheuserbrowsing,(ii)thepagethattheyclicked,and(iii)atimestampthatdenotesthetimeoftheclick.Althoughthesearethemostessentialinformation,clicklogsmaycontainotherusefulinformationaswell.Ausersessionisaperiodduringwhichauserisusingasiteandclick 64sessionizationisthemethodoffindingdistinctusersessionsbyanalyzingclicks.Suchananalysisisusefultounderstanduserbehavior,forinstance,howmuchtimeusersspendonasiteforeveryvisit.Acommonmethodtosessionizeclicksistocomparethetimestampsofsubsequentclicksandgrouptheminthesamesessionsiftheyoccurredwithingamaximumamountoftime.WeimplementedaniMRqueryinwhichthemapfunctionextractsclicksfromthelogsandthereducefunctionperformsthesessionizationbygroupingclicksasdescribedabove.ThecorrespondingMapReducequeryisalsodescribedin[38].Inthisexperiment,thereducefunction,summarizesthecalculatedsessionsbyreportingthenumberofsessionsfoundperuser.Aswiththepreviousexperiment,wewanttoevaluatehowthedifferentC2specificationsaffecttheaccuracyandthelatencyoftheanalysisforvaryingamountsofdataloss.Here,wemeasureaccuracyastherelativeerroronthecomputedsessioncount.Thedatasetconsistsof24hoursofseverlogsfromthe1998WorldCupwebsite[1].Theserepresentlogsfrom32webserversthatcomprisethesiteinfrastructure,andhaveasizeof4.5GBintotal.Thequerywindowissetto2hours,andwesetthepanesizeto6minutes,allowingfor20panesperwindow.Werunthequeryfortheentiredatasets,whichcorrespondsto12windows.Aswiththepreviousexperiment,wealsowanttoexplorehowthedatadistributionaffectstheaccuracyoftheanalysisforthedifferentC2specifications.Herethemaincharacteristicofinterestisthedistributionofauser’sclickacrossservers.Clicksfromthesameusermayeitherbedistributedacrossservers,orexistonasingleserver.Initially,weretaintheoriginaldatadistribution,whereauser’sclicksexistacrossdifferentservers.InFigure4.7(a),weplottherelativeerrorinthesessioncountastheper-centageofdataincludedintheanalysischanges.Wecomputetheaverageerroracrossallusersinaresult.Notethatthisaveragedoesnottakeintoaccountusersforwhichdataarecompletelymissingfromtheresult.However,toquantifythemissingusers,inFigure4.7(b)weplotthepercentageofuserIDsdiscoveredinthedata.ThisgraphshowstheabilityofthesystemtosampleawiderangeoftheuserspaceunderthedifferentC2specifications. 65WeseeinFigure4.7(a)thatalthoughallthreespecificationsprovidearel-ativelylowrelativeerror,temporalcompletenessandrandomsamplingprovidealowererrorthanspatialcompleteness.Inparticular,temporalcompletenessreducestherelativeerrorbyapproximately50%withrespecttospatialcomplete-ness.Becauseofthenon-uniformdistributionoftheclicksacrosstime,byrequiringspatiallycompletepanes,wemaymisspaneswithlargeramountsofclicks.Fig-ure4.7(b)showsthatspatiallycompleteresultsarenoteffectiveinsamplingalargenumberofkeys.Insteadrandomsamplingandtemporallycompleteresultscanreturnabetterrepresentationoftheuserspace.InFigure4.7(c)weplotthecorrespondinglatency.Whilerandomsamplingandspatialcompletenessallowtheanalysistofinishearlier,asexpected,duetothesmalldatasizethedifferencesinlatencyarenotsignificant.Next,weexperimentwithadifferentdatadistribution,whereclicksfromasingleuserexistonasingleserver.Figure4.8(a)showsthattemporalcompletenessreturnssessioncountsthatareperfectlyaccurate.Sinceauser’sclicksarelocaltoaserver,byretrievingalldatafromaserverweareabletoaccuratelyreconstructtheuser’ssessions.Instead,spatialcompletenessandrandomsamplingincurerrorssimilartothepreviousexperiment,asexpected,sinceonlythedistributionacrossspacehaschanged.However,inFigure4.8(b)weseethattemporalcompletenesssampleslessuserIDs.Asthegraphshows,wemayimprovethepercentageofuserIDdiscoveredby30-50%,simplybyrequiringrandomsampling.WeseethatC2providesaflexiblewayforapplicationstoreducefidelitydependingondatadistributionandtheobjective.Inthisscenario,wemaychoosetotradethefidelityoftheresultscomputedperuser,forawidersampleoftheuserspace.HDFSloganalysisHere,weimplementaquerythatanalyzeslogsfromtheHadoopDistributedFileSystem(HDFS)[8]service,todetectanalysis.IntheHDFSsystem,multiplefileservers,acceptrequestsfromclientstostoreblocksofafilelocally.Amongotherevents,HDFSserverslogthetimeittakestoserveclientrequeststowrite 661.6500RandomX%1.4X%time,100%space4001.2100%time,X%space13000.80.6200K−Sstatistic0.4Latency(sec)100%time,X%space100RandomX%0.2X%time,100%space00020406080100020406080100Datavolume(%)Datavolume(%)(a)KS-test(b)LatencyFigure4.9:(a)ResultsfromtheKolmogorov-SmirnovtestillustratetheimpactofreduceddatafidelityonthehistogramsreportedforeachHDFSserver.(b)ForHDFSanomalydetection,randomandspatialcompletenessC2improvelatencybyatleast30%.blockslocally.Bycomputingthedistributionofthesetimesonaperserverbasis,wecancomparepairsofserversanddetectanomalieswherethedistributionsdiffersubstantially[73].WeimplementedaniMRquerywithamapfunctionthatfiltersserverlogstofindallentriesthatsignifythebeginningorendofablockwriteoperation.Thereducefunctionmatchesthebeginningandendofablockwriteoperationforeachuniqueblock,calculatesthedurationofsuchanoperation,andmaintainsahis-togramoftheblockwriteservicetimesforeveryserver.Theresultinghistogramsrepresentthedistributionoftheservicetimes,andarecomparedaccordingto[73],todetectanomalies.Similarlytothepreviousexperiments,wemeasurehowdatalossaffectstheaccuracyandthelatencyoftheanalysisfordifferentC2specifications.Inthisexperiment,datalossaffectsthecalculatedservicetimedistributionofaserverand,essentially,theabilityoftheapplicationtodetectanomalies,resultingeitherinmissinganomaliesorinfalsealerts.Wemeasuretheaccuracyoftheanalysisbycomparingtheobserveddistributionforeveryserverwiththeidealone,that 67is,thedistributionwhenthereisnodataloss.WecomparedistributionsusingtheKolmogorov-Smirnov(KS)statisticaltest[10].TheKS-testdetermineswhethertheobserveddistributionisdifferentfromtheidealone.Thedatasetinthisexperimentconsistsofa48-hourHDFSlogtracegener-atedbyrunningtheGridMixHadoopworkloadgenerator[7]ona30-nodecluster.Eachservergeneratedapproximately2.5GBofdata,resultinginatotalof75GB.TheiMRqueryspecifiesawindowof60minuteswith20panesperwindow.InFigure4.9(a),weshowthefractionoftheobservedhistogramsthataredifferentthantheidealones.Weseethatsincedatadistributionsarecomputedonaperserverbasis,temporalcompletenessreturnsperfectlyaccuratedata.Eventhoughdataformsomenodesmaybecompletelylost,therestofthenodesreportalltheirdata,resultinginaccuratedistributions.NoticethattheotherC2specifi-cationsrequiremorethan80%ofthedatatobeprocessedinordertoreturnresultswithgoodquality.However,thesespecificationscanreducetheresultlatencybyapproximately30%atthatdatavolume.Similarlytothepreviousapplications,centraltothechoiceoftherightC2specificationispriorknowledgeaboutthespatialpropertiesofthedata,inthiscase,thattimedistributionsarecalculatedonaperserverbasis.TheC2metricallowsuserstoleveragesuchknowledgewhenchoosingtherightC2specification.Atthesametime,usershavethechoicetotradeanalysisaccuracyforreducedlatency.4.2.5In-situperformanceOneofthemaindesignprinciplesofiMRistheco-locationofthedataanalysiswiththeservicesgeneratingthedata.ThisimpliesthatCPUresourcesmaybelimitedfordataanalysis,sinceservicesaretypicallyallocatedthemajorityoftheresources.Here,wewanttoverifythefeasibilityofrunninganalyticsin-situ.Specifically,wewillevaluate(i)theabilityofiMRtodeliverusefulresultswhilerunningside-by-sidewitharealservice,andundertimeconstraintsfortheanalysis,and(ii)theimpactontheco-locatedservice.Inparticular,weruniMRside-by-sidewithaHadoopinstallationona10- 68100100Shedding80Noshedding90608040Fidelity(%)70200Relativeperformance(%)6002468100246810nicenessniceness(a)Fidelity(b)HadoopperformanceFigure4.10:FidelityandHadoopperformanceasafunctionoftheiMRprocessniceness.Thehighertheniceness,thelessCPUisallocatedtoiMR.Hadoopisalwaysgiventhehighestpriority,nice=0.nodecluster.WesubmittoHadoopaworkloadthatconsistsofavarietyofHadoopjobs,generatedbytheGridMixworkloadgenerator[7].Hadoopisconfiguredtouseallthenodesinourcomputecluster.Atthesametime,iMRexecutesawordcountqueryonthesyntheticdatasetusedinSection4.2.4.Thequeryspecifiesawindowwith2millionrecords,20panesperwindow,anda60-secondtimeout.Inthisexperiment,wevarytheCPUallocatedtoiMRandmeasurethequalityofthedeliveredresultsunderthegiventimeconstraintof60seconds.AslessCPUisallocatedtoiMR,weexpectfidelitytodropsinceiMRwillnotbeabletoprocessanentirewindowwithinthetimeconstraint.WevarytheCPUallocatedbychangingtheiMRprocessniceness,thepriorityassignedbythekernelscheduler,andreportthefidelityofthereturnedresults.Here,fidelityisequaltothevolumeofdataprocessed.Additionally,wereporttherelativechangeintheHadoopperformance,intermsofjobscompletedpertime,astheiMRCPUallocationvaries.ThismetricmeasurestheimpactofrunningiMRin-situonHadoop.Figure4.10(a)showsthatwithoutloadsheddingiMRreturnspoorresultsandfidelitydropsquicklyasiMRisallocatedlessCPU.Inthiscase,iMRspends 69mosttimetryingtoprocessdatathatwillneverbedeliveredbeforethetimeconstraint.Instead,theloadsheddingmechanismisabletouseavailableresourcesintelligently,improvingresultfidelitybyafactorofmorethan5.6×mostofthetime.NoticethanwhentheiMRnicenessissettogreaterthan9,CPUresourcesarenotsufficienttoprocessdatawithinthe60-secondtimeout,andfidelitydropssignificantly.Figure4.10(b)showstherelativechangeperformanceforHadoopastheCPUallocatedtoiMRvaries.WemeasureHadoopperformanceasthejobcom-pletionthroughput.Forreference,wheniMRandHadoopareassignedthesameprioritybythescheduler(niceness=0),thecostinHadoopperformanceisa17%-decreaseinjobthroughput.WhentheiMRnicenessissetto8,atwhichpointiMRcanstilldelivergoodqualityresults,thecostinHadoopperformanceislessthana10%-decreaseinjobthroughput.WeseethatiMRisabletodeliverusefulresultsevenwhenassignedonlyasmallfractionoftheCPU.AtthesametimeiMRincurslittleimpactontheco-locatedHadoopsystem,makingin-situprocessingapracticalapproach.4.3AcknowledgmentsChapter4,inpart,isreprintofthematerialpublishedintheProceedingsoftheUSENIXAnnualTechnicalConference2011.Logothetis,Dionysios;Trezzo,Chris;Webb,KevinC.;Yocum;Ken.Thedissertationauthorwastheprimaryinvestigatorandauthorofthispaper. Chapter5StatefulbulkprocessingInthepreviouschapterswepresentedasystemformanagingdataduringtheextractionfromtheirsourcesandearlyanalysis.Aftertheextraction,dataarestoredforfollow-on,richeranalysis.Analyticsinthisphasedonotsimplyfilterorsummarizedata.Instead,theymayperformmorecomplexcomputations,likeiterativegraphminingandmachinelearningalgorithms,oftenconsistingoflarge,multi-stepdataflows.Atthesametime,emphasisisgivenonextractinginformationfromalargebodyofdata,ratherthanobtainingquickinsightsoverthemostrecentdata.Intheseanalytics,stateremainsakeyrequirementforefficientprocessing.Forinstance,manyanalyticsmustoftenincorporatelargebatchesofnewlycol-lecteddatainanefficientmanner,andusestatetoavoidrecomputation.ThischapterpresentsCBP,acomplementaryarchitectureforstatefulanalyticsonbulkdata.CBPprovidesaprogrammingmodelandruntimesystemforsophisticatedstatefulanalytics.Here,wedescribethebasicconstructsofthemodelandshowhowitallowsuserstoprogramsophisticatedstatefulanalytics.WeillustratehowCBPcanbeusedtoprogram(i)incrementalanalyticsthatmustabsorblargebatchesofcontinuouslyarrivingdata,and(ii)iterativeanalytics.AcorecomponentofCBPisaflexible,statefulgroupwiseoperator,trans-late,thatcleanlyintegratesstateintodata-parallelprocessing.Thisbasicop-erationallowsuserstowritestatefulanalyticsandaffordsseveralfundamentalopportunitiesforminimizingdatamovementintheunderlyingprocessingsystem.70 71Additionally,CBPallowsuserstocomposesophisticateddataflowsusingthetranslateoperatorasthebuildingblock,andintroducesprimitivesfordataflowmanagement.Continuousdataflowsrequirecontrolfordeterminingstageexecutionandinputdataconsumption.TheCBPallowsdataflowcontrolthroughsimpleyetflexibleprimitives.Thesefeaturessimplifytheconstructionofincrementalanditerativeprogramsforlarge,evolvingdatasets.5.1Abasictranslateoperator(a)Basicgroupwiseprocessing.(b)Groupwiseprocessingwithaccesstostate.(c)Groupinginputwithstaterecords.Figure5.1:Theprogressionfromastatelessgroupwiseprocessingprimitivetostatefultranslation,T(·),withmultipleinputs/outputs,groupedstate,andinnergroupings.Webeginbystudyingtheincrementalcrawlqueue(Figure1.3.1)dataflowinmoredetail,whereeachstageisaseparatetranslationoperator.Weillustratetranslatewithasimplifiedversionofthecountin-linksstage,calledURLCount,thatonlymaintainsthefrequencyofobservedURLs.Thisstatefulprocessingstage 72hasasingleinputthatcontainsURLsextractedfromasetofcrawledwebpages.TheoutputisthesetofURLsandcountsthatchangedwiththelastsetofinputrecords.Forillustration,Figure5.1presentsaprogressionfromastatelessgroup-wiseprimitive,suchasreduce,toourproposedtranslateoperator,T(·),whichwilleventuallyimplementURLCount.Figure5.1(a)showsasingleprocessingstagethatinvokesauser-definedtranslatefunction,T(·).Tospecifythegroupingkeys,userswriteaRouteByhrifunctionthatextractsthegroupingkeyfromeachinputrecordr.InthecaseofURLCount,RouteByextractstheURLasthegroupingkey.Whenthegroupwiseoperatorexecutes,thesystemreadsinputrecords,callsRouteBy,groupsbythekeyk,partitionsinputdata(weillustrateasingleparti-tion),andrunsoperatorreplicasinparallelforeachpartition.EachreplicathencallsT(·)foreachgroupingkeykwiththeassociatedrecords,r[].Wecalleachparallelexecutionofanoperatoranepoch.TomaintainafrequencycountofobservedURLs,theURLCounttranslatorneedsaccesstostatethatpersistsacrossepochs.Figure5.1(b)addsalogicalstatemodulefromwhichatranslatefunctionmayreadorwritevaluesforthecurrentgroupingkey.Inourcase,URLCountstorescountsofpreviouslyseenURLs,maintainingstaterecordsofthetype{url,count}.However,asthenextfigureshows,translateincorporatesstateintothegroupingoperationitselfandthesemanticsofreadingandwritingtothisstatemodulearedifferentthanusinganexternaltable-basedstore.Figure5.1(c)showsthefull-featuredtranslationfunction:T:hk,Fin,Fin,...,Fini,withmultiplelogicalinputandoutputflowsandgroupedS1nstate.Asthefigureshows,wefounditusefultomodelstateusingexplicit,loopbackflowsfromastageoutputtoastageinput.Thisallowstranslatetoprocessstaterecordslikeanyotherinput,andavoidscustomusercodeformanagingaccesstoanexternalstore.Italsomakesitsimpleforthesystemtoidentifyandoptimizeflowsthatcarrystaterecords.ForsimplestatefultranslatorslikeURLCountoneloopbacksuffices,FouttoFin.SSFigure5.2showspseudocodeforourURLCounttranslatefunctioncalled 73URLCountT(url,Fstatein[],Fin[])urls1newcnt←Fin.size()urls2ifFstatein[0]6=nullthen3newcnt←newcnt+Fstatein[0].cnt4Fstateout.write({url,newcnt})5Fout.write({url,newcnt})updatesFigure5.2:TranslatorpseudocodethatcountsobservedURLs.Thetranslatorreadsandupdatesthesavedcount.withinthisstage.Withmultiplelogicalinputs,itistrivialtoseparatestatefromnewlyarrivedrecords.Itcountsthenumberofinputrecordsgroupedwiththegivenurl,andwritestheupdatedcountstostateandanoutputflowfordown-streamstages.AtranslationstagemustexplicitlywriteeachstaterecordpresentinFintoFouttoretainthemforthenextprocessingepoch.ThusatranslatorSScandiscardstaterecordsbynotpropagatingthemtotheoutputflow.Notethatwritesarenotvisibleintheirgroupsuntilthefollowingepoch.WecanoptimizetheURLCounttranslatorbyrecognizingthatFinmayup-urlsdateonlyafractionofthestoredURLcountseachepoch.Currentbulk-processingprimitivesprovide“fullouter”groupings,callingthegroupwisefunctionforallfoundgroupingkeys.HereURLCounttakesadvantageoftranslation’sabilitytoalsoperform“inner”groupingsbetweenstateandotherinputs.Theseinnergroup-ingsonlycalltranslateforstaterecordsthathavematchingkeysfromotherinputs,allowingthesystemtoavoidexpensivescansoftheentirestateflow.However,toimproveperformancethisrequirestheunderlyingprocessingsystemtobeabletorandomlyreadrecordsefficiently(Section6.5.2).5.2ContinuousbulkprocessingWenowturnourattentiontocreatingmoresophisticatedtranslatorsthateitheriterateoveraninputor,inincrementalenvironments,continuouslyprocessnewlyarriveddata.AkeyquestionforCBPishowtomanagecontinuousdataarrivals.Forexample,anincrementalprogramtypicallyhasanexternalprocess 74(URL,time)FrameBy=hourFinininstateFAFBRouteBy=URLSetDiff_T(url,S[],A[],B[]){ifS.hasNext()then{S.write(url);//propagatestate}else{S.write(url);//writenewstateif(A.hasNext()&&!B.hasNext())Adiff.write(url);if(B.hasNext()&&!A.hasNext())Bdiff.write(url);}}outoutoutFFFstateAdiffBdiffFigure5.3:AstageimplementingsymmetricsetdifferenceofURLsfromtwoinputcrawls,AandB.creatinginput.CBPsystemsmustdecidewhentoruneachstagebasedontherecordsaccumulatingontheinputflows.Insomecasestheymayactlikeexistingbulk-processingsystems,inwhichavertex(aDryadvertexoraMap-Reducejob)runswhenabatchofrecordsexistsoneachinput.Theymaybehaveinamannersimilartodatastreamprocessors[15],whichinvokeadataflowoperatorwhenanyinputhasasingletupleavailable.Ortheymaybehaveinsomehybridfashion.Duringeachprocessingepoch,thetranslator,T(·),readszeroormorerecordsfromeachinputflow,processesthem,andwriteszeroormorerecordstooutputflows.ThusaflowFisasequenceofrecordspassedbetweentwopro-cessingstagesovertime.Thesequenceofrecordsreadfromagiveninputflowiscalledaninputincrement,andaspecialinputframingproceduredeterminesthesizesoftheinputincrements.Thesequenceofrecordsoutputtoagivenflowduringoneepochformanoutputincrement.CBPcouplestheframingfunctionwithasecondfunction,runnability,whichgovernstheeligibilityofastagetorun(Section6.2)andalsocontrolsconsumptionofinputincrements.WeillustratetheseconceptsbyusingaCBPprogramtocomparetheoutputoftwoexperimentalwebcrawlers,AandB.Thestage,illustratedinFigure5.3,hasaninputfromeachcrawlerwhoserecordscontain(url,timestamp)pairs.Similarly,thereisanoutputfortheuniquepagesfoundbyeachcrawler.Thetranslator 75implementssymmetricsetdifference,andwewouldliketoreportthisdifferenceforeachhourspentcrawling.1First,thestageshouldprocessthesamehourofoutputfrombothcrawlersinanepoch.ACBPstagedefinesper-flowFrameByhrifunctionstohelpthesystemdeterminetheinputincrementmembership.Thefunctionassignsaframingkeytoeachrecord,allowingthesystemtoplaceconsecutiverecordswithidenticalframingkeysintothesameincrement.Anincrementisnoteligibletobereaduntilarecordwithadifferentkeyisencountered.2Here,FrameByreturnsthehouratwhichthecrawlerfoundtheURLastheframingkey.However,thestageisn’trunnableunlesswehaveanhour’sworthofcrawledURLsonbothFinandFin.Astage’srunnabilityfunctionhasaccesstothestatusABofitsinputflows,includingtheframingkeysofeachcompleteincrement.ThefunctionreturnsaBooleanvaluetoindicatewhetherthestageiseligibletorun,aswellasthesetofflowsfromwhichanincrementistobeconsumedandthesetfromwhichanincrementistoberemoved.Foroursymmetricsetdifferencestage,runnabilityreturnstrueiffbothinputflowscontaineligibleincrements.Ifbothinputflowincrementshavethesameframingkey,therunnabilityfunctionindicatesthatbothshouldberead.Ontheotherhand,iftheframingkeysdiffer,therunnabilityfunctionselectsonlytheonewiththesmallerkeytoberead.Thislogicpreventsalossofsynchronizationinthecasethatacrawlerproducesnodataforaparticularhour.Finally,thestage’stranslationfunction,SetDiffT,isreadytoprocessob-servedURLs,storingtheminstaterecords.Thisstage’sRouteByhrifunctionextractstheURLfromeachinputrecordasthegroupingkeyforstateandcrawlerrecords.Ifthereisastaterecordforthisurl,theniteitherwasreportedinapriorepochorbelongstobothcrawls(theintersection).Inthiscasethetranslatoronlyneedstomanuallypropagatethestaterecord.Otherwise,thisURLhasnotbeenseenanditiswrittentostate.Ifitwasseenexclusivelybyeithercrawl,weaddit1NotethatthisisthechangeinuniqueURLsobserved;theoutputswon’tincludere-crawledpages(thoughthatiseasilydone).2Theuseofpunctuations[15]canavoidhavingtowaitforanewkey,althoughwehavenotimplementedthisfeature. 76totheappropriateoutputflow.Framingandrunnabilityareapowerfulcombinationthatallowsstagestodeterminewhatdatatopresenttoastage,andtosynchronizeconsumptionofdataacrossmultipleinputflows.Aswithframingfunctions,runnabilityfunctionsmaymaintainasmallamountofstate.Thusitmaycontainsignificantcontrollogic.Wehaveusedittosynchronizeinputs(e.g.,fortemporaljoins),properlyinterleavewritestoandreadsfromstate,andtomaintainstaticlookuptables(readbutnotremoveanincrement).Finally,applicationssuchasPageRankcanuseittotransitionfromoneiterativephasetoanother,asweshowinSection5.5.3.5.3SupportforgraphalgorithmsGroupwiseprocessingsupportsobviouspartitioningsofgraphproblemsbyassigningasinglegrouptoeachvertexoredge.Forexample,programmerscanwriteasingletranslatorthatprocessesallverticesinparallelduringeachprocessingepoch.Inmanycases,thoseper-vertextranslationinstancesmustaccessstateassociatedwithothervertices.Todoso,eachvertexsends“messages”toothervertices(addressedbytheirgroupingkey)sothattheymayexchangedata.Suchmessagepassingisapowerfultechniquefororchestratinglargecomputations(italsounderliesGoogle’sgraphprocessingsystem,Pregel[56]),andtheCBPmodelsupportsit.Translationcomplementsmessagepassinginanumberofways.First,usingasecondloopbackflowtocarrymessagesallowsaninnergroupingwiththestateusedtostorethegraph.Thusthesystemwillcalltranslateonlyforthegroupsrepresentingmessagedestinations.Second,messagepassingcantakeadvantageofthegeneralityoftheRouteByconstruct.Oftenacomputationatasinglevertexinthegraphaffectssomeoralloftheverticesinthegraph.Forexample,ourincrementalPageRanktranslator(Section5.5.3)mustbroadcastupdatesofrankfromdanglingnodes(nodesw/ochildren)toallothernodesinthegraph.Similarly,anupdatemayneedtobesenttoasubsetofthenodesinthegraph.WhileRouteBycanreturnanynumberof 77Figure5.4:Usersspecifyper-inputflowRouteByfunctionstoextractkeysforgrouping.Specialkeysenablethebroadcastandmulticastofrecordstogroups.HereweshowthatmulticastaddressmcXisboundtokeysk1andk3.groupingkeysfromwithinarecord,thereisnosimplewayforatranslatortowritearecordthatincludesallnodesinthegraph.Itisdifficulttoknowthebroadcast(ormulticast)keyseta-priori.Toaddressthisissue,RouteBysupportslogicalbroadcastandmulticastgroupingkeys.Figure5.4showsRouteByreturningthespecialALLbroadcastkeyfortheinputrecordonFin.Thisensuresthattherecordbbecomesassociated1withallgroupsfoundintheinputflows.Whilenotshown,itisalsopossibletolimitthebroadcasttoparticularinputflows,e.g.,onlygroupsfoundinstate.Translatorsmayalsoassociateasubsetofgroupingkeyswithasinglelogicalmulticastaddress.HereRouteByoninputflowFinreturnsamulticastaddress,0mcX,associatedwithgroupingkeysk1andk3.WedescribebothmechanismsinmoredetailinSection6.5.3.5.4SummaryofCBPmodelNaturally,multipletranslationstagesmaybestrungtogethertobuildmoresophisticatedincrementalprograms,suchastheincrementalcrawlqueue.Ingen-eral,aCBPprogramitself(likeFigure1.3.1)isadirectedgraphP,possiblycontainingcycles,oftranslationstages(thevertices),thatmaybeconnectedwithmultipledirectedflows(theedges).HerewesummarizethesetofdataflowcontrolprimitivesinourCBPmodelthatorchestratetheexecutionofstatefuldataflowprograms. 78Table5.1:Fivefunctionscontrolstageprocessing.Defaultfunctionsexistforeachexceptfortranslation.FunctionDescriptionDefaultTranslate(Key,∆Fin,...,∆Fin)→Per-Stage:Groupwise—0n(∆Fout,...,∆Fout)transformfrominputto0noutputrecords.Runnable(framingKeys,state)→Per-Stage:DeterminesRunnableALL(reads,removes,state)ifstagecanexecuteandwhatincrementsareread/removed.FrameBy(r,state)→(Key,state)Per-Flow:AssignFrameByPriorrecordstoinputincre-ments.RouteBy(r)→KeyPer-Flow:ExtractRouteByRcdgroupingkeyfromrecord.OrderBy(r)→KeyPer-Flow:Extractsort-OrderByAnyingkeyfromrecord.Asourexamplesillustrate,CBPcontrolsstageprocessingthroughasetoffivefunctions,listedinTable5.1.Anapplicationmaychoosethesefunctions,oracceptthesystem-provideddefaults(exceptfortranslate).ThedefaultframingfunctionFrameByPriorreturnstheepochnumberinwhichtheupstreamstagepro-ducedtherecord,causinginputincrementstomatchoutputincrementsgeneratedbyupstreamstages.Thedefaultrunnabilityfunction,RunnableAll,makesastagerunnablewhenallinputshaveincrementsandthenreadsandremoveseach.ThedefaultRouteByfunction,RouteByRcd,giveseachrecorditsowngroupforrecord-wiseprocessing.Suchtranslatorscanavoidexpensivegroupingopera-tions,bepipelinedforone-passexecutionoverthedata,andavoidstatemainte-nanceoverheads.Similarly,theOrderByfunction,anotherkey-extractionfunc-tionthatprovidesper-flowrecordordering,hasadefaultOrderByAny,whichletsthesystemselectanorderthatmayimproveefficiency(e.g.,usingtheorderinwhichthedataarrives). 79Figure5.5:Incrementalclusteringcoefficientdataflow.Eachnodemaintainsasstateitsadjacencylistandits“friends-of-friends”list.5.5ApplicationsThecollectionofdefaultbehaviorsintheCBPmodelsupportarangeofimportantincrementalprograms,suchastheincrementalcrawlqueueexample,whichusesRunnableAllandFrameByPriorforallitsstages.Hereweshowcasetheextraflexibilitythemodelprovidesbybuildingstateful,iterativealgorithmsthatoperateongraphs.5.5.1MiningevolvinggraphsManyemergingdataminingopportunitiesoperateonlarge,evolvinggraphs.Instancesofdataminingsuchgraphscanbefoundinsystemsbiology,datanet-workanalysis,andrecommendationnetworksinonlineretail(e.g.,Netflix).HereweinvestigatealgorithmsthatoperateoverWebandsocialnetworkgraphs.TheWebisperhapsthecanonicalexampleofalarge,evolvinggraph,andwestudyanincrementalversionofthePageRank[62]algorithmusedtohelpindexitscontent.Ontheotherhand,theexplosivegrowthofcommunitysites,suchasMySpaceorFacebook,havecreatedextremelylargesocialnetworkgraphs.Forinstance,Facebookhasover300millionactiveusers(asofSeptember2009,seewww.facebook.com/press).Thesesitesanalyzethesocialgraphtosupportday-to-dayoperations,externalquerying(FacebookLexicon),andadtargeting. 80ClusteringCoefficientT(node,Fstatein,Fedgesin,FFoFin)1ifFstatein.hasNext()thenstate←Fstatein.next()2foreachedgeinFinedges3state.adj.add(edge.dst);4foreachedgeinFinedges5foreachtargetinstate.adj6Fout.write(target,edge.src,edge.dst);FoF7foreachupdateinFinFoF8state.adj[update.src].adj.add(update.dst);9ifFin.hasNext()thenFoF10recalcCo(state);Fout.write(node,state.co);Co11Fout.write(state);stateFigure5.6:Theclusteringcoefficientstranslatoraddsnewedges(2-3),sendsneighborsupdates(4-6),andprocessesthoseupdates(7-10).5.5.2ClusteringcoefficientsWebeginwithasimplegraphanalysis,clusteringcoefficient,that,amongotheruses,researchersemploytoascertainwhetherconnectivityinsocialnet-worksreflectsreal-worldtrustandrelationships[77].Thisexampleillustrateshowweloadgraphsintoastatefulprocessingstage,howtousegroupwiseprocessingtoiterativelywalkacrossthegraph,andhowmessagesmaybeusedtoupdateneighbor’sstate.Theclusteringcoefficientofagraphmeasureshowwellagraphconformstothe“small-world”networkmodel.Ahighclusteringcoefficientimpliesthatnodesformtightcliqueswiththeirimmediateneighbors.Foranodeni,withNneighborsandEedgesamongtheneighbors,theclusteringcoefficientci=2E/N(N−1).Thisissimpletocalculateifeachnodehasalistofitsneighbor’sneighbors.Inasocialnetworkthiscouldbedescribedasa“friends-of-friends”(FoF)relation.Forgraphalgorithms,wecreateagroupingkeyforeachuniquenodeinthegraph.Thisallowsthecalculationtoproceedinparallelforeachnodeduringanepoch,andustostorestaterecordsdescribingeachvertex.Figure5.5illustrates 81thesinglestatefulstageforincrementallycomputingclusteringcoefficients.3TheinputFincarrieschangestothegraphintheformof(src,dst)nodeIDpairsthatedgesrepresentedges.RecordsonthestateflowreferencethenodeanditsclusteringcoefficientandFoFrelation.Eachinput’sRouteByreturnsanodeIDasthegroupingkey.Figure5.6showsthetranslatorpseudocode.Thetranslatormustaddnewgraphnodes4,updateadjacencylists,andthenupdatetheFoFrelationsandclus-teringcoefficients.Line1retrievesanode’sstate(anadjacencylist,adj,ofad-jacencies).EachrecordonFinrepresentsanewneighborforthisnode.Linesedges2-3addthesenewneighborstothelocaladjacencylist.Whilethatcodealoneissufficienttobuildthegraph,wemustalsosendthesenewneighborstoeveryadjacentnodesothattheymayupdatetheirFoFrelation.Todoso,wesendarecordtoeachadjacentnodebywritingtotheloop-backflowFout(lines4-6).Duringthenextepoch,RouteByforFinroutestheseFoFFoFrecordstothenodedesignatedbytarget.Whenthesystemcallstranslateforthesenodes,lines7-10processrecordsonFin,updatingtheFoFrelationandrecal-FoFculatingtheclusteringcoefficient.Finally,line11propagatesanystatechanges.Notethattherunnabilityfunctionallowsthestagetoexecuteifinputisavailableonanyinput.Thusduringoneepoch,atranslateinstancemaybothincorporatenewedgesandoutputnewcoefficientsforpriorchanges.Thereareseveralimportantobservations.First,ittakestwoepochstoupdatetheclustercoefficientswhenthegraphchanges.Thisisbecause“messages”cannotberouteduntilthefollowingepoch.Second,Figure5.5showsstateasan“inner”flow.Thustranslationonlyoccursfornodesthathavenewneighbors(inputonFin)ormustupdatetheircoefficient(inputonFin).ThesetwoedgesFoFflowsactivelyselectthegraphnodesforprocessingeachepoch.Finally,whereasingleinputrecordintotheURLCounttranslatorcausesasinglestateupdate,heretheworkcreatedbyaddinganedgegrowswiththesizeofstate.AddinganedgecreatesmessagestoupdatetheFoFrelationforallthenode’sneighbors.Themessagecount(andsize)growsasthesizeandconnectivityofthegraphincrease.3Goingforwardwehidetheloopinstateloopbackflows.4Foreaseofexpositionwedonotshowedgedeletions. 82Figure5.7:IncrementalPageRankdataflow.Theloopbackflowsareusedtopropagatemessagesbetweennodesinthegraph.WeexploretheseimplicationsfurtherinSection6.6.3.5.5.3IncrementalPageRankPageRankisastandardmethodfordeterminingtherelativeimportanceofwebpagesbasedontheirconnectivity[62].IncrementalPageRankisimportantbecause(1)computingPageRankontheentirewebgraphstilltakeshoursonlargeclustersand(2)importantchangestothewebgraphoccuronasmallsubsetoftheweb(news,blogs,etc.).However,trulyincrementalPageRankischallengingbecausesmallchanges(addingalinkbetweenpages)canpropagatethroughouttheentiregraph.Hereweimplementtheapproximate,incrementalPageRankcomputationpresentedin[28],whichthresholdsthepropagationofPageRankupdates.Thisalgorithmtakesasinputasetoflinkinsertionsinthewebgraph;otherapproachesexisttoincorporatenodeadditionsandremovals[28].Figure5.7illustratesourincrementalPageRankdataflow,whichsharesmanyfeatureswithclusteringcoefficient.Itusesthesameformatforinputedges,groupsrecordsbyvertex,storesadjacencylistsinstaterecords,usesaninnerstateflow,andsends“messages”toothernodesonloopbackflows.Weskipthesundrydetailsoftranslation,andinsteadfocusonhowtomanageanalgorithmthathasseveraldistinctiterativephases.Atahighlevel,thealgorithmmustbuildthegraphW,findthesubgraphGaffectedbynewlyinsertededges,computetransitionprobabilitiestoasupernode 83IncrPageRankT(node,FSin,FEin,FRin,FWin,FCvgin,FΩin)1ifFEin.hasNext()thenmakeGraph();startWeight();2ifFWin.hasNext()thensendWeightToNeighbors();3ifFΩin.hasNext()thenupdateSupernode();4ifFCvgin.hasNext()thenresetRankState();5elseifFin.hasNext()thenR6doPageRankOnG();Figure5.8:PseudocodeforincrementalPageRank.Thetranslatoractsasaneventhandler,usingthepresenceofrecordsoneachloopbackflowasanindicationtorunaparticularphaseofthealgorithm.Ω(W−G),andthencomputePageRankforG(pagesinΩretaintheirrank).Thisalgorithmhasbeenshowntobebothfastandtoprovidehigh-qualityap-proximationsforavarietyofrealandsynthesizedwebcrawls[28].Figure5.8showshigh-levelpseudocodeforthePageRanktranslator.Inter-nally,thetranslatoractsasaper-nodeeventhandler,usingthepresenceofrecordsoneachloopbackflowasanindicationtorunaparticularphaseofthealgorithm.Heretherunnabilityfunctionplaysacriticalroleinmanagingphasetransitions;itexclusivelyreadseachsuccessivephase’sinputafterthepriorinputbecomesempty.ThusrunnabilityfirstconsumesedgesfromFin,thenFin(tofindG),edgesWthenFin(updatingthesupernode),andfinallyFin(tobeginPageRankonG).ΩRWhendoPageRankOnGconverges,thesecondstagewritesanALLrecordtoFout.CvgThiscausesthetranslatortoresetgraphstate,readyingitselfforthenextsetofedgeinsertions.ThisdesignattemptstominimizethenumberofcompletescansofthenodesinWbyusingboth“inner”stateflowsandthemulticastabilityoftheRouteByfunction.Forexample,whencalculatingPageRankforG,leavesinGmulticasttheirPageRanktoonlynodesinG.WediscussthemulticastAPImoreinSection6.5.3.Finally,notethatweplaceallthephasesinasingletranslator.Otherorganizationsarepossible,suchaswritingastageforeachphase,thoughthismaymakemultiplecopiesofthestate.Inanycase,weenvisionsuchanalyticsasjustonestepinalargerdataflow. 845.6RelatedworkNon-relationalbulkprocessing:Thisworkbuildsuponrecentnon-relationalbulkprocessingsystemssuchasMap-Reduce[34]andDryad[46].Ourcontributionsbeyondthosesystemsaretwo-fold:(1)aprogrammingabstractionthatmakesiteasytoexpressincrementalcomputationsoverincrementally-arrivingdata;(2)efficientunderlyingmechanismsgearedspecificallytowardcontinuous,incrementalworkloads.AcloselyrelatedefforttoCBPenhancesDryadtoautomaticallyidentifyredundantcomputation;itcachespriorresultstoavoidre-executingstagesortomergecomputationswithnewinput[67].Becausethesecachedresultsareoutsidethedataflow,programmerscannotretrieveandstorestateduringexecution.CBPtakesadifferentapproach,providingprogrammersexplicitaccesstopersistentstatethroughafamiliarandpowerfulgroupwiseprocessingabstraction.Ourworkalsocomplementsrecenteffortstobuild“online”Map-Reducesystems[32].WhiletheirdatapipeliningtechniquesforMap-ReducejobsareorthogonaltotheCBPmodel,theworkalsodescribesacontrollerforrunningMap-Reducejobscontinuously.Thedesignrequiresreducerstomanagetheirowninternalstate,presentingasignificantprogrammerburdenasitremainsoutsideofthebulk-processingabstraction.Thecontrollerprovideslimitedsupportfordecidingwhenjobsarerunnableandwhatdatatheyconsume.Incontrast,CBPdataflowprimitivesaffordarangeofpoliciesforcontrollingtheseaspectsofitera-tive/incrementaldataflows.Twister[36],acustomMap-Reducesystem,optimizesrepeatedlyrun(iter-ative)Map-Reducejobsbyallowingaccesstostaticstate.MapandReducetasksmaypersistacrossiterations,amortizingthecostofloadingthisstaticstate(e.g.,fromaninputfile).However,thestatecannotchangeduringiteration.Incon-trast,CBPprovidesageneralabstractionofstatethatsupportsinserts,updates,andremovals.Datastreammanagement:CBPoccupiesauniqueplacebetweentradi-tionalDBMSandstreamprocessing.Datastreammanagementsystems[15]focusonnear-real-timeprocessingofcontinuously-arrivingdata.Thisfocusleadstoan 85in-memory,record-at-a-timeprocessingparadigm,whereasCBPdealswithdisk-residentdataandset-orientedbulkoperations.Lastly,CBPpermitscyclicdataflows,whichareusefuliniterativecomputationsandotherscenariosdescribedbelow.Incrementalviewmaintenance:Traditionalview-maintenanceenviron-ments,likedatawarehousing,usedeclarativeviewsthataremaintainedimplicitlybythesystem[18,68].Incontrast,CBPcanbethoughtofasaplatformforgeneralizedview-maintenance;aCBPprogramisanexplicitgraphofdatatrans-formationsteps.Indeed,onecansupportrelationalviewmaintenanceontopofourframework,muchlikerelationalquerylanguageshavebeenlayeredontopofMap-ReduceandDryad(e.g.,DryadLINQ[82],Hive[9],Pig[61]).5.7AcknowledgmentsChapter5,inpart,isreprintofthematerialpublishedintheProceedingsoftheACMSymposiumonCloudComputing2010.Logothetis,Dionysios;Olston,Christopher;Reed,Benjamin;Webb,KevinC.;YocumKen.Thedissertationauthorwastheprimaryinvestigatorandauthorofthispaper. Chapter6CBPdesignandimplementationThischapterdescribesthedesignandimplementationoftheCBPruntimesystem.WedescribehowCBPallowstheorchestrationofdataflowsandhowitcanreliablyandefficientlyexecutethem.Furthemore,weillustratethefundamentalmismatchbetweenDISCsystemsandstatefulcomputationsbycomparingtwoimplementationsoftheCBPmodel:(i)a“black-box”implementationontopofastate-of-the-artDISCsystem,and(ii)adirectimplementationoftheCBPmodel.OurevaluationshowshowtheCPBruntimethatdirectlysupportsthemodelsignificantlyimprovesprocessingtimeandreducesresourcesusage.TheCBParchitecturehastwoprimarylayers:dataflowandphysical.Thephysicallayerreliablyexecutesandstorestheresultsofasinglestageofthedataflow.Aboveit,thedataflowlayerprovidesreliableexecutionofanentireCBPdataflow,orchestratingtheexecutionofmultiplestages.Itensuresreliable,orderedtransportofincrementsbetweenstagesanddetermineswhichstagesarereadyforexecution.Thedataflowlayermayalsocompilethelogicaldataflowintoamoreefficientphysicalrepresentation,dependingontheexecutioncapabilitiesofthephysicallayer.SuchautomatedanalysisandoptimizationofaCBPdataflowisfuturework.86 876.1ControllingstageinputsandexecutionThedataflowlayeracceptsaCBPdataflowandorchestratestheexecutionofitsmultiplestages.Theincrementaldataflowcontroller(IDC)determinesthesetofrunnablestagesandissuescallstothephysicallayertorunthem.TheIDCmaintainsaflowconnector,apieceofrun-timestate,foreachstage’sinputflow.Eachflowconnectorlogicallyconnectsanoutputflowtoitsdestinationinputflow.Itmaintainsalogical,orderedqueueofidentifiersthatrepresenttheincrementsavailableontheassociatedinputflow.Eachoutputflowmayhavemultipleflowconnectors,oneforeachinputflowthatusesitasasource.Afterastageexecutes,theIDCupdatestheflowconnectorsforeachoutputflowbyenqueueingthelocationandframingkeyofeachnewoutputincrement.Thedefault,withaDefaultFramingframingfunction,isforthestagetoproduceoneoutputincrementperflowperepoch.TheIDCusesastage’srunnablefunctiontodeterminewhetherastagecanberun.Thesystempassesthefunctionthesetofflowconnectorswithun-readincrementsandtheassociatedframingkeys,andanapplication-definedpieceofstate.Therunnablefunctionhasaccesstoeachflowconnector’smetadata(e.g.,numberofenqueuedincrements)anddeterminesthesetofflowconnectorsfromwhichtoread,readSet,andremove,removeSet,incrementsforthenextepoch.IfthereadSetisempty,thestageisnotrunnable.Aftereachepoch,theIDCupdateseachflowconnector,markingincrementsasreadorremovingincrementreferences.Incrementsmaybegarbagecollectedwhennoflowconnectorreferencesthem.6.2SchedulingwithbottleneckdetectionTheIDCmustdeterminethesetofrunnablestagesandtheorderinwhichtorunthem.Doingsowithpriorbulkprocessingsystemsisrelativelystraightforward,sincetheytakeaDAGasinput.Inthatcaseasimpleon-linetopologicalsortcandetermineavertex(stage)executionorderthatrespectsdatadependencies.However,CBPpresentstwoadditionalcriteria.First,Pmaycontaincycles,andtheschedulermustchooseatotalorderofstagestoavoidstarvationorhighresult 88latency(makespan).Second,usingtherunnabilityfunction,stagescanpreferorsynchronizeprocessingparticularinputs.Thismeansthatincrementscan“backup”oninputflows,andthatthestagecreatingdataforthatinputnolongerneedstorun.Oursimpleschedulerexecutesinphasesandmaytesteachstage’srunnabil-ityfunction.Itcandetectstagestarvationandrespondtodownstreambackpres-sure(abottleneckstage)bynotrunningstagesthatalreadyhaveincrementsinalloutputs.Fulldetailsofthisalgorithmareavailableinourtechnicalreport[52]).6.3FailurerecoveryThedataflowlayerassumesthatthephysicallayerprovidesatomicexe-cutionofindividualstagesandreliablestorageofimmutableincrements.Withsuchsemantics,asinglestagemayberestartedifthephysicallayerfailstorunastage.Theexecutedstagespecifiesanamingconventionforeachproducedincre-ment,requiringittobetaggedbyitssourcestage,flowid,andincrementindex.Thesemaybeencodedintheon-diskpathandincrementname.OncethephysicallayerinformstheIDCofsuccess,itguaranteesthatresultincrementsareondisk.Dryadusedsimilartechniquestoensuredataflowcorrectnessunderindividualjobfailures[46].Next,theIDCupdatestherun-timestateofthedataflow.Thisconsistsofaddinganddeletingincrementreferencesonexistingflowconnectors.Thecon-trolleruseswrite-aheadloggingtorecorditsintendedactions;theseintentionscontainsnapshotsofthestateoftheflowconnectorqueue.Thelogonlyneedstoretainthelastintentionforeachstage.IftheIDCfails,itrebuildsstatefromtheXMLdataflowdescriptionandrebuildstheflowconnectorsandschedulerstatebyscanningtheintentions. 896.4CBPontopofMap-ReduceWedividethedesignandimplementationoftheCBPmodelintotwoparts.InthefirstpartwemaptranslateontoaMap-Reducemodel.ThisisareasonablestartingpointfortheCBPphysicallayerduetoitsdata-parallelismandfault-tolerancefeatures.However,thisprovidesanincompleteimplementationofthetranslateoperatorandCBPdataflowprimitives.Further,sucha“black-box”em-ulationresultsinexcessdatamovementandspaceusage,sacrificingthepromiseofincrementaldataflows(Section6.6).Thenextsectiondescribesourmodificationstoanopen-sourceMap-Reduce,Hadoop,thatsupportsthefullCBPmodelandoptimizesthetreatmentofstate.Thedesignofourbulk-incrementaldataflowenginebuildsuponthescal-abilityandrobustnesspropertiesoftheGFS/Map-Reducearchitecture[39,34],andinparticulartheopen-sourceimplementationcalledHadoop.Map-Reduceallowsprogrammerstospecifydataprocessingintwophases:mapandreduce.Themapfunctionoutputsanewkey-valuepair,{k1,v1},foreachinputrecord.Thesystemcreatesalistofvalues,[v]1,foreachkeyandpassesthesetoreduce.TheMap-Reducearchitecturetransparentlymanagestheparallelexecutionofthemapphase,thegroupingofallvalueswithagivenkey(thesort),andtheparallelexecutionofthereducephase.WenowdescribehowtoemulateasingleCBPstageusingasingleMap-Reducejob.1HerewedescribetheMapandReduce“wrapper”functionsthatexporttranslateT(·).InCBPapplicationsdataisopaquetotheprocessingsystem,andthesewrapperfunctionsencapsulateapplicationdata(arecord)in-sideanapplicationdataunit(ADU)object.TheADUalsocontainstheflowID,RouteByKey,andOrderByKey.WhiletheMap-Reducemodelhasonelogicalinputandoutput,currentim-plementationsallowaMap-Reducejobtoprocessmultipleinputandwritemultipleoutputfiles.InCBP,theflowIDswithineachADUlogicallyseparateflows,andthewrappercodeusestheflowIDtoinvokeper-flowfunctions,suchasRouteBy1AnefficientimplementationofCBPoveraMap-Reduceenvironmentrequiresdeterministicandside-effect-freetranslators. 90andOrderBythatcreatetheroutingandorderingkeys.This“black-box”ap-proachemulatesstateasjustanotherinput(andoutput)fileoftheMap-Reducejob.•Map:ThemapfunctionwrapperimplementsroutingbyrunningtheRouteByfunctionassociatedwitheachinputflow.ItwrapseachinputrecordintoanADUandsetstheflowID,sothereducefunctioncanseparatedataoriginatingfromthedifferentflows.Mapfunctionsmayalsorunoneormorepreprocessorsthatimplementrecord-wisetranslation.TheoptionalMap-Reducecombinerhasalsobeenwrappedtosupportdistributiveoralgebraictranslators.•Reduce:TheHadoopreducerfacilitysortsrecordsbytheRouteByKeyembeddedintheADU.OurCBPreducewrapperfunctionmultiplexesthesortedrecordsintonstreams,upcallingtheuser-suppliedtranslatorfunctionT(·)withaniteratorforeachinputflow.Per-flowemitterfunctionsrouteoutputfromT(·)toHDFSfilelocationsspecifiedinthejobdescription.Likethemap,emitterfunctionsmayalsorunoneormoreper-recordpostprocessingstepsbeforewritingtoHDFS.Thusasinglegroupwisetranslatorbecomesajobwithamap/reducepair,whilearecord-wisetranslatorcanbeamap-onlyjob(allowedbyHadoop)orareducepostprocessor.6.4.1IncrementalcrawlqueueexampleWeillustratethecompilationofaCBPdataflowintoMap-ReducejobsusingourincrementalcrawlqueueexamplesfromFigure1.3.1.ThisdataflowiscompiledintotwoMap-Reducejobs:CountLinksandDecideCrawl.Figure6.1showsthetwojobsandwhichstageseachwrapperfunctionimplements.InbothjobsallinputflowsRouteBythesite,andorderinputbytheURL.Otherwiseallinputflowsusethedefaultframingandrunnabilityfunctions.ThefirstMap-Reducejobimplementsbothextractlinksandcountin-links.ItwritesstateADUswithbothsiteandURLroutingkeystomaintaincountsforeach.Thesecondjob 91Figure6.1:TheMap-ReducejobsthatemulatetheCBPincrementalcrawlqueuedataflow.placesbothscoreandthresholdaspostprocessingstepsonthegroupwisemergetranslator.ThisstateflowrecordsallvisitedsrcURLs.6.4.2IncrementmanagementMap-Reduceimplementationsusesharedfilesystemsasareliablemecha-nismfordistributingdataacrosslargeclusters.AllflowdataresidesintheHadoopdistributedfilesystem(HDFS).ThecontrollercreatesaflowdirectoryforeachflowFand,underneaththat,adirectoryforeachincrement.ThisdirectorycontainsoneormorefilescontainingtheADUs.AsdiscussedinSection6.2,whenHadoopsignalsthesuccessfulcompletionofastage,thecontrollerupdatesallaffectedflowconnectors.Weemulatecustom(non-default)framingfunctionsaspostprocessingstepsintheupstreamstagewhoseoutputflowthedownstreamstagesources.ThereducewrappercallstheframingfunctionforeachADUwrittentothatoutputflow.Bydefault,theincrementdirectorynameisthestage’sprocessingepochthatgeneratedtheseADUs.ThewrapperappendstheresultingFramingKeytotheincrementdirectorynameandwritesADUswiththatkeytothatdirectory.ThewrapperalsoaddstheFramingKeytothemetadataassociatedwiththisincrementintheinputflow’sflowconnector.Thisallowsastage’srunnablefunctiontocomparethosekeystosynchronizeinputincrements,asdescribedinSection5.2. 926.5DirectCBPWenowmodifyHadooptoaccommodatefeaturesoftheCBPmodelthatareeitherinexpressibleorinefficientas“black-box”Map-Reduceemulations.Thefirstcategoryincludesfeaturessuchasbroadcastandmulticastrecordrouting.Thesecondcategoryoptimizestheexecutionofbulk-incrementaldataflowstoensurethatdatamovement,sorting,andbufferingworkareproportionaltoarrivinginputsize,notstatesize.6.5.1IncrementalshufflingforloopbackflowsThesystemmayoptimizestateflows,andanyloopbackflowingeneral,bystoringstateinper-partitionsidefiles.Map-Reducearchitectures,likeHadoop,transferoutputfromeachmapinstanceortasktothereducetasksintheshufflephase.EachmaptaskpartitionsitsoutputintoRsets,eachcontainingasubsetoftheinput’sgroupingkeys.Thearchitectureassignsareducetasktoeachpartition,whosefirstjobistocollectitspartitionfromeachmapper.Hadoop,though,treatsstatelikeanyotherflow,re-mappingandre-shufflingitoneachepochforeverygroupwisetranslator.Shufflingisexpensive,requiringeachreducertosourceoutputfromeachmapperinstance,andstatecanbecomelargerelativetoinputincrements.ThisrepresentsalargefractionoftheprocessingrequiredtoemulateaCBPstage.However,stateislocaltoaparticulartranslateinstanceandonlycontainsADUsassignedtothistranslatepartition.WhentranslatorsupdateorpropagateexistingstateADUsinoneepoch,thoseADUsarealreadyinthecorrectpartitionforthenextepoch.Thuswecanavoidre-mappingandre-shufflingthesestateADUs.Instead,thereducetaskcanwriteandreadstatefrom/toanHDFSparti-tionfile.Whenareducerstarts,itreferencesthefilebypartitionandmergesortsitwithdatafromthemaptasksinthenormalfashion.NotethatatranslatorinstancemayaddstateADUswhoseRouteBykeybelongstoaremotepartitionduringanepoch.Theseremotewritesmustbeshuffledtothecorrectpartition(translationinstance)beforethenextepoch.We 93accomplishthisbysimplytestingADUsintheloopbackflow’semitter,splittingADUsintotwogroups:localandremote.ThesystemshufflesremoteADUsasbefore,butwriteslocalADUstothepartitionfile.Wefurtheroptimizethisprocessby“pinning”reducetaskstoaphysicalnodethatholdsareplicaofthefirstHDFSblockofthepartitionfile.ThisavoidsreadingdatafromacrossthenetworkbyreadingHDFSdatastoredonthelocaldisk.Finally,thesystemmayperiodicallyre-shufflethepartitionfilesinthecaseofdataskeworachangeinprocessorcount.6.5.2RandomaccesswithBIPtablesHerewedescribeBIPtables(bulk-incrementalprocessingtables),asimpleschemetoindexthestateflowandproviderandomstateaccesstostate.Thisallowsthesystemtooptimizetheexecutionoftranslatorsthatupdateonlyafrac-tionofstate.Forexample,atranslatormayspecifyaninnerstateflow,meaningthatthesystemonlyneedstopresentstateADUswhoseRouteBykeysalsoex-istonotherinputs.Butcurrentbulk-processingarchitecturesareoptimizedfor“streaming”dataaccess,andwillreadandprocessinputsintheirentirety.ThisincludesdirectCBPwithstatepartitionfiles(describedabove),whichreadstheentirepartitionfileevenifthetranslatorisextremelyselective.However,thesuccessofthisapproachdependsonreadingandwritingmatchedkeysrandomlyfromatablefasterthanreadingandwritingallkeyssequentiallyfromafile.PublishedperformancefiguresforBigtable,atable-basedstorageinfrastructure[25],indicateafourtotentimesreductioninperformanceforrandomreadsrelativetosequentialreadsfromdistributedfilesystemslikeGFS[39]for1000-byterecords.Moreover,ourrecentinvestigationindicatesevenachievingthatperformancewithopen-sourceversions,suchasHypertable,isoptimistic,re-quiringoperationstoselectunder15%ofstatekeystoimproveperformance[52].Thedesignoutlinedbelowoutperformssequentialwhenretrievingasmanyas60%ofthestaterecords(Section6.6.2).BIPtablesleveragesthefactthatourCBPsystemneedsonlysimple(key,ADUs)retrievalandalreadypartitionsandsortsstateADUs,makingmuchofthefunctionalityinexistingtable-storesredundantorunnecessary.Atahighlevel, 94eachstatepartitionnowconsistsofanindexanddatafile.WhilesimilartoHDFSMapFilesorBigtable’sSSTablefiles,theyaredesignedtoexistacrossmultipleprocessingepochs.Logically,thedatafileisanappend-only,unsortedlogthatcontainsthestateADUswrittenoverthelastnepochs.BecauseHDFSonlysupportswrite-once,non-appendfiles,wecreateadditionalHDFSdatafileseachepochthatcontainthenewstateinsertsandupdates.Eachtranslateinstancereads/writestheentireindexfilecorrespondingtoitsstatepartitioneachepoch.Theyuseanin-memoryindex(likeBigtable)forlookups,andwritetheindexfileasasortedsetofkeyto{epoch,offset}pairs.TosupportinnerstateflowsusingBIPtables,wemodifiedreducetaskstoqueryforstateADUsinparallelwiththemergesortofmapperoutputandtostorereadsinanADUcache.Thisensuresthatcallstothetranslatewrapperdonotstallonindividualkeyfetches.Oursystemlearnsthesetofkeystofetchduringthemergeandissuesreadsinparallel.TheprocessendswhentheADUcachefills,limitingthememoryfootprint,orallkeysarefetched.ThereducetaskprobestheADUcacheoneachcalltothetranslatewrapper,andmissesfaultintheoffendingkey.6.5.3MulticastandbroadcastroutingTheCBPmodelextendsgroupwiseprocessingbysupportingabroadcastALLaddressanddynamicmulticastgroups.Herewedescribehowtodosoeffi-ciently,reducingduplicaterecordsinthedatashuffle.WesupportALLRouteBykeysbymodifyingmapperstosendALLADUstoeachreducetaskduringtheshufflephase.Atthispoint,thereducewrapperwilladdthesetuplestotheap-propriatedestinationflowbeforeeachcalltotranslate.Sincethepartitioncountisoftenmuchlessthanthenumberofgroupsinstate,thismovesconsiderablylessdatathanshufflingthemessagestoeachgroup.ALLmayalsospecifyanoptionalsetofinputflowstobroadcastto(bydefaultthesystembroadcaststoallinputs).Whilebroadcastinghasanimplicitsetofdestinationkeysforeachepoch,weprovidetranslatorauthorstheabilitytodefinemulticastgroupsdynamically.Theydosobycallingassociate(k,mcaddr),whichassociatesatargetkeykwithamulticastgroupmcaddr.Atranslatormaycallthisforanynumberofkeys,making 95anykeyadestinationforADUswhoseRouteByreturnsmcaddr.Theassociationandmulticastaddressareonlyvalidforthisepoch;thetranslatormustwritetothismulticastaddressinthesameepochinwhichitassociateskeys.Underthehood,callstoassociateplacerecordsof{k,mcaddr}onady-namicallyinstantiatedandhiddenloopbackflownamedfmcaddr.ThesystemtreatsinputrecordsroutedtoamulticastaddressinasimilarfashiontoALLADUs,send-ingasinglecopytoeachreducetask.Thatrecordisplacedinanin-memoryhashtablekeyedbymcaddr.Whenthereducewrapperruns,itreadsthehiddenloop-backflowtodeterminethesetofmulticastaddressesboundtothiskeyandprobesthetabletoretrievethedata.6.5.4FlowseparationinMap-ReduceWhiletheFlowIDmaintainsthelogicalseparationofdataintheblack-boximplementation,theMap-ReducemodelandHadoopimplementationtreatdatafromallflowsasasingleinput.Thusthesystemsortsallinputdatabutmustthenre-separateitbasedonflowID.ItmustalsoordertheADUsoneachflowbythatflow’sOrderBykeys.Thisemulationcausesunnecessarycomparisonsandbufferingforgroupwisetranslation.Consideremulatingagroupwisetranslatorwithninputflows.AHadoopreducetaskscallsthereducefunctionwithasingleiteratorthatcontainsallrecords(ADUs)sharingaparticularkey.DirectCBPemulatestheindividualflowiteratorsofT(·)byfeedingfromasinglereduceiterator,readingtheflowiteratorsoutofflowIDorderforcesustobufferskippedtuplessothattheycanbereadlater.Areadtothatlastflowcausesthesystemtobufferthemajorityofthedata,potentiallycausingOutOfMemoryErrorsandabortedprocessing.Thisoccursinpractice;manyofourexamplesapplyupdatestostatebyfirstreadingallADUsfromaparticularflow.WeresolvethisissuebypushingtheconceptofaflowintoMap-Reduce.Re-ducetasksmaintainflowseparationbyassociatingeachmapperwithitssourcein-putflow.Whilethenumberoftransfersfromthemapperstoreducersisunchanged,thisreducesthenumberofprimary(andsecondary)groupingcomparisonsonthe 96RouteBy(andOrderBy)keys.ThisisasmallchangetotheasymptoticanalysisofthemergesortofrrecordsfrommmappersfromO(rlogm)toO(rlogm).ThisnspeedsupthesecondarysortofADUssharingaRouteByKeyinasimilarfashion;thereducetasknowemploysnsecondarysortsbasedonlyontheOrderByKey.ThisallowseachflowtodefineitsownkeyspaceforsortingandpermitsreadingflowsinanarbitraryorderthatavoidsunnecessaryADUbuffering.6.6EvaluationOurevaluationvalidatesthebenefitsofprogrammingincrementaldataflowsusingtheCBPmodel.Itexploreshowthevariousoptimizationsforoptimizingdatamovementimprovetheperformanceofourthreeexampleprograms:theincrementalcrawlqueue,clusteringcoefficients,andPageRank.WebuiltourCBPprototypeusingHadoopversion0.19.1,andtheimplementationconsistsof11klinesofcode.6.6.1IncrementalcrawlqueueThispartoftheevaluationillustratesthebenefitsofoptimizingthetreat-mentofstateforincrementalprogramsonanon-trivialclusterandinputdataset.TheseexperimentsusethephysicalrealizationoftheincrementalcrawlqueueshowninFigure6.1.Ourinputdataconsistsof27millionwebpagesthatwedivideintoteninputincrements(eachappr.30GB)forthedataflow.Weranourexperimentsonaclusterof90commoditydualcore2.13GHzXeonswithtwoSATAharddrivesand4GBofmemory.ThemachineshaveaonegigabitpersecondEthernetconnectiontoasharedswitchfabric.Thegoalofoursystemistoallowincrementalalgorithmstoachieveper-epochrunningtimesthatareafunctionofthenumberofstateupdates,notthetotalamountofstoredstate.Notethatfortheincrementalcrawlqueue,thenum-berofstaterecordupdatesisdirectlyproportionaltothenumberofarrivinginputrecords.Thus,asourtestharnessfeedstheincrementalcrawlqueuesuccessiveincrements,weexpecttherunningtimeofeachsuccessiveincrementtobealmost 975007.5GB−Blackbox7.5GB−Direct40030GB−Blackbox30GB−Direct300200Timeelapsed(min)100000.20.40.60.8%dataprocessedFigure6.2:Cumulativeexecutiontimewith30GBand7.5GBincrements.Thesmallertheincrements,thegreaterthegainfromavoidingstatere-shuffling.constant.Tomeasuretheeffectivenessofouroptimizations,wecompareexecutionsofthe“black-box”emulationwiththatofdirectCBP.Forsomedataflows,includingtheincrementalcrawlqueue,thebenefitsofdirectCBPincreaseasincrementsizedecreases.Thisisbecauseprocessinginsmallerincrementsforcesstateflowstobere-shuffledmorefrequently.Figure6.2showsthecumulativeprocessingtimefortheblack-boxanddirectsystemswithtwodifferentincrementsizes:30GB(thedefault)and7.5GB(dividingtheoriginalincrementby4).Thoughtheper-stagerunningtimeofdirectCBPrises,itstillremainsroughlylinearintheinputsize(i.e.,constantprocessingtimeperincre-ment).However,runningtimeusingblack-boxemulationgrowssuperlinearly,becausethecumulativemovementofthestateflowslowsdownprocessing.Figure6.3showsasimilarexperimentusing30GBincrements,butre-portstheindividualepochruntimes,aswellastheruntimesfortheindivid-ualCountLinksandDecideCrawljobs.Thisexperimentincludesthestrawman,non-incrementalprocessingapproachthatre-computestheentirecrawlqueueforeacharrivingincrement.Inthiscasewemodifythedataflowsothatrunsdonotreadorwritestateflows.Asexpected,therunningtimeofthenon-incrementaldataflowincreaseslinearly,withthemajorityofthetimespentcountingin-links.Whiletheincrementaldataflowoffersalargeperformanceimprovement(seenin 98120120TotalTotal100CountLinks100CountLinksDecideCrawlDecideCrawl808060604040Runningtime(min.)20Runningtime(min.)200012345671234567IncrementIncrement(a)Non-incrementaldataflow.(b)Incrementaldataflow:black-box.120Total100CountLinksDecideCrawl806040Runningtime(min.)2001234567Increment(c)Incrementaldataflow:direct.Figure6.3:Theperformanceoftheincrementalversuslandmarkcrawlqueue.ThedirectCBPimplementationprovidesnearlyconstantruntime.Figure6.3(b)),theruntimestillincreaseswithincrementcount.Thisisbecausetheblack-boxemulationpaysalargecosttomanagingthestateflow,whichcon-tinuestogrowduringtheexecutionofthedataflow.Eventuallythisreaches63GBforthecountlinksstageatthe7thincrement.Figure6.3(c)showsruntimesforthedirectCBPimplementationthatusesincrementalshuffling(withreducerpinning)andflowseparation.Notethatstateisan“outer”flowintheseexperiments,causingtranslationtoaccessallstateADUseachepoch.Evenso,incrementalshufflingallowseachstagetoavoidmappingandshufflingstateoneachnewincrement,resultinginanearlyconstantruntime.Moreover,HDFSdoesagoodjobofkeepingthepartitionfileblocksatthepriorreducer.Atthe7thincrement,pinningindirectCBPallowsreducerstoread88% 99oftheHDFSstateblocksfromthelocaldisk.6.6.2BIPtablemicrobenchmarks700600500400300Sequential200Sequential,nosortHypertable100Completiontime(sec)BIPTable00102030405060708090100%keysaccessedFigure6.4:Runningtimeusingindexedstatefiles.BIPTableoutperformsse-quentialaccessevenifaccessingmorethan60%ofstate.TheseexperimentsexplorewhetherrandomlyreadingasubsetofstateisfasterusingBIPtablethanreadingallofstatesequentiallyfromHDFS.Weidentifythebreak-evenhitrate,thehitratebelowwhichtherandomaccessoutperformsthesequentialaccess.Thetestusesastagethatstoresasetofuniqueintegersinaninnerstateflow;inputincrementscontainnumbersrandomlydrawnfromtheoriginalinput.Changinginputincrementsizechangestheworkload’shitrate,thefractionofaccessedstate.Werunthefollowingexperimentsona16-nodeclusterconsistingofDualIntelXeon2.4GHzmachineswith4GBofRAM,connectedbyaGigabitswitch.Wepre-loadedthestatewith1millionrecords(500MB).Heretranslationusesasingledatapartition,runningonasinglenode,thoughHDFS(orHypertable)runsacrossthecluster.Figure6.4comparesrunningtimesforfourconfigurations.BIPtableout-performsSequential,whichreadstheentirestatepartitionfile,foreveryselectivity.OnebenefitisthatBIPtabledoesnotsortitsrecords;ituseshashingtomatchkeysonotherinputs.Tomeasurethiseffect,sequential,nosortdoesnotsortthepartitionfile(andwillthereforeincorrectlyexecuteifthetranslatorwritesnew 100Non−incrementalshuffling300Non−incrementalshuffling200IncrementalshufflingIncrementalshufflingMulticastMulticast25015020010015010050Cumulativetime(min)50Cumulativedatamoved(GB)00051015202530051015202530EpochEpoch(a)Runningtime.(b)Datashuffled.Figure6.5:IncrementalclusteringcoefficientonFacebookdata.Themulticastoptimizationimprovesrunningtimeby45%andreducesdatashuffledby84%overtheexperiment’slifetime.keysduringanepoch).Inthiscase,BIPtablestilloutperformssequentialaccesswhenaccessingamajority(>60%)ofstate.Forreferenceweincludeapriorre-sult[54]usingHypertable;itfailedtoproducedatawhenreadingmorethan50%ofstate.Finally,itisrelativelystraightforwardforBIPtablestoleverageSSDstoimproverandomaccessperformance;adesignthatpromisestosignificantlyextendtheperformancebenefitofthisdesign[54].6.6.3ClusteringcoefficientsHereweexploretheperformanceofourclusteringcoefficienttranslator(Figure5.6).Thesegraphexperimentsuseaclusterof25machineswith160GBdrives,4GBofRAM,and2.8GHzdualcoreXeonprocessorsconnectedbygigabitEthernet.Weincrementallycomputeclusteringcoefficientsusingapubliclyavail-ableFacebookcrawl[77]thatconsistsof28millionedgesbetween“friends.”Werandomizethegraphedgesandcreateincrementscontaining50kedgesapiece.Theseareaddedtoaninitialgraphof50kedgesconnecting46kvertices. 101Figure6.5(a)showsthecumulativerunningtimeforprocessingsuccessiveincrements.Weconfigurethetranslatortousefull,outergroupingsandsucces-sivelyenableincrementalshufflingandmulticastsupport.Firstnotethat,unliketheincrementalcrawlqueue,runningtimeswithincrementalshufflingarenotcon-stant.Thisisbecausethemappedandshuffleddataconsistsofbothmessagesandstate.Recallthatthesemessagesmustbematerializedtodiskattheendofthepriorepochandthenshuffledtotheirdestinationgroupsduringthenextepoch.Infact,themessagevolumeincreaseswitheachsuccessiveincrementasthegraphbecomesincreasinglymoreconnected.Additionally,maptasksthatemulatemulticasting(i.e,byreplicatinganinputrecordforeachdestination)takefourtosixtimesaslongtoexecuteasmaptasksthatoperateonstaterecords.Hadoopinterleavestheselongermaptaskswiththesmallerstatemaptasks;theyactasstragglersuntilstatebecomessufficientlylarge(aroundepoch24).Atthatpointincrementalshufflingremovesover50%ofthetotalshuffleddataineachepoch,enoughtoimpactrunningtimes.Evenbeforethen,asFigure6.5(b)shows,incrementalshufflingfreesasignificantamountofresources,reducingtotaldatamovementby47%duringthecourseoftheexperiment.Forthisapplicationthecriticaloptimizationismulticasting,whichbotheliminatestheuseremulatingmulticastinmaptasksandremovesduplicaterecordsfromthedatashuffle.Inthiscase,directCBPimprovescumulativerunningtimeby45%andreducesdatashuffledby84%overtheexperiment’slifetime.6.6.4PageRankThissectionexplorestheimpactofdirectCBPoptimizationsonthein-crementalPageRankdataflow.Wehaveverifiedthatitproducesidenticalresultsforsmaller,7knodegraphsusinganon-incrementalversion.Asinputweusethe“indochina-2004”webgraphobtainedfrom[19];itcontains7.5millionnodesand109millionedges.Theseexperimentsexecuteon16nodesinourcluster(de-scribedabove).Hereourincrementalchangeistheadditionof2800randomedges(containedinasingleinputincrement). 102400350BroadcastBroadcastIncrementalshuffling300IncrementalshufflingBIPtables300250200200150100100Cumulativetime(min)FindsubgraphGPageRankinnewgraphFindsubgraphGPageRankinnewgraph50Cumulativedatashuffled(GB)00012345678910111213141516012345678910111213141516EpochEpoch(a)Runningtime.(b)DatashuffledFigure6.6:IncrementalPageRank.(a)Cumulativerunningtimeofourincre-mentalPageRanktranslatoradding2800edgestoa7millionnodegraph.(b)CumulativedatamovedduringincrementalPageRank.Figure6.6(a)showsthecumulativeexecutiontimeforthisprocess.AsSec-tion5.5.3explained,thedataflowproceedsinthreephases:computingPageRankontheoriginalgraph(epochs1-3),findingthesubgraphG(epochs4-8),andre-computingPageRankfornodesinG(epochs9-16).Herewehavepurposefullyreducedthenumberofiterationsinthefirstphasetohighlighttheincrementalcomputation.Forthisincrementalgraphupdate,theaffectedsubgraphGcon-tains40knodes.HereweevaluatetheimpactofincrementalshufflingandinnerstateflowsviaBIPtables.NotethatthisdataflowrequiredthedirectCBPimplementation,specificallybroadcastsupportforpropagatingweightsfromdanglingnodes.With-outit,localdisksfilledwithintermediatedataforevensmallgraphs.Unlikeclusteringcoefficient,incrementalshufflingimprovescumulativerun-ningtimeby23%relativetoonlyusingbroadcastsupport.Improvementsoccurprimarilyinthelastphaseastherearefewermessagesandprocessingstatedom-inates.Afterre-computingPageRank,incrementalshufflinghasreducedbytesmovedby46%.Finally,weseeasignificantgainbyusinginnerstateflows(BIPt-ables),aseachepochinthelastphaseupdatesonly0.5%ofthestaterecords.InthiscaseourarchitecturereducedbothnetworkandCPUusage,ultimatelycuttingrunningtimeby53%. 1036.7AcknowledgementsChapter6,inpart,isreprintofthematerialpublishedintheProceedingsoftheACMSymposiumonCloudComputing2010.Logothetis,Dionysios;Olston,Christopher;Reed,Benjamin;Webb,KevinC.;YocumKen.Thedissertationauthorwastheprimaryinvestigatorandauthorofthispaper. Chapter7ConclusionWehavebeenwitnessinganunprecedentedincreaseintheamountofun-structureddataproducedtoday.Exploitingthesebigdatasetsrequiresdataman-agementsystemsthatallowuserstogainvaluableinsightsthroughrichanalysis.Thisthesisisbasedontheobservationthatdataanalysisisnolongera”one-shot”process,ratherweviewitasanupdate-drivenprocess.Update-drivenanalyticsariseinseveralscenarios,likecontinuousdataprocessing,ormachinelearningal-gorithmsthatiterativelyrefinetheresultoftheanalysis.Wearguethatthereisafundamentalmismatchbetweencurrentdata-intensivesystemsandupdate-drivenanalyticsthatmakesprogrammingbigdataanalyticsharderandinefficient.Thisdissertationintroducesadifferentprogrammingapproachthatcapturesthisupdate-drivennature,simplifyingprogrammingandallowingefficientanalytics.Weobservethattheconceptofstatearisesnaturallyintheseupdate-drivendataanalyticsandisafundamentalrequirementforefficientprocessing.Basedonthisobservation,thisdissertationproposesthatstatebecomeafirst-classab-stractioninlarge-scaledataanalytics.Tothisend,thisthesisintroducesstatefulgroupwiseprocessing,anabstractionthatintegratesdata-parallelismforscalewithstateforefficiency.Weusestatefulgroupwiseprocessingtoefficientlymanageana-lyticsintwophasesofthedatalifecycle:(i)onlineETLanalysis,and(ii)follow-onanalysis.CriticaltobuildingapracticalsystemforonlineETLanalyticsistheabil-itytoassesstheimpactofincompletedataontheanalysisfidelity.Wefound104 105thatknowledgeaboutthenaturalspatialandtemporaldistributionofthedataacrosstheirsourcesallowsusefulinsightsaboutthequalityoftheanalysisresults.ByexposingthisinformationtotheusersthroughtheC2metric,iMRallowsawiderangeofonlineanalyticsapplications.Atthesametime,weprovidedgeneralguidelinesforusingC2totradefidelityforlatencydependingonthedifferentap-plicationrequirements.ThroughouriMRprototypewevalidatedtheusefulnessofthemetricinavarietyofrealapplications.Weshowedthatitispossibletoprocessincompletedata,toretainresultavailability,andstillmakeusefulconclusionsfromthedata.ToefficientlyexecuteETLanalytics,theiMRarchitecturemovestheanal-ysisfromdedicatedclusterstothedatasources,avoidingcostlydatamigrations.Whilethisin-situprocessingarchitecturereducesnetworktraffic,itrequiresiMRtomakecarefuluseofavailableresourcestominimizetheimpactoncollocatedservices.iMR’sloadsheddingtechniquesuseavailableresourcesintelligently,andprovideusefulresultsunderconstrainedresourcesorlatencyrequirementswithlittleimpactoncollocatedservices.WhileiMRissuitableforrunningETLanalytics,runningricheranalyticsrequiresadifferentprogrammingmodelandarchitecture.iMRisdesignedmainlyforfilteringandsummarizingdata,operationsthatarecommonasafirststepindataanalytics.However,severalfollow-onanalyticsmaybeexpressedascomplex,multi-stepdataflows.Additionally,someoftheseanalyticsimplementalgorithms,likegraphmining,thatmustiterateoverdatasets.WedesignedtheCBPmodeltobeexpressiveenoughtoallowavarietyofsophisticatedstatefulanalytics.WeexhibitedtheexpressivenessofCBPbybuildinganumberofrealapplications,includingwebmininganalytics,anditera-tivegraphalgorithms.CBP’sbroadcastandmulticastgroupingconstructsprovedparticularlyusefulingraphalgorithms.AbstractingthesecommongraphminingoperationsmadeprogrammingeasierandallowedtheCBPruntimetooptimizetheexecutionofthistypeofanalytics.Furthermore,wefoundthatrunningdataflowsaddstotheprogrammingcomplexityasusersmustcoordinatetheexecutionofindividualsteps.CBPassistsusersinthistaskbyallowingtheprogrammatic 106controlofdataflowsthroughsimple,yetpowerfulprimitives.WevalidatedthebenefitsofintegratingstateintheCBPprogrammingmodelbycomparingCBPagainstaprototypethatimplementsstatefulcomputa-tionsontopoftheMapReducemodel.Leveragingtheexplicitmodelingofstate,CBPcanoptimizestatemanagement,significantlyreducingprocessingtimesandnetworkresourceusage.InmanycasesCBPreducesbothprocessingtimeandnetworktrafficbyatleastafactorof2.Further,weverifiedthatCBP’sextendedgroupingconstructsallowhugeperformancegainsingraphmininganalytics.IncertainscenarioswefoundthatexecutingtheseiterativeanalyticswithcurrentDISCsystemswasimpossibleduetotheamountofdatathathastobesavedtodisksandtransferredacrossthenetwork.Ourworkisafirststeptowardaddressingthebasicchallengesinmanagingupdate-drivenanalytics.Atthesametimeitcreatesthegroundforinvestigatingavarietyofinterestingissuesinthefuture.Thereisatrendtowardreal-timeanalyticsandaneedtoextractusefulinformationfrompartialdata.Aninterest-ingdirectionistoexploretheuseoffidelitymetricsforonlineanalysisinmoresophisticatedanalytics,likemachinelearningandgraphmining,notjustETL.Incompletedataimpactsuchanalyticsinnon-obviousways.Understandingandcharacterizingthisimpactisasteptowardmakingonlineanalysisapplicabletoawiderrangeofanalytics.LikeMapReduce,oneofthestrengthsofCBPisitsflexibility.Eventhoughthisallowsrichanalytics,itmayresultinalotofcustom,complicatedusercodethatisdifficulttomaintainandre-use[61].Higher-levellanguages,likePig[61]andDryadLINQ[82],thatarelayeredontopofsystemslikeMapReduceandDryadallowuserstocomposedataflowsfromarestrictedsetofhigh-leveloperations(e.g.filtering,grouping,aggregations,joins),simplifyingprogramming.Apromisingdirectionistoinvestigatetheintegrationofstatefulprogramminginahigher-levellanguage.Devisingincrementalalgorithmsthatusestatemaysometimesbeahardprogrammingtask.Recentworkhasexploredtheabilitytoautomaticallydetectopportunitiesforcomputationre-use[67,42,43,17].Theseapproachestranspar- 107entlymodify”one-shot”dataflowstoupdatetheanalyticsinanincrementalway.Aninterestingfuturestudywouldbetoinvestigatethesweetspotbetweeneaseofprogrammingthatthesesystemsprovideandincreasedperformancethroughexplicitlyincrementalprograms. Bibliography[1]1998WorldCupWebServerLogs.http://ita.ee.lbl.gov/html/traces.html.[2]CPUUsageLimiterforLinux.http://cpulimit.sourceforge.net.[3]ListofcompaniesusingHadoop.http://wiki.apache.org/hadoop/PoweredBy.[4]Oopspowsurprise...24hoursofvideoallupinyoureyes!http://youtube-global.blogspot.com/2010/03/oops-pow-surprise24-hours-of-video-all.html.[5]TheApacheMahoutmachinelearninglibrary.http://mahout.apache.org.[6]TheFlumelogcollectionsystem.https://github.com/cloudera/flume.[7]TheGridMixHadoopWorkloadGenerator.http://hadoop.apache.org/mapreduce/docs/current/gridmix.html.[8]TheHadoopproject.http://hadoop.apache.org.[9]TheHiveproject.http://hadoop.apache.org/hive.[10]TheKomogorov-Smirnofftest.http://en.wikipedia.org/wiki/Kolmogorov-Smirnovtest.[11]WindowsAzureandFacebookteams.Personalcommunications,August2008.[12]Supercomputers:’DataDeluge’IsChang-ing,ExpandingSupercomputer-BasedResearch.http://www.sciencedaily.com/releases/2011/04/110422131123.htm,April2011.[13]D.J.Abadi,Y.Ahmad,M.Balazinska,M.Cherniack,J.-H.Hwang,W.Lind-ner,A.S.Maskey,E.Rasin,E.Ryvkina,N.Tatbul,Y.Xing,andS.Zdonik.ThedesignoftheBorealisstreamprocessingengine.InConferenceonInno-vativeDataSystemResearch,Asilomar,CA,Jan.2005.[14]A.ArasuandJ.Widom.Resourcesharingincontinuoussliding-windowaggre-gates.InInternationalConferenceonVeryLargeDataBases,pages336–347,Toronto,Canada,Aug.2004.108 109[15]B.Babcock,S.Babu,M.Datar,R.Motwani,andJ.Widom.Modelsandissuesindatastreamsystems.InACMSymposiumonPrinciplesofDatabaseSystems,page1,Madison,WI,June2002.ACMPress.[16]M.Balazinska,A.Rasin,U.Cetintemel,M.Stonebraker,andS.Zdonik.High-AvailabilityAlgorithmsforDistributedStreamProcessing.InInternationalConferenceonDataEngineering,pages779–790,Tokyo,Japan,Apr.2005.IEEE.[17]P.Bhatodia,A.Wieder,I.E.Akkus,R.Rodrigues,andU.Akar.Large-scaleIncrementalDataProcessingwithChangePropagation.InWorkshoponHotTopicsinCloudComputing,Portland,OR,June2011.[18]J.A.Blakeley,P.-A.Larson,andF.W.Tompa.Efficientlyupdatingmateri-alizedviews.InACMSIGMODInternationalConferenceonManagementofData,volume15,pages61–71,June1986.[19]P.BoldiandS.Vigna.TheWebGraphFrameworkI:CompressionTech-niques.InInternationalWorldWideWebConference,pages595–601,Man-hattan,NY,2004.InProc.oftheThirteenthInternationalWorldWideWebConference.[20]T.Brants,A.C.Popat,andF.J.Och.LargeLanguageModelsinMa-chineTranslation.InEmpiricalMethodsinNaturalLanguageProcessingandComputationalNaturalLanguageLearning,volume1,pages858–867,Prague,CzechRepublic,June2007.[21]R.E.Bryant.Data-IntensiveSupercomputing:ThecaseforDISC.Technicalreport,CarnegieMellonUniversity,Pissburgh,PA,May2007.[22]Y.Bu,B.Howe,M.Balazinska,andM.D.Ernst.HaLoop:efficientiterativedataprocessingonlargeclusters.ProceedingsoftheVLDBEndowment,3(1-2):285–296,Sept.2010.[23]D.Carney,U.C¸etintemel,M.Cherniack,C.Convey,S.Lee,G.Seidman,M.Stonebraker,N.Tatbul,andS.Zdonik.Monitoringstreams:anewclassofdatamanagementapplications.InInternationalConferenceonVeryLargeDataBases,pages215–226,HongKong,China,Aug.2002.[24]P.Chan,W.Fan,A.Prodromidis,andS.Stolfo.Distributeddataminingincreditcardfrauddetection.IEEEIntelligentSystems,14(6):67–74,Nov.1999.[25]F.Chang,J.Dean,S.Ghemawat,W.C.Hsieh,D.A.Wallach,M.Burrows,T.Chandra,A.Fikes,andR.E.Gruber.Bigtable:adistributedstoragesystemforstructureddata.InUSENIXSymposiumonOperatingSystems 110DesignandImplementation,OSDI’06,pages205–218,Seattle,WA,Nov.2006.USENIXAssociation.[26]D.ChatziantoniouandK.A.Ross.GroupwiseProcessingofRelationalQueries.InInternationalConferenceonVeryLargeDataBases,pages476–485,Athens,Greece,Aug.1997.[27]Y.Chen,D.Pavlov,andJ.F.Canny.Large-scalebehavioraltargeting.InACMSIGKDDInternationalConferenceonKnowledgeDiscoveryandDataMining,page209,NewYork,NewYork,USA,June2009.ACMPress.[28]S.Chien,C.Dwork,R.Kumar,D.R.Simon,andD.Sivakumar.LinkEvo-lution:AnalysisandAlgorithms.InternetMathematics,1:277–304,2004.[29]C.-T.Chu,S.K.Kim,Y.-A.Lin,Y.Yu,G.Bradski,A.Y.Ng,andK.Oluko-tun.Map-ReduceforMachineLearningonMulticore.InNeuralInformationProcessingSystems,Dec.2006.[30]E.F.Codd.Arelationalmodelofdataforlargeshareddatabanks.Commu-nicationsoftheACM,26(1):64–69,Jan.1983.[31]J.Cohen.GraphTwiddlinginaMapReduceWorld.ComputinginScience&Engineering,11(4):29–41,2009.[32]T.Condie,N.Conway,P.Alvaro,J.M.Hellerstein,K.Elmeleegy,andR.Sears.MapReduceonline.InUSENIXSymposiumonNetworkedSystemsDesignandImplementation,page21,SanJose,CA,Apr.2010.[33]F.Dabek,R.Cox,F.Kaashoek,andR.Morris.Vivaldi:ADecentralizedNet-workCoordinateSystem.InACMSIGCCOMMConferenceonApplications,Technologies,ArchitecturesandProtocolsofComputerCommunications,vol-ume34,page15,Portland,OR,Oct.2004.[34]J.DeanandS.Ghemawat.MapReduce:simplifieddataprocessingonlargeclusters.InUSENIXSymposiumonOperatingSystemsDesignandImple-mentation,page10,SanFrancisco,CA,Dec.2004.[35]J.DeanandS.Ghemawat.MapReduce:aflexibledataprocessingtool.Com-municationsoftheACM,53(1):72,Jan.2010.[36]J.Ekanayake,X.Qiu,T.Gunarathne,S.Beason,andG.Fox.HighPerfor-manceParallelComputingwithCloudandCloudTechnologies.InInterna-tionalConferenceonCloudComputing,June2009.[37]M.Ester,H.-P.Kriegel,J.Sander,M.Wimmer,andX.Xu.IncrementalClusteringforMininginaDataWarehousingEnvironment.InInternationalConferenceonVeryLargeDataBases,pages323–333,NewYorkCity,NY,Aug.1998. 111[38]E.Friedman,P.Pawlowski,andJ.Cieslewicz.SQL/MapReduce:apracti-calapproachtoself-describing,polymorphic,andparallelizableuser-definedfunctions.ProceedingsoftheVLDBEndowment,2(2):1402–1413,Aug.2009.[39]S.Ghemawat,H.Gobioff,andS.-T.Leung.TheGooglefilesystem.InACMSymposiumonOperatingSystemsPrinciples,volume37,page29,BoltonLanding,NewYork,Dec.2003.[40]J.Ginsberg,M.H.Mohebbi,R.S.Patel,L.Brammer,M.S.Smolinski,andL.Brilliant.Detectinginfluenzaepidemicsusingsearchenginequerydata.Nature,457(7232):1012–4,Feb.2009.[41]J.Gray,A.Bosworth,A.Layman,andH.Pirahesh.DataCube:ARelationalAggregationOperatorGeneralizingGroup-By,Cross-Tab,andSub-Total.InInternationalConferenceonDataEngineering,NewOrleans,LA,Mar.1996.[42]P.K.Gunda,L.Ravindranath,C.A.Thekkath,Y.Yu,andL.Zhuang.Nec-tar:AutomaticManagementofDataandComputationinDatacenters.InUSENIXSymposiumonOperatingSystemsDesignandImplementation,Oct.2010.[43]B.He,M.Yang,Z.Guo,R.Chen,B.Su,W.Lin,andL.Zhou.Comet:batchedstreamprocessingfordataintensivedistributedcomputing.InACMSymposiumonCloudComputing,pages63–74,Indianapolis,IN,June2010.ACM.[44]J.Hellerstein,P.Haas,andH.Wang.OnlineAggregation.InACMSIGMODInternationalConferenceonManagementofData,Tucson,AZ,June1997.[45]R.Huebsch,J.M.Hellerstein,N.Lanham,B.T.Loo,S.Shenker,andI.Sto-ica.QueryingtheinternetwithPIER.InInternationalConferenceonVeryLargeDataBases,pages321–332,Berlin,Germany,Sept.2003.[46]M.Isard,M.Budiu,Y.Yu,A.Birrell,andD.Fetterly.Dryad:DistributedData-ParallelProgramsfromSequentialBuildingBlocks.InEuroSysEuro-peanConferenceonComputerSystems,volume41,page59,Lisbon,Portugal,June2007.[47]M.IsardandY.Yu.Distributeddata-parallelcomputingusingahigh-levelprogramminglanguage.In35thInternationalConferenceonManagementofData,page7,Providence,RhodeIsland,2009.[48]Z.Ivezic,J.A.Tyson,R.Allsman,J.Andrew,andR.Angel.LSST:FromSci-enceDriverstoReferenceDesignandAnticipatedDataProducts.Evolution,page29,May2008. 112[49]R.Johnson.ScalingFacebookto500MillionUsersandBeyond.http://www.facebook.com/note.php?noteid=409881258919.[50]J.Li,D.Maier,K.Tufte,V.Papadimos,andP.A.Tucker.Nopane,nogain:efficientevaluationofsliding-windowaggregatesoverdatastreams.ACMSIGMODRecord,34(1),2005.[51]J.LinandM.Schatz.DesignPatternsforEfcientGraphAlgorithmsinMapReduce.InWorkshoponMiningandLearningwithGraphs,2010.[52]D.Logothetis,C.Olston,B.Reed,K.Webb,andK.Yocum.ProgrammingBulk-IncrementalDataflows.Technicalreport,UniversityofCalifornia,SanDiego,2009.[53]D.LogothetisandK.Yocum.Wide-ScaleDataStreamManagement.InUSENIXAnnualTechnicalConference,Boston,MA,June2008.[54]D.LogothetisandK.Yocum.DataIndexingforStateful,Large-scaleDataProcessing.In5thInternationalWorkshoponNetworkingMeetsDatabases(NetDB’09),BigSky,MT,Oct.2009.[55]S.Madden,M.J.Franklin,J.M.Hellerstein,andW.Hong.TAG:aTinyAGgregationserviceforad-hocsensornetworks.InUSENIXSymposiumonOperatingSystemsDesignandImplementation,volume36,page131,Boston,MA,Dec.2002.[56]G.Malewicz,M.H.Austern,A.J.Bik,J.C.Dehnert,I.Horn,N.Leiser,andG.Czajkowski.Pregel:asystemforlarge-scalegraphprocessing.Interna-tionalConferenceonManagementofData,2010.[57]S.Melnik,A.Gubarev,J.J.Long,G.Romer,S.Shivakumar,M.Tolton,andT.Vassilakis.Dremel:InteractiveAnalysisofWeb-ScaleDatasets.Proceed-ingsoftheVLDBEndowment,3,Sept.2010.[58]D.MetzlerandE.Hovy.Mavuno:AScalableandEffectiveHadoop-BasedParaphraseAcquisitionSystem.InKDDWorkshoponLarge-scaleDataMin-ing:TheoryandApplications,SanDiego,CA,Aug.2011.[59]C.Monash.Facebook,Hadoop,andHive,2009.[60]R.N.MurtyandM.Welsh.Towardsadependablearchitectureforinternet-scalesensing.InWorkshoponHotTopicsinSystemDependability,Hot-Dep’06,pages8–8,Berkeley,2006.USENIXAssociation.[61]C.Olston,B.Reed,U.Srivastava,R.Kumar,andA.Tomkins.Piglatin:anot-so-foreignlanguagefordataprocessing.InACMSIGMODInternationalConferenceonManagementofData,page1099,Vancouver,BC,Canada,June2008.ACMPress. 113[62]L.Page,S.Brin,R.Motwani,andT.Winograd.ThePageRankCitationRanking:BringingOrdertotheWeb.Technicalreport,StanfordInfoLab,Nov.1999.[63]C.Palmisano,A.Tuzhilin,andM.Gorgoglione.Userprofilingwithhierarchi-calcontext:ane-Retailercasestudy.InInternationalandInterdisciplinaryConferenceonModelingandUsingContext,pages369–383,Aug.2007.[64]B.Panda,J.S.Herbach,S.Basu,andR.J.Bayardo.PLANET:massivelyparallellearningoftreeensembleswithMapReduce.ProceedingsoftheVLDBEndowment,2(2):1426–1437,Aug.2009.[65]B.Pariseau.IDC:Unstructureddatawillbecometheprimarytaskforstorage,Oct.2008.[66]D.PengandF.Dabek.Large-scaleincrementalprocessingusingdistributedtransactionsandnotifications.InUSENIXSymposiumonOperatingSystemsDesignandImplementation,pages1–15,Vancouver,BC,Canada,Oct.2010.[67]L.Popa,M.Budiu,Y.Yu,andM.Isard.DryadInc:Reusingworkinlarge-scalecomputations.InUSENIXWorkshoponHotTopicsinCloudComput-ing,page21.USENIXAssociation,2009.[68]X.QianandG.Wiederhold.Incrementalrecomputationofactiverela-tionalexpressions.IEEETransactionsonKnowledgeandDataEngineering,3(3):337–341,Sept.1991.[69]A.Simitsis,P.Vassiliadis,S.Skiadopoulos,andT.Sellis.DataWarehouseRefreshment.InR.WrembelandC.Koncilia,editors,DataWarehousesandOLAP:Concepts,ArchitecturesandSolutions.IRMPress,2006.[70]U.SrivastavaandJ.Widom.Flexibletimemanagementindatastreamsys-tems.InACMSymposiumonPrinciplesofDatabaseSystems,page263,Paris,France,June2004.ACMPress.[71]M.Stonebraker,C.Bear,U.Cetintemel,M.Cherniack,T.Ge,N.Hachem,S.Harizopoulos,J.Lifter,J.Rogers,andS.Zdonik.Onesizefitsall?Part2:benchmarkingresults.InConferenceonInnovativeDataSystemResearch,Asilomar,CA,Jan.2007.[72]M.Stonebraker,S.Madden,D.J.Abadi,S.Harizopoulos,N.Hachem,andP.Helland.Theendofanarchitecturalera:(it’stimeforacompleterewrite).InInternationalConferenceonVeryLargeDataBases,pages1150–1160,Vi-enna,Austria,Sept.2007. 114[73]J.Tan,X.Pan,S.Kavulya,R.Gandhi,andP.Narasimhan.SALSA:analyzinglogsasstatemachines.In1stUSENIXWorkshoponAnalysisofSystemLogs,page6,Dec.2008.[74]N.Tatbul,U.C¸etintemel,andS.Zdonik.StayingFIT:efficientloadsheddingtechniquesfordistributedstreamprocessing.InInternationalConferenceonVeryLargeDataBases,page11,Vienna,Austria,Sept.2007.[75]N.TatbulandS.Zdonik.Window-awareloadsheddingforaggregationqueriesoverdatastreams.InInternationalConferenceonVeryLargeDataBases,Seoul,Korea,Sept.2006.[76]P.VassiliadisandA.Simitsis.Extraction,transformation,andloading.InL.LiuandM.T.Ozsu,editors,¨EncyclopediaofDatabaseSystems.Springer,2009.[77]C.Wilson,B.Boe,A.Sala,K.P.Puttaswamy,andB.Y.Zhao.Userinterac-tionsinsocialnetworksandtheirimplications.InACMEuropeanConferenceonComputerSystems(EuroSys’09),EuroSys’09,page205,NewYork,NewYork,USA,2009.ACMPress.[78]H.Wu,B.Salzberg,andD.Zhang.Onlineevent-drivensubsequencematchingoverfinancialdatastreams.InACMSIGMODInternationalConferenceonManagementofData,page23,Paris,France,June2004.ACMPress.[79]P.YalagandulaandM.Dahlin.Ascalabledistributedinformationmanage-mentsystem.InACMSIGCCOMMConferenceonApplications,Technolo-gies,ArchitecturesandProtocolsofComputerCommunications,volume34,page379,Portland,OR,Oct.2004.[80]A.YooandI.Kaplan.Evaluatinguseofdataflowsystemsforlargegraphanalysis.InWorkshoponMany-TaskComputingonGridsandSupercomput-ers,MTAGS’09,pages1–9,NewYork,NewYork,USA,2009.ACMPress.[81]Y.Yu,P.K.Gunda,andM.Isard.Distributedaggregationfordata-parallelcomputing:interfacesandimplementations.InACMSymposiumonOperat-ingSystemsPrinciples,page13,BigSky,MT,Oct.2009.[82]Y.Yu,M.Isard,D.Fetterly,M.Budiu,U.Erlingsson,P.K.Gunda,andJ.Currey.DryadLINQ:asystemforgeneral-purposedistributeddata-parallelcomputingusingahigh-levellanguage.InUSENIXSymposiumonOperatingSystemsDesignandImplementation,pages1–14,SanDiego,CA,Dec.2008.[83]M.Zaharia,M.Chowdhury,M.J.Franklin,S.Shenker,andI.Stoica.Spark:ClusterComputingwithWorkingSets.InWorkshoponHotTopicsinCloudComputing,Boston,MA,June2010.