Forcing External Pushdown

In the last post, we dug into how the MapReduce process works.  Today, I want to spend a little bit of time talking about forcing MapReduce operations and disabling external pushdown.

What Can We Force?

Before we get into the mechanics behind forcing pushdown, it’s important to note that predicate pushdown is somewhat limited in the sense of what information we can pass to the MapReduce engine.

For example, let’s say that I have a set of flight details in Hadoop and a set of airports in SQL Server.  Suppose I want to run the following query as quickly as possible:

SELECT
    *
FROM dbo.Flights f
    INNER JOIN dbo.Airports ar
        ON f.dest = ar.IATA
WHERE
    ar.state = 'OH'
    AND ar.city = 'Columbus';

Ideally, the engine would start with the Airports table, filtering by state and city and getting back the 3 records in that table which associate with Columbus airports.  Then, knowing the IATA values for those 3 airports, the Polybase engine would feed those into the remote query predicate and we’d get back the fewest number of rows.  There’s just one catch:

The Polybase engine does not work that way!

Unfortunately, my ideal scenario isn’t one that Polybase can give me (at least today; we’ll see if they can in the future).  The only predicates which the Polybase engine can push are those which are explicit, so we can’t feed a set of results from one part of a query plan into Hadoop.  If you did want to do something like that, I think the easiest solution would be to generate dynamic SQL and build an IN clause with the IATA/dest codes.

So with that in mind, let’s talk more about forcing and preventing pushdown.

Forcing Predicate Pushdown

As a reminder, in order to allow predicate pushdown to occur, we need to hit a Hadoop cluster; we can’t use predicate pushdown on other systems like Azure Blob Storage.  Second, we need to have a resource manager link set up in our external data source.  Third, we need to make sure that everything is configured correctly on the Polybase side.  But once you have those items in place, it’s possible to use the FORCE EXTERNALPUSHDOWN command like so:

SELECT TOP(50)
	sb.FirstName,
	sb.LastName,
	sb.Age,
	a.AgeDesc,
	sb.Throws,
	sb.Bats,
	tsa.TopSalary
FROM dbo.SecondBasemen sb
	CROSS APPLY (SELECT CASE WHEN Age > 30 THEN 'Old' ELSE 'Not Old' END AS AgeDesc) a
	INNER JOIN dbo.TopSalaryByAge tsa
		ON sb.Age = tsa.Age
ORDER BY
	sb.LastName DESC
OPTION(FORCE EXTERNALPUSHDOWN);

You might want to force external pushdown when you don’t have good external table stats but do have some extra knowledge the optimizer doesn’t have regarding the data set.  For example, you might know that you’re asking for a tiny fraction of the total data set to get returned but the optimizer may not be aware of this, and so it pulls the entire data set over to SQL Server for the operation.

Be wary of using this operation, however; it will spin up a MapReduce job, and that can take 15-30 seconds to start up.  If you can get your entire data set over the wire in 10 seconds, there’s little value in shunting the work off to your Hadoop cluster.

Disabling External Pushdown

The opposite of FORCE EXTERNALPUSHDOWN is DISABLE EXTERNALPUSHDOWN.  On Azure Blob Storage and other external sources, this is a no-op.  This is also a no-op if you have a Hadoop cluster but no resource manager set up.

With everything set up, it will ensure that you do not push down to your Hadoop cluster.  This might be a good thing if you consistently see faster times by just pulling the entire data set over the wire and performing any filters or operations in SQL Server.  Note that even with this option set, performance is still going to be a good bit better than using Hive over a linked server.

Pushdown-Related Errors

Let’s say that you want to run OPTION(FORCE EXTERNALPUSHDOWN) and you get an error as soon as the command starts:

Msg 7320, Level 16, State 110, Line 1
Cannot execute the query “Remote Query” against OLE DB provider “SQLNCLI11” for linked server “(null)”. Query processor could not produce a query plan because of the hints defined in this query. Resubmit the query without specifying any hints.

If you’re trying to perform this operation against a Hadoop cluster, you’ll want to re-read the configuration settings blog post.  There are several files which need to change and if you don’t get all of them, you’ll end up with strange-looking error messages.

Running MapReduce Polybase Queries

Previously, we looked at basic querie which do not perform MapReduce operations.  Today’s post is going to look at queries which do perform MapReduce operations and we’ll see how they differ.

MapReduce Queries

In order for us to be able to perform a MapReduce operation, we need the external data source to be set up with a resource manager.  In addition, one of the following two circumstances must be met:

  1. Based on external table statistics, our query must be set up in such a way that it makes sense to distribute the load across the Hadoop data nodes and have the difference be great enough to make it worth waiting 15-30 seconds for a MapReduce job to spin up, OR
  2. We force a MapReduce job by using the OPTION(FORCE EXTERNALPUSHDOWN) query hint.

For the purposes of today’s post, I’m going to focus on the latter.  Regarding condition #1, my hunch is that there’s some kind of in-built cost optimizer the Polybase engine uses to determine whether to spin up a MapReduce job.  If this is in fact the case, it’s important to note that there is no “cost threshold for MapReduce” option that we can set, so if you’re looking for ways to fine-tune MapReduce operations, you may have to be liberal with the query hints to enable and disable external pushdown.

Our MapReduce query is pretty simple:

SELECT TOP(50)
	sb.FirstName,
	sb.LastName,
	sb.Age,
	a.AgeDesc,
	sb.Throws,
	sb.Bats,
	tsa.TopSalary
FROM dbo.SecondBasemen sb
	CROSS APPLY (SELECT CASE WHEN Age > 30 THEN 'Old' ELSE 'Not Old' END AS AgeDesc) a
	INNER JOIN dbo.TopSalaryByAge tsa
		ON sb.Age = tsa.Age
ORDER BY
	sb.LastName DESC
OPTION(FORCE EXTERNALPUSHDOWN);

The only thing I’ve done here compared to the basic query is to add a query hint to force external pushdown.  Let’s see how this changes things.

Step 1:  Communicating with Hadoop

Just like with the basic query, we can use Wireshark to see how the Polybase engine interacts with our Hadoop cluster.  This time around, there’s a lot more going on over the wire than in the basic query scenario; there are ten separate TCP streams which the Polybase engine initiated, and we’re going to take at least a quick look at each one.

mapreducepacketcapture

