Pandora's Move to GCP With Composer (Cloud Next '19)

[MUSIC PLAYING] FENG LU: Hi Good morning, everyone Welcome to the Pandora and Composer session It’s not a music session, sadly But you will probably learn how Composer helps Pandora to manage their music data So I’m Feng I’m a software engineer at Google Cloud Also joining me today are Daniel from Pandora and Wilson too from Google Cloud Composer team So we’ll start the talk by giving folks an overview of what is Cloud Composer? What kind of problem we’re trying to solve, and what are some of the features in Cloud Composer? And then we’ll follow it by Daniel onstage talking about Pandora’s experience of migrating their workflow extrusion to Google Cloud And we’ll wrap up the talk by some of the newer and latest features that were just recently released So we live in an increasingly complex world In the context of data analytics, this means your data probably live in many different places And every once in a while, there’ll be a new data processing framework that get announced And maybe once you’re done with your data processing, there might be different places you need to upload your data processing results You eventually realize that it’s very unlikely that you can finish all your data processing needs in a single system It’s very common that you have to stage data processing jobs in different systems together, and essentially put them in a form of a workflow And once you have a workflow, then ultimately comes the need of, you want to be able to schedule that workflow You want to monitor that workflow Really, think about a way to help you to reduce complexity, it’s through automation And here, it’s really important that you have a tool that could offload and manage all this complexity of scheduling and monitoring workflows on your behalf With that purpose in mind, we designed and developed Composer It’s built on top of the popular open source Apache Airflow And then helps you to create workflows at ease, making it powerful for you to manage your workflows So exactly how does Code Composer helps you to reduce your workflow management complexities and help you to improve your workflow development? We focus on a few areas I mean, namely the first one is really about, we want to make sure Composer being in the orchestration choice when it comes to GCP So what that means is, we want to make sure that from Cloud Composer, you could orchestrate and manage pretty much all your GCP related workloads [INAUDIBLE] jobs data flow, data proc jobs Second thing, as I mentioned earlier, we realized that it needed that You know, today your data is likely going to live in different places, on prem and on cloud So we designed Composer on day one with that requirement that whatever orchestration solution we’re going to introduce, that needs to work well in a hybrid cloud environment Also, be able to come up with high abstractions, and also make it really easy for you to write your workflow Especially now think about if your workflow has tens or hundreds or thousands of tasks Again, we want to make it really easy for you to be able to act surprised Workflows consist of that many of task Finally, to increase reusability Because you have done all your development on prem, we don’t want it to be the case that when you move to Google Cloud, you realize, oh, now I need to redevelop all my workflows So we really put open source in the core of Cloud Composer And we make sure that whatever workflows you write or you develop, those can run in GCP And then likewise, when you move certain workflows to on prem, you find out you don’t have to modify anything Things should just work out of the box So with those requirements in mind, and then we added a lot of more– we started design and development of Cloud Composer And then one of the questions we often got was, how does Cloud Composer differentiate with self-managed Apache Airflow? I mean, I understand probably many of you have on prem Airflow deployments And you’re wondering, what are some of the value added pieces

