Solving A Problem With Kafka

Today, we’re going to look at the specific problem I want to solve using Kafka.

The Data

I am going to use flight data because it is copious and free.  Specifically, I’m going to look at data from 2008, giving me approximately 7 million data points.

The Question

Given this data, I’d like to ask a couple of questions.  Specifically, by destination state, how many flights did we have in 2008?  How many of those flights were delayed?  How long were those delays in the aggregate?  And given that we’re going to experience a delay, how long can we expect it to be?

The Other Question:  Why Use Kafka?

Hey, I know how to solve this problem with SQL Server:  read the 2008 CSV file in and write a basic SQL query.  Or I could load the data in Hive and run the same query.

So why would I want to use Kafka?  Isn’t this overkill?  In this case it is.  But let’s spin the story a slightly different way:  instead of reading from a file, let’s pretend that our consumers are receiving messages from a web service or some other application.  We now want to stream the data in, possibly from a number of sources.  We can decouple message collection from message processing.

In the real world, my problem involves handling clickstream data, where we’re looking at hundreds of millions of messages per day.  So what I’m dealing with is certainly a toy problem, but it’s the core of a real problem.

The Infrastructure

So here’s my plan:  I’m going to have a console application read from my flight data file, putting messages into a single Kafka topic with a single partition.  The purpose of this app is to do nothing more than read text string and push them onto the topic, doing nothing fancy.

From there, I intend to enrich the data using another application.  This will process messages on the raw topic, clean up messages (e.g., fixing known bad data), and perform a lookup against some data in SQL Server.  Once we’re done with that, we can put the results on an enriched data topic.  Our final process will read from the enriched topic and build up a data set that we can use for our solution.

What’s Next?

The next four posts are going to cover some specifics.  In the next post, I’m going to walk through some of the really simple parts of Kafka, including setting up topics and pushing & receiving messages.  From there, the next three posts will look at the three console applications I’m going to write to handle the different stages of this application.  I’ll wrap it up with a couple more Kafka-related posts.  So stay tuned!

What Is Kafka?

I’m working on a new talk entitled Kafka for .NET Developers, which I plan to make part of my 2017 rotation.  I’m going to give this talk first at the Triangle Area .NET User Group’s Data SIG on October 19th, so it’s about time I got started on this…

Who Is Your Broker And What Does He Do?

Apache Kafka is a message broker.  Its purpose is to accept messages from one or more publishers and push messages out to one or more subscribers.  It’s not an Enterprise Service Bus; it’s a stream data platform.

Maybe I just don’t have my architect hat on today, but that seems like a lot of words to describe what’s going on.  I’m going to describe it my own way (with the bonus of probably getting a bunch of stuff wrong too!).

The Basics

We have an application which needs to send messages somewhere.  We have a separate application which needs to consume messages from that sender.  The easiest way to do that is to have the two applications directly communicate:


In this case, we have an app connecting directly to a database.  For low-scale systems, this is fine.  In fact, it’s better than fine:  it’s easy to understand, easy to work with, and easy to fix when things go wrong.

But it has two fundamental problems:  it doesn’t scale, and it doesn’t expand.  If I have more users trying to connect to may app than my app server can handle, users will have to wait in line for service.  Similarly, if my database cannot handle the user load my app can send, people have to wait in line.

So we decide to expand out.  The easiest method of scaling out is simply to throw more app servers at the problem:


That’s cool and is also pretty easy.  But what happens if our database doesn’t have the oomph to handle all of this work?  Then we could scale out the databases.  That looks like this:


We now have 12 separate connection points and probably some middleware to figure out when to connect what to where.  Things get more complex, but you can work out some technical details and get this running fine.  This can even scale out to hundreds of servers against dozens of databases.  But the problem is that you need to know which database to hit.  If you can guarantee this, then life is good and you can scale indefinitely.  But let’s say that we don’t necessarily know which consumer should get the message.  Or let’s say one of the consumers go down:


So what do we do with all of the messages that are supposed to go to that database?  In our current setup, the app either holds messages until the database is back or outright fails.


This is where a broker steps in.


The broker serves several purposes:

  1. Know who the producers are and who the consumers are.  This way, the producers don’t care who exactly consumes a message and aren’t responsible for the message after they hand it off.
  2. Buffer for performance.  If the consumers are a little slow at the moment but don’t usually get overwhelmed, that’s okay—messages can sit with the broker until the consumer is ready to fetch.
  3. Let us scale out more easily.  Need to add more producers?  That’s fine—tell the broker who they are.  Need to add consumers?  Same thing.
  4. What about when a consumer goes down?  That’s the same as problem #2:  hold their messages until they’re ready again.

