Apache Tez – Present and Future

so everyone welcome to this session next my cardiologist Anna media wiki we are presentation about about States its present and the future so how many of you have heard of test if you help you with your hand oh thanks thanks surprise me yeah so I think that many of you must know Pig hive and cascading and many of you may already use these tools in your routine work and that means you have already used tests unknowingly so this presentation will tell you what is test and how to use this to make your routine work more efficiently now here this is today’s agenda I will cover the first two section tears introduction and its feature deep dive and my colleague or ladouche will cover the next two sections so first I will give one single simple example to make a comparison between the emeritus and a test so here we have one on high query you may note that by default this hog query will be translated to several MapReduce jobs so you can see that in the on the right side we have our four four circles which means four MapReduce jobs and each MapReduce job will write data on the HDFS and which this data will be read by the next module job now let’s see how this how test increment is high query on the right side we can see that one circle which means which only require one job and as there’s no intermediate data written to the HDFS instead each vertex we reports data from its upstream vertex so this is just a one simple comparison between the my reduce and a test just want to give you one simple impression or what is test of course test has a lot of other features and improvements compared to values our tourism later so first what is test test is the distribution excusing engine framework here I want to exercise that test is this will be a huge engine engine foreign worker it’s not a accusing engine so that that means you can use test to bill the excusing engine for your special purpose next test try to express the condition as the data flow graph also you can catch a daj sorry taystee designed to be a highly customized to meet a broad spectral user cases you can use tails for eg air for impact query and for machine learning and you can only long test unhappy with you because test is built on top of yarn so Louis’s manual for yoga of Heidelberg at the last test is the open source attached operatic 13 study around two years ago and around one year ago which becomes a much top level project just now I machine such tests try to express the competition by that so what is that and why dag that means you looked at a cycle graph first which is dwarf Nhat lecturer with no circle actually are most of the work around it in the data area can be modeled as a tag and any complicated tag can be composed of the following three basic Paradise sequential merger and divider so secretion means that we have one data set and we’d like to transport you from one kind of a match to another kind of a match for example we are production filter robot the second part were anti motor means that we have multiple deserts and we rely to more than by some Lewis for them was the join union or intersection the last part I is divided which means we have one desert and we delight to divide it into multiple deserts for example the our

spirit are brushing in pig so now let’s see how to express that entails API test has to a chives for that one is the tag API the other is a long time he I see that API allows the user to build a tag which represents the topological structure of the data mutation parole and the wrong time API help you to specify the application logical of each computation unit and as you can help to specify how to move move read and write the data between these what it is now let’s first see the decade child there’s two main components in a dagger type one is a vertex the other is their age so you when you create the vertex you can specify the processor para lesson resources and an extension so when you create two ages you can also specify lots of profits so here I list the most important one the data movement are for tests you support four kinds of data movement the scatter gather broadcast one-to-one and in costume so the so scared Cara is very similar to the shuffle of MapReduce as we know that C shuffle we are involved involves several steps you need first partitioner data on the mouth side and a sword module data on the radio side usually is SK together is used in group I join and we know that some time we don’t need to partition sort murder data so that’s the reason why test introduced other kinds of data movement for example the broadcasts will be used in pig lubricate joint and the high broadcast joint in which case we have one small data set and we rely to join this smaller set with a very large data set the other ten a moment type is the one-to-one this will be used at Pig Auto by I will courage later so if neither of these sweet datum untied don’t fit for your requirement you can also implement your own tail bone type we can courage custom so now let’s see the wrong type shy for the wrong time shy there’s three main components the input processor and output we can also touch IPO model so the input is the data that a process where receiver and for test each vertex can have multiple inputs this is the advantage compared to MapReduce as we know that one MapReduce job can only have one input of a match and the next thing is the processor processor has the application logic to consume your data and produce the output the next is the output the output is that either data that we are not you will write on the aged and similar as the input each vertex can have multiple outputs so here are this several building library of input-output and processor for example CMR input can read the reader data which is comparable to import format and a multiple can write the data to which is comparable to output format so usually the input output appeared because Pompeyo input/output represents the two sides of one age for example the other group kV input and other partition kV output is the one input output pair of scatter and aging and the autoclaving input and our kv output is the input outer pair of broadcasts and one who are aging and of course each occasion has this own processor for them poppy has P processor and hive has high processor okay just now I talked about the test biggest feature that tries to express the commutation by that so now our lists develop any bits of dag the first benefit is that which is much easier to compress the commutation by dag so as as I just mentioned that there’s three basic paradigm and each

