Yesterday’s post introduced our first console application, which pushed data onto a Kafka topic entitled Flights.  Today’s console application will pull data from the Flights topic, give us a bit more information, and then push the final product onto an EnrichedFlights topic.

What Is This Enrichment Of Which You Speak?

Data enrichment is extremely common when dealing with streaming data.  What you have coming in is a fraction of the total useful data, and that’s by design:  you don’t want to send unnecessary reference data over the wire when you can just look that up locally.  If you’re processing 500 million records a day, saving a single byte on each record adds up quickly.

Also, sometimes the upstream sources don’t know all of the relevant information.  Let’s look at our initial data set for example.  Here’s a sample row:

2008,1,3,4,2003,1955,2211,2225,WN,335,N712SW,128,150,116,-14,8,IAD,TPA,810,4,8,0,,0,NA,NA,NA,NA,NA

In the middle, we see two airport codes:  IAD and TPA.  That tells us that this flight originated in Dulles (Virginia) and flew to Tampa (Florida).  We have the International Airport Transport Association (IATA) airport code, but this data set doesn’t have any additional details, so we’ll need to do that work ourselves.  I have that data in a SQL Server table, so I’m going to import the table contents and perform a lookup to get the correct value.

Next up, look near the right-hand side of the text and you’ll see a lot of “NA” values.  This data set came from R, where NA means “not applicable.”  If you look at the headers for this file, you’ll see that the string of NAs all tie to reasons for delay, such as weather, security incident, or mechanical issue.  If the flight takes off on time, those values are not applicable—there wasn’t a delay.

In .NET land, we don’t want to leave these as NA values; we want to convert them.  If I were writing C# code, I’d use nullable ints, but because I’m in F#, I am going to use Option values.  Options are fantastic because if we use them correctly, they ensure that we never end up with a null reference; you’re required to fill out every scenario, and there are patterns around accessing data safely.  My intent is to convert each of these NA values to an Option of the appropriate type.

Finally, after handling airport codes and NA values, I want to make sure that downstream readers have an easier time handling this data, so I’m going to serialize my results as JSON.  I know that the Newtonsoft.Json library makes dealing with JSON easy.

Let’s Get To The Code

Now that we know what we want to do, the code is pretty straightforward.  It’s nearly 150 lines of code now, so I’m going to break it up into chunks.

The Main Method

Our main method is a couple dozen lines of code, but it packs a wallop.  Let’s walk through it piece by piece, introducing the rest of our codebase as we go along:

[<EntryPoint>]
let main argv =
    //Pull unrefined queue items from the flights queue.
    let (config:RdKafka.Config) = new Config(GroupId = "Airplane Enricher")
    use consumer = new EventConsumer(config, "sandbox.hortonworks.com:6667")
    let topics = ["Flights"]
    consumer.Subscribe(new System.Collections.Generic.List<string>(topics |> List.toSeq))

    //After enrichment, put the message onto the enriched flights queue
    use producer = new Producer("sandbox.hortonworks.com:6667")
    let metadata = producer.Metadata(allTopics=true)
    use topic = producer.Topic("EnrichedFlights")

    let conn = new System.Data.SqlClient.SqlConnection(connectionString)
    conn.Open()
    let airports = AirportSql.Create(conn).Execute() |> Seq.toList
    conn.Close()

    processMessages consumer topic true 1000 airports
    consumer.Start()
    printfn "Started enricher. Press enter to stop enriching."
    System.Console.ReadLine() |> ignore
    consumer.Stop() |> ignore

    printfn "You've finished enriching some data.  Hit Enter to close this app." stopWatch.Elapsed
    System.Console.ReadLine() |> ignore

    0 // return an integer exit code

We’re going a bunch of setup work here, so let’s take it from the top.  First, I declare a consumer group, which I’m calling “Airplane Enricher.”  Kafka uses the concept of consumer groups to allow consumers to work in parallel.  Imagine that we have ten separate servers available to process messages from the Flights topic.  Each flight message is independent, so it doesn’t matter which consumer gets it.  What does matter, though, is that multiple consumers don’t get the same message, as that’s a waste of resources and could lead to duplicate data processing, which would be bad.

The way Kafka works around this is to use consumer groups:  within a consumer group, only one consumer will get a particular message.  That way, I can have my ten servers processing messages “for real” and maybe have another consumer in a different consumer group just reading through the messages getting a count of how many records are in the topic.  Once you treat topics as logs rather than queues, consumer design changes significantly.

The next couple lines of code tells our Kafka instance that we’re going to enlist this app as a consumer, reading from the Flights topic.  We’re allowed to read from multiple topics, but I only need to read from one today.