A quick look at the Wireshark packet capture.

Note that my names for these streams comes from the order in which they show up in Wireshark.  They might have specific internal names, but I’m not going to know what they are.

Stream 0

Stream 0 appears to act as a coordinator, setting everything up and building the framework for the rest of the network operations to succeed.  This first stream has three major sections.  The first section is around basic tasks, like making sure the file we are looking for exists:

mapreducesetup

Once we know it exists, we want to create a job.  Eventually, that job will complete (we hope).

mapreducecreate

Then, after everything completes, we get file info for the staging work:

mapreducegetfileinfo

Stream 0 runs through the entire operation, with the earliest packet being packet #7 and the last packet being #20118 of approximately 20286.  We communicate with the Hadoop name node on port 8020 for this stream.

Stream 1

The next stream is Stream 1.  This stream does two things.  The first thing, which you can see at the beginning of the stream text below, is to prep the MapReduce job, including running /bin/bash to set the CLASSPATH and LD_LIBRARY_PATH.

mapreducestream1

After doing that, it regularly calls YARN’s getApplicationReport method.  When it gets word that the MapReduce job is complete, it finishes its work.

mapreducestream1end

Stream 1 starts with packet #23 and ends with packet #20286.  But there’s a big gap between packets; stream 1 is responsible for packets 23-30, 128, and then most of 19989 through 20110.  This conversation takes place on port 8050 of the name node; remember that 8050 is the resource manager port for YARN.

Stream 2

Stream 2 is the single largest stream, responsible for 18,864 of the ~20,286 packets.  We can see exactly why by looking at the beginning and end sections of the stream.  First the beginning:

mapreducestream2start

And now the end:

mapreducestream2end

Stream 2 sends along 27 MB worth of data.  It’s packaging everything Polybase needs to perform operations, including JAR files and any internal conversion work that the Polybase engine might need to translate Hadoop results into SQL Server results.  For example, the sqlsort at the bottom is a DLL that performs ordering using SQL Server-style collations, as per the Polybase academic paper.

Stream 2 is responsible for almost every packet from 43 through 19811; only 479 packets in that range belonged to some other stream (19811 – 43 – 18864 – 425 = 479).  We send all of this data to the data node via port 50010.

Stream 3

After the behemoth known as Stream 2, the rest of these are pretty small.

mapreducestream3

This stream appears to perform a block write operation.  It is responsible for most of the packets from 19830 to 19846.  This conversation takes place over the data node’s port 50010.

Stream 4

This stream follows up on the work Stream 3 did.

mapreducestream4

It appears to be writing metadata, and is responsible for most of the packets from 19853 through 19866.  This communication is happening over the data node’s port 50010.

Stream 5

After two tiny streams, we get back to a slightly larger stream.

mapreducestream5

As you can guess from the contents, this stream creates the YARN XML job and passes it over the wire.  The stream starts with packet 19875 and ends at 19968, using 83 packets to transmit the payload.  We connect to port 50010 to push the YARN config file to the data node.

Stream 6

Our next stream is of the “child on a long road trip” variety.

mapreducestream6

Basically this stream calls two methods as part of a loop:  getTaskAttemptCompletionEvents and getJobReport.  It looks like there might be a periodic call of getCounters or this may be triggered by something.  In any event, the loop continues until getTaskAttemptCompletionEvents gets back a result.  In my case, the result fired back $http://sandbox.hortonworks.com:13562 .”(.  Of note is that port 13562 is the default MapReduce shuffle port.

Stream 6 was responsible for 80 of the packets from 20103 through 20209.  One interesting thing to note is that the Hadoop-side port was 48311, which I believe is just a dynamic port and does not signify anything like many of the other ports we’re looking at.

Stream 7

This is another loop-until-done stream, but it ended pretty quickly.

mapreducestream7

This looped for getting file info and listing information by connecting to the Hadoop name node on port 8020.  It took packets 20210 though 20224, as well as a few wrapup packets at the end.

Stream 8

Stream 8 retrieved information on the finished MapReduced data.

mapreducestream8

It first looks for a file called part-m-00000.ppax.  The PPAX format is a proprietary data format Polybase uses (the link is a Powerpoint deck that the current Polybase PM presented at SQL Saturday Boston).

Stream 8 is a TCP connection to the name node on port 8020 and stretches from 20225-20234 and a few packets after that as well.

Stream 9

The final Polybase stream actually receives the data in PPAX format.

mapreducestream9

We connect to the Hadoop data node on port 50010 and retrieve the data file.  This takes 34 packets and stretches from 20235 through 20280.

Step 2:  MapReduce Logs

For MapReduce logs, you’ll want to go to port 8088 of the name node, so for example, http://sandbox.hortonworks.com:8088/cluster.  On this link, we can see the various “applications” that have been created.

mapreducelogapplication

Clicking on one of these brings up details on the application, and we can see logs for each attempt.

mapreduceloglogs

If you click the Logs link, you’ll get to see the various logs, starting in short form and with the option to drill in further.

mapreducelogsyslog

In this example, we can see that the stderr and stdout logs didn’t have much, but the syslog log had more than 4K, so if we want to see the full log, we can click a link.

Step 3:  The Rest Of The Story

Once we have the file available, it’s time for Polybase to wrap up the story.  We can see this in the DMV outputs:

mapreducedmv

We can see that the dm_exec_external_work DMV looks slightly different, but still performs a file split.  But this time, instead of splitting the CSV file directly, we split the MapReduced file /user/pdw_user/TEMP_DIR/5130041d4035420b80f88096f197d4b2/Output/part-m-00000.ppax.  The operations to load the table are pretty similar to before, though:  we create a temp table, build statistics on the table, and then import the data using a round robin approach.

Conclusion

Figuring out how the Polybase engine performs MapReduce operations has been interesting.  For this simple single-Hadoop-node, single-Polybase-node, single-file setup, we saw ten separate TCP streams, and understanding how they work in conjunction with DMV and MapReduce logs can give you a fuller understanding of the Polybase MapReduce process.

Running Basic Polybase Queries On Hadoop

Following the Polybase series, we have created an external table and we can now use it like any other table.  For example:

SELECT TOP(50)
	sb.FirstName,
	sb.LastName,
	sb.Age,
	a.AgeDesc,
	sb.Throws,
	sb.Bats,
	tsa.TopSalary