complicated attack can be composed of the story of these three basic paradigms and if you have any experience in the database you may have the concept you may know the concept of query pram so each curve will first be translated to a career plan which will be somewhere to the underlying engine and accuracy square prime is the dag the next benefit is that there’s no intermediate data within to the HDFS this also means that we have we will have less pressure and a node because when we lead a large data on HDFS we have to first wear the name node the next benefit is that there’s no real or screwing efforts and the less resource confession and in last penny pages which were in test which will be more optimal telling opportunity with global with more global context because since we model that whole competition as a tag so for each vertex it ever has each has net which has no the global context which rotates it she knows the context of these upstream waters and it’s downstream waters so for example in tears it is much easier to take it a boundary to pick at the mountie of the data locality so just now I are measuring that introduced the give you a basic introducing of test and talk about its bigger the feature about Express accommodation paddock of course test has also has a lot of other features so here I will next hour talk about several of the main features so the first feature I will talk about is the continually use here the container means the young container but we know that emeritus we have to allocate each container for each map dealer for each task and after this task is completed we have to release this container wiring test we do not need to release the container we can continue use that for the next task so the container we use will bring lots of benefits the first benefit is that we can consume less resources so since we consume less resources we can reduce the overhead or launch in Gillian and its overhead or Diigo issue with the resource manager and the overhead only source localization and the next benefit is we can reduce the network i/o so how’s that achieved for example we have one container as a high as already long one task and this task has its output within to the local disk and then we reuse this container for another task and then luckily this task we are related data of the last task so in this case we do not need to pour the data through Network instead we can read it from the local disk so test will encourages such kind of continually use to take advantage of the data locality the last benefit is that is the object caching since multiple tasks can log in the same gvn so all these tasks access the same data in this JVM ok the next feature is the test session as we know that foolish MapReduce job we need to launch one M Y in test we can reuse the same for multiple tags that means we can reuse the containers across tags and we can share the data across tags the next feature is the dynamic parsing estimation as we know that one job may run on different size all data and for each long the data is pushing may be different sometimes which is it may be even and sometimes the data distribution may be very skewed so so the part of the number is very critical for the performance and a test provide a very cool feature for you to put use to estimate the party number at a long time the kissing the kissing here is the what has manager the word test manager is

much like a control the word has it you can listen to other water status coordinators schedule each task and also here before communication between these vertices so now let’s see how what is manager do the dynamic person is the machine on the right side we can see that there’s two vertices one is the map of a test the other is the reduce of artists so when the task in the map of our test is completed it will send a message to the vertex manager and this message contained their size information and the word test manager where determines a large person number of the video server test based on this information now the next feature is the ATS integration here the ATS means the brocation timeline service test we are pushed its long in data to the HS and the data in the 80s is maining for a similar purpose one purpose is for monitoring another purpose for performers and narratives and my colleague Rajesh will talk about these two things later now the next feature is the our recovery in test the am a crash in some corner cases which may crash due to the auto memory or nodal period so if the aim in crashed the next aim attempt will be launched automatically and it will continue from the last checkpoint and the whole recovery process is transparent to the users okay just now I talked about lots of test features including the Express at invitation by dag continually used test session recovery edges integration so now I will give you one concrete example to show you how test use these features here we have one simple feature script it is a auto by you may note that this all about where in this auto by will be translated to two MapReduce jobs the first MapReduce job is for the historian calculation so the next MapReduce job will use the his grand data to build a customized a petitioner to repartition the data so finally the data will be globally sorted it now let’s see how test increment is ping script on the right side we can see that there’s four more tests see lo the word has has to output 1 output is for his graduation another output is for repetition and the partition substance the Voltaire’s partition has two input Y is the his current data the partition what is we will use the his core and data to build a customer partitioner to the petunia data and finally the data a global sorted so let’s see what kind of features we use here the first feature is that we express the computation by dad so there’s no intermediate data with into HDFS the next feature is container we use after this feature is used increasingly many other can many of the containers used by the vertex loader will be used by the word hesitation so that it can take at the boundary of the data locality ok so just I give you an introduction you’ll test and its features now next my colleague rajesh we are talking about the remaining two parts rajesh please thanks Jeff so I will concentrate mainly on the performance as well as the debug ability aspects so large clusters can have heterogeneous hardware and a combination of good as well as bad nodes so the job run times can get heavily impacted if the task attempts are scheduled on some of these bad nodes are on a slower set of nodes so when speculation is enabled taste tries to gather the runtime statistics of all the task attempts and based on this information it identifies the stragglers which are running a lot more slower then which are running a lot more slower when