if I move to Cloud Composer? There are quite some of those I’m going to probably highlight a few important ones The first one is really about– a lot of those are really sort of center around simplicity, and also trying to help you to minimize your operational complexities So we have really convenient tooling Cloud Console, and G Cloud for you to manage your Cloud Composer environment, and make it easy for you to upload your workflow and inspect workflow status Second thing, probably 70% to 80% of people rely on the Airflow web UI And also, frankly speaking, that’s also one of the distinct factors that draw people to Apache Airflow So we understand the need That’s why we put a lot of thoughts to make things easier for you to host your Apache Airflow web server, and also make it extremely secure through our cloud Identity-Aware proxy And then when it comes to– Airflow has different versions Some people choose to– there’s always different [INAUDIBLE] versions, since Airflow is developing Python So again, in Cloud Composer, we’re really happy to streamline and simplify the management of Python and Airflow versions So that in Composer, you can specify, and then which Python and Airflow version you want to use, and then also open a path for you to sort of upgrade from one Airflow to a different Airflow version Given that it is often the case that when you write your workflow, you realize that you may have external library dependencies, you may have customized operators So that’s why we also make it extremely simple for you to install and manage additional PyPi dependencies Debugging and monitoring are common needs these days [INAUDIBLE] workflow failures So Composer comes out of box with Stackdriver logging and the monitoring integration So whenever you have issues regarding to a particular task, you would be able to find all the logs in Stackdriver And finally, there are IMs for you to sort of streamline your composing environment management, and manage permissions who has access to what We launched Cloud Composer last year during Google Next It has probably been about nine months And since then, we added a lot more GCP operators That’s really why I want to highlight here As you can see from the slide, pretty much we support all debug of GCP operators You know, you have Machine Cloud AI, Translate, Cloud Vision, Kubernetes On the data side, you have Cloud SQL, BigTable, [INAUDIBLE],, and Spanner So those are all really trying to make it easier for you to write workflows So you don’t really have to worry about, how should I interact with these new systems? I also want to, as I mention that writing workflows are easy, and Airflow has a very expressive mechanism for you to express your workflow So in the next couple of slides, I just want to show you a quick overview of how a DAG or a workflow is developed in Composer or Airflow So a little bit context, a history of Apache Airflow It’s a project that was originally developed by Airbnb and subsequently got donated to Apache Software Foundation And since then, it has gained a lot of popularity in the community It’s a very, very active community It has a few hundred contributors Workflows are essentially expressed as Directed Acyclic Graph That’s why you probably have– for people coming from the Airflow community, you probably have heard them talking a lot about DAGs DAGs really essentially [INAUDIBLE] workflows This is an example DAG of how the Hello world-ish workflow It has about a three task, and then you run different commands And then you have ways to specify the dependency among your task At the beginning, you do your standard part, like standard Python imports Those really just tell Airflow, hey, I want to [INAUDIBLE] the D-A-G and the DAG is the workflow, top level workflow construct And after that, it’s just import whatever operators you think are useful In this case, we use a bash operator, but there are a Python operator, and then

there’s a lot of GCP operators, such as say run big query, query operator So many different operators are there Once you’re done with the import, then you put all your orchestration or scheduling arguments there, like things like who are the owner of this workflow, how often do you want to run, when do you want to start a workflow, when do you want to end the workflow, so on and so forth Once you have the arguments set up, then writing the workflow itself is actually really straightforward With the top line DAG definition, then you can specify your task, any task that you plan to contain in that workflow So as I mentioned in this example, we use bash operator But you can, in fact, use any operator The nice thing really about Airflow is, it has the flexibility for you to introduce your own operator This is really tying back to my early point about it managing complexity Be able to come out with higher level abstractions, and be able to hide that complexity from your internal users is really one way for us to navigate this complex sort of workspace Once you put everything together, you submit it to Cloud Composer From that point onwards, Composer will take care of scheduling, running, and monitoring your workflows This is sort of like a typical bash web UI for you to access your workflow From there, you would be able to inspect the graphic structure of your workflow, and also you would be able to inspect different logs when it comes to individual task They also have a rich set of dashboarding for you to inspect, if you put all your task runs, workflow runs together, how does the processing time fluctuates across different time runs? Sometimes that helps you to detect anomaly in your workflow Behind the scene, this is what happens Cloud Composer creates a collection of GCP resources And then leveraging those resources [INAUDIBLE] those resources are collectively known as cloud composing environment And then within that environment, we host Airflow scheduler worker web server the database, and then the workflow storage The same workflow storage is also used for you to store workflow artifacts And then once you have your DAGs there, then you will be able to orchestrate and connect to a host set of external systems Earlier I showed you really a simple example, but then really that can be as complicated as this Although the graph itself looks complicated, but actually the code behind the graph isn’t that hard So this is– I just want to say that this is a real DAG we use within the Cloud Composer team to sort of dynamically generate Airflow docker images That’s so far about a quick overview of Composer and Airflow Next I’ll hand over to Daniel to talk about Pandora’s move to Cloud Composer DANIEL BARRETT: Good morning My name is Daniel Barrett I’m a staff engineer at Pandora I’m here to talk today about our move to GCP with Cloud Composer from on prem Airflow Just to give a little history on the scheduling at Pandora Originally when I started, we had a system of legacy Java schedulers And they were based in Java They used the Quartz framework as the actual scheduler They were completely developed in house So because they were developed in house, it was very hard for engineers, analysts outside of analytics, to create jobs in the framework There wasn’t a whole lot of online support either It was all based off of whatever we documented internally It had a limited UI that we developed ourselves If we wanted to add new features, we had to go out and develop them ourselves That also took place when you’re doing other work and other tasks So a lot of times it was very slow to be developed in the UI About two years ago, we started our migration to Airflow The Airflow UI is full of features and incredibly easy to use It has support, as Feng showed you, for various technologies within Google Cloud And then also regular technologies that allow you to integrate with Hive and Hadoop and things like that we use to do our analytics processing It has a very large online support community If you run into any problems, it’s very easy to be able to Google it and look it up, and somebody else has probably run into that same problem, and will allow you to be able to figure out a solution to your problem