Then, like the producer we saw yesterday, we need to publish to a new Kafka topic.  This time, the topic is EnrichedFlights.  I don’t want to write back to the same topic, as then my consumers might get confused.  Topics are cheap, so don’t be afraid to spin up new ones.

After looking at topics, I see something familiar:  a SQL Server connection.  I want to hit SQL Server once, at the beginning of the process, and grab my metadata. I’m far enough along that it’s time to show the very top of my F# console app.

Using the SQL Type Provider

module AirplaneEnricher

open RdKafka
open System
open Newtonsoft.Json
open FSharp.Data
open FSharp.Data.SqlClient

[<Literal>]
let connectionString =
    @"Data Source=LOCALHOST;Initial Catalog=Scratch;Integrated Security=True"

type AirportSql =
    SqlCommandProvider<"SELECT IATA, Airport, City, State, Country, Lat, Long FROM dbo.Airports", connectionString>

After the open statements, I declare a connection string pointing to the Scratch database on my local instance. I then create an F# type (which, if you want, you can incorrectly think of as a class, as in this case it’s close enough for government work) called AirportSql.  I am going to populate the AirportSql type using a type provider, one of the nicest things about F#.  Type providers give you easy access to structured documents, automatically creating relevant types.  In this case, I’m using the FSharp.Data.SqlClient library, which gives me a micro-ORM.  Now the AirportSql type will be built around this simple SELECT statement, and I’ll get a type with attributes named IATA, Airport, City, etc., each of which has the correct data type.

Going back to my main function, I call the Create method on the type to build a SqlCommandProvider, and then call its Execute method to get an IEnumerable of AirportSql.Record values.  This is a lazy-loaded operation, and if I don’t force loading now, I’ll have to load the table for every record I pull off the Kafka topic, and that’d be a terrible waste.  To prevent this waste, I pipe the command to Seq.toList, which generates a list which we call airports.

Whew.  Now let’s get back to the main method.

Processing Messages

After we load the reference data from SQL Server, the next step is to call the processMessages function.  This function takes five parameters:

  1. The consumer we have set up to pull messages from Kafka.
  2. The topic to which we will publish enriched messages.
  3. A boolean flag which checks whether we want to start from the beginning of the Flights topic or if we want to pick up mid-stream.
  4. An indicator of how far along we are.  We print a line to the console after every n messages we receive.
  5. A list of airports that we can use for reference data.

The processMessages function itself is pretty simple:

let processMessages (consumer:EventConsumer) (topic:Topic) startFromBeginning n (airports:System.Collections.Generic.IEnumerable<AirportSql.Record>) =
    if startFromBeginning then readFromBeginning consumer
    consumer.OnMessage.Add(fun(msg) ->
        let rawFlightMessage = System.Text.Encoding.UTF8.GetString(msg.Payload, 0, msg.Payload.Length)
        //build flight type -- enrich
        let flight = buildFlight rawFlightMessage airports

        //publish
        let jsonFlight = JsonConvert.SerializeObject(flight)
        publish topic jsonFlight
    )

I stripped out the section where we occasionally write to the console, as it’s not core to the solution.  The wrap-up to this series will have a link to the full code, which includes this (as well as some other niceties I stripped out for pedagogical purposes).

The processMessages function starts by checking to see if startFromBeginning is true.  If so, it calls the readFromBeginning function:

let readFromBeginning (consumer:EventConsumer) =
    consumer.OnPartitionsAssigned.Add(fun(partitions) ->
        printfn "Starting from the beginning..."
        let fromBeginning = List.ofSeq partitions
                               |> List.map(fun(x) -> new TopicPartitionOffset(x.Topic, x.Partition, RdKafka.Offset.Beginning))
        let fb = new System.Collections.Generic.List<TopicPartitionOffset>(fromBeginning |> List.toSeq)
        consumer.Assign(fb);
    )

If I were adding new instances of this enricher to an already-running process, I’d set startFromBeginning to false and just pick up mid-stream. Because I’m doing a full load, however, I set it to true.

Notice that the readFromBeginning function is primarily event-driven code, so it looks similar to what you’d see with C#.  We add a handler to the OnPartitionsAssigned event.  This handler will print a message to the console and look for the beginning offsets for each partition in the topic.  That list becomes fromBeginning, and we turn that into a .NET list and assign that list to the consumer.

Going back to the processMessages method, we hook into the consumer’s OnMessage event and add a new handler which does all of our work.  For each message that comes in, we convert it to a string, enrich the data, and then publish the enriched data.

Building A Flight

The function we use to enrich data is called buildFlight.  It has a few helper functions, so I’m going to include them all here.

let filterNA (str:string) =
    let x = match str with
            | "NA" -> None
            | _ -> Some(Convert.ToInt32(str))
    x

