Data Microservices in Apache Spark using Apache Arrow Flight

Just another WordPress site

Data Microservices in Apache Spark using Apache Arrow Flight

– Hi everyone Thanks for coming to the webinar My name is Ryan Murray I’ve been working at Dremio for about a year and I’m working there as a developer doing some open source projects and some research and that kind of stuff So today, I wanted to share with you one of the cooler things that we’ve been doing lately, which is our work on Apache Arrow Flight, and in particular for this talk, some of the stuff with the Spark Connector And how that can apply to sort of building, data microservices So for those of you who aren’t aware, I just thought I’d level set on what Apache Arrow is Get everyone on the same page before we go any further So Arrow has really become sort of the industry standard for in-memory data I’m not sure how many people have heard about it, but as you can see, it’s in tons of different applications So it’s used in Spark and in Dremio, and video is doing some interesting stuff with it on GPUs It’s kind of spreading all over the place And that’s really by design One of the goals that we had when we first formed Spark, Arrow as a community was to make it a lingua franca for data The idea that if you store data in Arrow format, which we’ll get to in a few minutes, there’s all kinds of tools which you can leverage to do calculations on that data and there’s understood and clear and standardized ways to move data between processes and between applications and machines what not So since the first release of Arrow back in December or so of 2016, it’s been growing exponentially Every month, there’s even more downloads Part of that is the broad language support So you can see there’s, over or coming close to half a dozen languages now that it’s implemented in, some of these are sort of using the C libraries and others are native implementations of these libraries That really helps them with the lingua franca part of it because every programming language is sort of speaking the same language when it comes to data And as I said, the community is very active There’s over 300 developers are doing a lot of interesting stuff, making Arrow work on CPUs, GPUs, and in more recently FPGAs So, what is Arrow? Well, simply it’s an in-memory specification for data It tells you how to lay out memory, layout your data in memory, in a binary format that makes it extremely efficient for analytical workloads, large analytical workloads And that’s irrespective of if you’re on CPUs or GPUs, or more exotic things Aside from that, it’s a set of tools So you have the standard And then we’ve built a lot of tools in the community to help you manipulate that, the data in that standard So you can think of that as, you know, sort of Lego bricks for building novel data applications Some examples of this are IO getting data into and out of Arrow from various formats, whether it’s Avro or Parquet or something like that And other things are compute kernels or engines, which help you do calculations on Arrow even to things like Flight which is a RPC mechanism or other ways of trading data with other applications or processes It’s important to say what Arrow isn’t And it isn’t an installable system as such, you can’t go and download a copy of Arrow like you would Spark and run it Whereas it’s a library Spark uses Arrow to be efficient with columnar data Nor is it a memory grid or an in memory database or something like that Well, you can build these sorts of things with Arrows, It doesn’t have any of these things in it on its own It’s more a set of primitives And finally, it’s important to mention it’s not really designed for streaming records So you’re not sending single row Arrow batches around, you tend to have dozens or hundreds or thousands or millions of rows and no record batch And that’s mostly to be efficient There’s the columnar data structure, which we’ll describe in a second, is designed for larger datasets The overhead for small single operation, single record operations is too high to make it useful for reference streaming So that’s what it is What is the format look like and the, as I mentioned, the important thing here is it’s a columnar data format So compared to the traditional memory format,