It is all written in Python and is incredibly easy to develop new operators and sensors that will do custom tasks for you Things that you do over and over and over again, you can put those into operators and allow other developers to use them much more simply and not have to reinvent the wheel every time Because it was open source and there was a large community behind it, it was very easy to onboard new engineers that had experience using Airflow It was not a lot of ramp up time In our previous one, there was quite a bit of ramp up time, and a period where people were getting used to the legacy Java scheduler That didn’t have to happen in Airflow There was another issue that we came across, and a lot of our analysts at Pandora would use Cron to be able to schedule their Hive scripts in the past This obviously is not a great solution And it was very easy for us to be able to integrate the analysts to create Airflow tasks and jobs that could depend on other jobs and things of that nature That made it much more simpler and robust than just running a Cron script using Hive We were also able to use a sensor to depend on other analytics jobs completion outside of your actual Airflow instance on prem After those two years, now we have approximately about 90 instances between development and production in Pandora Airflow has been adopted very widely and very quickly So there are five problems that we looked at that we had to solve when we moved from Airflow to Composer One of them was during the migration, Composer and Airflow were both running in cloud and on prem, and the issues between running them in two locations There was also the question of, how do we depend on a job finishing in either location? And then Composer is a true distributive framework On prem, we made the decision to deploy Composer to a single VM, so that it was a lot simpler for the deployment, so that we did not have to manage a distributed deployment of Airflow on prem And then how do we support analysts in creating jobs that run on dataproc? Back on prem, they were used to having a Hadoop cluster that was always on that they could submit jobs to anytime Now they had to figure out how to actually run stuff on dataproc itself And then finally, how do we deploy code to Composer from an on prem Bitbucket or a repository on prem? So the first one that we solved was the job dependency solution So for that, we have a Postgres database on prem Eventually we’ll move that to the cloud And there is a meta service REST API container that sits in front of that Postgres database that handles requests for jobs or tables that are finished or code that is asking, is something finished for the day So that allows jobs or tables to be set as complete for a specific day parameter It would also allow you to get the status of a job or table for a very specific day And so around that, we created an Airflow sensor that would ask the meta service if a dependency is fulfilled If you are depending on job x, you would create that sensor in your workflow And it would say, is job x done for April 1 of 2019 And then on the flip side, we also created an operator that at the end of the Airflow workflow, you could set your dependency as complete– your table, your job, whatever it is that you were running So this is what that looks like from the architecture In here we have two Composer instances that are running in GCP and two Airflow on prem There is the meta service with the Postgres service, Postgres database backing it So for this instance, we would have a job populating a table running on Composer instance number one That job, the several jobs that are waiting on that table to be done on Composer instance number two and on prem three and four After the table is populated in Composer one, it would send a status done to that meta service in that Postgres database And then the meta service would then tell two, three, and four that that job is now complete And then their workflows would be able to move on from there There are a couple of approach drawbacks to this in GCP One is that, because we are Postgres instances on prem, and because we have on prem instances, it is required that we need a VPC network for our Composer instances in GCP So currently, due to our networking model that we decided in GCP, we use shared VPCs