FROM dbo.SecondBasemen sb
	CROSS APPLY (SELECT CASE WHEN Age > 30 THEN 'Old' ELSE 'Not Old' END AS AgeDesc) a
	INNER JOIN dbo.TopSalaryByAge tsa
		ON sb.Age = tsa.Age
ORDER BY
	sb.LastName DESC;

In this query, dbo.SecondBasemen is an external table and TopSalaryByAge is a table in SQL Server.  I haven’t found any T-SQL syntax which does not work, and although I haven’t tried everything, I’ve tried a few wacky ideas.

With that in mind, let’s dig a little deeper into what’s actually happening when we kick off basic queries.

The Basic Query

For the purposes of discussion, I’m going to label “basic” queries as any query which does not generate MapReduce jobs.  What happens is that all of the data is streamed over the wire to our SQL Server instance and stored in a temp table (or multiple temp tables if you have a scale-out Polybase cluster).  The database engine then continues along its merry way, performing any joins or other T-SQL-specific operations.  Let’s look at an example in action by running the query in my intro section.

We’re going to look at a couple of sources to see what’s happening with our request:  a Wireshark packet capture and DMV outputs.

Step 1:  Communicating With Hadoop

First, let’s check out the Wireshark packet capture:

basicpacketcapture

We have three TCP streams running here, two from my port 49423 to the Hadoop node’s 8020, and one from my port 49439 to the Hadoop node’s 50010.  The first TCP stream connects to the name node and asks for details regarding a file located at /tmp/secondbasemen.csv.

nonmapreducefileinfo

The name node responds with some basic information which confirms that the file actually exists (and, incidentally, that the name node is up and running).  Then, the Polybase engine requests information on who owns copies of that file, specifically by calling the getBlockLocations method:

nonmapreducegetblocklocations

The name node returns information on which machines in the Hadoop cluster own which blocks (with block names starting with BP).  The Polybase engine also wants the server defaults to know of any particular client protocol oddities, and the name node returns its results.

The final stream is where the Polybase engine connects directly to the data node on port 50010 and requests the block contents.

nonmapreducetcpstream

In this case, it runs a non-MapReduce job, meaning it just wants the data node to stream its block to the Polybase engine.  The data node complies, sending our data in plaintext to our SQL Server instance.

Step 2:  Putting It Together

Once the Polybase engine has retrieved the data, it feeds that data into a temp table and continues the operation.  We can see this clearly from a pair of DMVs:

nonmapreduceworkresults

The dm_exec_external_work DMV tells us which execution we care about; in this case, I ended up running the same query twice, but I decided to look at the first run of it.  Then, I can get step information from dm_exec_distributed_request_steps.  This shows that we created a table in tempdb called TEMP_ID_14 and streamed results into it.  The engine also created some statistics (though I’m not quite sure where it got the 24 rows from), and then we perform a round-robin query.  Each Polybase compute node queries its temp table and streams the data back to the head node.  Even though our current setup only has one compute node, the operation is the same as if we had a dozen Polybase compute nodes.

From there, we’ve satisfied the Remote Query operation in the execution plan, and the rest is just like any other SQL Server query:

nonmapreduceexecutionplan

Note that there are some interesting Remote Query elements in the execution plan, and I plan to look at them in more detail later in the series.  For now, though, this should suffice.

Conclusion

Today’s post covers how to figure out what’s going on with non-MapReduce queries in Polybase.  I originally intended to cover MapReduce queries as well, but I realized that there’s a lot more going on and that it deserves its own blog post, and this post will appear tomorrow.  Stay tuned!

Let’s Build A Hadoop Cluster, Part 3

Last time around, we installed Ubuntu and Docker on our Hadoop cluster-to-be.  Now we strike and install Hadoop.

Caochong

My project of choice for installing a Hadoop cluster using Docker is Weiqing Yang’s caochong.  It’s pretty easy to install, so let’s get started.  I’m going to assume that you have a user account and have pulled caochong into a folder called caochong.

Making Changes

After grabbing the code, I’m going to go into caochong/from-ambari and edit the run.sh file.  The reason is that by default, the installation does not forward any ports to the outside world.  That’s fine if you’re testing Hadoop on your own machine (which is, admittedly, the point of this Docker project), but in our case, we’re doing some basic remote development, so I want to expose a series of ports and set privileged=true.

caochongchanges

In case you want to follow along at home or my default vim coloring scheme is hard to read, here’s the section I changed:

# launch containers
master_id=$(docker run -d --net caochong -p $PORT:8080 -p 6080:6080 -p 9090:9090 -p 9000:9000 -p 2181:2181 -p 8000:8000 -p 8020:8020 -p 42111:42111 -p 10500:10500 -p 16030:16030 -p 8042:8042 -p 8040:8040 -p 2100:2100 -p 4200:4200 -p 4040:4040 -p 8050:8050 -p 9996:9996 -p 9995:9995 -p 8088:8088 -p 8886:8886 -p 8889:8889 -p 8443:8443 -p 8744:8744 -p 8888:8888 -p 8188:8188 -p 8983:8983 -p 1000:1000 -p 1100:1100 -p 11000:11000 -p 10001:10001 -p 15000:15000 -p 10000:10000 -p 8993:8993 -p 1988:1988 -p 5007:5007 -p 50070:50070 -p 19888:19888 -p 16010:16010 -p 50111:50111 -p 50075:50075 -p 18080:18080 -p 60000:60000 -p 8090:8090 -p 8091:8091 -p 8005:8005 -p 8086:8086 -p 8082:8082 -p 60080:60080 -p 8765:8765 -p 5011:5011 -p 6001:6001 -p 6003:6003 -p 6008:6008 -p 1220:1220 -p 21000:21000 -p 6188:6188 -p 2222:22 -p 50010:50010 -p 6667:6667 -p 3000:3000 --privileged=true --name $NODE_NAME_PREFIX-0 caochong-ambari)
echo ${master_id:0:12} > hosts
for i in $(seq $((N-1)));
do
    container_id=$(docker run -d --net caochong --privileged=true --name $NODE_NAME_PREFIX-$i caochong-ambari)
    echo ${container_id:0:12} >> hosts
done

There’s a bit of downside risk to doing this:  I am forwarding all of these ports from my machine onto the name node’s Docker instance.  This means I have to install all of the Hadoop services on the name node, rather than splitting it over the various nodes (which is generally a smarter idea with a real cluster).

