F# Zip Doesn’t Sort First

Here’s a problem I ran into as I worked through my Kafka code.  Near the end of the consumer project, I used the Seq.zip3 function to zip together three separate tuples and turn them into a quadruple.  This is a great function to flatten out a set of tuples, but I kept getting results that looked like this:


I kept running into situations like the above.  With Iowa, for example, the process said that we had 1994 flights come in over the time frame, but there were 3719 flights delayed.  This…didn’t seem right.  Similarly, there were 30,772 flights to Vermont, of which only 871 were delayed?  I have trouble believing that there were more flights to Vermont than Connecticut or Kentucky (which hosts the Cincinnati airport).

To help debug the code, I checked each individual tuple and spit out the counts, and noticed that although the delay counts were fine, the totals were way off.  After digging into the zip function a bit more, it clicked.  Zip does not sort first!

When you call the zip method, you’re not “joining” in the relational sense; you’re simply taking the first element from the first set and attaching to it the first element from the second set; then you take the second element from each set, the third, etc.

Once I understood that and corrected the code, life was good:


Thinking About Kafka Performance

Wrapping up my series on Kafka, I want to spend a few moments thinking about how my Kafka application has performed.  When I started this journey, I wanted to put together an easy-to-understand application which performs well.  I think I’ve solved the first problem well enough, but still have the second problem.  What follows are some assorted thoughts I have regarding this performance discrepancy.

Keep in mind throughout this that my simple CSV pump-and-dump averaged 18K records per second, my enricher averaged somewhere around 2K records per second, and my data aggregator averaged approximately 10K records per second.  Considering that the article which drove Kafka to prominence showed 2 million writes per second, I’m way under-shooting the mark.  Without performing any serious tests, here are a few things that might increase the throughput:

  1. I’ve run the test from a single consumer to a single broker from a single laptop.  That gives me several areas for parallelism:  I could have more consumers pushing to more brokers, each on dedicated hardware.
  2. I’m pushing to just one partition.  If I increase the number of brokers, I should increase the number of partitions to ensure that we’re writing to each of the brokers in parallel.
  3. I don’t believe that I’m batching messages to Kafka.  One way of increasing throughput is to bundle a number of small messages together, with the idea being that there is messaging overhead with TCP, which we have to treat as a fixed cost.  By packing more data in, given a fixed cost, we reduce the relative burden of that fixed cost.  We also have certain fixed costs around Kafka handling individual messages and writing them individually.
  4. I’m using a .NET library.  The .NET libraries for Kafka seem to be a lot slower than their Java counterparts.  In discussions with another person who pulled off a similar project, he noticed about a 5x improvement in switching from a .NET library to a Java library (100k records/sec to 500k).  I’m sure that’s a ‘Your Mileage May Vary’ scenario, but it makes sense to me, given the relative focus in the entire Hadoop ecosystem is Java-first (and sometimes Java-only).

After monkeying with some configuration settings, I was able to get about 25,000 messages per second into the Flights topic and could bump up the others a fair percentage as well.  When generating messages and shooting them directly into Kafka, I was able to get 30K messages per second, so it’s nowhere near 800K messages per second (as Jay Kreps was able to get in his single producer thread, no replication scenario), but it’s nothing to sneeze at.

Resetting Kafka Topics

Let’s say you’re working on a program to load a Kafka topic and you mess up and want to start over.  There are two good ways of doing this.  Both of these methods involve connecting to the name node and running shell scripts in /usr/hdp/[version]/kafka/bin (for the Hortonworks Data Platform; for some other distro, I leave it as an exercise to the reader to find the appropriate directly…mostly because I wouldn’t know where it was).

Method One:  Delete And Re-Create

The method that I’ve shown already is the delete and re-create method.  This one is pretty simple:  we delete the existing topic and then generate a new one with the same name.

./kafka-topics.sh --delete --zookeeper localhost:2181 --topic test

When you delete the topic, you’ll the the following warning message:


You can check this in Ambari by going to the Kafka —> Configs section:


Then, once we’ve deleted the topic, we can re-create it.