compared to the other tasks in the same vertex so based on this information the speculative attempts are scheduled and if the original if the speculative attempts finish past the original attempts are automatically killed so this feature will be very useful in the case of large clusters this stores the intermediate data generated in the producer as well as in the consumer side in an Eiffel format so just to take an example in the producer side weight ordered output or built unordered output the data is generated in the Eiffel format and in the consumer side as the data gets pulled from various sources and if there is memory pressure it will end up spilling the data on to the disk so at that point in time IPL format is being used so a typical record in the iPad format will compress of the key length the value length the key bytes and the value bytes however this may not be the best suited format for all use cases for instance if you have a sorted stream of incoming data with duplicate keys it this particular format will end up writing all the duplicate keys again and again so it’s going to bloat up the memory in the short buffer as well as the amount of disk space that is required for storing the I file is going to be higher or it’s going to bloat up that particular size as well in turn this will this will also occupy the network bandwidth the reason is if you have larger amount of data in the days it has to be transferred over to the consumer as well so it’s going to take up a lot of i/o as well so in the recent versions of days we try to modify this slightly in such a way that it caters to both of use cases that I talked about which is having unique set of keys as well as having a bunch of keys which have duplicate keys as well so it starts off by writing the normal curve I shall record but the moment it encounters a duplicate key it writes the RLE marker followed by a bunch of values that are associated with the keys followed by the value in marker so you can think of it more like a key comma list of values pattern so with this in our internal benchmark we observed our own fifty-seven percentage memory efficiency and around 23 percentage reduction in the I file size days when it see releases the data to memory it ends up writing the key size as well as the key bytes associated with that the same thing applies for the value as well and when it’s serious is the data from the memory to the distance of writing the same key size followed by the data which is stored in the memory so if you look at it essentially it is wasting 8 bytes of data when writing the data from the memory to the disk so in the recent versions of this we introduced fast by serializable which optimizes this particular use case it may not sound a big improvement if we are trying to compare or if you are trying to process tens or hundreds of rolls but this will be a big improvement if you are trying to come our process millions or billions of records in our experiments we observe around 25 percentage improvement in the I felt size and it also reduces the amount of serialization as well as that is this relation cost it is not common uncommon to see a containers of size 4 GB are greater than 4 GB in sizes these days however the amount of memory that we could allocate to the asad buffer was always limited by 2 GB limit in the earlier versions of this in the recent versions with the modifications in the pipeline sorter we are able to allocate greater than 2 GB sort buffer and this internally means that it will be able to we are able to reduce the number of spills that will end up happening in the case of large jobs we also reduced then a CPU cost involved in the key comparisons in the pipeline sorter which will help in the overall runtime of the job as I mentioned earlier consumers pull the data from various sources and start merging the data together so it’s quite possible that it’s trying to pull all the do it’s it’s quite possible that it’s pulling the duplicate keys as well from different sources and we didn’t have our ellie enable in the reducer site in the reduce of merge code path so we enable our le merge eyerly in the reducer merge code path in the recent versions of this and we saw around 10 percentage improvement this helps in reducing both the IU as well as the CPU cost just to give an example in this particular dad the reducer number 2 has taken almost all 29 million 50 milliseconds and the entire diet oook almost 691 seconds to execute with the fix the same reducer too has taken only 22 million CPU milliseconds and 621 seconds to complete the entire tag so that’s a good improvement in