let convertNAint (str:string) =
    let x = match str with
            | "NA" -> 0
            | _ -> Convert.ToInt32(str)
    x

let convertNAbool (str:string) =
    let x = match str with
            | "NA" -> false
            | "0" -> false
            | "1" -> true
            | _ -> false
    x

let buildFlight (rawFlightMessage:string) (airports:System.Collections.Generic.IEnumerable<AirportSql.Record>) =
    let flightSplit = rawFlightMessage.Split(',')
    let origin = airports |> Seq.filter(fun f -> f.IATA.Value.Equals(flightSplit.[16])) |> Seq.head
    let destination = airports |> Seq.filter(fun f -> f.IATA.Value.Equals(flightSplit.[17])) |> Seq.head

    let flight = {
        Year = Convert.ToInt32(flightSplit.[0]);
        Month = Convert.ToInt32(flightSplit.[1]);
        DayOfMonth = Convert.ToInt32(flightSplit.[2]);
        DayOfWeek = Convert.ToInt32(flightSplit.[3]);
        DepTime = filterNA flightSplit.[4];
        CRSDepTime = Convert.ToInt32(flightSplit.[5]);
        ArrTime = filterNA flightSplit.[6];
        CRSArrTime = Convert.ToInt32(flightSplit.[7]);
        UniqueCarrier = flightSplit.[8];
        FlightNum = flightSplit.[9];
        TailNum = flightSplit.[10];
        ActualElapsedTime = filterNA flightSplit.[11];
        CRSElapsedTime = filterNA flightSplit.[12];
        AirTime = filterNA flightSplit.[13];
        ArrDelay = filterNA flightSplit.[14];
        DepDelay = filterNA flightSplit.[15];
        Origin = flightSplit.[16];
        Dest = flightSplit.[17];
        Distance = Convert.ToInt32(flightSplit.[18]);
        TaxiIn = filterNA flightSplit.[19];
        TaxiOut = filterNA flightSplit.[20];
        Cancelled = convertNAbool flightSplit.[21];
        CancellationCode = flightSplit.[22];
        Diverted = convertNAbool flightSplit.[23];
        CarrierDelay = convertNAint flightSplit.[24];
        WeatherDelay = convertNAint flightSplit.[25];
        NASDelay = convertNAint flightSplit.[26];
        SecurityDelay = convertNAint flightSplit.[27];
        LateAircraftDelay = convertNAint flightSplit.[28];
        OriginCity = origin.City.Value;
        OriginState = origin.State.Value;
        DestinationCity = destination.City.Value;
        DestinationState = destination.State.Value;
    }
    flight

Let’s talk about the three helper functions first.  All three convert NA to reasonable values.  The first makes use of the Option type, converting NA to None and non-NA values to some integer value.  The other two functions convert the string value to an appropriate integer or boolean.

Moving on from the helper functions, we have the main function, buildFlight.  First, we split the incoming message into a comma-delimited array.  Next, we check the reference data set for our origin and destination airports.  Seq.filter returns a sequence, and we know that the IATA airport code is unique.  I also happen to know that all airport codes are in the data set, so I don’t need to put in error handling like I would in production code.

Once we have the origin and destiantion airports, I can turn this comma-separated value array into an F# type.  Note that I call the various filter/convert functions as needed.

With that, we can bounce back to the main function and wrap this up.

Closing Out The Main Method

At this point, we can look at the last few lines of the main function.  First up is a call to consumer.Start().  We need to call the Start method before consumer will begin listening.  It will continue to listen until we call the consumer.Stop() method.  In between, I’m going to let it run until we hit enter.

Here’s a copy of my enricher running through the 7 million record data set:

Enricher.png

There’s one important thing to note here:  it didn’t really take 3 hours and 15 minutes to burn through this data set.  In reality, it took a little less than one hour, enriching somewhere around 2000 records per second.  But the consumer’s work is not done until you hit Enter, meaning that if I were to load the 2009 data, it would pick right back up where it left off and start enriching again.  We’ve completely decoupled the producer from the enricher, which has benefits but it does have this drawback.

Conclusion

As of now, we have enriched data, which includes the origin and destination airport cities and states.  We’ve also cleaned up all of those NA values and handled scenarios such as cancelled and diverted flights.  Our resulting records are in JSON format.  In case you’re interested in seeing what one of those looks like, here’s what it looks like:

enrichedflight

Note that in the half-second or so that I let it run, it displayed 1256 messages on screen.  It’s pretty zippy.

On Monday, we’re going to look at writing a consumer for all of this enriched data, and will spend most of the rest of the week wrapping things up.

One thought on “Console App 2: The Enricher

Leave a comment