SQL Saturday Raleigh

SQL Saturday Raleigh 2017 is official.  The event will be Saturday, March 11th at William Peace University in Raleigh, North Carolina.  SQL Saturdays are free, one-day training events for database administrators, database developers, and anybody in the broader data platform space.  Attendance is free, and there is an optional $15 charge for lunch.  Our call for speakers is open through Tuesday, January 10th, so sign up!

For this year’s SQL Saturday, I hope to get a fairly broad range of topics, covering plenty of core SQL Server administration and development topics, but also topics like Hadoop and R, bringing in some groups of people who might not normally think of going to a SQL Saturday.

F# Zip Doesn’t Sort First

Here’s a problem I ran into as I worked through my Kafka code.  Near the end of the consumer project, I used the Seq.zip3 function to zip together three separate tuples and turn them into a quadruple.  This is a great function to flatten out a set of tuples, but I kept getting results that looked like this:


I kept running into situations like the above.  With Iowa, for example, the process said that we had 1994 flights come in over the time frame, but there were 3719 flights delayed.  This…didn’t seem right.  Similarly, there were 30,772 flights to Vermont, of which only 871 were delayed?  I have trouble believing that there were more flights to Vermont than Connecticut or Kentucky (which hosts the Cincinnati airport).

To help debug the code, I checked each individual tuple and spit out the counts, and noticed that although the delay counts were fine, the totals were way off.  After digging into the zip function a bit more, it clicked.  Zip does not sort first!

When you call the zip method, you’re not “joining” in the relational sense; you’re simply taking the first element from the first set and attaching to it the first element from the second set; then you take the second element from each set, the third, etc.

Once I understood that and corrected the code, life was good:


Thinking About Kafka Performance

Wrapping up my series on Kafka, I want to spend a few moments thinking about how my Kafka application has performed.  When I started this journey, I wanted to put together an easy-to-understand application which performs well.  I think I’ve solved the first problem well enough, but still have the second problem.  What follows are some assorted thoughts I have regarding this performance discrepancy.

Keep in mind throughout this that my simple CSV pump-and-dump averaged 18K records per second, my enricher averaged somewhere around 2K records per second, and my data aggregator averaged approximately 10K records per second.  Considering that the article which drove Kafka to prominence showed 2 million writes per second, I’m way under-shooting the mark.  Without performing any serious tests, here are a few things that might increase the throughput:

  1. I’ve run the test from a single consumer to a single broker from a single laptop.  That gives me several areas for parallelism:  I could have more consumers pushing to more brokers, each on dedicated hardware.
  2. I’m pushing to just one partition.  If I increase the number of brokers, I should increase the number of partitions to ensure that we’re writing to each of the brokers in parallel.
  3. I don’t believe that I’m batching messages to Kafka.  One way of increasing throughput is to bundle a number of small messages together, with the idea being that there is messaging overhead with TCP, which we have to treat as a fixed cost.  By packing more data in, given a fixed cost, we reduce the relative burden of that fixed cost.  We also have certain fixed costs around Kafka handling individual messages and writing them individually.
  4. I’m using a .NET library.  The .NET libraries for Kafka seem to be a lot slower than their Java counterparts.  In discussions with another person who pulled off a similar project, he noticed about a 5x improvement in switching from a .NET library to a Java library (100k records/sec to 500k).  I’m sure that’s a ‘Your Mileage May Vary’ scenario, but it makes sense to me, given the relative focus in the entire Hadoop ecosystem is Java-first (and sometimes Java-only).

After monkeying with some configuration settings, I was able to get about 25,000 messages per second into the Flights topic and could bump up the others a fair percentage as well.  When generating messages and shooting them directly into Kafka, I was able to get 30K messages per second, so it’s nowhere near 800K messages per second (as Jay Kreps was able to get in his single producer thread, no replication scenario), but it’s nothing to sneeze at.

Resetting Kafka Topics

Let’s say you’re working on a program to load a Kafka topic and you mess up and want to start over.  There are two good ways of doing this.  Both of these methods involve connecting to the name node and running shell scripts in /usr/hdp/[version]/kafka/bin (for the Hortonworks Data Platform; for some other distro, I leave it as an exercise to the reader to find the appropriate directly…mostly because I wouldn’t know where it was).