terms of both the overall runtime as well as the CPU utilization this automatically scales down the amount of memory that is allocated to various inputs as well as outputs so in this particular bag you can see that couple of vertices have got too many number of inputs as well as outputs so in the older versions of this what used to happen is the amount of memory will be pretty much equally distributed to all the inputs as well as the outputs but that might not be the best option because you might want to allocate as much memory as possible to the sought buffers because that’s where they memory is required as opposed to the unordered use cases if you don’t allocate that much amount of memory to the enough memory to the sort ordered outputs then you will end up having a lot of spills so in the recent versions of phase we introduced the weighted memory distributor with which you can control the amount of memory that is allocated both to the input as well as the output the various different inputs as well as outputs so that gives you a fine-grained control on the amount of memory that is allocated so we saw her on 26 percentage improvement in the task runtime in the in the internal benchmarks as Jeff pointed out broadcast is one of the data movements that is supported in taste wherein the source generates the data and which is pretty much sent to all the tasks in the downstream vertex this will not be a problem if you have one source as well as of 10 or 100 downstream tasks in those in the vertex however this could become a big issue if there are like 10,000 tasks are like thousand tasks in the down chamber types and they all are trying to pull the data from the same source so with the fix what we did was it is quite possible that many tasks pertaining to the same vertex might be running on the same container so with the fix what what had happened what happens is that the task which gets launched first tries to pull the data from the source and the subsequent task pertaining to the same vertex running on the same node they try to check whether the data is locally available or not and then try to pull it over as opposed to contacting the source if it is not available anyways it will contact the source and then download the data this drastically reduced the number of TCP connections as well as the gave a good improvement in the latency as well it is common to see of data skew in the production jobs so just to give an example it’s quite possible that 20% of the tasks might be generating almost 80% of the output so in this case even the slow start may not be useful the reason is even though the slow start is going to read launch all your reduce tasks unless and until the sauce tasks have generate are completed generating the data the reduce task will not be able to pull it pull the data over and in this case the source task will be like generating too many number of spills and at the end of it it has to do a final merge in terms of writing the data on to the disk only then the events will be sent to the reducer in terms of pulling the data so we introduced the initial versions of the pipeline shuffle in the recent versions of days where in Assen when a spill happens in the source task and even to send to the downstream vertex so that it can start pulling the data so this has got two big benefits oneness you no longer need to do the final merge in the source task in which case it’s gonna drastically reduce the amount of i/o that is required in the source and the second benefit is nasan when the spill is getting generated yeah the downstream vertex is able to pull the data over and then merge it so you are in a way overlapping the along with the CPU so in our internal benchmarks we saw around 20 percent improvement for certain jobs which involve heavy skew just to give an example in this particular dag map number seven executed one single a single task and it ended up generating huge amount of couple of jeebies worth of data so it ended up spilling lot many number of times and finally it ended up doing a merge and generated somewhere around 11 GB of data and it took almost like 925 seconds and the downstream vertex which is reducer number seven was just waiting for the data to be generated from the source with the pipeline shuffle it took only eight eight six 80 minutes of 680 seconds so that’s a big improvement in terms of the overall runtime and the biggest factor is you don’t you no longer need to do the final merge in the map number seven which is gonna drastically reduce the i/o cost from the debug ability aspects

