Previously, we looked at basic querie which do not perform MapReduce operations.  Today’s post is going to look at queries which do perform MapReduce operations and we’ll see how they differ.

MapReduce Queries

In order for us to be able to perform a MapReduce operation, we need the external data source to be set up with a resource manager.  In addition, one of the following two circumstances must be met:

  1. Based on external table statistics, our query must be set up in such a way that it makes sense to distribute the load across the Hadoop data nodes and have the difference be great enough to make it worth waiting 15-30 seconds for a MapReduce job to spin up, OR
  2. We force a MapReduce job by using the OPTION(FORCE EXTERNALPUSHDOWN) query hint.

For the purposes of today’s post, I’m going to focus on the latter.  Regarding condition #1, my hunch is that there’s some kind of in-built cost optimizer the Polybase engine uses to determine whether to spin up a MapReduce job.  If this is in fact the case, it’s important to note that there is no “cost threshold for MapReduce” option that we can set, so if you’re looking for ways to fine-tune MapReduce operations, you may have to be liberal with the query hints to enable and disable external pushdown.

Our MapReduce query is pretty simple:

SELECT TOP(50)
	sb.FirstName,
	sb.LastName,
	sb.Age,
	a.AgeDesc,
	sb.Throws,
	sb.Bats,
	tsa.TopSalary
FROM dbo.SecondBasemen sb
	CROSS APPLY (SELECT CASE WHEN Age > 30 THEN 'Old' ELSE 'Not Old' END AS AgeDesc) a
	INNER JOIN dbo.TopSalaryByAge tsa
		ON sb.Age = tsa.Age
ORDER BY
	sb.LastName DESC
OPTION(FORCE EXTERNALPUSHDOWN);

The only thing I’ve done here compared to the basic query is to add a query hint to force external pushdown.  Let’s see how this changes things.

Step 1:  Communicating with Hadoop

Just like with the basic query, we can use Wireshark to see how the Polybase engine interacts with our Hadoop cluster.  This time around, there’s a lot more going on over the wire than in the basic query scenario; there are ten separate TCP streams which the Polybase engine initiated, and we’re going to take at least a quick look at each one.

mapreducepacketcapture
A quick look at the Wireshark packet capture.

Note that my names for these streams comes from the order in which they show up in Wireshark.  They might have specific internal names, but I’m not going to know what they are.

Stream 0

Stream 0 appears to act as a coordinator, setting everything up and building the framework for the rest of the network operations to succeed.  This first stream has three major sections.  The first section is around basic tasks, like making sure the file we are looking for exists:

mapreducesetup

Once we know it exists, we want to create a job.  Eventually, that job will complete (we hope).

mapreducecreate

Then, after everything completes, we get file info for the staging work:

mapreducegetfileinfo

Stream 0 runs through the entire operation, with the earliest packet being packet #7 and the last packet being #20118 of approximately 20286.  We communicate with the Hadoop name node on port 8020 for this stream.

Stream 1

The next stream is Stream 1.  This stream does two things.  The first thing, which you can see at the beginning of the stream text below, is to prep the MapReduce job, including running /bin/bash to set the CLASSPATH and LD_LIBRARY_PATH.

mapreducestream1

After doing that, it regularly calls YARN’s getApplicationReport method.  When it gets word that the MapReduce job is complete, it finishes its work.

mapreducestream1end

Stream 1 starts with packet #23 and ends with packet #20286.  But there’s a big gap between packets; stream 1 is responsible for packets 23-30, 128, and then most of 19989 through 20110.  This conversation takes place on port 8050 of the name node; remember that 8050 is the resource manager port for YARN.

Stream 2

Stream 2 is the single largest stream, responsible for 18,864 of the ~20,286 packets.  We can see exactly why by looking at the beginning and end sections of the stream.  First the beginning:

mapreducestream2start

And now the end:

mapreducestream2end

Stream 2 sends along 27 MB worth of data.  It’s packaging everything Polybase needs to perform operations, including JAR files and any internal conversion work that the Polybase engine might need to translate Hadoop results into SQL Server results.  For example, the sqlsort at the bottom is a DLL that performs ordering using SQL Server-style collations, as per the Polybase academic paper.