And currently you can only have one shared VPC network per Composer instance So in a specific project, that means that we can only have one Composer instance that uses that job dependency framework And also, if you have a table that needs historical, and for some reason it happened before we set up this framework, and those tables or those jobs are not done in that framework, then we need some kind of dependency override to be able to process history where the dependencies are not set in the past So the next problem we do is dealing with Celery So Celery is the asynchronous task queue job scheduler used as the workers in Composer And it has some drawbacks So specifically, I had a job that was a scientist job that had three pipelines that were highly parallelized that we were going to move to GCP last year So I took these pipelines, and I knew that it had three distinct parts And I ran them manually on specific machines to determine what type of machine could handle each individual task From that, I determined the actual machine time that I was going to use to handle these pipelines So originally what I did is I set up Composer with three worker nodes with the very specific machine type that I knew I wanted to be able to handle this task It was an n1-standard-16, 16 cores When it came to actually scheduling those tasks, the Celery scheduler calls those nodes to be overwhelmed Because what it would do is it would actually schedule two tasks on one node and one task on another node and then nothing on the third node It was not an even scheduling of tasks itself And because my node type couldn’t handle more than one of the pipelines, it would cause it to freeze So this is what it looked like when it would actually execute You would have the scheduler It would execute one of them on worker one, two of them on worker two, and nothing running on worker three So the solution that I’d used to get around this is I created three VM instances of the type of machine that I wanted to And I named them processor one, two, and three And then I shut those machine instances down I then set up Composer with three normal nodes, n1-standard-4 And then I set up the Composer DAG to use those processor machines And what I would do is, my task would FCP fresh code from Composer over to the processor machines themselves And then it would use an SSH command to run the pipeline on each individual instance So this enabled the actual processing to happen on the processing machines and not on the Composer nodes itself So what would happen there is my scheduler would schedule on worker one and two on worker two, but then those workers would actually execute the code on the actual processor machines themselves So I ended up with one task running on one, one task running on two, and one task running on three This enabled the process to happen the way that I actually wanted it to happen At the end of the processing, when everything was done, I would then shut down processor one, two, and three so we were no longer getting charged to use those nodes This is what the resulting DAG looked like If you look at the very beginning, you’ll see a start VM instances task This is where I would start up processor one, two, and three And then I would wait for those instances to fully start up I then had tasks to FCP the code over to each one of those VM instances And then I would run the scripts And then it would have a couple of different tasks that would do some other processing and some local data cleanup on each one of those VM instances And then at the very end, you see at the very last one, I would stop all three of those VM instances because we don’t need them running until they’re ready to go the next day So the next problem we had to look at was, how do we make Dataproc easier for analysts to use? Analysts, like I said before, they’re used to having a Hadoop cluster that is always on and accepts their jobs at any time They may have to wait for resources, but they could just fire off queries and fire off tasks and send them to this Hadoop cluster Dataproc is a little bit different than that Dataproc you have to know a little bit to be able to create a Dataproc instance You have to understand it a little bit more– what are worker nodes, some of the properties that are needed to start up a Dataproc instance So like I said, it was a bar to entry for analysts to use the Dataproc to run their schedule queries So what we ended up developing was a batch job framework And this was developed with a Google contractor called Neudesic This was a batch job framework that requires minimal coding and really hardly any Dataproc knowledge whatsoever