Anyhow, once that’s done, run ./run.sh --nodes=5 --port=8080 and you’ll get a lot of messages, one of which includes something like the following:

Hostnames.png

The box shows the five nodes that I’ve created.  Specifically, it gives us the Docker container names.  But which of those is the name node?

With caochong, you can tell which is the name node because it will have the name caochong-ambari-0 by default.  You can get that by running docker ps while the images are running.

HostNode.png

Once we know the primary node, we’re good to go.  We’ll need to copy all of those hostnames that get created and keep note of which one’s the name node when we install Hadoop via Ambari.  If you’ve forgotten those names and have closed the install window, don’t fret:  you can get those host names in a file named caochong/from-ambari/hosts.

Installing Hadoop Via Ambari

Once this is set up, we can connect to Ambari to finish installation.  We have port forwarding set up, so from a laptop or other device, you can connect via web browser to port 8080 on your NUC device’s IP address and you’ll get a screen which looks like this:

0_ambariwizard

We don’t have a cluster yet, so we’ll need to click the “Launch Install Wizard” button.  This will prompt us for a cluster name:

1a_namecluster

The next step is to figure out which version of the Hortonworks Data Platform we want to install:

1b_repos

The underlying Linux installation is Ubuntu 14, so we’ll select that box.  Note that I tried to trick the installer into installing HDP 2.5 by putting in the public repo information for 2.5, but it ended up still installing 2.4.  There might be some trick that I missed that gets it to work, though.

After selecting the repo, you get to list the nodes.  This is the set that you’ll copy and paste from the hosts list:

2_installoptions

You can safely ignore any warnings you get about not using fully-qualified domain names; within the Docker virtual network that caochong sets up, these are all accessible names.  After entering the hosts, you’ll want to copy and paste the SSH private key which gets generated.  That’s in caochong/from-ambari/id_rsa.  Copy the contents of that file into the SSH private key box and you can register and confirm the nodes.

3_confirmhosts

This can take a couple minutes but eventually all of the bars should go green and you’ll be able to click the Next button to go to the next page, where you get to select the services you want to install on this cluster.

4a_chooseservices

I selected pretty much all of the services, although if you’re testing production clusters, you might want to match whatever’s in your production cluster.  Going to the next step, you get the chance to set master nodes for the various services.

5_assignmasters

Notice how 3bd shows up for pretty much all of these services.  This is not what you’d want to do in a real production environment, but because we want to use Docker and easily pass ports through, it’s the simplest way for me to set this up.  If you knew beforehand which node would host which service, you could modify the run.sh batch script that we discussed earlier and open those specific ports.

After assigning masters, we next have to define which nodes are clients in which clusters.

6_assignclients

We want each node to be a data node, and from there, I spread out the load across the five.  That way, if one node goes down (e.g., if I’m testing a secondary node failure), there’s at least one other node which can take up the slack.

Once we’ve assigned nodes, it’s time to modify any configurations.  The installer is pretty helpful about what you need to modify, specifically passwords.

7_customizeservices

Each one of the numbered notes above is a password or secret key which needs set.  Fill those out and you can go to the next step, which is reviewing all of your changes.

8_review

Assuming you don’t want to make any changes, hit the Deploy button and you’ll get to watch an installer.

9_install

This installer will take quite some time.  I didn’t clock installations, but I wouldn’t be shocked if it took 45 minutes or so for everything to install.  But once you’re finished, you officially have a Hadoop cluster of your own.

Conclusion

In this third part of the Hadoop on the Go miniseries, we created a five-node cluster.  There are a couple more blog posts around administration that I want to get to, particularly around rebooting the cluster and quickly rebuilding the cluster (something I think will come to me as I become more familiar with Docker).

Let’s Build A Hadoop Cluster, Part 2

In part 1 of this series, we bought some hardware.  After patiently(?) waiting for it, we have the hardware and installed Ubuntu, so let’s keep going.

Docker?  I hardly even know her!

Hadoop on Docker is a relatively new thing.  Thanks to Randy Gelhausen’s work, the Hortonworks Data Platform 2.5 sandbox now uses Docker.  That has negative consequences for those of us who want to use Polybase with the sandbox, but for this series, I’m going to forego the benefits of Polybase to get a functional 5-node cluster on the go.

Before I go on, a couple notes about Docker and Hadoop.  First, why would I want to do this?  Let’s think about the various ways we can install Hadoop:

  1. As a one-node, standalone server.  This is probably the worst scenario because it’s the least realistic.  The benefit of Hadoop is its linear scalability, so you’re never going to run a production system with a single, standalone node.
  2. As a one-node cluster.  This is what the various sandboxes do, and they’re useful for getting your feet wet.  Because they’re set up to be multi-node clusters, you get to think about things like service placement and can write code the way you would in production.  Sandboxes are fine, but they’re not a great way of showing off Hadoop.
  3. As multiple VMs using VirtualBox or VMware.  This works, but virtual machines burn through resources.  Even with 32 GB of RAM, putting three or four VMs together will burn 8-12 GB of RAM just for the operating systems and 5x the disk.
  4. As multiple containerized nodes.  In other words, put together several nodes in Docker.  Each node has a much lower resource overhead than a virtual machine, so I can easily fit 5 nodes.  I also have one copy of the Ubuntu image and small marginal disk additions for the five nodes.

With these in mind, and because I want to show off Hadoop as Hadoop and not a glorified single-node system, I’m going to use Docker.

Installing Docker

After setting up Ubuntu, the first step is to install Docker.  It’s really easy to do on Ubuntu, and probably takes 10 minutes to do.  If you want to learn more about Docker, Pluralsight has a Container Management path which is extremely helpful.

Setting Up A Cluster

There are a few guides that you can follow for setting up a multi-node Hadoop cluster with Docker.  Here are a few I didn’t use, but which you might find helpful:

I used Kiwen Lau’s blog post first to understand how to do it, but I wanted to put together a Hortonworks Data Platform installation instead.  In Friday’s post, I’ll show you the project I used to set up a HDP cluster in Docker.

Let’s Build A Hadoop Cluster, Part 1

I’m taking a short break from my Polybase series to start a series on setting up a Hadoop cluster you can put in a laptop bag.  For today’s post, I’m going to walk through the hardware.