Method One:  Delete And Re-Create

The method that I’ve shown already is the delete and re-create method.  This one is pretty simple:  we delete the existing topic and then generate a new one with the same name.

./kafka-topics.sh --delete --zookeeper localhost:2181 --topic test

When you delete the topic, you’ll the the following warning message:


You can check this in Ambari by going to the Kafka —> Configs section:


Then, once we’ve deleted the topic, we can re-create it.

./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

Method Two:  Retention Policy Shenanigans

The first method works fine for non-production scenarios where you can stop all of the producers and consumers, but let’s say that you want to flush the topic while leaving your producers and consumers up (but maybe you have a downtime window where you know the producers aren’t pushing anything).  In this case, we can change the retention period to something very short, let the queue flush, and bring it back to normal, all using the kafka-configs shell script.

First, let’s check out our current configuration settings for the topic called test:

./kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type topics --entity-name test


This might look odd at first, but it’s just the Kafka configuration script’s way of saying that you’re using the default settings.  Incidentally, our default setting has a retention period of 168 hours, as we can see in Ambari.


Now that we have the correct script, we can run the following command to set our retention policy to something a bit shorter:

./kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name test --add-config retention.ms=1000


Now we can see that the retention period is 1000 milliseconds, or one second. Give that a minute or two to take hold and then we can run the following to remove the special configuration:

./kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name test --delete-config retention.ms


And we’re back, with no real downtime.  As long as the producers were temporarily paused, we didn’t lose any data and our producers can go about their business like nothing happened.


There are at least two different methods for clearing out a Kafka topic.  Before you break out the hammer, see if monkeying with the retention period will solve your problem without as much disruption.

Console App 3: The Consumer

We’ve looked at two console applications thus far, one which reads data from a text file into Kafka; and one which reads from one Kafka topic, enriches the data with some information coming out of SQL Server, and puts the results into another topic.  Today’s post will create a consumer which answers the questions we had at the beginning of the series.

What Were Those Questions Again?

As a quick reminder, here’s what I want to know:

  1. By destination state, how many flights did we have in 2008?
  2. How many of those flights were delayed?
  3. How long were those delays in the aggregate?
  4. Given that we’re going to experience a delay, how long can we expect it to be?

We’ll read the data off of the EnrichedFlights Kafka topic and perform aggregation within F#, printing results to the console.

Back To The Projects

Our final console application weighs in at about 100 lines of code.  Just like the first two days’ console apps, we’re going to start with the driver function and work our way out from there.

let main argv =
    let (config:RdKafka.Config) = new Config(GroupId = "Airplane Consumer")
    use consumer = new EventConsumer(config, "sandbox.hortonworks.com:6667")
    let topics = ["EnrichedFlights"]
    consumer.Subscribe(new System.Collections.Generic.List<string>(topics |> List.toSeq))

    let flights = new System.Collections.Generic.List<Flight>()

    processMessages consumer 1000 flights
    printfn "Started reader. Press enter to finalize calculations and display results."
    System.Console.ReadLine() |> ignore
    consumer.Stop() |> ignore
    printfn "You've finished pulling enriched data.  Now aggregating data..."

    let flightTuple =
        List.ofSeq flights
        |> Seq.map(fun f -> (f.DestinationState, match f.ArrDelay.IsSome with
                                                    | true -> f.ArrDelay.Value
                                                    | false -> 0))
    let results = delaysByState flightTuple
                    |> Seq.iter(fun(dest,flights,delayed,delay) ->
                            printfn "Dest: %A. Flights: %i.  Delayed: %i.  Total Delay (min): %i.  Avg When Delayed (min): %.3f" dest flights delayed delay (float(delay)/float(delayed)))

    printfn "Press enter to quit."
    System.Console.ReadLine() |> ignore

    0 // return an integer exit code

Our main function is just a couple dozen lines of code.  Just like before, we declare a consumer group and connect to Kafka; this time, however, we’re going to hit the EnrichedFlights topic.  The first major function that we hit is processMessages.