So brokers add a bit of complexity, but they solve some important problems.  The nice part about a broker is that it doesn’t need to know anything about the messages, only who is supposed to receive it.

Enter Kafka

Now that we have an idea of what message brokers do, let’s look at Kafka specifically.

In Kafka terms, we have producers and consumers.  Producers push messages and consumers read messages.  In between, we have brokers.

Messages belong to topics.  Topics themselves are broken up into partitions.  Think of an individual partition as a log.  Kafka accepts messages from producers publishing to a specific topic and writes them to a partition.  Consumers can read from a topic, pulling messages from one or more partitions.

Speaking of messages, it’s important to note that Kafka is not a queue.  With a queue, a producer puts a message on and a consumer pops a message off.  Instead, Kafka is more like a log:  producers add messages and the broker keeps track of them over time.  Consumers then read from the log starting at any point, which means that they’re allowed to re-read messages.


The purpose of this blog post was to give a basic introduction to Kafka and help understand where it fits in an architecture.  Tomorrow, we’re going to look at the problem I intend to solve.

If you want more of an overview on Kafka, Kevin Sookocheff’s blog post on the topic is great.

Polybase And HDP 2.5

I didn’t want the next post in my Polybase series to be such a downer, but here goes:

tl;dr version:  It does not look like the Hortonworks Data Platform 2.5 sandbox works with Polybase at this time.

Time For An Update

The Hortonworks Data Platform 2.5 is now generally available.  It brings along with it niceties such as Spark 2.0, updated versions of Kafka & Storm, and quite a few little tweaks.  Because it’s shiny and new, I want to use it.

My goal was to set up everything so that I can give my Integrating Hadoop and SQL Server presentation on HDP 2.5.  Things started out positively:  the HDFS interface has changed a little bit, but nothing big:


I could also get Sqoop, .NET file access, and Hive to work just fine.  But when I tried to run a Polybase query, the query would run for just over 90 seconds and then fail with the following error message:


Msg 8680, Level 17, State 26, Line 20
Internal Query Processor Error: The query processor encountered an unexpected error during the processing of a remote query phase.

Where To Go When You Know It’s Broken

That error message isn’t incredibly helpful, but fortunately, there’s a bit more information available to us.

Polybase DMVs

There are a few Polybase DMVs, but the two that are relevant here are sys.dm_exec_external_work and sys.dm_exec_compute_node_errors.

Because this is a quiet system, I didn’t really need to find the execution ID and could go straight to the compute node errors DMV, but I included both queries here because troubleshooting in a production environment likely will require both.

FROM sys.dm_exec_external_work w;

FROM sys.dm_exec_compute_node_errors e
	e.execution_id = 'QID1136'
	e.create_time ASC;


Now we’re onto something:

Could not obtain block: BP-61928784- file=/tmp/ootp/secondbasemen.csv Microsoft.SqlServer.DataWarehouse.Common.ErrorHandling.MppSqlException:
Could not obtain block: BP-61928784- file=/tmp/ootp/secondbasemen.csv
at Microsoft.SqlServer.DataWarehouse.DataMovement.Common.ExternalAccess.HdfsBridgeReadAccess.Read(MemoryBuffer buffer, Boolean& isDone)
at Microsoft.SqlServer.DataWarehouse.DataMovement.Workers.DataReader.ExternalMoveBufferReader.Read()
at Microsoft.SqlServer.DataWarehouse.DataMovement.Workers.ExternalMoveReaderWorker.ReadAndSendData()
at Microsoft.SqlServer.DataWarehouse.DataMovement.Workers.ExternalMoveReaderWorker.Execute(Object status)

So this looks pretty bad:  we can’t obtain a data block from one of the datanodes.  My first step would be to attempt to pull this some other way, and I checked three different ways to make sure that the file is available and that it isn’t corrupt:

  1. I accessed the file through the HDFS graphical interface in Ambari.  I was able to preview the first several dozen rows and download the entire file without a problem.
  2. I accessed the file through the WebHDFS interface (port 50070) from a .NET application, pulling down the entire 777-row file with no problems.
  3. I ran a curl command to grab the file.  If you want to try this at home, you can build your own curl command.  Mine was:
# Basic information
curl -i
# Automatically redirect
curl -L


Note that all three of these methods go through the WebHDFS interface, so I didn’t need to do all three—I could have safely pulled the file from curl and gotten the same information.  Anyhow, now that we know that the file absolutely is there and is valid, let’s see what kind of story we can corroborate on our Hadoop cluster.

Checking Hadoop Logs