My idea for my on-the-go cluster hardware comes from an Allan Hirt blog post.  After seeing how powerful the Intel NUC NUC6i7KYK, which features a quad-core i7 processor and the ability to slot in 32 GB of RAM and two NVMe hard drives.  It’s also small, barely larger than a DVD case in length and width, and about an inch thick.  This thing fits easily in a laptop bag, and the power supply brick is about the size of a laptop power supply, so it’s not tiny but it’s still portable.  Note that if you want this to be an on-the-go device, you’ll need a stable source of power; it doesn’t have a laptop battery or any other built-in way to keep power going when it’s unplugged.

The Purchases List

Here’s what I ended up buying.  It’s not the same as Allan’s kit, but it does the job for me:

The NUC is a bare-bones kit with a case, motherboard, and CPU.  You need to buy and install RAM and hard drive, and you can also install an external video card if you’d like.  Of course, you’ll need a monitor, keyboard, and mouse, but I had those lying around.

Preparations

I decided to install Ubuntu 16.04 LTS as my base operating system.  It’s a reasonably new Ubuntu build but still stable.  Why Ubuntu?  Because Docker.  Yes, Windows Server 2016 has Docker support, but I’ll stick with Linux because I have appropriate images and background with the operating system to set it up alright.  If you’re comfortable with some other Linux build (CentOS, Red Hat, Arch, whatever), go for it.  I’m most comfortable with Ubuntu.

I also had to grab the latest version of the NUC BIOS.  You can read the install instructions as well.

Where We’re Going From Here

On Wednesday, I’m going to walk us through setting up Docker and putting together the image.  On Friday, I’ll install a 5-node Hadoop cluster on my NUC.  I have a couple more posts planned out in the Hadoop series as well, so stay tuned.

Curated SQL At One Year

Curated SQL is just over a year old now.  I started work on it during PASS Summit 2015, and I’m happy with the results so far.

By The Numbers

I’m up to 2026 posts on Curated SQL.  My three largest categories are Administration, Cloud, and Hadoop.  I used to call “Cloud” Azure, but kept running into relevant AWS posts.  I have 7 categories with just one post:  Containers (that’ll increase soon), Riak, Soft Skills, Durability, Hashing, Filestream, and Policy-Based Management.

I should also note that I’m pretty bad about tagging things and terrible about going back and tagging old posts with new categories, so even though I’ve mentioned Docker quite frequently, Containers has just one post.

I’ve had just over 79K unique visitor-days.  Unfortunately, the stats package I use doesn’t give me total unique visitors across all days.  There’s a nice positive trend in daily views and I hope that continues to rise.

Over the past 365 days, the top country is the United States (46,361 unique visitor-days), followed by Germany, Ukraine, and France.

Where Curated SQL Is Going

I think I’ve landed in a groove with Curated SQL.  My theme shows 10 posts at a time, so I have a personal rule not to link to more than 10 things per day.  I’ve broken that rule a few times, but it is in the back of my mind.

Overall, I don’t have any major plans for changing Curated SQL.  I like the format and want it to be a site that you can hit on a coffee break and find an interesting blog post or two.

I do have a minor change that I’ve kicked around in my head a few times.  Right now, I schedule things to go out at 8 AM through 8:20 AM Eastern time.  That way, Europeans can hit the site during the mid-afternoon doldrums and Americans can check it out whenever they want.  I’ve thought about trying to spread the load, scheduling posts at 8 AM, 10 AM, noon, 2 PM, and 4 PM.  Doing that has its upsides and downsides, and I haven’t come to a firm decision just yet.

Regardless, I’m looking forward to year 2 of Curated SQL.  There’s a lot out there to read…

Loading Into Columnstore: Avoid Trickle Loads

I’m going to tell this one in story format, so here’s the short of it up front:

tl;dr — Clustered columnstore indexes don’t like the combination of wipe-and-replace with multi-threaded trickle loaders.  Avoid that pattern.

The Setup

In the olden days, we had a large fact table with a standard clustered index and some standard non-clustered indexes.  Because the primary use of this fact table was to aggregate fairly large amounts of data, performance was less than stellar.

Then came SQL Server 2014, with its promises of updatable, clustered columnstore indexes.  I jumped on this immediately and replaced my large fact table with one built off of a clustered columnstore index.  Initial testing looked great, giving me a 2-5X performance gain depending upon the query, and that was good enough for our PM to authorize moving forward with the project.

After that project went live, all was well…for about a week.  Then things started slowing down.  It took a while before we were able to tell that there was a problem and, from there, find the root cause.

The Root Cause

To understand the problem, let’s talk a little about our infrastructure.  We load the warehouse on a per-client, per-day basis and we have a number of processes which load data concurrently.  In other words, one process loads data for Client A on June 1st while another process may load data for Client B on June 1st and a third process loads data for Client C on June 2nd, all at the same time.  Loading data includes two steps:  deleting current data and loading new data.  The first step of deleting current data happens because it turned out to be much more efficient in our scenario to delete old records and then insert new records rather than trying to merge data in (either using the MERGE keyword or combination INSERT/UPDATE/DELETE statements).

There are two major issues that we experienced with this pattern against a clustered columnstore index in SQL Server 2014.  First, there was no way to reorganize or rebuild the index online in SQL Server 2014, meaning that the only way I could clean up deleted records would be to rebuild an entire partition.  Given that our hot load partitions are also hot access (specifically, the current and prior months) and we’re a 24/7 company, rebuilding those partitions is pretty much out of the question.  This means that I wouldn’t be able to clean out partitions which are full of deleted records.  That means that my compressed columnstore rowgroups were woefully under-populated.

At the same time, we experienced large numbers of open rowgroups in the deltastore, many of which contained just a few records.  My best understanding of why this happened is as follows:  when a process goes to delete records for a customer-day combination, that process can lock multiple deltastore rowgroups.  If other processes are trying to insert data into the deltastore while that first process tries to delete records, they’ll open new rowgroups because the current ones are locked.  After a while, we’d end up with hundreds or thousands of open rowgroups in the deltastore, many of which contained well under 10,000 rows apiece but which added up to tens of millions of records in total.  Given the way the deltastore works (it’s a big heap), having to scan a giant heap made our queries slower.  The worst part is that because these rowgroups tended not to grow much in size, the tuple mover wouldn’t do anything with them, so they’d just accumulate as new deltastore rowgroups get created and populated.