./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

Method Two:  Retention Policy Shenanigans

The first method works fine for non-production scenarios where you can stop all of the producers and consumers, but let’s say that you want to flush the topic while leaving your producers and consumers up (but maybe you have a downtime window where you know the producers aren’t pushing anything).  In this case, we can change the retention period to something very short, let the queue flush, and bring it back to normal, all using the kafka-configs shell script.

First, let’s check out our current configuration settings for the topic called test:

./kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type topics --entity-name test


This might look odd at first, but it’s just the Kafka configuration script’s way of saying that you’re using the default settings.  Incidentally, our default setting has a retention period of 168 hours, as we can see in Ambari.


Now that we have the correct script, we can run the following command to set our retention policy to something a bit shorter:

./kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name test --add-config retention.ms=1000


Now we can see that the retention period is 1000 milliseconds, or one second. Give that a minute or two to take hold and then we can run the following to remove the special configuration:

./kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name test --delete-config retention.ms


And we’re back, with no real downtime.  As long as the producers were temporarily paused, we didn’t lose any data and our producers can go about their business like nothing happened.


There are at least two different methods for clearing out a Kafka topic.  Before you break out the hammer, see if monkeying with the retention period will solve your problem without as much disruption.

Console App 3: The Consumer

We’ve looked at two console applications thus far, one which reads data from a text file into Kafka; and one which reads from one Kafka topic, enriches the data with some information coming out of SQL Server, and puts the results into another topic.  Today’s post will create a consumer which answers the questions we had at the beginning of the series.

What Were Those Questions Again?

As a quick reminder, here’s what I want to know:

  1. By destination state, how many flights did we have in 2008?
  2. How many of those flights were delayed?
  3. How long were those delays in the aggregate?
  4. Given that we’re going to experience a delay, how long can we expect it to be?

We’ll read the data off of the EnrichedFlights Kafka topic and perform aggregation within F#, printing results to the console.

Back To The Projects

Our final console application weighs in at about 100 lines of code.  Just like the first two days’ console apps, we’re going to start with the driver function and work our way out from there.

let main argv =
    let (config:RdKafka.Config) = new Config(GroupId = "Airplane Consumer")
    use consumer = new EventConsumer(config, "sandbox.hortonworks.com:6667")
    let topics = ["EnrichedFlights"]
    consumer.Subscribe(new System.Collections.Generic.List<string>(topics |> List.toSeq))

    let flights = new System.Collections.Generic.List<Flight>()

    processMessages consumer 1000 flights
    printfn "Started reader. Press enter to finalize calculations and display results."
    System.Console.ReadLine() |> ignore
    consumer.Stop() |> ignore
    printfn "You've finished pulling enriched data.  Now aggregating data..."

    let flightTuple =
        List.ofSeq flights
        |> Seq.map(fun f -> (f.DestinationState, match f.ArrDelay.IsSome with
                                                    | true -> f.ArrDelay.Value
                                                    | false -> 0))
    let results = delaysByState flightTuple
                    |> Seq.iter(fun(dest,flights,delayed,delay) ->
                            printfn "Dest: %A. Flights: %i.  Delayed: %i.  Total Delay (min): %i.  Avg When Delayed (min): %.3f" dest flights delayed delay (float(delay)/float(delayed)))

    printfn "Press enter to quit."
    System.Console.ReadLine() |> ignore

    0 // return an integer exit code

Our main function is just a couple dozen lines of code.  Just like before, we declare a consumer group and connect to Kafka; this time, however, we’re going to hit the EnrichedFlights topic.  The first major function that we hit is processMessages.

The processMessages function gives me an opportunity to start from the top:

module AirplaneConsumer

open RdKafka
open System
open FSharp.Collections.ParallelSeq
open Newtonsoft.Json