down in the bottom right there, in your traditional format you’re going to have, every row in your table is going to be a block of contiguous memory So to say you wanted to take the average of session idea, the maximum session idea or something like that To do that, you’re going to have to read row one, and then you’re gonna have to skip ahead to row two to read the next value of session And because the interleave fields, they might be a variable length, they could be strings or whatever You can’t really hop to the next row, you have to read all those bytes So you end up reading the entire data set to pull out your single column of that data set And that’s really expensive for the CPU So you have to load a lot of stuff into your cache, you’re constantly breaking your pipelines and lots of branches and all that kind of stuff and it’s not really efficient Arrow does something that will look familiar to the people who are familiar with pandas, it stores everything in a columnar data storage So there you have all your session IDs and one can take this block of memory So now when you want to do calculations on that, you simply load that block, you can load a good chunk of that block directly onto the CPU cache and do that calculation one go sometimes leveraging SIMD, Single Instruction Multiple Data operations, where you’re actually doing these calculations in single CPU cycles So you get a huge efficiency by having this locality of data And allows you to do all kinds of interesting things Like I said, you can leverage some of the interesting architectural aspects of GPUs There’s lots of these scatter gather I/O type of operations that you can do to really perform these operations efficiently So that’s the format what are some of the Building Blocks? So these are the Arrow Blocks I mentioned before We have a quick survey of some of the four most interesting ones First off is our Parquet readers and writers So what these are designed to do is get data from Parquet to Arrow or from Arrow to Parquet very fast So this is done natively at the C++ level So is an extremely fast operation This is particularly useful because your Parquet file formats are so similar to your Arrow formats So you can sort of think of them as cousins So it becomes very efficient to string Parquet into Arrow And then when the rest of your data pipeline is Arrow, you’re really not having to constantly change data structures and data formats and you’re never going to be marshaling data once it’s out of Parquet and into Arrow Similarly, a more dast implementation is the Feather implementation If you’re familiar with one of the Arrow founders Wes McKinney’s work, you might have seen Feather That’s a good way to get data between R/Python very fast It’s meant to be ephemeral It doesn’t last very long and it’s not a durable storage, like you’d expect from Parquet, but it’s extremely fast And then on the sort of the compute kernel side, you have something called Gandiva And in that case if you have a, say you have an Arrow, some arbitrary expression, maybe some filters from a SQL statement, or you’re multiplying some columns together and dividing by the other column, something like that You can take that expression, feed it into Gandiva, and Gandiva will spit out some LLVM bytecode of that expression And then LLVM will just in time compile that into the machine machine code for your native operating system So here, you’re able to generate arbitrary expressions on your Arrow data, and you really get the speed and machine code regardless of your starting language So it doesn’t matter if you’re in Java, JavaScript or Ruby or whatever else, you’re going to get full machine level speed on these Key kernels And finally is Arrow Flight And that’s really why we’re here today And that’s our newest member to these building blocks, and it’s our RPC mechanism So what iS Arrow Flight? Simply it’s a Again, it’s a protocol It’s high performance protocol that defines how to move data between two systems And the key here is, it’s a bulk transfer system One of the reasons that it’s so much faster than other implementations is you take an error buffer, and write it directly into your network buffer So you don’t have to translate your data before writing it onto the network Similarly, the client is going to receive the error data from the network and it’s going to materialize directly into an Arrow buffer So you’re not dealing with all the marshaling and the expensive CPU calculations of having to constantly change the format that your data is in

So that’s where the speed comes from This also is sort of the last piece in our interoperability problems So as I said, one of the founding points of Arrow is to make a lingua franca for data And what the Arrow Flight does is it allows any system any operating system most any programming language to talk to each other In a understood known language, we never have to marshal data, change data, transform data And it’s built up from the ground up to support parallel streams, which I’ll get to in a few minutes and security So out of the box, in only maybe a dozen lines of Python, you can write a Flight server that will be SSL encrypted and have security attached to it So, some of the features that you expect are builds built in right out of the box So how does this protocol actually look like? Well, so it’s built primarily, the underlying format is gRPC So if you’re not familiar with gRPC, it’s a Google’s RPC mechanism And the idea there is you to find a concrete set of operations that you know how to do, and then clients come in and request you to do those operations by supplying you the data or requesting you to form something about it Well, the core concepts around gRPC is a concept of streams So rather than I give you some data, you give me back some data, I can open up a stream and continuously feed you data or receive data So what a client server interaction looks like is going to be something like, I’m going to send you data Here’s a batch, here’s a batch, here’s a batch, here’s a batch I’m done That’s some of the efficiency and some of the ease of use around gRPC helps us do that and it’s it’s going to be really fast We also interact with a relatively low layer of gRPC, which is where we’re able to get the zero copy between the network buffer and the in memory process buffers Some of the kind of things that Flight can support right now is you can do, PUTs and GETs So you can give a server data, you can get data from Server, and recent addition is something a bidirectional streams, that’s constant interchange of data And everything is initiated by the client here So, the client is in control of how these transactions happen You never standing around waiting for a server to contact you So this is what got us thinking originally about the concept of microservices So you’re gonna have, say you have a Spark instance Spark cluster in a Dremio cluster and you want a very large data set from Dremio inter Spark for some data science application where you can, so you use the Flink connector and you can stream in parallel, bring back a very large data set very quickly Then you train your model and you expose that model as a FlightEndpoint So now a further downstream consumers able to send you a matrix and get back up prediction vectors, something like that So you’re able to start getting the concept of around have a bunch of small servers doing really concrete things Something more esoteric might be you’re doing a machine learning mixing model, so you ask the system for a prediction and then it federates out to a bunch of different machine learning models And since they all know how to talk Flight and are using Arrow under the hood, they can be you know TensorFlow on Spark, you know, the Python it doesn’t really matter Ask all those to train the data on their individual models bring all that back, mix it together in the upstream microservice before sending it back to the client So you can start breaking down your data pipeline into a bunch of distinct components that can be reused, deployed and developed individually So you can start seeing the sort of microservices architecture come out of this data world And for me, at least, I think this makes the concept of a data machine learning pipeline much more palatable to manage and control and build and maintain So that’s kind of the underlying How does the Parallel Streams work? Well, when you ask a server for a data set, you’re going to ask for a Flight information And you’re gonna get back a set of FlightEndpoints and those FlightEndpoints represent your stream So if you collect data from all those FlightEndpoints,