SQL Server 2016 gave me the ability to reorganize indexes online, which was a great addition as it allowed us to keep those columnstore tables online while reorganizing the partitions and smashing together all of those open rowgroups and combine together the mostly-empty rowgroups.  But that’s not a real solution to the problem; it just buys a bit of time and masks the symptoms.

Possible Solutions

Now we’re in the architecture portion of today’s post.  There are three potential solutions I want to bring up, two of which I’ve implemented in production at one time or another.

Rowstore Front-End

The first architecture involves putting a rowstore table in front of the columnstore.

rowstorefrontend

In this design, I have data coming from the transactional system, undergoing some ETL processing, and going into a staging table on the warehouse.  From there, I perform the remainder of the ETL work and insert into a rowstore table.  This rowstore table has the same attribute names and data types as the columnstore table, but instead of having a clustered columnstore index, it has a standard B-tree index and can have additional non-clustered indexes.  From there, I expose the combination table using a view which simply unions the two sets of data so the application doesn’t have to see rowstore versus columnstore tables.

To move data from the rowstore to the columnstore, I have an external migration process.  This migration process waits until one of two conditions is met:  either there are at least 250,000 records in a single partition, or there is data from at least 4 partitions ago.  In other words, for last three months (including the current), I’d hold off on migrating data until I hit the magic number of 250K records, so that I could migrate that as a single batch and bulk insert the results, bypassing the deltastore altogether.  For older data, my expectation was that these are typically one-off or smaller data moves, and so waiting for 250K records was folly, as that might never come.  Instead, move those immediately to keep the rowstore table compact.  The migration process I wrote looked at data by partition, so I could pull in data from 6 months ago while still waiting for the current partition to accumulate enough records to make that move worthwhile.

Advantages
  1. I get immediate access to the data once it makes its way into the rowstore.  This gets me access to the data earlier than the other alternatives.
  2. It solves one of the pain points I expressed above.  Hot records don’t go into the deltastore, so we don’t see a proliferation of open rowgroups.  Also, depending upon how quickly we reload data, it might solve the other problem as well:  if data doesn’t get reloaded very frequently, letting it sit in a rowgroup for a day or so means that if we delete and reinsert data, we aren’t deleting from the clustered columnstore index.
Disadvantages
  1. This is a relatively complex design to implement, especially with a zero down-time release and a significant amount of existing code looking for tables.
  2. If a huge number of rows get into the rowstore table, query performance won’t be that great because we’re unioning rowstore values and columnstore values and performing calculations later (which negates some of the effect of having a columnstore index).
  3. If I have to reprocess months worth of data, that columnstore gets hit hard.
  4. It seems harder to follow.  I had other database engineers regularly ask about these tables and some of our software engineers and testers found them vexing.
  5. You have to keep two tables in sync, so whenever I add a column to one side, I have to add the column to the other side and to the migration procedure and to the view.

I used this model for SQL Server 2014, but then removed it after we moved to SQL Server 2016 and went back to direct columnstore insertion.  My hope was that I would not need to re-implement something for 2016, but that ended up not being the case.

Staging Table Direct Insertion

Once I started experiencing the same problems in SQL Server 2016, I had to act.  Instead of once more putting a rowstore in front of my columnstore table, I decided to increase complexity in the ETL process to simplify application querying.  To wit, I created a two-step load process.  The first step of the load process, moving data from our transactional system into the warehouse, remains a trickle load, inserting records for a customer-date combination into a memory-optimized staging table.  Once that load process is complete, the ETL process inserts a row into a warehouse queue table, letting the next step know that this customer-day combo is ready to go.

stagingtabledirectinsertion

From there, I let the staging table grow a bit and run a job periodically to bulk move the staging table rows into the columnstore.  Now I delete and insert larger chunks of rows, usually in the 250-800K range.  This means that I avoid the deltastore completely and get a pretty decent row count in each compressed columnstore rowgroup.

Advantages
  1. Seamless post-ETL process.  No new tables and no views.
  2. Calculations don’t need to hit the rowstore table, so I can take full advantage of columnstore aggregations.
Disadvantages
  1. Data loads are no longer “instantaneous.”  I have to wait a bit longer for the second step of the process to commit before records show up in the warehouse.
  2. Additional ETL complexity means there are more things that could break.
  3. There is a possibility of weird data issues.  For example, if I’m trying to load a customer-day combo while the first phase of the ETL process is trying to re-insert that data, I could get inconsistent results.  I needed to add in checks preventing this from happening.

This is my current architecture.  I’m happier with it than with the original rowstore table architecture.

Bulk Insertion

The third architecture is simple:  don’t do trickle load at all.  For many companies, it’s okay to do a major load of data once a day or once every several hours.  If that’s the case, I’d recommend doing a bulk insertion over trying to implement a trickle load.  Just like the other two methods, bulk insertion bypasses the deltastore when you’re loading enough records.

Ideally, this bulk insertion would be a straight insert, never updating or deleting data.  If you can get away with it, the ideal pattern would be something like this:

bulkswap

If I need to load data for July 5th, I’m going to load all of the data for the partition which contains July 5th into a new table with that partition, and then I’m going to swap out the corresponding partition on the columnstore side.  I would want to partition by the normal load unit—for example, if we load data monthly, I’d partition by month; if we load data daily, I’d partition by day if possible.

Advantages
  1. No deletes from the columnstore means no fragmentation.
  2. Columnstore rowgroups are as packed as possible.  If there’s any doubt, we can run an index rebuild on the new partition before swapping it, as nothing reads that table.
  3. Just like the staging table direct insert, I don’t need to make any application changes or create new supporting objects outside of ETL.
Disadvantages
  1. Data loading must be periodic, and will probably be slower than trickle loads.  You’re probably loading once a day or once a month at this point.
  2. If just one row changes, you have to rebuild the entire partition.  That can be a time sink when partitions get very large.

This is the “ideal” solution, but making it work when customers expect data in less than 20 minutes is tricky.  The staging table direct insert architecture seems to be a reasonable compromise between spray loading data and waiting a long time for data.

Use SQL Server 2016

Regardless of the architecture, SQL Server 2016 is a must-have for clustered columnstore indexes.  The ability to reorganize indexes online is a life-saver.  There is the possibility for these reorganizations to block queries for short amounts of time, but that’s a huge benefit if you do find yourself deleting a lot of data in the columnstore.

