We’ve looked at two console applications thus far, one which reads data from a text file into Kafka; and one which reads from one Kafka topic, enriches the data with some information coming out of SQL Server, and puts the results into another topic. Today’s post will create a consumer which answers the questions we had at the beginning of the series.
What Were Those Questions Again?
As a quick reminder, here’s what I want to know:
- By destination state, how many flights did we have in 2008?
- How many of those flights were delayed?
- How long were those delays in the aggregate?
- Given that we’re going to experience a delay, how long can we expect it to be?
We’ll read the data off of the EnrichedFlights Kafka topic and perform aggregation within F#, printing results to the console.
Back To The Projects
Our final console application weighs in at about 100 lines of code. Just like the first two days’ console apps, we’re going to start with the driver function and work our way out from there.
[<EntryPoint>] let main argv = let (config:RdKafka.Config) = new Config(GroupId = "Airplane Consumer") use consumer = new EventConsumer(config, "sandbox.hortonworks.com:6667") let topics = ["EnrichedFlights"] consumer.Subscribe(new System.Collections.Generic.List<string>(topics |> List.toSeq)) let flights = new System.Collections.Generic.List<Flight>() processMessages consumer 1000 flights consumer.Start() printfn "Started reader. Press enter to finalize calculations and display results." System.Console.ReadLine() |> ignore consumer.Stop() |> ignore printfn "You've finished pulling enriched data. Now aggregating data..." let flightTuple = List.ofSeq flights |> Seq.map(fun f -> (f.DestinationState, match f.ArrDelay.IsSome with | true -> f.ArrDelay.Value | false -> 0)) let results = delaysByState flightTuple |> Seq.iter(fun(dest,flights,delayed,delay) -> printfn "Dest: %A. Flights: %i. Delayed: %i. Total Delay (min): %i. Avg When Delayed (min): %.3f" dest flights delayed delay (float(delay)/float(delayed))) printfn "Press enter to quit." System.Console.ReadLine() |> ignore 0 // return an integer exit code
Our main function is just a couple dozen lines of code. Just like before, we declare a consumer group and connect to Kafka; this time, however, we’re going to hit the EnrichedFlights topic. The first major function that we hit is processMessages.
The processMessages function gives me an opportunity to start from the top:
module AirplaneConsumer open RdKafka open System open FSharp.Collections.ParallelSeq open Newtonsoft.Json type Flight = { Year:int; Month:int; DayOfMonth:int; DayOfWeek:int; DepTime:Option<int>; CRSDepTime:int; ArrTime:Option<int>; CRSArrTime:int; UniqueCarrier:string; FlightNum:string; TailNum:string; ActualElapsedTime:Option<int>; CRSElapsedTime:Option<int>; AirTime:Option<int>; ArrDelay:Option<int>; DepDelay:Option<int>; Origin:string; Dest:string; Distance:int; TaxiIn:Option<int>; TaxiOut:Option<int>; Cancelled:bool; CancellationCode:string; Diverted:bool; CarrierDelay:int; WeatherDelay:int; NASDelay:int; SecurityDelay:int; LateAircraftDelay:int; OriginCity:string; OriginState:string; DestinationCity:string; DestinationState:string; } let readFromBeginning (consumer:EventConsumer) = consumer.OnPartitionsAssigned.Add(fun(partitions) -> printfn "Starting from the beginning..." let fromBeginning = List.ofSeq partitions |> List.map(fun(x) -> new TopicPartitionOffset(x.Topic, x.Partition, RdKafka.Offset.Beginning)) let fb = new System.Collections.Generic.List<TopicPartitionOffset>(fromBeginning |> List.toSeq) consumer.Assign(fb); ) let processMessages (consumer:EventConsumer) n (flights:System.Collections.Generic.List<Flight>) = //Always start from the beginning. readFromBeginning consumer consumer.OnMessage.Add(fun(msg) -> let messageString = System.Text.Encoding.UTF8.GetString(msg.Payload, 0, msg.Payload.Length) let flight = JsonConvert.DeserializeObject<Flight>(messageString) flights.Add(flight) )
The processMessages function takes three parameters: a consumer, a count of number of records before we spit out a message (which I’ve stripped out for simplicity’s sake), and a collection of Flights to which we append.
The readFromBeginning function looks pretty similar to Friday’s readFromBeginning, so I’ll just glance over that—we turn it on by default because I want to stream the entire data set.
From here, I hook into the OnMessage event just like before, and like before we decode the Kafka payload and turn it into a string. Unlike before, however, I call Newtonsoft’s DeserializeObject method and return a Flight type, which I’ve defined above. This is the same definition as in the Producer, so in a production-quality environment, I’d pull that out to a single location rather than duplicating it.
Going back to the main function, I call the consumer.Start() method and let ‘er rip. When I’m ready to aggregate, I’ll hit the enter key and that’ll call consumer.Stop(). When that happens, I’m going to have up to 7 million records in a list called flights. Out of all of this information, I only need two attributes: the destination state and the arrival delay in minutes. I get those by using the map function on my sequence of flights, taking advantage of F#’s match syntax to get all relevant scenarios safely and put the result into a tuple. The resulting sequence of tuples is called flightTuple. I pass that into the delaysByState function.
let delaysByState flightTuple = let totalFlights = flightTuple |> Seq.countBy fst |> Seq.sort let delayedFlights = flightTuple |> Seq.filter(fun(dest,delay) -> delay > 0) |> Seq.countBy fst |> Seq.sort let totalArrivalDelay = flightTuple |> Seq.groupBy(fun(dest,delay) -> dest) |> Seq.map(fun(dest,delay) -> (dest,delay |> Seq.sumBy snd)) |> Seq.sort let results = Seq.zip3 totalFlights delayedFlights totalArrivalDelay |> Seq.map(fun((dest,flights),(dest,delayed),(dest,delay)) -> dest,flights,delayed,delay) |> Seq.sort results
This function is probably the most confusing function out of any of the three console apps for non-F# programmers. I’m taking the sequence of tuples and processing them three times. The first process gets a count of items grouped by the first element in the tuple (that is, destination state) and then sorting that tuple. We call that totalFlights. The second bit takes our sequence of flight tuples and retains only those with a delay of more than 0 minutes. I then get the count of results by state and sort the tuple. We call that delayedFlights. Then, I take flightTuple one more time and group by destination, but instead of getting a count, I want to sum the second element in the tuple (delay in minutes), so I need to map my sequence, summing up by delay. I sort that result set and call it totalArrivalDelay. Finally, I build a results quadruple (n-tuple with 4 elements) and use the zip3 function to combine all three result sets together. Don’t think of “zip” like compression; instead, think of it like a zipper: it attaches elements of a data set together. Here, we take the three tuples and zip them, leaving us with a quadruple.
Taking that results quadruple back to the main function, we iterate over each result in the sequence—that is, we iterate over each state—and spit out results to the console. Here’s what it looks like:
We were able to process approximately 10,000 records per second and after approximately 1:45 of churn, we learned that New Jersey is the worst place to go with respect to arrival delays in 2008.
Conclusions
At this point, we now have a working three-piece system for processing data. There are some things we can do to help make this a production-worthy system—for example, I could tighten up the Flights type to keep only the fields we actually need (so I don’t need to hold 6 GB of data in RAM). But as it is, we have a working demo of Kafka interacting with .NET, and that’s good enough for me today.
One thought on “Console App 3: The Consumer”