When I checked the Hadoop logs, I tried a few places.  If you want to play along at home and you have the Hortonworks sandbox, these logs are in a folder at

  1. The datanode logs ( had zero records for my Polybase query.  This told me that the datanode never received a request to do anything worth logging.
  2. The name node logs ( also had nothing of value.
  3. Because I have Ranger turned on, my audit log (hdfs-audit.log) does have something useful.
2016-09-28 03:05:15,675 INFO FSNamesystem.audit: allowed=true	ugi=pdw_user (auth:SIMPLE)	ip=	cmd=getfileinfo	src=/tmp/ootp/secondbasemen.csv	dst=null	perm=null	proto=rpc
2016-09-28 03:05:16,088 INFO FSNamesystem.audit: allowed=true	ugi=pdw_user (auth:SIMPLE)	ip=	cmd=open	src=/tmp/ootp/secondbasemen csv	dst=null	perm=null	proto=rpc
2016-09-28 03:05:39,004 INFO FSNamesystem.audit: allowed=true	ugi=pdw_user (auth:SIMPLE)	ip=	cmd=open	src=/tmp/ootp/secondbasemen csv	dst=null	perm=null	proto=rpc
2016-09-28 03:06:03,502 INFO FSNamesystem.audit: allowed=true	ugi=pdw_user (auth:SIMPLE)	ip=	cmd=open	src=/tmp/ootp/secondbasemen csv	dst=null	perm=null	proto=rpc
2016-09-28 03:06:38,392 INFO FSNamesystem.audit: allowed=true	ugi=pdw_user (auth:SIMPLE)	ip=	cmd=open	src=/tmp/ootp/secondbasemen csv	dst=null	perm=null	proto=rpc

First, it’s interesting to note that the Polybase engine uses “pdw_user” as its user account.  That’s not a blocker here because I have an open door policy on my Hadoop cluster:  no security lockdown because it’s a sandbox with no important information.  Second, my IP address on the main machine is and the name node for my Hadoop sandbox is at  These logs show that my main machine runs a getfileinfo command against /tmp/ootp/secondbasemen.csv.  Then, the Polybase engine asks permission to open /tmp/ootp/secondbasemen.csv and is granted permission.  Then…nothing.  It waits for 20-30 seconds and tries again.  After four failures, it gives up.  This is why it’s taking about 90 seconds to return an error message:  it tries four times.

Aside from this audit log, there was nothing interesting on the Hadoop side.  The YARN logs had nothing in them, indicating that whatever request happened never made it that far.

When Logs Just Won’t Do

Logs have failed us, but we still have one tool available to us:  network packet captures.  Fortunately, I have two machines with different versions of the Hadoop sandbox installed.  First, here’s my working machine.  In this case, the 137 IP address is my SQL Server VM, and my 149 IP address is my Hadoop sandbox/namenode/datanode combo.

The packet capture shows first that Polybase connects on port 8020.  We saw this earlier in the logs above, where the Polybase engine asks for file info (using the getBlockLocations method) and where the datanode lives.  Then, we get to the important part:  Polybase connects to the datanode on port 50010, and pulls the second basemen file from there.

By contrast, with my HDP 2.5 setup, Polybase tries to connect to the datanode, whose IP is (which is the internal Docker container address).  Every two seconds for the duration of my check, I get back the following message:  “Standard query response [hex ID] No such name PTR”  So it’s trying to connect directly to the datanode but can’t because the node is now within a Docker container.  Even though I’ve exposed port 50010 as a passthru port to Docker, the Polybase engine doesn’t care because the response that came back from the TCP session on port 8020 says to connect to as the datanode, not my name node’s external IP.  Note that I don’t have a good packet capture image.

Conclusions (For Now)

Like I mentioned at the top, this is a downer of a conclusion.  I really want to be able to use Polybase + HDP 2.5 + Docker, but Polybase requires direct access to the datanodes.  This doesn’t much matter with a classic Hadoop setup where all of the nodes are physical and it’s just a matter of setting up iptables correctly.  But in the New Dockerized World, it makes implementing Polybase a lot trickier.  I’m still searching for a solution.  I have a question out on the Hortonworks forums and am trying to bang on some Microsoft doors, so hopefully there will be some traction there, but in the meantime, it looks like I’ll have two HDP VMs on my machine.

October Presentations

After a busy summer, October is slowing down a bit on the speaking front.  As of right now, I have only three presentations lined up:

  1. SQL Saturday Pittsburgh was October 1st.  I gave my talk on integrating Hadoop with SQL Server.
  2. SQL Saturday Orlando is October 8th.  I will be giving my talk on R for SQL Server developers.
  3. The Triangle area .NET User Group (TriNUG) Data SIG is Wednesday, October 19th.  My topic is Kafka for .NET developers.  This is the first chunk of work for a larger project built around streaming data in Hadoop.

With presentations quieting down a bit, I should have some time to work more on next year’s slate of talks, but I’m saving that for another post…

Get The Right Sandbox File

It looks like there was a new HDP 2.5 sandbox VM image for VMware added to the Hortonworks download page after the original release.  You can easily check the MD5 hash to make sure it matches up.

Because I originally downloaded the HDP 2.5 VM on the morning of September 22nd, I have an old version whose hash is 74b279aaddfa30bb1e3cf65ff4e87855.  In Windows, you can get that by running the certUtil utility:

certUtil -hashfile HDP_2.5_vmware.ova MD5

I noticed several problems with this initial build (which is probably why there’s a new one!).  For example, there were a series of missing files in /kafka-logs/ATLAS_ENTITIES-0 and /kafka-logs/ATLAS_HOOK-0.  There were also some missing files in /var/lib/ambari-metrics-collector/hbase-tmp and a few others directories.  These missing files would cause services like Kafka and Ambari Metrics to fail at startup.

With the new version of the VM, I was able to start Kafka with zero problems.

TIL: Database Mirroring, TDE, And Encryption

I recently had to build database mirroring for a database with Transparent Data Encryption enabled. Ahmad Yaseen had a very nice walkthrough which covered most of what I needed to know. There were a couple things I needed to do to get everything working, so I figured it was worth a blog post.

Set Up Certificates

The first thing I had to do was set up a master key and certificate on my primary instance:

USE master;

Then we need to turn encryption on for the database, which is a two-step process:

USE [SomeDatabase]

At this point, TDE is on for the primary instance.


Now it’s time to take some backups. First, let’s back up the various keys and certificates:

USE [master]
--Back up the service master key
--Note that the password here is the FILE password and not the KEY password!
BACKUP SERVICE MASTER KEY TO FILE = 'C:\Temp\ServiceMasterKey.key' ENCRYPTION BY PASSWORD = 'Service Master Key Password';
--Back up the database master key
--Again, the password here is the FILE password and not the KEY password.
BACKUP MASTER KEY TO FILE = 'C:\Temp\DatabaseMasterKey.key' ENCRYPTION BY PASSWORD = 'Database Master Key Password';
--Back up the TDE certificate we created.
--We could create a private key with password here as well.
BACKUP CERTIFICATE [TDECertificate] TO FILE = 'C:\Temp\TDECertificate.cert'
	WITH PRIVATE KEY (FILE = 'C:\Temp\TDECertificatePrivateKey.key', ENCRYPTION BY PASSWORD = 'Some Private Key Password');

Then we want to take a database backup and log file backup. I’ll let you take care of that part.

Now I want to get mirroring set up.

Mirror Certificates

On the mirror instance, let’s restore the various certificates. I’m assuming that this is a true mirroring instance and that you haven’t created any keys. I also moved the keys, certificates, and backups over to the mirroring instance’s C:\Temp folder.

USE [master]
--Test restoration of the keys and certificate.
RESTORE SERVICE MASTER KEY FROM FILE = 'C:\Temp\ServiceMasterKey.key' DECRYPTION BY PASSWORD = 'Service Master Key Password';
--For the master key, we need to use the file decription and then the original password used for key encryption.  Otherwise,
--your restoration attempt will fail.
RESTORE MASTER KEY FROM FILE = 'C:\Temp\DatabaseMasterKey.key'
	DECRYPTION BY PASSWORD = 'Database Master Key Password'
	ENCRYPTION BY PASSWORD = 'Some Master Key Password' FORCE;
CREATE CERTIFICATE [TDECertificate] FROM FILE = 'C:\Temp\TDECertificate.cert'
WITH PRIVATE KEY (FILE = 'C:\Temp\TDECertificatePrivateKey.key', DECRYPTION BY PASSWORD = 'Some Private Key Password');

I needed to use the FORCE directive when restoring the master key. Otherwise, this part went smoothly.

Database And Log Restoration

Before restoring the database files, I needed to open the master key file.

RESTORE LOG [SomeDatabase] FROM DISK = 'C:\Temp\SomeDatabase.trn' WITH NORECOVERY, REPLACE;

Now we have the mirror database in Restoring mode, and it’s using our service master key, so TDE is ready to work on this database as well.

From there, I was able to finish setting up mirroring. Note that I didn’t need to do anything special to the witness server–it doesn’t need any certificates or keys to do its job.

Additional Links

In addition to Ahmad’s post, I read though this post on the topic. I also read Don Castelino’s post on mirroring configuration failure scenarios. I ran into a couple of these errors, so it was a helpful article.

Finally, I want to leave this Simon McAuliffe post on the uselessness of Transparent Data Encryption. I’m not sure I’m 100% in agreement, but it’s a thought-provoking post.