Conclusion

In this post, we looked at three architectures for loading data into columnstore indexes, with a special focus on trickle loading data.  The common denominator for all of these is good staging tables to absorb the first wave of changes and move data into the columnstore in bulk.

R For The DBA: Graphing Rowcounts

Something I am trying to harp upon is that R isn’t just a language for data analysts; it makes sense for DBAs to learn the language as well.  Here’s a really simple example.

The Setup

I have a client data warehouse which holds daily rollups of revenue and cost for customers.  We’ve had some issues with the warehouse lately where data was not getting loaded due to system errors and timeouts, and our services team gave me a list of some customers who had gaps in their data due to persistent processing failures.  I figured out the root cause behind this (which will show up as tomorrow’s post), but I wanted to make sure that we filled in all of the gaps.

My obvious solution is to write a T-SQL query, getting some basic information by day for each customer.  I could scan through that result set, but the problem is that people aren’t great at reading tables of numbers; they do much better looking at pictures.  This is where R comes into play.

The Solution

My solution is just a few lines of R code, as well as a few lines of T-SQL.  I’m using SQL Server 2016 but we don’t use SQL Server R Services (yet!), so I’m doing this the “old-fashioned” way by pulling data across the wire.  Here’s the code:

install.packages("ggplot2")
install.packages("RODBC")
library(RODBC)
library(ggplot2)

conn <- odbcDriverConnect("Driver=SQL Server;Server=MYSERVER;Initial Catalog=MYDATABASE;Provider=SQLNCLI11.1;Integrated Security=SSPI")

wh <- sqlQuery(conn, "SELECT fc.CustomerWK, fc.DateWK, COUNT(1) AS NumRecords, SUM(fc.Revenue) AS Revenue, SUM(fc.Cost) AS Cost FROM MYDATABASE.dbo.FactTable fc WHERE fc.CustomerWK IN (78296,80030,104098,104101,104104,108371) AND fc.DateWK > 20160901 GROUP BY fc.CustomerWK, fc.DateWK;")

wh$CustomerWK <- as.factor(wh$CustomerWK)
wh$DateWK <- as.Date(as.character(wh$DateWK), "%Y%m%d")
str(wh)

ggplot(wh, aes(x=DateWK, y=NumRecords, colour=CustomerWK, group=CustomerWK)) +
  geom_line() +
  xlab("Date") +
  ylab("Number Of Records")

Let’s walk through this step by step.  After installing and loading the two relevant packages (RODBC to connect to SQL Server and ggplot2 to help us create a pretty graph), I open a connection to my server (with the name replaced to protect the innocent).

Next, I create a data frame called wh and populate it with the results of a query to the warehouse.  This is a pretty simple SQL query which gets the number of rows by customer by day, and also shows me revenue and cost.  I’m not using revenue and cost in this graph, but did look at them as part of my sanity checks.

Next up, I want to “fix” a couple data types.  CustomerWK is an int and represents the customer’s surrogate key.  Although this is an integer type, it’s really a factor.  I have a small, unique set of categories; there’s no mathematical relationship between CustomerWK 1 and CustomerWK2.  Anyhow, I replace CustomerWK with this new, factorized attribute.

After taking care of the CustomerWK factor, I convert DateWK to a date type.  DateWK is an integer representation of the date in ISO format, so January 15th, 2016 would be represented as 20160115.  I need to convert this from an integer to a character string, and then I can convert it to a date.  I replace the DateWK value with this date type.  I included the str(wh) call to show that my data frame really does have the correct types.

Finally, I call ggplot, passing in my warehouse data frame.  I create an aesthetic, which tells the graphing engine what I want to see on the screen.  I want to see the number of records in the fact table per day for each customer, so my Y coordinate is defined by NumRecords, my X coordinate by DateWK, and my group by CustomerWK.  To make it easier to read, I color-code each customer.

After creating the aesthetic, I plot the results as a line graph using the geom_line() function, and then give meaningful X and Y axis labels.

The Results

What I get in return is a decent-enough looking graph:

numrecordsbyday

I can easily see that customer 108371 experienced a major dropoff sometime in mid-October, and fell off the cliff in early November.  The other customers have been fairly stable, leading me to believe that just one customer (in this cohort) has an issue.  I was able to investigate the issue and determine the root cause of the falloff—that the customer stopped sending us data.

Conclusion

This is another example where knowing a little bit of R can be very helpful.  Even if you aren’t building predictive models or performing advanced regressions, the ability to throw results into a plot and quickly spot outliers makes the job much easier.  If I had to discern results from a Management Studio result set, I could still do the job, but I might have been thrown for a loop with customer 78296, whose counts fluctuated over a fairly wide band.

Creating External Tables

At this point, we’ve got Polybase configured and have created external data sources and file formats (and by the way, if you haven’t been keeping up, I am tracking my entire Polybase series there).  Now we want to create an external table to do something with all of this.  As usual, my first stop is MSDN, which gives us the following syntax for creating an external table from within SQL Server or Azure SQL Database.

Looking At Syntax

CREATE EXTERNAL TABLE [ database_name . [ schema_name ] . | schema_name. ] table_name   
    ( <column_definition> [ ,...n ] )  
    WITH (   
        LOCATION = 'folder_or_filepath',  
        DATA_SOURCE = external_data_source_name,  
        FILE_FORMAT = external_file_format_name  
        [ , <reject_options> [ ,...n ] ]  
    )  
[;]  
  
<reject_options> ::=  
{  
    | REJECT_TYPE = value | percentage  
    | REJECT_VALUE = reject_value  
    | REJECT_SAMPLE_VALUE = reject_sample_value  
  
}  
  
-- Create a table for use with Elastic Database query  
CREATE EXTERNAL TABLE [ database_name . [ schema_name ] . | schema_name. ] table_name   
    ( <column_definition> [ ,...n ] )  
    WITH ( <sharded_external_table_options> )  
[;]  
  
<sharded_external_table_options> ::=  
        DATA_SOURCE = external_data_source_name,   
        SCHEMA_NAME = N'nonescaped_schema_name',  
        OBJECT_NAME = N'nonescaped_object_name',  
        [DISTRIBUTION  = SHARDED(sharding_column_name) | REPLICATED | ROUND_ROBIN]]  
    )  