It would require them to create config files that just defined their SQL, the SQL tasks that they had, and the variables that were going to be put into those SQL tasks And then the DAG was dynamically configured from those config files themselves The config files would define external and intertagged dependencies, sensors based off of other jobs that were running on other Airflow instances on prem, the piece that I showed you earlier You would define those in the config files for your SQL And you would also define inside of the DAG what SQL files depended on other ones so that they could then be set up correctly in a DAG It would also provide a way to instantiate a Dataproc cluster for each SQL statement run using only the number of workers needed And to do this, there was a heavy use of Dataproc workflow templates The Dataproc workflow templates would start up a cluster for the batch job They would run the SQL file on that cluster And then they would delete that cluster afterwards And we found that this was the best way to use Dataproc It was repeatable and was less prone to error than some of the other ways that we looked at running Dataproc and running SQL files on Dataproc This was all the code that was required to be able to create this dynamic DAG So this create batch DAG operator was what actually went through all of the config files and constructed the DAG itself This code can be copied from one project to the next one So the analysts would not really need to know any Python whatsoever in order to run their batch jobs This could be high This could be Spark And several of the different ways of running stuff From that code came this DAG right here So this DAG was created using that code, using config files that were set up for it Like I said, it was a minimal amount of code to be able to create something as complex as this This is one of our largest jobs, called our nightly job And it’s a great name But there is a lot of– you’ll see on the left, there are all of our external sensors, that were other jobs on other Airflow or imports or things like that, that we were waiting on to be complete in order for our process to start Each one of those pieces would have– there would be data quality checks involved in that, where after the table was completely loaded, it would do duplicate checks or threshold checks or things of that nature So that you would ensure that your data was up to your standards at the end of the process And then at the very end, it would set its status as complete for each individual table back in our analytics Postgres database and allow other jobs to be kicked off from that So one of the other ones was, how do we deploy code from on prem Pandora to Composer in the cloud? So the ways that we came up with to deploy, one of them would be an on prem Jenkins build that uses a plug-in that Google has for Jenkins to be able to push code to a Composer DAG bucket All of your code in Composer is synced to a bucket So whenever you create a Composer instance, there is a Composer bucket that is assigned to that instance Inside of there, there is a DAGs folder, and also a plugins folder These are the two places that you need to deploy your code in order to run them in Composer You can also just drag and drop manually into the Composer bucket using Google’s console UI That is a very simple way, but it’s also a manual way So the other one would be more of an automated build The second one would be more of a manual process But something we came up with at Pandora, because a lot of our analysts are not really used to command line performance, and so we came up with a view that would allow them to use a web page to sync a project from Bitbucket This view looks like this And this view when you pulled it up would go out to our internal Bitbucket repo And will pull all of the internal Pandora publicly available Bitbucket repositories You can select which one you wanted You could select the branch you wanted, and then individually you could select the commit that you wanted And then when you push Submit, it would sync that code to the DAGs folder on the server that it was running on on Pandora So we had to take this and move it to Composer to be able to do the same kind of thing So on prem, the view would build the git command and then sync directly to the file system because it would run the git command as an external process in Python On Composer, the web server is not running on the Kubernetes Composer cluster So the web server doesn’t have access to the file system where your DAGs are located or the data directory that you’re allowed to do work in on Composer

is located So we had to think of it a little bit differently way And we had to create a DAG to take care of any web server action, and then send the DAG arguments from the web server itself That is because of this Composer architecture right here You’ll see that the workers, the Redis, the Airflow scheduler, and the Cloud SQL proxy are all running on the Kubernetes Engine But the Airflow web server is running on the tenant project So it did not have access to the file system on the Kubernetes cluster itself The downsides to a DAG sync are the Composer bucket is syncing down to the container file system, but not back up So you could not just sync the code to the actual local file system where the DAGs are located because it would be overwritten the next time that Google synced your code from the bucket down to the file system itself Another one is that Bitbucket projects must be made public within the organization in order to view them to pull a list of them Pulls, because of the network from on prem to bucket, are a little slow This is to be expected because you’re pulling them up to Google Composer You have to sync the project code to home Airflow GCS data before doing an rsync up to the Composer bucket The home Airflow GCS data was just a temporary location in order to sync the code there So we did not have to overwrite everything in the bucket every time that we wanted to do a pull from the bucket And that is it for me And I will hand you over right now, sir WILSON LIAN: Hi, everybody I’m Wilson Lian I’m a software engineer on the Cloud Composer team And today I have the privilege of telling you all about some new features that we’ve rolled out recently in Cloud Composer So something that we’ve heard a lot from our customers is, how can I monitor my environments to make sure that they’re up and running and healthy and making progress on DAGs, as well as determine whether or not I’ve provisioned enough resources for my environment to make progress– or sorry, to handle the workload that I’m throwing at it? And so to answer that question, our Composer data plane is now exporting metrics into Stackdriver As you see here, we’re exporting information about the quantity and latency of both DAG runs and task instances So you can see if anything has gotten stuck We’re also exporting information about the health of the Celery queue, including the number of Celery workers that are up and the length of the queue Now let’s flip over to the demo screen, and I’ll show you how you can use these metrics OK, so you can see here, I built this dashboard earlier Basically we’re showing on the left column the number of DAG runs and task runs over time And on the right column, we have the latency of DAG runs and task runs over time So if we zoom in on the one hour, we can see that we have four DAG runs per hour for a single particular DAG Called sometimes slow And we can see on the right hand side, that the latency of that DAG run varies over the course of an hour In fact, once every hour it takes much longer than the other DAG runs in that hour And if we look down at the task, I happen to know that this DAG has two tasks One of them is called consistent1 It’s a bash operator And it runs fairly consistently And the other one is called sometimesslow, and it’s a Python operator And I cheated for this demo, and I made it so that sometimesslow checks the time, and once an hour makes itself one minute slower than every other time that it runs So this is a bit of a contrived example, but you could see how, if you had some external dependency that was introducing latency into your DAG runs, you would be able to see this in a Stackdriver monitoring console And additionally, you could go in and create an alerting policy And let’s do that right now We’ll create an alert so that whenever that task is slow, we get an email So we’ll filter that by the task name and say whenever sometimesslow takes longer than 60 seconds, for one minute we will get an email or some other notification This is just standard Stackdriver monitoring stuff And then we can save that And likewise, we could do a similar thing for the Celery queue Normally a Celery queue length of zero is a good sign That means that your workers are able to keep up with the offered load So we could set up an alert on the Celery queue And whenever it exceeds some threshold, we can get an alert and realize that we need to do some reprovisioning So now let’s swap back to the slides, please