in the starting branch 0.6 there was a huge effort which has gone in in terms of creating the taste UI in the earlier days we used to debug of the taste for I mean job problems by mining the logs that we obtained from the HDFS or so which was a huge pain so this UI the source of truth for state 0 is the 80s data which gets stored so as and when the dag gets executed at the end of it it publishes the data on to 80s as Jeff pointed out and days UI pulls the data from 80s and then renders that information and this is going to be a lot more useful not only for developers but for admins as well as for the performance analyst you can clearly look at the set up tags that are running in the system the setup DAGs that ran on the system and look at the various tasks task attempts the counters and do all sorts of performance analysis on top of this this was not possible in the earlier versions of days and also for debugging we used to look at all the taste logs in terms analyzing what was the error that happened but with the face you it becomes a lot more easier because the error automatically gets popped up in the UI so it becomes a lot more easier in terms of debugging one of the nice features about the phase UI is that it is also possible to download the ATS data pertaining to a specific dag by clicking the download data button so this will be a lot more useful if you are trying to support remotely where earlier we used to ask for all the logs which is going to be in the GB certain in terms of megabytes but now you can ask for this particular download I mean you can ask only for this particular data and mind that information in terms of understanding what really happened in their particular diet the other feature that we added recently was the local mode where you can create the taste jobs without and tested without having to spin up a Hadoop cluster so you can as long as you have an ID and you are able to write the taste code you will be able to debug through your test code without having to deploy it on the Hadoop cluster itself the only caveat is it will not go through the scheduler path or it will not go through the RPC mechanisms one a couple of questions that arises one while debugging the production system is whether your containers got launched properly whether they were pre bombed properly whether your containers got reused properly whether any local rates are whether there any remote rates that happened in the system what are the kind of tasks that ran in the system I mean where they taking a lot longer time so these are some of the questions that keep on coming up when you try to debug in the production systems so we added the taste film Lane I mean wherein you can look at most of this information by looking at the swimline graph itself for instance this is a taste session which had executed for different DAGs and the first one represents the pre bombing session the second one represents the remote reads or the red color ones the bars which has got the red red bars they are the remote reefs and you can also look at the container releases that happen in the system we are also planning to incorporate couple of other features like when in the shuffle ended for a specific vertex did my reducer task a lot long take a lot longer time because the shuffle was running a lot more slower so those setup details can be added to this particular graph by mining the data that we get from the download data button in the earlier slide that I showed so whenever a job gets completed the task attempt logs and all those details and the application server logs are at master logs all of them get aggregated and then stored in the HDFS so earlier you are not I mean we were looking at it in a manual fashion sort of thing but now we wrote a simple default parser which will help in terms of parsing these T files without having to modify any of the things and it lets you do the parsing in the pig itself so in a way we are analyzing the phase log using Hadoop itself so one of the this this graph represents one of the analysis that we did based on that particular analysis for instance the y-axis represents the source machine the x-axis represents the destination machine and the wizard axis represents the time taken for the download so in this case as you can clearly see measure any data which is downloaded from machine number seven to any of the other machine has taken a lot longer time when compared to the other machines so this sort of analysis helps a lot in terms of identifying some of the slow nodes in this in the cluster

which I have got which I’ve got some network issues however it is not limited to this particular thing if you have a custom processor and if you have enough logging added to the processes also you can also make use of the parser and do some analysis on top of it additional analysis that could be done or I mean something like this where you try to analyze the amount of data that gap that got transferred from the source to the destination so this will help in terms of identifying whether there were any SKUs in the system or not so from the roadmap perspective we are planning to add the shared output edges where the same output can be given to the different vertices and we are planning to have local mode stabilization currently in the local mode we are able to execute only one task but we are planning to add more number of tasks and we are also planning to optimize the runtime wherein you will be able to add the vertex set or add or remove the vertices at runtime and we are also planning to add a partial completion vertex manager for instance currently the tasks within the same vertex cannot talk to each other but it might be useful to have this kind of communication possible for certain scenarios for instance let’s say you have a select query with the condition and limit by 10 or so ask then when the 10 records are generated you might want to stop the other tasks that are running in a graceful fashion so to mitigate that we need to have the partial completion vertex manageable and to add that course scheduling is something like if you are able to start the producer as well as the consumer at the same time then instead of writing onto the disk you can directly stream through the data onto the downstream vertex and also we have planned to add a lot of framework starts in terms of making better runtime decisions from the adoption perspective pipe has been using this for quite a number of releases and pig has been using a days starting 0.14 release there are a couple of other projects which make use of this for instance cascading as well as fling these are some of the useful links are in terms of understanding the community for instance you have JIRA links code repository and dev user and shows list that’s pretty much it a comparison between tears and spark as far as I understand the major features of tests like tags like storing data in memory and even trying right now that are closer to Jesus exist in spark what’s the initial test considering that spark is very trendy right now and obviously odd number by the number of committers okay yeah so actually I’ll pass and spark for two different purpose this part is to around the application layer so is she usually you use part to solve the solve your application problem and a spot under the test is on the accusing engine layer so usually uses tests to build a accusing engine and on the you can see the difference between the on the API between the spark and the test this part exposed API or DD and you can do some operation on the oddity Map Reduce provides something like that and but for the test you can see the API is full view to the target and how to optimize the accusing you can see that in test we can specify the input output and process we can specify how to reduce that and how to write such and we can also do the dynamic paradism estimation so bar for spark there’s no API for that so actually state these two things are different layers although they have some common things yeah hope that answer your question as an end user I really do not care is one tool a framework exists on the execution layer and another an application layer I am solving a