type Flight = { Year:int; Month:int; DayOfMonth:int; DayOfWeek:int; DepTime:Option<int>; CRSDepTime:int; ArrTime:Option<int>; CRSArrTime:int;
                UniqueCarrier:string; FlightNum:string; TailNum:string; ActualElapsedTime:Option<int>; CRSElapsedTime:Option<int>; AirTime:Option<int>;
                ArrDelay:Option<int>; DepDelay:Option<int>; Origin:string; Dest:string; Distance:int; TaxiIn:Option<int>; TaxiOut:Option<int>;
                Cancelled:bool; CancellationCode:string; Diverted:bool; CarrierDelay:int; WeatherDelay:int; NASDelay:int; SecurityDelay:int;
                LateAircraftDelay:int; OriginCity:string; OriginState:string; DestinationCity:string; DestinationState:string; }

let readFromBeginning (consumer:EventConsumer) =
    consumer.OnPartitionsAssigned.Add(fun(partitions) ->
        printfn "Starting from the beginning..."
        let fromBeginning = List.ofSeq partitions
                               |> List.map(fun(x) -> new TopicPartitionOffset(x.Topic, x.Partition, RdKafka.Offset.Beginning))
        let fb = new System.Collections.Generic.List<TopicPartitionOffset>(fromBeginning |> List.toSeq)

let processMessages (consumer:EventConsumer) n (flights:System.Collections.Generic.List<Flight>) =
    //Always start from the beginning.
    readFromBeginning consumer

    consumer.OnMessage.Add(fun(msg) ->
        let messageString = System.Text.Encoding.UTF8.GetString(msg.Payload, 0, msg.Payload.Length)
        let flight = JsonConvert.DeserializeObject<Flight>(messageString)

The processMessages function takes three parameters:  a consumer, a count of number of records before we spit out a message (which I’ve stripped out for simplicity’s sake), and a collection of Flights to which we append.

The readFromBeginning function looks pretty similar to Friday’s readFromBeginning, so I’ll just glance over that—we turn it on by default because I want to stream the entire data set.

From here, I hook into the OnMessage event just like before, and like before we decode the Kafka payload and turn it into a string.  Unlike before, however, I call Newtonsoft’s DeserializeObject method and return a Flight type, which I’ve defined above.  This is the same definition as in the Producer, so in a production-quality environment, I’d pull that out to a single location rather than duplicating it.

Going back to the main function, I call the consumer.Start() method and let ‘er rip.  When I’m ready to aggregate, I’ll hit the enter key and that’ll call consumer.Stop().  When that happens, I’m going to have up to 7 million records in a list called flights.  Out of all of this information, I only need two attributes:  the destination state and the arrival delay in minutes.  I get those by using the map function on my sequence of flights, taking advantage of F#’s match syntax to get all relevant scenarios safely and put the result into a tuple.  The resulting sequence of tuples is called flightTuple.  I pass that into the delaysByState function.

let delaysByState flightTuple =
    let totalFlights =
        |> Seq.countBy fst
        |> Seq.sort
    let delayedFlights =
        |> Seq.filter(fun(dest,delay) -> delay > 0)
        |> Seq.countBy fst
        |> Seq.sort
    let totalArrivalDelay =
        |> Seq.groupBy(fun(dest,delay) -> dest)
        |> Seq.map(fun(dest,delay) -> (dest,delay |> Seq.sumBy snd))
        |> Seq.sort
    let results =
        Seq.zip3 totalFlights delayedFlights totalArrivalDelay
        |> Seq.map(fun((dest,flights),(dest,delayed),(dest,delay)) -> dest,flights,delayed,delay)
        |> Seq.sort

This function is probably the most confusing function out of any of the three console apps for non-F# programmers.  I’m taking the sequence of tuples and processing them three times.  The first process gets a count of items grouped by the first element in the tuple (that is, destination state) and then sorting that tuple.  We call that totalFlights.  The second bit takes our sequence of flight tuples and retains only those with a delay of more than 0 minutes.  I then get the count of results by state and sort the tuple.  We call that delayedFlights.  Then, I take flightTuple one more time and group by destination, but instead of getting a count, I want to sum the second element in the tuple (delay in minutes), so I need to map my sequence, summing up by delay.  I sort that result set and call it totalArrivalDelay.  Finally, I build a results quadruple (n-tuple with 4 elements) and use the zip3 function to combine all three result sets together.  Don’t think of “zip” like compression; instead, think of it like a zipper:  it attaches elements of a data set together.  Here, we take the three tuples and zip them, leaving us with a quadruple.

Taking that results quadruple back to the main function, we iterate over each result in the sequence—that is, we iterate over each state—and spit out results to the console.  Here’s what it looks like:


We were able to process approximately 10,000 records per second and after approximately 1:45 of churn, we learned that New Jersey is the worst place to go with respect to arrival delays in 2008.


At this point, we now have a working three-piece system for processing data.  There are some things we can do to help make this a production-worthy system—for example, I could tighten up the Flights type to keep only the fields we actually need (so I don’t need to hold 6 GB of data in RAM).  But as it is, we have a working demo of Kafka interacting with .NET, and that’s good enough for me today.

Console App 1: The Producer

Today’s post will look at the first of three console applications.  This app is going to read in flight data from an external source (in my case, a flat file) and push messages out to a Kafka topic.

A Sidebar:  F#

kafkadotnetsolutionIf you look on the left-hand side, you’ll see the three console projects that we are going to use for this Kafka solution.  You may notice that these are all F# projects.

Whenever I present my Integrating Hadoop and SQL Server talk, I always get asked one question:  “Why are you using F#?”  I like getting this question.  The honest answer is, because I enjoy the language.  But there’s a bit more to it than that.

First, let me respond to the important subtext of the question:  no, there’s nothing in this project that we absolutely need F# for.  I can just as easily write C# code to get to the same answer, and if you’re more comfortable writing code in C#, then go for it.

So why, then, do I persist?  Here are my reasons, in no particular order:

  1. I want to learn things, and the best way to learn a language is to use it.  I’m not particularly good at writing F# code; I’ve been exposed to it for a couple of years now and can code my way out of a wet paper bag, but it’d be scary to jump onto a project as an F# developer.  I want to change that.
  2. The reason I want to change that is that I see the power of functional languages.  If you’re not sold on the importance of functional languages, I need only note that both C# and Java have made significant functional inroads with LINQ/lambdas, closures, pipelines (via the fluent interface in C#), etc.
  3. I want to show that the entryway from C# to F# is pretty easy.  A very complex block of F# code will leave me scratching my head, absolutely, but I can give working, tutorial-style code to people and have them easily understand it.  If I’m going to show you easy code, I’d rather show you easy code in a language you don’t know—that way, you’re learning two things at the same time.  This does run the risk of unnecessarily confusing people, admittedly, but I think the risk is relatively low gien my target audience.
  4. I think there’s a synergy between functional languages and SQL.  I believe that having a background in SQL gives you an appreciation for set-based solutions and treating data and functions as first-class citizens.  A language like F# or Scala appeals to the relational database developer in me, as I’m using the pipeline to push data through functions rather than nesting for loops.
  5. The code is more concise.  My producer is barely 40 lines of code, including a big comment block.  This is probably a trivial enough application that I could write a similar C# app in not too many more lines of code, but one scales with respect to complexity much better than the other.

Those are my major reasons.  There are reasons not to use F#, but my belief is that the reasons to use this language in my demos outweigh the negatives.

Anyhow, with all of that said, let’s check out what I’m doing.

NuGet Packages

For the entire solution, I’m going to need a few NuGet packages.  Here are my final selections:


The first two are F#-specific packages.  We’ll see ParallelSeq in action today, but the SqlClient will have to wait until tomorrow, as will Newtonsoft.Json.

The last two libraries are Kafka libraries.  The first is rdkafka-dotnet.  This is a fairly thin wrapper around librdkafka.  Of all the .NET Kafka libraries I tried, this one made the most sense to me.

Other Kafka libraries for .NET that I looked at include:

  • Kafka Sharp, which is intended to be a fast implementation.
  • Franz, a Kafka library written in F# (but works just as well with C#).
  • Kafka4net, another implementation designed around Kafka 0.8.
  • CSharpClient-for-Kafka, Microsoft’s implementation.

You’re welcome to try out the different libraries and see what works best for you.  There was even an article written around writing your own Kafka library.

Loading Data

The console app is pretty simple.  Simple enough that I’m going to include the functional chunk in one code snippet:

module AirplaneProducer

open RdKafka
open System
open System.IO
open FSharp.Collections.ParallelSeq

let publish (topic:Topic) (text:string) =
    let data = System.Text.Encoding.UTF8.GetBytes(text)
    topic.Produce(data) |> ignore

let loadEntries (topic:Topic) fileName =
        |> PSeq.filter(fun e -> e.Length > 0)
        |> PSeq.filter(fun e -> not (e.StartsWith("Year")))   //Filter out the header row
        |> PSeq.iter(fun e -> publish topic e)

let main argv =
    use producer = new Producer("sandbox.hortonworks.com:6667")
    let metadata = producer.Metadata(allTopics=true)
    use topic = producer.Topic("Flights")

    //Read and publish a list from a file
    loadEntries topic "C:/Temp/AirportData/2008.csv"

    printfn "Hey, we've finished loading all of the data!"
    Console.ReadLine() |> ignore
    0 // return an integer exit code

That’s the core of our code.  The main function instantiates a new Kafka producer and gloms onto the Flights topic.  From there, we call the loadEntries function.  The loadEntries function takes a topic and filename.  It streams entries from the 2008.csv file and uses the ParallelSeq library to operate in parallel on data streaming in (one of the nice advantages of using functional code:  writing thread-safe code is easy!).  We filter out any records whose length is zero—there might be newlines somewhere in the file, and those aren’t helpful.  We also want to throw away the header row (if it exists) and I know that that starts with “Year” whereas all other records simply include the numeric year value.  Finally, once we throw away garbage rows, we want to call the publish function for each entry in the list.  The publish function encodes our text as a UTF-8 bytestream and pushes the results onto our Kafka topic.

Running from my laptop, loading 7 million or so records took 6 minutes and 9 seconds.


That averages out to about 19k records per second.  That’s slow, but I’ll talk about performance later on in the series.  For real speed tests, I’d want to use independent hardware and spend the time getting things right.  For this demo, I can live with these results (though I’d happily take an order of magnitude gain in performance…).  In testing the other two portions, this is the fastest of the three, which makes sense:  I’m doing almost nothing to the data outside of reading and publishing.


We’ve built a console app which pushes data to a Kafka topic.  That’s a pretty good start, but we’re only a third of the way done.  In tomorrow’s post, I’m going to pull data off of the Flights topic and try to make some sense of it.  Stay tuned!

Kafka Shell Scripts

Today’s Kafka post will be a relatively simple one, where we use the built-in shell scripts to create a new topic, add some records, and view those records.  We’ll wrap it up by creating the topics I need for the rest of the process.

I’m going to use the Hortonworks Data Platform 2.4 sandbox for this.  You can use other versions, but I have this one readily available (thanks to my Polybase + Docker issues).

Is Kafka Actually On?

By default, Kafka is in maintenance mode on the Hortonworks sandbox.  I modified my local hosts file to make sandbox.hortonworks.com point to my sandbox, so to connect to Ambari on port 8080, I go to http://sandbox.hortonworks.com:8080:


Now that we know Kafka is on, I can check the Configs tab to see how to connect to Kafka:


There are three important things here:  first, our Zookeeper port is 2181.  Zookeeper is great for centralized configuration and coordination; if you want to learn more, check out this Sean Mackrory post.

The second bit of important information is how long our retention period is.  Right now, it’s set to 7 days, and that’s our default.  Remember that messages in a Kafka topic don’t go away simply because some consumer somewhere accessed them; they stay in the log until we say they can go.

Finally, we have a set of listeners.  For the sandbox, the only listener is on port 6667.  We connect to listeners from our outside applications, so knowing those addresses and ports is vital.

Creating A Topic

Now that we know where everything is, let’s connect to our sandbox and start using some Kafka shell scripts.  There are three scripts that I’ll use today:


The first script I plan to use is kafka-topics.sh.

./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

The response is pretty simple:  Created topic “test”

If we want to know what topics already exist, I can use the list command:

./kafka-topics.sh --zookeeper localhost:2181 --list


You can see my “test” topic as well as a few others.

Publish To A Topic

We want to add a message to a topic, and the quickest way to do that is to use the built-in console producer:

./kafka-console-producer.sh --broker-list sandbox.hortonworks.com:6667 --topic test

If you did it right, you’ll get a blinking cursor and the ability to enter text.  You publish a message by hitting the enter key, and you can quit with control + c.


Consume From A Topic

We have something to send messages, so it make sense that we have something to receive messages.  That’s what the console consumer does:

./kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

This shell script reads from a topic and there’s an optional from-beginning flag which lets you start from the very first message.  Otherwise, if you leave that off, the consumer will pick up the topic in mid-stream and show whatever anybody is currently pushing, but not go back into history.


It’s hard to capture in screenshots, but if you have the publisher and consumer both up at the same time, you’ll see messages appear on the consumer almost instantaneously.

Cleanup:  Remove A Topic

When we don’t have any further use for a topic, we can remove it:

./kafka-topics.sh --delete --zookeeper localhost:2181 --topic test

The message we get back is interesting:


The delete.topic.enable configuration setting is false by default, so we have to flip it to true before we can actually delete a topic.  It’s easy enough to do in Ambari:


Once you change that configuration setting, you will need to restart the service.

Final Prep Work:  Creating Relevant Topics

Okay, we played around with a test topic a little bit, so let’s create the real topics that we’ll use for the rest of this journey.  I want to create two topics:  one for raw flight data and the second for enriched flight data.  The commands are straightforward:

./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic Flights
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic EnrichedFlights

You’ll note that we already had these topics, almost like I’ve actually done the work first and am not winging it…

Wrapping Up

Today’s post was about understanding the basics of Kafka—the very basics. There are a lot of other things to learn, but I’m going to hold off on those, as we’ve got the bare minimum of what we need to get going.  We’ll start fresh tomorrow on the .NET side of the house.

Solving A Problem With Kafka

Today, we’re going to look at the specific problem I want to solve using Kafka.

The Data

I am going to use flight data because it is copious and free.  Specifically, I’m going to look at data from 2008, giving me approximately 7 million data points.

The Question

Given this data, I’d like to ask a couple of questions.  Specifically, by destination state, how many flights did we have in 2008?  How many of those flights were delayed?  How long were those delays in the aggregate?  And given that we’re going to experience a delay, how long can we expect it to be?

The Other Question:  Why Use Kafka?

Hey, I know how to solve this problem with SQL Server:  read the 2008 CSV file in and write a basic SQL query.  Or I could load the data in Hive and run the same query.

So why would I want to use Kafka?  Isn’t this overkill?  In this case it is.  But let’s spin the story a slightly different way:  instead of reading from a file, let’s pretend that our consumers are receiving messages from a web service or some other application.  We now want to stream the data in, possibly from a number of sources.  We can decouple message collection from message processing.

In the real world, my problem involves handling clickstream data, where we’re looking at hundreds of millions of messages per day.  So what I’m dealing with is certainly a toy problem, but it’s the core of a real problem.

The Infrastructure

So here’s my plan:  I’m going to have a console application read from my flight data file, putting messages into a single Kafka topic with a single partition.  The purpose of this app is to do nothing more than read text string and push them onto the topic, doing nothing fancy.

From there, I intend to enrich the data using another application.  This will process messages on the raw topic, clean up messages (e.g., fixing known bad data), and perform a lookup against some data in SQL Server.  Once we’re done with that, we can put the results on an enriched data topic.  Our final process will read from the enriched topic and build up a data set that we can use for our solution.

What’s Next?

The next four posts are going to cover some specifics.  In the next post, I’m going to walk through some of the really simple parts of Kafka, including setting up topics and pushing & receiving messages.  From there, the next three posts will look at the three console applications I’m going to write to handle the different stages of this application.  I’ll wrap it up with a couple more Kafka-related posts.  So stay tuned!