you’ll have your entire data set And the FlightEndpoint is simply a Flight ticket, which is a token to say you’re allowed to get that data set and it identifies a very specific portion of a specific data set and an endpoint so you know where to get the data from So when you get back this set of FlightEndpoints, you’re then able to break those up any way you want If you get 10 back from your Flight server, you can redeem Flight tickets, one at a time and zero if you want Or you can do them in a big bank, say you can spread them across a parallel system either in memory for a single process or you can send them across a bunch of Spark executors And you’re really just linearly multiplying your ability to move data (mumbles) And, right now, the way you get data from Spark or from Flight, excuse me is either sort of a dotted namespace so you can say, ListFlights, and that’ll say all of the namespace separated data sets and then Flights or knows about, or you can even send an SQL query So you can send a full SQL query, the downstream server will execute that SQL query and send you the results back So more concretely, that’s sort of the parallel system will look more like this If you have a Dremio cluster and your say a Python Client, the Python Client will issue a get Flight info to the to the coordinator to the Dremio coordinator, which will return back a set of endpoints and then the client knows to go to each of those endpoints with their specific tickets to get the data and in this case, it will go to a bunch of Dremio executors to get the data So if a client was a Spark client and you can have your Spark executors facing off against your Dremio executors, and you’re just getting individual pieces of your data back into your Spark executors So, that’s Flight Now let’s talk about what the Spark Source looks like First, our Spark source is built off of the relatively new DataSource version two That came out, I guess, a couple of smart versions ago It was a complete rewrite on the data source API And has a lot of really interesting, really exciting features for people building data sources And as big of a change as that was there is even another very large change going into Spark three So it’s a lot of really cool stuff happening for these Data Sources For us, some of the most important stuff is the columnar support, so you can do stuff with Arrow batches, for example, transactions which isn’t very important for this example But it’s really powerful So you can actually perform like acid transactions on your underlying data source And if that’s a Flight data source talking to a large scale, say Dremio cluster or something like that, then you’re able to do some pretty powerful transactions It’s also easier to control partitions and how to map data source partitions into Spark partitions, and better support for push downs, which is really important for single sources So the the Spark source for Flight what that looks like so it leverages the columnar batch as I said before, so you’re actually pulling Arrow offers directly from the Flight source into Sparks in internal Arrow representation So the columnar batch is actually using Arrow under the hood It’s also fully supporting push down so if your Flight server is able to understand SQL then you can generate generic SQL queries inside of your pandas to any side of your Spark data frame And the source will translate that down into SQL before sending it off to the Spark source Currently, the Data Source V2 is only capable of doing push downs for filters and predicates so it doesn’t support things like joints or aggregates So it does limit how much heavy computation you’re able to push down to your downstream sources, but hopefully that’ll be added in the near future And then that can really you’re really able to send arbitrary SQL down here to your Flight sources from your Spark data frames And finally, for our use case, we’re gonna partition by Arrow Flight ticket So the Flights Source the Flight server is going

to generate a bunch of error tickets and then those error tickets are going to be federated out as partitions in Spark So that the the downstream system is actually able to control how it’s gonna best partition the data given the queries that’s being said So that’s it for the Spark Source It’s relatively simple There’s a link to the GitHub repo at the end of the talk I encourage everyone who’s interested to go and give it a try For now let’s talk about benchmarks The fun stuff So for this benchmark, we worked on AWS We took the most recent version of EMR and face that off against Dremio AWS Edition So in both cases, we’re gonna have four nodes, four relatively large nodes So relatively equal comparison of compute power And then, we are gonna run a bunch of different data sizes for each data size, we’re going to perform some non-trivial calculation on the Spark side And that’s to make sure that we use every row and every column And to make sure that Spark or Dremio aren’t playing games on this When I was first working on this, we were Spark and Dremio are both relatively smart, so they were dropping columns that weren’t being calculated on them But what that gives us is two times, the first time is the time for the entire calculation, the Spark Dremio and on the wire calculation, and the time in brackets is the on the wire Everything’s measured in seconds So just focusing on JDBC versus Serial Flight, you can see significant performance improvements between JDBC and Flight immediately on the single serial use case of Flight So there’s already a good argument for why Flight can be a good replacement for JDBC or ODBC in the future And then when we start talking about the parallel stuff, we can really see massive performance improvements So we are seeing orders of magnitude improvements over JDBC, many, several multiple orders of magnitude in some cases For point of reference, when we are using eight nodes of both EMR and Dremio, in moving a billion records that total something like 80 gigabytes of data And that boiled down to something like four gigabits per second of bandwidth on each one of those nodes Which is actually a significant portion of the total EC2 bandwidth available So at this point, we are actually moving data pretty much as fast as we can move it, on these modern cloud networks Which is really exciting And I think this is what really makes for me the concept of these microservices which are distinct, isolated components, all talking to each other a real reality So thanks everyone for watching and listening and happy to take any question and here are some links to get you guys started with flying or with Arrow and Spark