particular task for instance to speed up my query or spit up my personal engine choke and the rest for me is transparent and what I’m trying to say is what’s the benefit for me to tie to type 5 dot execution engine equal test instead of : spark or in spark in v 1.1 use the sparkle and execution engine so your question is how to decide use on ranks using if we use how use case or spark that’s correct and also what’s the benefits of test over spark ok so actually for now I think that at least this capability over the test is much better and actually right from the beginning is the high we use the MapReduce for these accusing engine and then now under C actually many all the stuff in the test we Auto idea from the MapReduce and we try to make it 100 comparable with the MapReduce so we try to do everything that MapReduce can do kids can do that but kids can also do a lot of other scenes that MapReduce can not do and for this Forest Park Hye wons Parker as far as I know it’s still not emotional but and she has still some scary scary problem on touch maybe I think maybe one or two years later issue have become better yeah so just to add to that hi one spark just started recently and in the case of this there are a couple of additional features like you have customer text managers that can be added I was already making use of that for instance for doing the dynamic partition pruning they are making yourself the customer text manager in taste in terms of doing it in a better fashion so those set of features may not be available in I want spar so that’s one of the other I have caution concern in one of the slides can you please go to the previous slide concerning building a histogram where you had this small big job I think it was towards the beginning second example two MapReduce jobs for exactly this one so if I understood it well so there are two MapReduce jobs for this order to to solve this one ki by X Y not having one taking advantage of the shuffle and all the parts do you see my question okay so what is it correct that to my produced jobs will be running here to execute the order by yeah why not one yeah so in Map Reduce you cannot have multiple outputs so you need multiple outputs from one stage for that to work so map and reduce you can only have a pair of things connected by single edge so the shuffle and order only lets you do that after you partition so first you to generate your partition order which is 0 200 is the first partition 100 to 200 if you have random data you don’t know where to partition you want each reducer to get uniform data chance so you generate a histogram equal height and then chop it up and sort them individually in the last slide you mentioned also or before the last you also mentioned Apache flink how do you see the relationship between flink and test do you see flink more like an application layer that uses tests as

execution engine or hi you talked about the the pipeline shuffle path yeah are you saying therefore that you can repartition the data set in a pipeline without landing the records on disk so what you’re saying um can you please the question are you saying you can repartition the data set without in the pipeline without landing the records on disk is that what you’re saying no it’s still interpreting the data on this but you don’t need to have the final merge so in the earlier versions of phase what used to happen is when multiple spools are happening at the end of it it is to have a final merge we’re used to merge all the spools together and then write a final merge in the source only when that particular piece was done it used to send it even to the downstream vertices stating that a you can consume the data so with the pipe and shuffle we are completely avoiding the final world step that is going to happen so Hasson when a spill happens in the days it is going to send an even to the downstream vertex to consume that particular spot could we imagine sputtering about tests could we imagine that spark could be running about as I was asking you because park is using his own the aging scheduling and maybe could use tests as well okay yeah so there was an effort in terms of voting or like running spark on top of days so that’s an internal effort but I’m not too sure whether we’re Planet open source is caught on