The processMessages function gives me an opportunity to start from the top:

module AirplaneConsumer

open RdKafka
open System
open FSharp.Collections.ParallelSeq
open Newtonsoft.Json

type Flight = { Year:int; Month:int; DayOfMonth:int; DayOfWeek:int; DepTime:Option<int>; CRSDepTime:int; ArrTime:Option<int>; CRSArrTime:int;
                UniqueCarrier:string; FlightNum:string; TailNum:string; ActualElapsedTime:Option<int>; CRSElapsedTime:Option<int>; AirTime:Option<int>;
                ArrDelay:Option<int>; DepDelay:Option<int>; Origin:string; Dest:string; Distance:int; TaxiIn:Option<int>; TaxiOut:Option<int>;
                Cancelled:bool; CancellationCode:string; Diverted:bool; CarrierDelay:int; WeatherDelay:int; NASDelay:int; SecurityDelay:int;
                LateAircraftDelay:int; OriginCity:string; OriginState:string; DestinationCity:string; DestinationState:string; }

let readFromBeginning (consumer:EventConsumer) =
    consumer.OnPartitionsAssigned.Add(fun(partitions) ->
        printfn "Starting from the beginning..."
        let fromBeginning = List.ofSeq partitions
                               |> List.map(fun(x) -> new TopicPartitionOffset(x.Topic, x.Partition, RdKafka.Offset.Beginning))
        let fb = new System.Collections.Generic.List<TopicPartitionOffset>(fromBeginning |> List.toSeq)

let processMessages (consumer:EventConsumer) n (flights:System.Collections.Generic.List<Flight>) =
    //Always start from the beginning.
    readFromBeginning consumer

    consumer.OnMessage.Add(fun(msg) ->
        let messageString = System.Text.Encoding.UTF8.GetString(msg.Payload, 0, msg.Payload.Length)
        let flight = JsonConvert.DeserializeObject<Flight>(messageString)

The processMessages function takes three parameters:  a consumer, a count of number of records before we spit out a message (which I’ve stripped out for simplicity’s sake), and a collection of Flights to which we append.

The readFromBeginning function looks pretty similar to Friday’s readFromBeginning, so I’ll just glance over that—we turn it on by default because I want to stream the entire data set.

From here, I hook into the OnMessage event just like before, and like before we decode the Kafka payload and turn it into a string.  Unlike before, however, I call Newtonsoft’s DeserializeObject method and return a Flight type, which I’ve defined above.  This is the same definition as in the Producer, so in a production-quality environment, I’d pull that out to a single location rather than duplicating it.

Going back to the main function, I call the consumer.Start() method and let ‘er rip.  When I’m ready to aggregate, I’ll hit the enter key and that’ll call consumer.Stop().  When that happens, I’m going to have up to 7 million records in a list called flights.  Out of all of this information, I only need two attributes:  the destination state and the arrival delay in minutes.  I get those by using the map function on my sequence of flights, taking advantage of F#’s match syntax to get all relevant scenarios safely and put the result into a tuple.  The resulting sequence of tuples is called flightTuple.  I pass that into the delaysByState function.

let delaysByState flightTuple =
    let totalFlights =
        |> Seq.countBy fst
        |> Seq.sort
    let delayedFlights =
        |> Seq.filter(fun(dest,delay) -> delay > 0)
        |> Seq.countBy fst
        |> Seq.sort
    let totalArrivalDelay =
        |> Seq.groupBy(fun(dest,delay) -> dest)
        |> Seq.map(fun(dest,delay) -> (dest,delay |> Seq.sumBy snd))
        |> Seq.sort
    let results =
        Seq.zip3 totalFlights delayedFlights totalArrivalDelay
        |> Seq.map(fun((dest,flights),(dest,delayed),(dest,delay)) -> dest,flights,delayed,delay)
        |> Seq.sort

This function is probably the most confusing function out of any of the three console apps for non-F# programmers.  I’m taking the sequence of tuples and processing them three times.  The first process gets a count of items grouped by the first element in the tuple (that is, destination state) and then sorting that tuple.  We call that totalFlights.  The second bit takes our sequence of flight tuples and retains only those with a delay of more than 0 minutes.  I then get the count of results by state and sort the tuple.  We call that delayedFlights.  Then, I take flightTuple one more time and group by destination, but instead of getting a count, I want to sum the second element in the tuple (delay in minutes), so I need to map my sequence, summing up by delay.  I sort that result set and call it totalArrivalDelay.  Finally, I build a results quadruple (n-tuple with 4 elements) and use the zip3 function to combine all three result sets together.  Don’t think of “zip” like compression; instead, think of it like a zipper:  it attaches elements of a data set together.  Here, we take the three tuples and zip them, leaving us with a quadruple.

Taking that results quadruple back to the main function, we iterate over each result in the sequence—that is, we iterate over each state—and spit out results to the console.  Here’s what it looks like:


We were able to process approximately 10,000 records per second and after approximately 1:45 of churn, we learned that New Jersey is the worst place to go with respect to arrival delays in 2008.


At this point, we now have a working three-piece system for processing data.  There are some things we can do to help make this a production-worthy system—for example, I could tighten up the Flights type to keep only the fields we actually need (so I don’t need to hold 6 GB of data in RAM).  But as it is, we have a working demo of Kafka interacting with .NET, and that’s good enough for me today.

Console App 2: The Enricher

Yesterday’s post introduced our first console application, which pushed data onto a Kafka topic entitled Flights.  Today’s console application will pull data from the Flights topic, give us a bit more information, and then push the final product onto an EnrichedFlights topic.

What Is This Enrichment Of Which You Speak?

Data enrichment is extremely common when dealing with streaming data.  What you have coming in is a fraction of the total useful data, and that’s by design:  you don’t want to send unnecessary reference data over the wire when you can just look that up locally.  If you’re processing 500 million records a day, saving a single byte on each record adds up quickly.

Also, sometimes the upstream sources don’t know all of the relevant information.  Let’s look at our initial data set for example.  Here’s a sample row:


In the middle, we see two airport codes:  IAD and TPA.  That tells us that this flight originated in Dulles (Virginia) and flew to Tampa (Florida).  We have the International Airport Transport Association (IATA) airport code, but this data set doesn’t have any additional details, so we’ll need to do that work ourselves.  I have that data in a SQL Server table, so I’m going to import the table contents and perform a lookup to get the correct value.

Next up, look near the right-hand side of the text and you’ll see a lot of “NA” values.  This data set came from R, where NA means “not applicable.”  If you look at the headers for this file, you’ll see that the string of NAs all tie to reasons for delay, such as weather, security incident, or mechanical issue.  If the flight takes off on time, those values are not applicable—there wasn’t a delay.

In .NET land, we don’t want to leave these as NA values; we want to convert them.  If I were writing C# code, I’d use nullable ints, but because I’m in F#, I am going to use Option values.  Options are fantastic because if we use them correctly, they ensure that we never end up with a null reference; you’re required to fill out every scenario, and there are patterns around accessing data safely.  My intent is to convert each of these NA values to an Option of the appropriate type.

Finally, after handling airport codes and NA values, I want to make sure that downstream readers have an easier time handling this data, so I’m going to serialize my results as JSON.  I know that the Newtonsoft.Json library makes dealing with JSON easy.

Let’s Get To The Code

Now that we know what we want to do, the code is pretty straightforward.  It’s nearly 150 lines of code now, so I’m going to break it up into chunks.

The Main Method

Our main method is a couple dozen lines of code, but it packs a wallop.  Let’s walk through it piece by piece, introducing the rest of our codebase as we go along:

let main argv =
    //Pull unrefined queue items from the flights queue.
    let (config:RdKafka.Config) = new Config(GroupId = "Airplane Enricher")
    use consumer = new EventConsumer(config, "sandbox.hortonworks.com:6667")
    let topics = ["Flights"]
    consumer.Subscribe(new System.Collections.Generic.List<string>(topics |> List.toSeq))

    //After enrichment, put the message onto the enriched flights queue
    use producer = new Producer("sandbox.hortonworks.com:6667")
    let metadata = producer.Metadata(allTopics=true)
    use topic = producer.Topic("EnrichedFlights")

    let conn = new System.Data.SqlClient.SqlConnection(connectionString)
    let airports = AirportSql.Create(conn).Execute() |> Seq.toList

    processMessages consumer topic true 1000 airports
    printfn "Started enricher. Press enter to stop enriching."
    System.Console.ReadLine() |> ignore
    consumer.Stop() |> ignore

    printfn "You've finished enriching some data.  Hit Enter to close this app." stopWatch.Elapsed
    System.Console.ReadLine() |> ignore

    0 // return an integer exit code

We’re going a bunch of setup work here, so let’s take it from the top.  First, I declare a consumer group, which I’m calling “Airplane Enricher.”  Kafka uses the concept of consumer groups to allow consumers to work in parallel.  Imagine that we have ten separate servers available to process messages from the Flights topic.  Each flight message is independent, so it doesn’t matter which consumer gets it.  What does matter, though, is that multiple consumers don’t get the same message, as that’s a waste of resources and could lead to duplicate data processing, which would be bad.

The way Kafka works around this is to use consumer groups:  within a consumer group, only one consumer will get a particular message.  That way, I can have my ten servers processing messages “for real” and maybe have another consumer in a different consumer group just reading through the messages getting a count of how many records are in the topic.  Once you treat topics as logs rather than queues, consumer design changes significantly.

The next couple lines of code tells our Kafka instance that we’re going to enlist this app as a consumer, reading from the Flights topic.  We’re allowed to read from multiple topics, but I only need to read from one today.

Then, like the producer we saw yesterday, we need to publish to a new Kafka topic.  This time, the topic is EnrichedFlights.  I don’t want to write back to the same topic, as then my consumers might get confused.  Topics are cheap, so don’t be afraid to spin up new ones.

After looking at topics, I see something familiar:  a SQL Server connection.  I want to hit SQL Server once, at the beginning of the process, and grab my metadata. I’m far enough along that it’s time to show the very top of my F# console app.

Using the SQL Type Provider

module AirplaneEnricher

open RdKafka
open System
open Newtonsoft.Json
open FSharp.Data
open FSharp.Data.SqlClient

let connectionString =
    @"Data Source=LOCALHOST;Initial Catalog=Scratch;Integrated Security=True"

type AirportSql =
    SqlCommandProvider<"SELECT IATA, Airport, City, State, Country, Lat, Long FROM dbo.Airports", connectionString>

After the open statements, I declare a connection string pointing to the Scratch database on my local instance. I then create an F# type (which, if you want, you can incorrectly think of as a class, as in this case it’s close enough for government work) called AirportSql.  I am going to populate the AirportSql type using a type provider, one of the nicest things about F#.  Type providers give you easy access to structured documents, automatically creating relevant types.  In this case, I’m using the FSharp.Data.SqlClient library, which gives me a micro-ORM.  Now the AirportSql type will be built around this simple SELECT statement, and I’ll get a type with attributes named IATA, Airport, City, etc., each of which has the correct data type.

Going back to my main function, I call the Create method on the type to build a SqlCommandProvider, and then call its Execute method to get an IEnumerable of AirportSql.Record values.  This is a lazy-loaded operation, and if I don’t force loading now, I’ll have to load the table for every record I pull off the Kafka topic, and that’d be a terrible waste.  To prevent this waste, I pipe the command to Seq.toList, which generates a list which we call airports.

Whew.  Now let’s get back to the main method.

Processing Messages

After we load the reference data from SQL Server, the next step is to call the processMessages function.  This function takes five parameters:

  1. The consumer we have set up to pull messages from Kafka.
  2. The topic to which we will publish enriched messages.
  3. A boolean flag which checks whether we want to start from the beginning of the Flights topic or if we want to pick up mid-stream.
  4. An indicator of how far along we are.  We print a line to the console after every n messages we receive.
  5. A list of airports that we can use for reference data.

The processMessages function itself is pretty simple:

let processMessages (consumer:EventConsumer) (topic:Topic) startFromBeginning n (airports:System.Collections.Generic.IEnumerable<AirportSql.Record>) =
    if startFromBeginning then readFromBeginning consumer
    consumer.OnMessage.Add(fun(msg) ->
        let rawFlightMessage = System.Text.Encoding.UTF8.GetString(msg.Payload, 0, msg.Payload.Length)
        //build flight type -- enrich
        let flight = buildFlight rawFlightMessage airports

        let jsonFlight = JsonConvert.SerializeObject(flight)
        publish topic jsonFlight

I stripped out the section where we occasionally write to the console, as it’s not core to the solution.  The wrap-up to this series will have a link to the full code, which includes this (as well as some other niceties I stripped out for pedagogical purposes).

The processMessages function starts by checking to see if startFromBeginning is true.  If so, it calls the readFromBeginning function:

let readFromBeginning (consumer:EventConsumer) =
    consumer.OnPartitionsAssigned.Add(fun(partitions) ->
        printfn "Starting from the beginning..."
        let fromBeginning = List.ofSeq partitions
                               |> List.map(fun(x) -> new TopicPartitionOffset(x.Topic, x.Partition, RdKafka.Offset.Beginning))
        let fb = new System.Collections.Generic.List<TopicPartitionOffset>(fromBeginning |> List.toSeq)

If I were adding new instances of this enricher to an already-running process, I’d set startFromBeginning to false and just pick up mid-stream. Because I’m doing a full load, however, I set it to true.

Notice that the readFromBeginning function is primarily event-driven code, so it looks similar to what you’d see with C#.  We add a handler to the OnPartitionsAssigned event.  This handler will print a message to the console and look for the beginning offsets for each partition in the topic.  That list becomes fromBeginning, and we turn that into a .NET list and assign that list to the consumer.

Going back to the processMessages method, we hook into the consumer’s OnMessage event and add a new handler which does all of our work.  For each message that comes in, we convert it to a string, enrich the data, and then publish the enriched data.

Building A Flight

The function we use to enrich data is called buildFlight.  It has a few helper functions, so I’m going to include them all here.

let filterNA (str:string) =
    let x = match str with
            | "NA" -> None
            | _ -> Some(Convert.ToInt32(str))

let convertNAint (str:string) =
    let x = match str with
            | "NA" -> 0
            | _ -> Convert.ToInt32(str)

let convertNAbool (str:string) =
    let x = match str with
            | "NA" -> false
            | "0" -> false
            | "1" -> true
            | _ -> false

let buildFlight (rawFlightMessage:string) (airports:System.Collections.Generic.IEnumerable<AirportSql.Record>) =
    let flightSplit = rawFlightMessage.Split(',')
    let origin = airports |> Seq.filter(fun f -> f.IATA.Value.Equals(flightSplit.[16])) |> Seq.head
    let destination = airports |> Seq.filter(fun f -> f.IATA.Value.Equals(flightSplit.[17])) |> Seq.head

    let flight = {
        Year = Convert.ToInt32(flightSplit.[0]);
        Month = Convert.ToInt32(flightSplit.[1]);
        DayOfMonth = Convert.ToInt32(flightSplit.[2]);
        DayOfWeek = Convert.ToInt32(flightSplit.[3]);
        DepTime = filterNA flightSplit.[4];
        CRSDepTime = Convert.ToInt32(flightSplit.[5]);
        ArrTime = filterNA flightSplit.[6];
        CRSArrTime = Convert.ToInt32(flightSplit.[7]);
        UniqueCarrier = flightSplit.[8];
        FlightNum = flightSplit.[9];
        TailNum = flightSplit.[10];
        ActualElapsedTime = filterNA flightSplit.[11];
        CRSElapsedTime = filterNA flightSplit.[12];
        AirTime = filterNA flightSplit.[13];
        ArrDelay = filterNA flightSplit.[14];
        DepDelay = filterNA flightSplit.[15];
        Origin = flightSplit.[16];
        Dest = flightSplit.[17];
        Distance = Convert.ToInt32(flightSplit.[18]);
        TaxiIn = filterNA flightSplit.[19];
        TaxiOut = filterNA flightSplit.[20];
        Cancelled = convertNAbool flightSplit.[21];
        CancellationCode = flightSplit.[22];
        Diverted = convertNAbool flightSplit.[23];
        CarrierDelay = convertNAint flightSplit.[24];
        WeatherDelay = convertNAint flightSplit.[25];
        NASDelay = convertNAint flightSplit.[26];
        SecurityDelay = convertNAint flightSplit.[27];
        LateAircraftDelay = convertNAint flightSplit.[28];
        OriginCity = origin.City.Value;
        OriginState = origin.State.Value;
        DestinationCity = destination.City.Value;
        DestinationState = destination.State.Value;

Let’s talk about the three helper functions first.  All three convert NA to reasonable values.  The first makes use of the Option type, converting NA to None and non-NA values to some integer value.  The other two functions convert the string value to an appropriate integer or boolean.

Moving on from the helper functions, we have the main function, buildFlight.  First, we split the incoming message into a comma-delimited array.  Next, we check the reference data set for our origin and destination airports.  Seq.filter returns a sequence, and we know that the IATA airport code is unique.  I also happen to know that all airport codes are in the data set, so I don’t need to put in error handling like I would in production code.

Once we have the origin and destiantion airports, I can turn this comma-separated value array into an F# type.  Note that I call the various filter/convert functions as needed.

With that, we can bounce back to the main function and wrap this up.

Closing Out The Main Method

At this point, we can look at the last few lines of the main function.  First up is a call to consumer.Start().  We need to call the Start method before consumer will begin listening.  It will continue to listen until we call the consumer.Stop() method.  In between, I’m going to let it run until we hit enter.

Here’s a copy of my enricher running through the 7 million record data set:


There’s one important thing to note here:  it didn’t really take 3 hours and 15 minutes to burn through this data set.  In reality, it took a little less than one hour, enriching somewhere around 2000 records per second.  But the consumer’s work is not done until you hit Enter, meaning that if I were to load the 2009 data, it would pick right back up where it left off and start enriching again.  We’ve completely decoupled the producer from the enricher, which has benefits but it does have this drawback.


As of now, we have enriched data, which includes the origin and destination airport cities and states.  We’ve also cleaned up all of those NA values and handled scenarios such as cancelled and diverted flights.  Our resulting records are in JSON format.  In case you’re interested in seeing what one of those looks like, here’s what it looks like:


Note that in the half-second or so that I let it run, it displayed 1256 messages on screen.  It’s pretty zippy.

On Monday, we’re going to look at writing a consumer for all of this enriched data, and will spend most of the rest of the week wrapping things up.

Console App 1: The Producer

Today’s post will look at the first of three console applications.  This app is going to read in flight data from an external source (in my case, a flat file) and push messages out to a Kafka topic.

A Sidebar:  F#

kafkadotnetsolutionIf you look on the left-hand side, you’ll see the three console projects that we are going to use for this Kafka solution.  You may notice that these are all F# projects.

Whenever I present my Integrating Hadoop and SQL Server talk, I always get asked one question:  “Why are you using F#?”  I like getting this question.  The honest answer is, because I enjoy the language.  But there’s a bit more to it than that.

First, let me respond to the important subtext of the question:  no, there’s nothing in this project that we absolutely need F# for.  I can just as easily write C# code to get to the same answer, and if you’re more comfortable writing code in C#, then go for it.

So why, then, do I persist?  Here are my reasons, in no particular order:

  1. I want to learn things, and the best way to learn a language is to use it.  I’m not particularly good at writing F# code; I’ve been exposed to it for a couple of years now and can code my way out of a wet paper bag, but it’d be scary to jump onto a project as an F# developer.  I want to change that.
  2. The reason I want to change that is that I see the power of functional languages.  If you’re not sold on the importance of functional languages, I need only note that both C# and Java have made significant functional inroads with LINQ/lambdas, closures, pipelines (via the fluent interface in C#), etc.
  3. I want to show that the entryway from C# to F# is pretty easy.  A very complex block of F# code will leave me scratching my head, absolutely, but I can give working, tutorial-style code to people and have them easily understand it.  If I’m going to show you easy code, I’d rather show you easy code in a language you don’t know—that way, you’re learning two things at the same time.  This does run the risk of unnecessarily confusing people, admittedly, but I think the risk is relatively low gien my target audience.
  4. I think there’s a synergy between functional languages and SQL.  I believe that having a background in SQL gives you an appreciation for set-based solutions and treating data and functions as first-class citizens.  A language like F# or Scala appeals to the relational database developer in me, as I’m using the pipeline to push data through functions rather than nesting for loops.
  5. The code is more concise.  My producer is barely 40 lines of code, including a big comment block.  This is probably a trivial enough application that I could write a similar C# app in not too many more lines of code, but one scales with respect to complexity much better than the other.

Those are my major reasons.  There are reasons not to use F#, but my belief is that the reasons to use this language in my demos outweigh the negatives.

Anyhow, with all of that said, let’s check out what I’m doing.

NuGet Packages

For the entire solution, I’m going to need a few NuGet packages.  Here are my final selections:


The first two are F#-specific packages.  We’ll see ParallelSeq in action today, but the SqlClient will have to wait until tomorrow, as will Newtonsoft.Json.

The last two libraries are Kafka libraries.  The first is rdkafka-dotnet.  This is a fairly thin wrapper around librdkafka.  Of all the .NET Kafka libraries I tried, this one made the most sense to me.

Other Kafka libraries for .NET that I looked at include:

  • Kafka Sharp, which is intended to be a fast implementation.
  • Franz, a Kafka library written in F# (but works just as well with C#).
  • Kafka4net, another implementation designed around Kafka 0.8.
  • CSharpClient-for-Kafka, Microsoft’s implementation.

You’re welcome to try out the different libraries and see what works best for you.  There was even an article written around writing your own Kafka library.

Loading Data

The console app is pretty simple.  Simple enough that I’m going to include the functional chunk in one code snippet:

module AirplaneProducer

open RdKafka
open System
open System.IO
open FSharp.Collections.ParallelSeq

let publish (topic:Topic) (text:string) =
    let data = System.Text.Encoding.UTF8.GetBytes(text)
    topic.Produce(data) |> ignore

let loadEntries (topic:Topic) fileName =
        |> PSeq.filter(fun e -> e.Length > 0)
        |> PSeq.filter(fun e -> not (e.StartsWith("Year")))   //Filter out the header row
        |> PSeq.iter(fun e -> publish topic e)

let main argv =
    use producer = new Producer("sandbox.hortonworks.com:6667")
    let metadata = producer.Metadata(allTopics=true)
    use topic = producer.Topic("Flights")

    //Read and publish a list from a file
    loadEntries topic "C:/Temp/AirportData/2008.csv"

    printfn "Hey, we've finished loading all of the data!"
    Console.ReadLine() |> ignore
    0 // return an integer exit code

That’s the core of our code.  The main function instantiates a new Kafka producer and gloms onto the Flights topic.  From there, we call the loadEntries function.  The loadEntries function takes a topic and filename.  It streams entries from the 2008.csv file and uses the ParallelSeq library to operate in parallel on data streaming in (one of the nice advantages of using functional code:  writing thread-safe code is easy!).  We filter out any records whose length is zero—there might be newlines somewhere in the file, and those aren’t helpful.  We also want to throw away the header row (if it exists) and I know that that starts with “Year” whereas all other records simply include the numeric year value.  Finally, once we throw away garbage rows, we want to call the publish function for each entry in the list.  The publish function encodes our text as a UTF-8 bytestream and pushes the results onto our Kafka topic.

Running from my laptop, loading 7 million or so records took 6 minutes and 9 seconds.


That averages out to about 19k records per second.  That’s slow, but I’ll talk about performance later on in the series.  For real speed tests, I’d want to use independent hardware and spend the time getting things right.  For this demo, I can live with these results (though I’d happily take an order of magnitude gain in performance…).  In testing the other two portions, this is the fastest of the three, which makes sense:  I’m doing almost nothing to the data outside of reading and publishing.


We’ve built a console app which pushes data to a Kafka topic.  That’s a pretty good start, but we’re only a third of the way done.  In tomorrow’s post, I’m going to pull data off of the Flights topic and try to make some sense of it.  Stay tuned!