Stream 2 is responsible for almost every packet from 43 through 19811; only 479 packets in that range belonged to some other stream (19811 – 43 – 18864 – 425 = 479).  We send all of this data to the data node via port 50010.

Stream 3

After the behemoth known as Stream 2, the rest of these are pretty small.

mapreducestream3

This stream appears to perform a block write operation.  It is responsible for most of the packets from 19830 to 19846.  This conversation takes place over the data node’s port 50010.

Stream 4

This stream follows up on the work Stream 3 did.

mapreducestream4

It appears to be writing metadata, and is responsible for most of the packets from 19853 through 19866.  This communication is happening over the data node’s port 50010.

Stream 5

After two tiny streams, we get back to a slightly larger stream.

mapreducestream5

As you can guess from the contents, this stream creates the YARN XML job and passes it over the wire.  The stream starts with packet 19875 and ends at 19968, using 83 packets to transmit the payload.  We connect to port 50010 to push the YARN config file to the data node.

Stream 6

Our next stream is of the “child on a long road trip” variety.

mapreducestream6

Basically this stream calls two methods as part of a loop:  getTaskAttemptCompletionEvents and getJobReport.  It looks like there might be a periodic call of getCounters or this may be triggered by something.  In any event, the loop continues until getTaskAttemptCompletionEvents gets back a result.  In my case, the result fired back $http://sandbox.hortonworks.com:13562 .”(.  Of note is that port 13562 is the default MapReduce shuffle port.

Stream 6 was responsible for 80 of the packets from 20103 through 20209.  One interesting thing to note is that the Hadoop-side port was 48311, which I believe is just a dynamic port and does not signify anything like many of the other ports we’re looking at.

Stream 7

This is another loop-until-done stream, but it ended pretty quickly.

mapreducestream7

This looped for getting file info and listing information by connecting to the Hadoop name node on port 8020.  It took packets 20210 though 20224, as well as a few wrapup packets at the end.

Stream 8

Stream 8 retrieved information on the finished MapReduced data.

mapreducestream8

It first looks for a file called part-m-00000.ppax.  The PPAX format is a proprietary data format Polybase uses (the link is a Powerpoint deck that the current Polybase PM presented at SQL Saturday Boston).

Stream 8 is a TCP connection to the name node on port 8020 and stretches from 20225-20234 and a few packets after that as well.

Stream 9

The final Polybase stream actually receives the data in PPAX format.

mapreducestream9

We connect to the Hadoop data node on port 50010 and retrieve the data file.  This takes 34 packets and stretches from 20235 through 20280.

Step 2:  MapReduce Logs

For MapReduce logs, you’ll want to go to port 8088 of the name node, so for example, http://sandbox.hortonworks.com:8088/cluster.  On this link, we can see the various “applications” that have been created.

mapreducelogapplication

Clicking on one of these brings up details on the application, and we can see logs for each attempt.

mapreduceloglogs

If you click the Logs link, you’ll get to see the various logs, starting in short form and with the option to drill in further.

mapreducelogsyslog

In this example, we can see that the stderr and stdout logs didn’t have much, but the syslog log had more than 4K, so if we want to see the full log, we can click a link.

Step 3:  The Rest Of The Story

Once we have the file available, it’s time for Polybase to wrap up the story.  We can see this in the DMV outputs:

mapreducedmv

We can see that the dm_exec_external_work DMV looks slightly different, but still performs a file split.  But this time, instead of splitting the CSV file directly, we split the MapReduced file /user/pdw_user/TEMP_DIR/5130041d4035420b80f88096f197d4b2/Output/part-m-00000.ppax.  The operations to load the table are pretty similar to before, though:  we create a temp table, build statistics on the table, and then import the data using a round robin approach.

Conclusion

Figuring out how the Polybase engine performs MapReduce operations has been interesting.  For this simple single-Hadoop-node, single-Polybase-node, single-file setup, we saw ten separate TCP streams, and understanding how they work in conjunction with DMV and MapReduce logs can give you a fuller understanding of the Polybase MapReduce process.

3 thoughts on “Running MapReduce Polybase Queries

Leave a comment