[;]  

For now, we’re going to care about the first version; I’ll look at Elastic Database later on in this series, but for this post, we’re looking at Hadoop.

The create statement itself is pretty easy, and the first half of the statement looks like what you’d expect from any other table, save for the term “EXTERNAL” in the create statement.  It’s when you get down to the WITH clause that things start getting a bit different.

First, you have to define a location, data source, and file format.  LOCATION is the location of the file or folder, so in my case, it’s going to be in HDFS, so I want to specify that path.  For example, I have a file in ‘/tmp/ootp/secondbasemen.csv’ so I just put that in.  Note that you don’t need to put in IP address, port, or anything else because those details are already defined in the external data source.  If you don’t remember what they are, you can check the DMV:

SELECT *
FROM sys.external_data_sources eds
WHERE
	eds.name = 'HDP2';

The column named location already has the first half of our statement, so we don’t need to specify that location again.

The DATA_SOURCE and DATA_FORMAT options are easy:  pick you external data source and external file format of choice.

The last major section deals with rejection.  We’re going from a semi-structured system to a structured system, and sometimes there are bad rows in our data, as there are no strict checks of structure before inserting records.  The Hadoop mindset is that there are two places in which you can perform data quality checks:  in the original client (pushing data into HDFS) and in any clients reading data from HDFS.  To make things simpler for us, the Polybase engine will outright reject any records which do not adhere to the quality standards you define when you create the table.  For example, let’s say that we have a Age column for each of our players, and that each age is an integer.  If the first row of our file has headers, then the first row will literally read “Age” and conversion to integer will fail.  Polybase rejects this row (removing it from the result set stream) and increments a rejection counter.  What happens next depends upon the reject options.

There are two rejection types, value and percentage.  When you set REJECT_TYPE to VALUE, it’s pretty easy:  a query fails once it hits REJECT_VALUE failed records.  For example, if you set REJECT_VALUE to 10, your query will succeed if the query set returned had up to 10 bad values, but as soon as you get that 11th bad row, the query fails with an error.

Msg 8680, Level 17, State 26, Line 35
Internal Query Processor Error: The query processor encountered an unexpected error during the processing of a remote query phase.

Looking at the compute node errors, we can see the issue:

SELECT TOP(10) *
FROM sys.dm_exec_compute_node_errors decne
ORDER BY
	decne.create_time DESC;

Look at the details column and we can see the details behind why this query failed:

Query aborted– the maximum reject threshold (0 rows) was reached while reading from an external source: 1 rows rejected out of total 1 rows processed. (/tmp/ootp/secondbasemen.csv) Column ordinal: 0, Expected data type: INT, Offending value: Casey  (Column Conversion Error), Error: Error converting data type NVARCHAR to INT.

I set the reject value to 0 and the first row caused me to fail.

If I set the REJECT_TYPE to PERCENTAGE, then the REJECT_VALUE becomes the percent of records which need to be bad before the query fails, and I need to specify REJECT_SAMPLE_VALUE.  I ran a test with a sample value of 100 and a reject percentage of 1, and here’s what the results look like:

 107091;Query aborted– the maximum reject threshold (1 %) was reached while reading from an external source: 777 rows rejected out of total 777 rows processed.

The MSDN documentation states that it should try to run a check every REJECT_SAMPLE_VALUE rows, but in this case, it pulled all 777 and rejected all of them.

Focus On Data Types

The linked MSDN document above shows how each SQL data type converts to a Hadoop data type.  The mappings are fairly straightforward, though note that the money and smallmoney types convert to doubles in Hadoop, meaning that money is imprecise; decimal types in SQL Server convert to decimals in Hadoop, so if you want to avoid floating point irregularities, stick to decimal.

Sample Script

Here is a sample to create a table called SecondBasemen.

CREATE EXTERNAL TABLE [dbo].[SecondBasemen]
(
	[FirstName] [VARCHAR](50) NULL,
	[LastName] [VARCHAR](50) NULL,
	[Age] [INT] NULL,
	[Throws] [VARCHAR](5) NULL,
	[Bats] [VARCHAR](5) NULL
)
WITH
(
	DATA_SOURCE = [HDP2],
	LOCATION = N'/tmp/ootp/secondbasemen.csv',
	FILE_FORMAT = [TextFileFormat],
	REJECT_TYPE = VALUE,
	REJECT_VALUE = 5
);

By this point, there’s nothing special about this table creation statement, and there aren’t that many other options you can choose.  You could change the reject type to be percentage-based:

CREATE EXTERNAL TABLE [dbo].[SecondBasemen]
(
	[FirstName] [VARCHAR](50) NULL,
	[LastName] [VARCHAR](50) NULL,
	[Age] [INT] NULL,
	[Throws] [VARCHAR](5) NULL,
	[Bats] [VARCHAR](5) NULL
)
WITH
(
	DATA_SOURCE = [HDP2],
	LOCATION = N'/tmp/ootp/secondbasemen.csv',
	FILE_FORMAT = [TextFileFormat],
	REJECT_TYPE = PERCENTAGE,
	REJECT_VALUE = 15,
	REJECT_SAMPLE_VALUE = 500
);

Miscellany

External tables do not store their data in SQL Server; they are truly external, and every query is a remote query.  You can store statistics (which I’ve talked about in the past and will discuss in more detail later), but there’s no such thing as indexing external tables, so the only data we have about that table is metadata.

Speaking of metadata, when you create the table, it shows up in sys.tables like any other table.  If you set the is_external flag to 1, you can see all external tables in the current database:

SELECT *
FROM sys.tables
WHERE
	is_external = 1;

Conclusion

The script to create an external table is pretty easy, with relatively few options.  You will want to fine-tune those rejection counts, though.  The problem is that if you set the rejection count too low, you’ll run the risk of having queries fail after a very long time, likely without returning any data to you.  Imagine that you have a 20 billion row table and you’re returning 10 million records, of which 50 are bad.  If you set the reject value to 49, then you might have to do all of that work on 10 million records just to watch it fail at the very end.  But at the same time, if you have a large percentage of records failing, you’d want to know that (especially if you can fix the HDFS files) rather than setting a reject count to 100% and blissfully ignoring the problem.

If you need to drop an external table, it’s easy:  use the DROP EXTERNAL TABLE command.