Advancing Serverless Data Processing in Cloud Dataflow (Cloud Next '18)

Just another WordPress site

Advancing Serverless Data Processing in Cloud Dataflow (Cloud Next '18)

[MUSIC PLAYING] SERGEI SOKOLENKO: Hello and welcome My name is Sergei Sokolenko I am a product manager at the Google Cloud And today with me, I have George Hilios, vice president of Two Sigma Talking to you about advancing serverless data processing in Cloud Dataflow by separating state and compute storage But before we talk about separating compute and sate storage, I wanted to do very quickly, a historical retrospective of how the industry used to process really largely data sets in the past Imagine it’s the year 2000, and you have a one petabyte large data set that you want to process Who am I kidding? In the year 2000, a really large data set was probably something you measured in terabytes Well, still, you have a terabyte of data And you want to process it, and it’s the year 2000 So you’d probably use something like this, an architecture, something like this You would use a large box with lots of CPU and might be a mainframe or a Unix box The storage would be provided through a storage area network Then come 2004, new distributed processing frameworks were released And now, you would probably be using a cluster of cheap Linux nodes with storage actually being part of that processing node, so local storage Then as things moved into the cloud, you were still using these clusters of Linux nodes But now, you would be using network-attached storage provided by the cloud provider In 2010, Google published a white paper We called it the Dremel paper, which defined the new architecture for processing really large web-scale data sets And that architecture was based on the following idea You would have a independent highly-available cluster of compute And you would have highly-reliable storage facilities In 2011, we launched a product that was based on this white paper, on this concept We called it BigQuery And probably at this conference, you’ve heard about BigQuery many times Became a very successful product So this is the architecture of BigQuery It has the concepts described in the Dremel white people It has a replicated storage layer It has a highly-available cluster of compute They’re separate They’re working together And they’re connected through a very fast petabyte scale network But there’s a component in this architecture that was not part of the original Dremel paper It’s the [INAUDIBLE] in-memory Shuffle [INAUDIBLE] Now you might ask yourself what is this Shuffle? Why was it added later on when we actually launched the product into general availability? So let’s just quickly cover the concept of Shuffle With Shuffle in data processing, when you have a big collection of key value pairs, and you want to do a grouping or a join with another data set, you need to sort these elements And when you have a single node processing architecture, you can easily sort it more or less Either in-memory, you can sort your data set in memory by key Or you have efficient on-disk sorting algorithms Unfortunately, once your data sets explode, and you have to move to a distributed framework or a distributed architecture where you have lots of nodes, it becomes somewhat more complex So your end goal is to have everything sorted by key and all data elements associated with a particular key co-located on a particular worker node And what you end up doing is you end up shuffling or physically moving data elements associated with specific keys from one box to another And that’s the process of shuffling Now as you can imagine, this is a very resource-intensive and time-intensive process And as your data sets scale, it becomes more and more complex That’s why BigQuery ended up building a separate dedicated component that was just responsible for shuffling big data sets And it became a critical component that connected the storage as well as the compute nodes Now I am a product manager of Dataflow, and this is a Dataflow-related question So you might ask yourself, why am I spending so much time talking about Shuffle? We actually, in Dataflow have the same problem We are providing the same data transformations to customers

We allow them to do groupings, and joins, and aggregations, and filtering, and projections So the problem set is actually quite similar to what BigQuery is dealing with Here’s a screenshot of a real Dataflow pipeline Let’s just follow this pipeline as it lives in our service Simplistically, it can be presented by this diagram We have a couple of data sources We have data transformations That’s the green things over there And then we have joined some group [INAUDIBLE] That’s a special kind of a data transformation And finally, the data ends up in a data sync So as you submit this Dataflow job to our service, Cloud Dataflow, we are going to create a cluster of nodes to process it And these will be Compute Engine virtual machines, and we’ll also use persistent disk storage And they’re going to start reading from sources Once we have the green boxes, the data transformations, they’re going to run them on compute and use some local storage And then when things enter the joins and the group [INAUDIBLE] they’re actually going to do a Shuffle And we’re going to do the Shuffle today using resistance storage– either the magnetic type or SSD type, whatever the customer specifies Ultimately, data gets written into sync, and we shut down the cluster And that’s kind of, in a nutshell, how a Dataflow job works Now in Dataflow, as I said, we are dealing with the same kinds of volume and scalability issues that BigQuery is dealing with We have huge volumes of data that customers want to process And so we ended up implementing a very similar mechanism for a distributed in-memory Shuffle service in Dataflow And the service works like this You have your compute clusters and data on your user code, which you write either in Java or Python Then we have the Dataflow Shuffle And the Dataflow Shuffle is deployed for high-availability reasons into multiple GCP regions So in each GCP region where a Dataflow Shuffle is available, we also have replication and duplication of Shuffle in every zone We have it deployed in every zone We have a component we call the outer zone placement And this component decides, based on the available capacity and the job, which zone it needs to be assigned to So we take care of deciding which zone to run And then we have a tier of components which coordinate the actual Shuffle operation among themselves We have a Shuffle proxy, which accepts the job, and we have two file systems One is in-memory, and the other one is on-disk And so if your job and the available capacity allows us to do a Shuffle entirely in memory, your data will never touch disk and will quickly join results to you But if your job is size such as that we have to cache some of this data on disk, we’re going to transparently move some of your data from in-memory into disk and then Shuffle it there For you, you don’t have to worry about anything We do transparent shuffling, and we just return results to you So this entire architecture requires no code changes from users You can tell us just by specifying a parameter that you want to use the Dataflow Shuffle And we’re going to switch from the PD-based Shuffle– the one that we discussed earlier in the talk– to a service-based Shuffle transparently to you Now today, I’m very happy to announce general availability of Dataflow Shuffle in two GCP regions– in US central one and US west one Dataflow Shuffle has been in beta for awhile Now, it’s available generally to all our users Let’s quickly go through all the benefits of Dataflow Shuffle The first benefit is many of our customers tell us that Shuffle is now much faster than it used to be So if we take the same pipeline and compare the execution times of this pipeline using magnetic disks, magnetic PD, then SSD-PD, and the Dataflow Shuffle– the Shuffle service– you’re going to see that the magnetic disks give you maybe 55 minutes of duration in your pipeline execution Now [INAUDIBLE] pipeline, the one that is using SSD-PDs, persist in disks,

will run in approximately 17 minutes The Dataflow Shuffle one will actually run in 10 minutes These results are not always applicable to every use case, but many of our customers are telling us that’s what they are seeing The other benefit of Dataflow Shuffle is that we are now able to process and shuffle much larger data sets And you’re going to see in a demo the sizes of shuffles we can now support If previously the untuned jobs– the ones that used magnetic disks– were able to shuffle tens of terabytes of data, and jobs that used SSD-PDs were able to shuffle up to 50 terabytes of data, now, we can push into hundreds of terabytes So with this, I wanted to show you a demo that actually runs a Shuffle job And here’s how the job looks like I have two inputs– two GCS buckets, a data set and GCS buckets And I’m going to read a 50 terabyte data set in each case, in each of the inputs And I’m going to join them and write them into a GCS bucket The code for this pipeline is written in Scala using a framework that one of our customers developed It’s called [INAUDIBLE] Spotify are the original developers of this framework And I like [INAUDIBLE],, because it allows me to very easily define my pipelines The entire pipeline is these 10 to 15 lines of code For those of you who might not see it, let me just quickly explain what it does In the main function, I define my first input and my second input And then I do a left outer join And I write the output to files That’s all this pipeline does And let me switch to the demonstration Before we switch to demonstration, you might ask me yourself, how does the input work? Here’s a screenshot of one of the files that I will be joining It consists of three columns The first column is the key That’s the key I want to join on And it’s just random generated strings Then I have a record ID I have billions of records in my data set And so this record ID is pretty long It’s a very long string And then I have the payload, the value that I want to associate with the key And so with this, let’s switch to the demonstration, and let me run a few commands Great So I’m in bash, in terminal The first thing I want to show you is the contents of my bucket from which I will be reading files I’ve defined a bash variable, input 50 TBs And this is the bucket that contains 50 terabytes worth of files So I have 50,000 1-gigabyte large files that have data with random keys that I will be joining together And just to show you that this bucket really contains 50 TBs, I ran a listing command And here’s my proof it does really contain 50 terabytes I’m not going to run now, because if I ran it, then listing 50,000 files will actually take several minutes That’s why I run it before Now my next command is the one that will create the pipeline And I’m going to be using the Scala build tool to quickly execute from the command line the command that launches a Dataflow job And as you can see, what I need to specify here is, what is my input? And both of the inputs will be reading from the sync bucket, the output bucket, and the parameter instruction, the Dataflow service to use the Shuffle service So I’m going to run this command And so within a minute or two, the code will be packaged All of the dependencies will be deployed And I’m going to initiate my job I’m going to give it a few seconds Depending on the Wi-Fi gods, if we’re lucky, it will be done quickly But in any case, I also ran a Shuffle just before the demo, a Shuffle that processed 100 terabytes So here’s my job Just to give you a few data points about this job,

the job took 500 billion elements in my data set It was about 51 terabytes In the second input, I had also 500 billion rows, another 51 And then I joined them in this operation In Dataflow, we call joins co-group by keys And so it created 100, I think, billion combinations of these keys Some of the keys overlapped Some of them didn’t And then I wrote them out into files And I actually created 2.5 trillion lines in my output, 2.5 trillion, almost one petabyte of outputs And this job took about 7.5 hours And I was able to process it with 10 lines of code Back to my slides, please So hopefully, I was able to show you that now it’s very easy to do large scale Shuffle processing with Cloud Dataflow In addition to my 100 terabyte run, I also ran a few smaller jobs just to show you the scalability of this process So I did a four terabyte run, a 20 terabyte run, and the 100 terabyte run With 5x data increased every time And the execution time was pretty much linear, which is what you want to have You don’t want to have a quadratic escalation of your duration If you can achieve linearity in execution time, that’s a very good result What you don’t see on this slide is that the resources that you consumed during this execution, what we call Shuffle data processed, also scaled linearly If you’re difference in data is 2x, you will only pay 2x more for such a job It’s another good property of a service You are able to scale both duration as well as cost linearly as your data processes Now we talked about batch pipelines And hopefully, I was able to show you that in batch processing, being able to do efficient shuffling is important But what about streaming processing? Cloud Dataflow provides both batch and streaming capabilities And perhaps, in streaming you also need Shuffle, you might ask The answer is, yes, it’s also very important to be able to do Shuffling in streaming pipelines Because customers want to join in group data elements But in addition to Shuffling, in streaming pipeline, the other thing that is important is that you need to be able to efficiently store state State, as I said, relates to windows, time windows that you create when you’re on windowing aggregations on streaming data So let’s go through a similar life of a pipeline but for a streaming use case So this pipeline leads from Pub/Sub, does a windowing operation by event time, does a grouping, does a group by, does an account aggregation, and then writes into BigQuery Now the first thing to note here is that when you submit such a job to Dataflow we’re going to divide it into three stages– everything that comes before the group by, everything that comes after the group by, and the actual Shuffle step And once we divide it into three portions, we’re going to start thinking, how do we scale and distributed this processing? And our way of distributing the incoming workload to multiple workers is by partitioning your data set by key If your pipeline already has a key through maybe a group by operation, we’ll use that key If it doesn’t have a group by, then we’ll auto-generate a key and partition your data that way So in this case, for example, we have split the key spaces for the pre-Shuffle and the post-Shuffle operations into ranges I’m going to show you next what we are going to do with these ranges Well, they’re going to assign them to workers So each key range will be assigned to an individual worker Once we’ve done the assignment of key ranges to workers, we are going to persist the date, make specific PD volumes responsible for storage of data

as they relate to these keys and key ranges When we have to scale a pipeline in a streaming case, we’re actually going to move an entire disk from one worker to the other We’re not going to try to rearrange key spaces and compress them or reassign them We’re going to take an entire desk, an entire persistent disk and reassign it to another worker As you can imagine, it’s another time-intensive operation Streaming out of scaling has been working for many customers really well But in some cases, it might end up being a little bit sluggish because we have to reassign disks Now we talked about group by, let’s quickly cover the windowing operation Here, it’s important to remember, in streaming, there are two important time dimensions that you want to think about The first time dimension is your business time dimension That’s the event time This is when the sales transaction happened if you’re dealing with sales transactions Or it might be the time when the user clicked on a link So it’s the business time In our terminology, we call it event time The other important time dimension is processing time That’s the time when the business transaction enters the processing pipeline And as you can imagine, there could be delays between an event generated by the source system and this same event entering a processing pipeline Now many customers want to do, they don’t want to deal with processing time as a unit of analysis or as a dimension of analysis They actually want to deal with event time And they want to organize the data elements into windows of either fixed duration, or configurable duration, or session-based windows But they want to group their data elements by event time So with this in mind, what we have to do on the Dataflow side to allow such an analytical processing, we end up buffering data elements on disk Because we have to store these elements until the window closes, and we can initiate the processing of elements in the window So in addition to Shuffle, we also store data elements related to windows So we asked ourselves, can we do something similar for streaming pipelines, as we did for the batch pipelines with Shuffle? And the answer is, yes I’m happy to announce that today, we made the streaming engine available in beta in two regions– in US central 1 and Europe west 1 Quickly about the benefits of the streaming engine and the architecture of the streaming engine Your user code– the code that you write that represents your business logic– continues to run on worker nodes But now, all the streaming plumbing that used to run in [INAUDIBLE] and co-exist with your user code that used to run on the worker nodes, it has been moved to a dedicated service in Google’s back end And it is responsible for two things It’s responsible for windows state storage as well as for streaming Shuffle Your user code communicates transparently You as a developer, you don’t have to think about it or worry about it Very similarly to Dataflow Shuffle in batch case, you only have to provide us with a parameter that tells us that you want us to use the streaming engine So your using code transparently communicates with us back end And we do all the shuffling and all the state storage for you The benefits are, we don’t have to move around disks anymore if you want to scale So out of scaling and streaming became much better We also can do maintenance on our service much easier It doesn’t interrupt you You can continue running your pipeline We can do maintenance and patches on the back end without interrupting you in most cases And we also consume less resources on the worker nodes So now, your user code that implements business transactions and processes your data element has more CPU and memory available to it So it can produce more and crunch more data Here’s an example that shows you how streaming out of scaling works together with a streaming engine And what this diagram shows you is a incoming stream of data that ran for about one hour and ranged in bandwidth and throughput between one

megabyte per second to 10 megabytes per second When I didn’t use the streaming engine, Cloud Dataflow scaled my workers And initially, it started with three workers And then once Dataflow sense there’s an increase in incoming data, it’s scaled the number of workers to 15 It kept this number for awhile and then scaled down, and then waiting for another spike in inputs and then scaled up Now if you compare this graph to the graph with the streaming engine, as you can see, we used less workers And we were able to scale down much faster Here are the two graphs overlaid And it shows you quite nicely how the streaming engine is able to avoid scaling down to high numbers of resources, is using less resources, and is also more responsive to variations in incoming data rate Of course, the best stories about Dataflow are told by our customers And so I’m very happy to invite today, George Hilios from Two Sigma, who will be talking to you about how Two Sigma is using Cloud Dataflow [APPLAUSE] GEORGE HILIOS: Thanks, Sergei So my name is George Hilios I’m a vice president at Two Sigma Just a quick intro into who Two Sigma is We are a Manhattan-based hedge fund We take a scientific approach to investing We hire scientists, mathematicians, engineers, the works Our mission is to find value in the world’s data So in particular, I had our engineering group that deals with granular economic data I’m probably getting some squints for most of you So let me explain what that kind of means Here’s an example The NOAA publishes a wealth of information publicly There’s weather forecast about all regions of the United States And so we have a big question– can we correlate weather activity with regional economics to predict financial outcomes? These are the types of questions data scientists and engineers in my group ask And so this data tend to be very large I actually jumped slides a little too quickly Sorry The NOAA data compresses about a terabyte or so But when you expand it and look at all the rows of data, this is into the hundreds of terabytes So it’s very, very large to make sense of that So what do we do with all this data? We can do a lot of things with this We can do geospatial analysis, aggregate billions of rows of data, terabytes of data, ultimately, to build alpha models So this type of work is very lucrative It’s very satisfying However, there is a weakness there We get a lot of our data from third-party vendors And so we’re at risk of bad vendor data causing bad predictions So at Two Sigma, we take data quality very seriously We build in anomaly detections into all of our pipelines to guard ourself against these bad outcomes Now when you’re dealing with data at this scale, we have to build very complex, high-scale systems to detect these anomalies and protect us and our investors And so this is where Dataflow enters We really don’t want to be in the business of building infrastructure We’d like to focus our energy on business logic, things nuanced to particular vendors and the types of quirks they threw our way But also, we’re not a particularly large team We don’t have the resources Google tends to throw at these infrastructure-type problems So can a team of 10 to 20 engineers deal with 100, 200 terabyte data sets without batting an eye? Let me take a quick little sidebar here How many of you know what RFC-4180 is? I’ll be surprised to see any hands out there I do, and this is the format for CSV files These are comma separated values It’s the bread and butter in the industry for how files are distributed Unfortunately, our vendors do not tend to know what that RFC is Let me show you an example of what that really entails And these are real examples Obviously, change for presentation purposes So that comma, is that separating two numbers, or is that 15,600?

In this case, it was actually 15,600 Good luck to the pipeline understanding that Within the same file, dates represented in different ways Here is a new line character Is that a separate row, or is that the same string that happens to have a new line in it A lot of these vendors get their data from third-party sources to them, and they’re just passing it along So if it’s unescaped and unquoted, any off-the-shelf CSV parser will choke on this And here’s another one We’ve noticed that as the days and months go by, vendors decide to expose new fields to us, so the schema changes So imagine you’re a data engineer You’re writing a pipeline And you have your code for doing aggregation Do you have a line, a conditional in there that says, if date is less than January 1, 2016, I only expect four rows? All your pipelines would be littered with this vendor-specific logic And it gets very unwieldy very quickly So we wanted to use this opportunity with a tool like Dataflow to eliminate this problem entirely and get in front of it So let me show you an example of what that looks like We built some tooling, which we called the normalizer– very creative naming we used– on top of Dataflow And the very first step up here really involves taking the list of files and a JSON file we call the schema as input I’ll go into details of what that schema file looks like in the next slide But it’s things like, the first field has this name, and it has this type-like string or integer It has hooks for Python user-defined functions for doing custom things, et cetera In this second step, we implemented a custom file source It takes a file as input from GCS, and it turns it into a PCollection, which is a Apache beam slash Dataflow concept, a PCollection of generic type, which is an Avro object that has a schema Again, we want to use open source where possible to leverage the innovations and the contributions of the broader community So we turn everything into an object of a type And much later in the pipeline, we split The stuff that goes one direction are all the rows that we successfully parsed, that we applied our logic to normalize and put everything to a schema And we use the same Spotify library Sergei mentioned earlier, CO, to save it as a parquet file There is another functionality in there that I particularly appreciate It’s that you can say, I want 5 million rows in every single parquet file, which for those you have done Spark, that is actually kind of a hard thing to do generally So when we output our parquet files and process them downstream, with all the files being very evenly distributed, you make much better use of your resources The other output I want to call out, because this is the hard-earned experience that our data engineering team has built up over time We output an extra file that includes summary statistics I’ll also show an example of this– things like the total number of rows, the number of error rows that we encountered, and so on And even more importantly, say you’re on support, you get paged in the middle of the night, and there’s a failure, there’s a spike in errors You’d like to have a file you go to that contains all of the rows that you saw that were invalid Maybe the vendor got bored and wanted to throw a new cork your way I’m laughing because it’s true And finally, I’m super-proud to announce that a partnership between us and Google, we really wanted to get parquet loading directly into BigQuery And Google pulled through big time here So we not only can produce these parquet files, but we can load them to BigQuery So imagine having all your raw vendor data in BigQuery to do your research on, to investigate failures, et cetera And then if your aggregation is simple or SQL-based, you can just let the Google infrastructure handle it for you Furthermore, since these are all open source technologies– parquet, et cetera– the data are small You can feed it into [INAUDIBLE] If the data are big and nuanced, you can’ feed it to Spark We have one thing up front that handles all of this It’s a big deal for us So here’s an example of a scheme of file I talked about earlier There’s a few different types here That first one is a date time in a very standard format– an integer containing the number of seconds since Unix Epoch, January 1, 1970, and very standard convention here Way of a string– we have a string hash that takes its input another column

We found that if your primary key or your foreign key is a string, that’s very compute-intensive to do joins or group bys on And so with our standard upfront processing, we can hash a string as input and add a new column there So when it gets loaded to BigQuery, we have the hash as well as the source And then when we process in Spark, those groups are very fast And finally, [INAUDIBLE] Some fields are optional, and we want to make it clear which one should be treated as failures So here’s an example of our output– the summary file I’m super-proud of the work the team did Because this particular vendor gave us over a trillion rows of data And this is one of our days worth of delivery We were successfully able to parse 916 million records And we did this in a matter of hours on Dataflow It scaled very linearly There’s 185 parquet files You can kind of see behind the circle, they’re very evenly-sized This is great for Spark processing And the errors rows is 0, because we had no failures that day– very powerful So let’s tie this all together We have the data from the vendor They give it to us FTPS, 3GCS, carrier pigeon, whatever they want to do We first store it in GCS and then process it in Dataflow This is where we do our normalization, turn it into parquet, and output our summary statistics We then run our ingestion checks Like I mentioned earlier, we care deeply about data quality in Two Sigma So we do things like fail if we have a spike or a sharp decline in the number of rows processed or the number of errors encountered Ideally, we have zero errors But sometimes, if you’re processing a billion rows, you sort of shrug at five failed rows That’s just the reality of the situation And last but not least, we load to BigQuery for analysis, research, and further processing So I do want to highlight some of the high-level benefits we get here A small team like ours can process a ton of data with very little effort And that trillion row data set example I told you earlier, we tried it in Spark previously And it took 1,000 workers two weeks to crank through everything And this is a pretty simple algorithm, just parsing strings and turning them into parquet In Dataflow, we did the whole data set in one to two days One other benefit is we don’t have to manage any of the infrastructure We just write our beam pipeline and send it to Google And there’s one more thing I want to highlight there Sergei talked earlier about Cloud Shuffle We found we should just turn it on So either the data set is small enough to where Shuffle is not invoked– you can see on the UI, there’s a number of bytes shuffled– if this number is small, you don’t really pay much of anything for it But when that number is big, and your data set is very large, we’ve seen pipelines take much less time to run, which saves you on compute and storage costs That’s it Thank you [APPLAUSE] SERGEI SOKOLENKO: But let’s wrap this up Hopefully, you were able to see how having a separate distributed in-memory processing mechanism for Shuffle improves the efficiency of separating compute from storage That was key for both batch and streaming pipelines We have a new service available for you today, Dataflow Shuffle– a new component, sorry, not a new service We are not launching a new service So for batch pipelines, we have a new option for you, Dataflow Shuffle, that makes your pipelines run faster And you are now able to process much larger data sets and shuffle much larger data sets For the streaming pipelines, we have the streaming engine And it improves the scalability of your streaming pipelines Both of these features are now available in two GCP regions– in US central one, and Europe west one Thank you very much And we will be happy to take questions either over microphone or later on after the session [MUSIC PLAYING]