So the next feature that I’m going to talk about is one that you can use to help improve the security of your Cloud Composer environments So whenever you create a Cloud Composer environment, as Dan pointed out, the Cloud Composer control plane is going to create a GKE cluster and Cloud SQL instance for you And by default, the VMs for the GKE cluster and Cloud SQL instance will have public IPs, meaning that they are externally routable And you can be reached from anyone on the internet Moreover, the communication to your Airflow database in the Cloud SQL instance will traverse the public internet So what this means is, your traffic potentially traverses the public internet And if you have a zero day vulnerability in some daemon running on your system, it could be exploited by someone on the internet However, with private IP clusters, we only assign internal non-publicly routable IP addresses to all of your GKE VMs and the Cloud SQL instance VM Optionally, you can enable a feature which will put the Kubernetes master for your GKE cluster behind a private only IP as well So let’s swap back to the demo screen, and I’ll show you how you can create this You can do this using the beta channel of the G Cloud tool But you can also do this in the GCP console by enabling beta features up in the top right corner Let’s just look at what a private IP cluster looks like OK, so I precreated this private IP cluster earlier You can see that a private environment is set to true, and I have master external access set to false So the Kubernetes master should only have an internal IP So if we drill down into the GKE cluster that’s associated with this, we can see that the master endpoint has an IP address, 172.16.0.2, which is an internal IP address And if we look at the node pool to check out the GCE VMs that were allocated for this, we see that they have no external IPs The last feature that I like to touch on is one that we’ve been working on for a long time And basically, let’s say you’re really excited, when you get out of here, to go and try out the Stackdriver monitoring feature or some of the new features that you’ve seen us announce in recent releases In the past, we would have asked you to create a new environment and then move all your DAGs over, and then move all your Airflow metadata over to the new environment, and then take down the old environment Now, with just the push of a button, you can upgrade your environments in place and preserve the VMs, the Cloud SQL instance, and all your Airflow metadata on the same environment, while still running the latest version of Composer or even an upgraded version of Airflow I can try to demo this, but we’ll see if we’re able to try that Let’s go back to the demo screen So I have this environment here that’s ready to be upgraded I’ve enabled beta features And we can see that it’s running Composer 1.3.0 and Airflow 1.9.0 So if we just hit the Upgrade Image Version button, we can select a new image version 1.6.1 is our most recent version And let’s upgrade to the latest version of Airflow Submit And then the upgrades is in-flight It’ll complete in a few minutes And then you’ll be at the latest version That’s all I have For more information on Cloud Composer or Airflow, you can check out these links [MUSIC PLAYING]