Let’s Build A Hadoop Cluster, Part 1

I’m taking a short break from my Polybase series to start a series on setting up a Hadoop cluster you can put in a laptop bag.  For today’s post, I’m going to walk through the hardware.

My idea for my on-the-go cluster hardware comes from an Allan Hirt blog post.  After seeing how powerful the Intel NUC NUC6i7KYK, which features a quad-core i7 processor and the ability to slot in 32 GB of RAM and two NVMe hard drives.  It’s also small, barely larger than a DVD case in length and width, and about an inch thick.  This thing fits easily in a laptop bag, and the power supply brick is about the size of a laptop power supply, so it’s not tiny but it’s still portable.  Note that if you want this to be an on-the-go device, you’ll need a stable source of power; it doesn’t have a laptop battery or any other built-in way to keep power going when it’s unplugged.

The Purchases List

Here’s what I ended up buying.  It’s not the same as Allan’s kit, but it does the job for me:

The NUC is a bare-bones kit with a case, motherboard, and CPU.  You need to buy and install RAM and hard drive, and you can also install an external video card if you’d like.  Of course, you’ll need a monitor, keyboard, and mouse, but I had those lying around.

Preparations

I decided to install Ubuntu 16.04 LTS as my base operating system.  It’s a reasonably new Ubuntu build but still stable.  Why Ubuntu?  Because Docker.  Yes, Windows Server 2016 has Docker support, but I’ll stick with Linux because I have appropriate images and background with the operating system to set it up alright.  If you’re comfortable with some other Linux build (CentOS, Red Hat, Arch, whatever), go for it.  I’m most comfortable with Ubuntu.

I also had to grab the latest version of the NUC BIOS.  You can read the install instructions as well.

Where We’re Going From Here

On Wednesday, I’m going to walk us through setting up Docker and putting together the image.  On Friday, I’ll install a 5-node Hadoop cluster on my NUC.  I have a couple more posts planned out in the Hadoop series as well, so stay tuned.

Curated SQL At One Year

Curated SQL is just over a year old now.  I started work on it during PASS Summit 2015, and I’m happy with the results so far.

By The Numbers

I’m up to 2026 posts on Curated SQL.  My three largest categories are Administration, Cloud, and Hadoop.  I used to call “Cloud” Azure, but kept running into relevant AWS posts.  I have 7 categories with just one post:  Containers (that’ll increase soon), Riak, Soft Skills, Durability, Hashing, Filestream, and Policy-Based Management.

I should also note that I’m pretty bad about tagging things and terrible about going back and tagging old posts with new categories, so even though I’ve mentioned Docker quite frequently, Containers has just one post.

I’ve had just over 79K unique visitor-days.  Unfortunately, the stats package I use doesn’t give me total unique visitors across all days.  There’s a nice positive trend in daily views and I hope that continues to rise.

Over the past 365 days, the top country is the United States (46,361 unique visitor-days), followed by Germany, Ukraine, and France.

Where Curated SQL Is Going

I think I’ve landed in a groove with Curated SQL.  My theme shows 10 posts at a time, so I have a personal rule not to link to more than 10 things per day.  I’ve broken that rule a few times, but it is in the back of my mind.

Overall, I don’t have any major plans for changing Curated SQL.  I like the format and want it to be a site that you can hit on a coffee break and find an interesting blog post or two.

I do have a minor change that I’ve kicked around in my head a few times.  Right now, I schedule things to go out at 8 AM through 8:20 AM Eastern time.  That way, Europeans can hit the site during the mid-afternoon doldrums and Americans can check it out whenever they want.  I’ve thought about trying to spread the load, scheduling posts at 8 AM, 10 AM, noon, 2 PM, and 4 PM.  Doing that has its upsides and downsides, and I haven’t come to a firm decision just yet.

Regardless, I’m looking forward to year 2 of Curated SQL.  There’s a lot out there to read…

Loading Into Columnstore: Avoid Trickle Loads

I’m going to tell this one in story format, so here’s the short of it up front:

tl;dr — Clustered columnstore indexes don’t like the combination of wipe-and-replace with multi-threaded trickle loaders.  Avoid that pattern.

The Setup

In the olden days, we had a large fact table with a standard clustered index and some standard non-clustered indexes.  Because the primary use of this fact table was to aggregate fairly large amounts of data, performance was less than stellar.

Then came SQL Server 2014, with its promises of updatable, clustered columnstore indexes.  I jumped on this immediately and replaced my large fact table with one built off of a clustered columnstore index.  Initial testing looked great, giving me a 2-5X performance gain depending upon the query, and that was good enough for our PM to authorize moving forward with the project.

After that project went live, all was well…for about a week.  Then things started slowing down.  It took a while before we were able to tell that there was a problem and, from there, find the root cause.

The Root Cause

To understand the problem, let’s talk a little about our infrastructure.  We load the warehouse on a per-client, per-day basis and we have a number of processes which load data concurrently.  In other words, one process loads data for Client A on June 1st while another process may load data for Client B on June 1st and a third process loads data for Client C on June 2nd, all at the same time.  Loading data includes two steps:  deleting current data and loading new data.  The first step of deleting current data happens because it turned out to be much more efficient in our scenario to delete old records and then insert new records rather than trying to merge data in (either using the MERGE keyword or combination INSERT/UPDATE/DELETE statements).

There are two major issues that we experienced with this pattern against a clustered columnstore index in SQL Server 2014.  First, there was no way to reorganize or rebuild the index online in SQL Server 2014, meaning that the only way I could clean up deleted records would be to rebuild an entire partition.  Given that our hot load partitions are also hot access (specifically, the current and prior months) and we’re a 24/7 company, rebuilding those partitions is pretty much out of the question.  This means that I wouldn’t be able to clean out partitions which are full of deleted records.  That means that my compressed columnstore rowgroups were woefully under-populated.

At the same time, we experienced large numbers of open rowgroups in the deltastore, many of which contained just a few records.  My best understanding of why this happened is as follows:  when a process goes to delete records for a customer-day combination, that process can lock multiple deltastore rowgroups.  If other processes are trying to insert data into the deltastore while that first process tries to delete records, they’ll open new rowgroups because the current ones are locked.  After a while, we’d end up with hundreds or thousands of open rowgroups in the deltastore, many of which contained well under 10,000 rows apiece but which added up to tens of millions of records in total.  Given the way the deltastore works (it’s a big heap), having to scan a giant heap made our queries slower.  The worst part is that because these rowgroups tended not to grow much in size, the tuple mover wouldn’t do anything with them, so they’d just accumulate as new deltastore rowgroups get created and populated.

SQL Server 2016 gave me the ability to reorganize indexes online, which was a great addition as it allowed us to keep those columnstore tables online while reorganizing the partitions and smashing together all of those open rowgroups and combine together the mostly-empty rowgroups.  But that’s not a real solution to the problem; it just buys a bit of time and masks the symptoms.

Possible Solutions

Now we’re in the architecture portion of today’s post.  There are three potential solutions I want to bring up, two of which I’ve implemented in production at one time or another.

Rowstore Front-End

The first architecture involves putting a rowstore table in front of the columnstore.

rowstorefrontend

In this design, I have data coming from the transactional system, undergoing some ETL processing, and going into a staging table on the warehouse.  From there, I perform the remainder of the ETL work and insert into a rowstore table.  This rowstore table has the same attribute names and data types as the columnstore table, but instead of having a clustered columnstore index, it has a standard B-tree index and can have additional non-clustered indexes.  From there, I expose the combination table using a view which simply unions the two sets of data so the application doesn’t have to see rowstore versus columnstore tables.

To move data from the rowstore to the columnstore, I have an external migration process.  This migration process waits until one of two conditions is met:  either there are at least 250,000 records in a single partition, or there is data from at least 4 partitions ago.  In other words, for last three months (including the current), I’d hold off on migrating data until I hit the magic number of 250K records, so that I could migrate that as a single batch and bulk insert the results, bypassing the deltastore altogether.  For older data, my expectation was that these are typically one-off or smaller data moves, and so waiting for 250K records was folly, as that might never come.  Instead, move those immediately to keep the rowstore table compact.  The migration process I wrote looked at data by partition, so I could pull in data from 6 months ago while still waiting for the current partition to accumulate enough records to make that move worthwhile.

Advantages
  1. I get immediate access to the data once it makes its way into the rowstore.  This gets me access to the data earlier than the other alternatives.
  2. It solves one of the pain points I expressed above.  Hot records don’t go into the deltastore, so we don’t see a proliferation of open rowgroups.  Also, depending upon how quickly we reload data, it might solve the other problem as well:  if data doesn’t get reloaded very frequently, letting it sit in a rowgroup for a day or so means that if we delete and reinsert data, we aren’t deleting from the clustered columnstore index.
Disadvantages
  1. This is a relatively complex design to implement, especially with a zero down-time release and a significant amount of existing code looking for tables.
  2. If a huge number of rows get into the rowstore table, query performance won’t be that great because we’re unioning rowstore values and columnstore values and performing calculations later (which negates some of the effect of having a columnstore index).
  3. If I have to reprocess months worth of data, that columnstore gets hit hard.
  4. It seems harder to follow.  I had other database engineers regularly ask about these tables and some of our software engineers and testers found them vexing.
  5. You have to keep two tables in sync, so whenever I add a column to one side, I have to add the column to the other side and to the migration procedure and to the view.

I used this model for SQL Server 2014, but then removed it after we moved to SQL Server 2016 and went back to direct columnstore insertion.  My hope was that I would not need to re-implement something for 2016, but that ended up not being the case.

Staging Table Direct Insertion

Once I started experiencing the same problems in SQL Server 2016, I had to act.  Instead of once more putting a rowstore in front of my columnstore table, I decided to increase complexity in the ETL process to simplify application querying.  To wit, I created a two-step load process.  The first step of the load process, moving data from our transactional system into the warehouse, remains a trickle load, inserting records for a customer-date combination into a memory-optimized staging table.  Once that load process is complete, the ETL process inserts a row into a warehouse queue table, letting the next step know that this customer-day combo is ready to go.

stagingtabledirectinsertion

From there, I let the staging table grow a bit and run a job periodically to bulk move the staging table rows into the columnstore.  Now I delete and insert larger chunks of rows, usually in the 250-800K range.  This means that I avoid the deltastore completely and get a pretty decent row count in each compressed columnstore rowgroup.

Advantages
  1. Seamless post-ETL process.  No new tables and no views.
  2. Calculations don’t need to hit the rowstore table, so I can take full advantage of columnstore aggregations.
Disadvantages
  1. Data loads are no longer “instantaneous.”  I have to wait a bit longer for the second step of the process to commit before records show up in the warehouse.
  2. Additional ETL complexity means there are more things that could break.
  3. There is a possibility of weird data issues.  For example, if I’m trying to load a customer-day combo while the first phase of the ETL process is trying to re-insert that data, I could get inconsistent results.  I needed to add in checks preventing this from happening.

This is my current architecture.  I’m happier with it than with the original rowstore table architecture.

Bulk Insertion

The third architecture is simple:  don’t do trickle load at all.  For many companies, it’s okay to do a major load of data once a day or once every several hours.  If that’s the case, I’d recommend doing a bulk insertion over trying to implement a trickle load.  Just like the other two methods, bulk insertion bypasses the deltastore when you’re loading enough records.

Ideally, this bulk insertion would be a straight insert, never updating or deleting data.  If you can get away with it, the ideal pattern would be something like this:

bulkswap

If I need to load data for July 5th, I’m going to load all of the data for the partition which contains July 5th into a new table with that partition, and then I’m going to swap out the corresponding partition on the columnstore side.  I would want to partition by the normal load unit—for example, if we load data monthly, I’d partition by month; if we load data daily, I’d partition by day if possible.

Advantages
  1. No deletes from the columnstore means no fragmentation.
  2. Columnstore rowgroups are as packed as possible.  If there’s any doubt, we can run an index rebuild on the new partition before swapping it, as nothing reads that table.
  3. Just like the staging table direct insert, I don’t need to make any application changes or create new supporting objects outside of ETL.
Disadvantages
  1. Data loading must be periodic, and will probably be slower than trickle loads.  You’re probably loading once a day or once a month at this point.
  2. If just one row changes, you have to rebuild the entire partition.  That can be a time sink when partitions get very large.

This is the “ideal” solution, but making it work when customers expect data in less than 20 minutes is tricky.  The staging table direct insert architecture seems to be a reasonable compromise between spray loading data and waiting a long time for data.

Use SQL Server 2016

Regardless of the architecture, SQL Server 2016 is a must-have for clustered columnstore indexes.  The ability to reorganize indexes online is a life-saver.  There is the possibility for these reorganizations to block queries for short amounts of time, but that’s a huge benefit if you do find yourself deleting a lot of data in the columnstore.

Conclusion

In this post, we looked at three architectures for loading data into columnstore indexes, with a special focus on trickle loading data.  The common denominator for all of these is good staging tables to absorb the first wave of changes and move data into the columnstore in bulk.

R For The DBA: Graphing Rowcounts

Something I am trying to harp upon is that R isn’t just a language for data analysts; it makes sense for DBAs to learn the language as well.  Here’s a really simple example.

The Setup

I have a client data warehouse which holds daily rollups of revenue and cost for customers.  We’ve had some issues with the warehouse lately where data was not getting loaded due to system errors and timeouts, and our services team gave me a list of some customers who had gaps in their data due to persistent processing failures.  I figured out the root cause behind this (which will show up as tomorrow’s post), but I wanted to make sure that we filled in all of the gaps.

My obvious solution is to write a T-SQL query, getting some basic information by day for each customer.  I could scan through that result set, but the problem is that people aren’t great at reading tables of numbers; they do much better looking at pictures.  This is where R comes into play.

The Solution

My solution is just a few lines of R code, as well as a few lines of T-SQL.  I’m using SQL Server 2016 but we don’t use SQL Server R Services (yet!), so I’m doing this the “old-fashioned” way by pulling data across the wire.  Here’s the code:

install.packages("ggplot2")
install.packages("RODBC")
library(RODBC)
library(ggplot2)

conn <- odbcDriverConnect("Driver=SQL Server;Server=MYSERVER;Initial Catalog=MYDATABASE;Provider=SQLNCLI11.1;Integrated Security=SSPI")

wh <- sqlQuery(conn, "SELECT fc.CustomerWK, fc.DateWK, COUNT(1) AS NumRecords, SUM(fc.Revenue) AS Revenue, SUM(fc.Cost) AS Cost FROM MYDATABASE.dbo.FactTable fc WHERE fc.CustomerWK IN (78296,80030,104098,104101,104104,108371) AND fc.DateWK > 20160901 GROUP BY fc.CustomerWK, fc.DateWK;")

wh$CustomerWK <- as.factor(wh$CustomerWK)
wh$DateWK <- as.Date(as.character(wh$DateWK), "%Y%m%d")
str(wh)

ggplot(wh, aes(x=DateWK, y=NumRecords, colour=CustomerWK, group=CustomerWK)) +
  geom_line() +
  xlab("Date") +
  ylab("Number Of Records")

Let’s walk through this step by step.  After installing and loading the two relevant packages (RODBC to connect to SQL Server and ggplot2 to help us create a pretty graph), I open a connection to my server (with the name replaced to protect the innocent).

Next, I create a data frame called wh and populate it with the results of a query to the warehouse.  This is a pretty simple SQL query which gets the number of rows by customer by day, and also shows me revenue and cost.  I’m not using revenue and cost in this graph, but did look at them as part of my sanity checks.

Next up, I want to “fix” a couple data types.  CustomerWK is an int and represents the customer’s surrogate key.  Although this is an integer type, it’s really a factor.  I have a small, unique set of categories; there’s no mathematical relationship between CustomerWK 1 and CustomerWK2.  Anyhow, I replace CustomerWK with this new, factorized attribute.

After taking care of the CustomerWK factor, I convert DateWK to a date type.  DateWK is an integer representation of the date in ISO format, so January 15th, 2016 would be represented as 20160115.  I need to convert this from an integer to a character string, and then I can convert it to a date.  I replace the DateWK value with this date type.  I included the str(wh) call to show that my data frame really does have the correct types.

Finally, I call ggplot, passing in my warehouse data frame.  I create an aesthetic, which tells the graphing engine what I want to see on the screen.  I want to see the number of records in the fact table per day for each customer, so my Y coordinate is defined by NumRecords, my X coordinate by DateWK, and my group by CustomerWK.  To make it easier to read, I color-code each customer.

After creating the aesthetic, I plot the results as a line graph using the geom_line() function, and then give meaningful X and Y axis labels.

The Results

What I get in return is a decent-enough looking graph:

numrecordsbyday

I can easily see that customer 108371 experienced a major dropoff sometime in mid-October, and fell off the cliff in early November.  The other customers have been fairly stable, leading me to believe that just one customer (in this cohort) has an issue.  I was able to investigate the issue and determine the root cause of the falloff—that the customer stopped sending us data.

Conclusion

This is another example where knowing a little bit of R can be very helpful.  Even if you aren’t building predictive models or performing advanced regressions, the ability to throw results into a plot and quickly spot outliers makes the job much easier.  If I had to discern results from a Management Studio result set, I could still do the job, but I might have been thrown for a loop with customer 78296, whose counts fluctuated over a fairly wide band.

Creating External Tables

At this point, we’ve got Polybase configured and have created external data sources and file formats (and by the way, if you haven’t been keeping up, I am tracking my entire Polybase series there).  Now we want to create an external table to do something with all of this.  As usual, my first stop is MSDN, which gives us the following syntax for creating an external table from within SQL Server or Azure SQL Database.

Looking At Syntax

CREATE EXTERNAL TABLE [ database_name . [ schema_name ] . | schema_name. ] table_name   
    ( <column_definition> [ ,...n ] )  
    WITH (   
        LOCATION = 'folder_or_filepath',  
        DATA_SOURCE = external_data_source_name,  
        FILE_FORMAT = external_file_format_name  
        [ , <reject_options> [ ,...n ] ]  
    )  
[;]  
  
<reject_options> ::=  
{  
    | REJECT_TYPE = value | percentage  
    | REJECT_VALUE = reject_value  
    | REJECT_SAMPLE_VALUE = reject_sample_value  
  
}  
  
-- Create a table for use with Elastic Database query  
CREATE EXTERNAL TABLE [ database_name . [ schema_name ] . | schema_name. ] table_name   
    ( <column_definition> [ ,...n ] )  
    WITH ( <sharded_external_table_options> )  
[;]  
  
<sharded_external_table_options> ::=  
        DATA_SOURCE = external_data_source_name,   
        SCHEMA_NAME = N'nonescaped_schema_name',  
        OBJECT_NAME = N'nonescaped_object_name',  
        [DISTRIBUTION  = SHARDED(sharding_column_name) | REPLICATED | ROUND_ROBIN]]  
    )  
[;]  

For now, we’re going to care about the first version; I’ll look at Elastic Database later on in this series, but for this post, we’re looking at Hadoop.

The create statement itself is pretty easy, and the first half of the statement looks like what you’d expect from any other table, save for the term “EXTERNAL” in the create statement.  It’s when you get down to the WITH clause that things start getting a bit different.

First, you have to define a location, data source, and file format.  LOCATION is the location of the file or folder, so in my case, it’s going to be in HDFS, so I want to specify that path.  For example, I have a file in ‘/tmp/ootp/secondbasemen.csv’ so I just put that in.  Note that you don’t need to put in IP address, port, or anything else because those details are already defined in the external data source.  If you don’t remember what they are, you can check the DMV:

SELECT *
FROM sys.external_data_sources eds
WHERE
	eds.name = 'HDP2';

The column named location already has the first half of our statement, so we don’t need to specify that location again.

The DATA_SOURCE and DATA_FORMAT options are easy:  pick you external data source and external file format of choice.

The last major section deals with rejection.  We’re going from a semi-structured system to a structured system, and sometimes there are bad rows in our data, as there are no strict checks of structure before inserting records.  The Hadoop mindset is that there are two places in which you can perform data quality checks:  in the original client (pushing data into HDFS) and in any clients reading data from HDFS.  To make things simpler for us, the Polybase engine will outright reject any records which do not adhere to the quality standards you define when you create the table.  For example, let’s say that we have a Age column for each of our players, and that each age is an integer.  If the first row of our file has headers, then the first row will literally read “Age” and conversion to integer will fail.  Polybase rejects this row (removing it from the result set stream) and increments a rejection counter.  What happens next depends upon the reject options.

There are two rejection types, value and percentage.  When you set REJECT_TYPE to VALUE, it’s pretty easy:  a query fails once it hits REJECT_VALUE failed records.  For example, if you set REJECT_VALUE to 10, your query will succeed if the query set returned had up to 10 bad values, but as soon as you get that 11th bad row, the query fails with an error.

Msg 8680, Level 17, State 26, Line 35
Internal Query Processor Error: The query processor encountered an unexpected error during the processing of a remote query phase.

Looking at the compute node errors, we can see the issue:

SELECT TOP(10) *
FROM sys.dm_exec_compute_node_errors decne
ORDER BY
	decne.create_time DESC;

Look at the details column and we can see the details behind why this query failed:

Query aborted– the maximum reject threshold (0 rows) was reached while reading from an external source: 1 rows rejected out of total 1 rows processed. (/tmp/ootp/secondbasemen.csv) Column ordinal: 0, Expected data type: INT, Offending value: Casey  (Column Conversion Error), Error: Error converting data type NVARCHAR to INT.

I set the reject value to 0 and the first row caused me to fail.

If I set the REJECT_TYPE to PERCENTAGE, then the REJECT_VALUE becomes the percent of records which need to be bad before the query fails, and I need to specify REJECT_SAMPLE_VALUE.  I ran a test with a sample value of 100 and a reject percentage of 1, and here’s what the results look like:

 107091;Query aborted– the maximum reject threshold (1 %) was reached while reading from an external source: 777 rows rejected out of total 777 rows processed.

The MSDN documentation states that it should try to run a check every REJECT_SAMPLE_VALUE rows, but in this case, it pulled all 777 and rejected all of them.

Focus On Data Types

The linked MSDN document above shows how each SQL data type converts to a Hadoop data type.  The mappings are fairly straightforward, though note that the money and smallmoney types convert to doubles in Hadoop, meaning that money is imprecise; decimal types in SQL Server convert to decimals in Hadoop, so if you want to avoid floating point irregularities, stick to decimal.

Sample Script

Here is a sample to create a table called SecondBasemen.

CREATE EXTERNAL TABLE [dbo].[SecondBasemen]
(
	[FirstName] [VARCHAR](50) NULL,
	[LastName] [VARCHAR](50) NULL,
	[Age] [INT] NULL,
	[Throws] [VARCHAR](5) NULL,
	[Bats] [VARCHAR](5) NULL
)
WITH
(
	DATA_SOURCE = [HDP2],
	LOCATION = N'/tmp/ootp/secondbasemen.csv',
	FILE_FORMAT = [TextFileFormat],
	REJECT_TYPE = VALUE,
	REJECT_VALUE = 5
);

By this point, there’s nothing special about this table creation statement, and there aren’t that many other options you can choose.  You could change the reject type to be percentage-based:

CREATE EXTERNAL TABLE [dbo].[SecondBasemen]
(
	[FirstName] [VARCHAR](50) NULL,
	[LastName] [VARCHAR](50) NULL,
	[Age] [INT] NULL,
	[Throws] [VARCHAR](5) NULL,
	[Bats] [VARCHAR](5) NULL
)
WITH
(
	DATA_SOURCE = [HDP2],
	LOCATION = N'/tmp/ootp/secondbasemen.csv',
	FILE_FORMAT = [TextFileFormat],
	REJECT_TYPE = PERCENTAGE,
	REJECT_VALUE = 15,
	REJECT_SAMPLE_VALUE = 500
);

Miscellany

External tables do not store their data in SQL Server; they are truly external, and every query is a remote query.  You can store statistics (which I’ve talked about in the past and will discuss in more detail later), but there’s no such thing as indexing external tables, so the only data we have about that table is metadata.

Speaking of metadata, when you create the table, it shows up in sys.tables like any other table.  If you set the is_external flag to 1, you can see all external tables in the current database:

SELECT *
FROM sys.tables
WHERE
	is_external = 1;

Conclusion

The script to create an external table is pretty easy, with relatively few options.  You will want to fine-tune those rejection counts, though.  The problem is that if you set the rejection count too low, you’ll run the risk of having queries fail after a very long time, likely without returning any data to you.  Imagine that you have a 20 billion row table and you’re returning 10 million records, of which 50 are bad.  If you set the reject value to 49, then you might have to do all of that work on 10 million records just to watch it fail at the very end.  But at the same time, if you have a large percentage of records failing, you’d want to know that (especially if you can fix the HDFS files) rather than setting a reject count to 100% and blissfully ignoring the problem.

If you need to drop an external table, it’s easy:  use the DROP EXTERNAL TABLE command.

More Polybase With Tar

Last week, I looked at flat file compression formats and noted that when you I tarred a file, I lost the top row.  Today, I’m going to have a short test in which I try to tar several files and see what happens.

The Setup

I have four copies of my Second Basemen data set, each saved as a CSV.  Each copy has 777 records in it.  I have put them all into one tar archive and uploaded into HDFS as /tmp/tartest/SecondBasemen.tar.  I then created an external table like so:

CREATE EXTERNAL FILE FORMAT [CompressionTestFormat] WITH
(
    FORMAT_TYPE = DELIMITEDTEXT,
    FORMAT_OPTIONS
    (
        FIELD_TERMINATOR = N',',
        USE_TYPE_DEFAULT = TRUE
    ),
    DATA_COMPRESSION = 'org.apache.hadoop.io.compress.DefaultCodec'
);
GO
CREATE EXTERNAL TABLE [dbo].[SecondBasemenTar]
(
    [FirstName] [varchar](50) NULL,
    [LastName] [varchar](50) NULL,
    [Age] [int] NULL,
    [Throws] [varchar](5) NULL,
    [Bats] [varchar](5) NULL
)
WITH
(
    DATA_SOURCE = [HDP2],
    LOCATION = N'/tmp/tartest/SecondBasemen.csv',
    FILE_FORMAT = [CompressionTestFormat],
    REJECT_TYPE = VALUE,
    REJECT_VALUE = 5
);
GO
SELECT *
FROM dbo.SecondBasemenTar sb;
GO
SELECT *
FROM dbo.SecondBasemenTar sb
OPTION (FORCE EXTERNALPUSHDOWN);

Results

The select statement returned 3104 records, exactly 4 shy of the 3108 I would have expected (777 * 4 = 3108).  In each case, the missing row was the first, meaning when I search for LastName = ‘Turgeon’ (the first player in my data set), I get zero rows.  When I search for another second basemen in the set, I get back four rows, exactly as I would have expected.

What’s really interesting is the result I get back from Wireshark when I run a query without pushdown:  it does actually return the row for Casey Turgeon.

caseyturgeon

When I run a predicate pushdown operation (forcing external pushdown), I get the same zero rows, but what comes back over the wire is a PPAX file, whose format I haven’t yet figured out how to decode (and the Polybase team probably wants that to remain the case!).

ppax

Conclusions

If you want to tar together delimited flat files, I recommend adding a header row.  There is a bit of irony here in that normally, you can’t skip header rows in Polybase.  If you’re feeling really cheeky, you might have gotten your workaround…

Flat File Compression In Polybase

For today’s post, I want to look at how well Polybase on Hadoop can handle different compression formats.  I’m going to use the same compression codec (org.apache.hadoop.io.compress.DefaultCodec) for each.  Each file will be a small, 777-record file set of second basemen, and I will run two queries, one which is a simple SELECT *, and one which is a SELECT * with forced external pushdown to ensure that I perform a MapReduce operation.

For all of my tests, I’m using the Hortonworks HDP 2.4 sandbox.

Create Statements

What follows is a simple script which I used for each of the file formats.  I include an external file format (which I reuse on the other examples), a table create statement, and a pair of statements to query that data.  For round one, I specify the file; for rounds two and three, I specify a folder containing a specified set of files.

USE [OOTP]
GO
CREATE EXTERNAL FILE FORMAT [CompressionTestFormat] WITH
(
	FORMAT_TYPE = DELIMITEDTEXT,
	FORMAT_OPTIONS
	(
		FIELD_TERMINATOR = N',',
		USE_TYPE_DEFAULT = TRUE
	),
	DATA_COMPRESSION = 'org.apache.hadoop.io.compress.DefaultCodec'
);
GO
CREATE EXTERNAL TABLE [dbo].[SecondBasemenCSV]
(
	[FirstName] [varchar](50) NULL,
	[LastName] [varchar](50) NULL,
	[Age] [int] NULL,
	[Throws] [varchar](5) NULL,
	[Bats] [varchar](5) NULL
)
WITH
(
	DATA_SOURCE = [HDP2],
	LOCATION = N'/tmp/compressionTests/SecondBasemen.csv',
	FILE_FORMAT = [CompressionTestFormat],
	REJECT_TYPE = VALUE,
	REJECT_VALUE = 2
);
GO
SELECT *
FROM dbo.SecondBasemenCSV sb;
GO
SELECT *
FROM dbo.SecondBasemenCSV sb
OPTION (FORCE EXTERNALPUSHDOWN);

Round One:  Single-File

In this first round of testing, I’m going to test a single file in a directory.  The following table includes the compression format I chose, whether I was able to perform a SELECT *, and whether I was able to perform a SELECT * with a MapReduce job.

Format Simple Query MapReduce Job
None Works Works
7Zip (.7z) Does Not Work Does Not Work
Bzip2 (.bz2) Works Works
GZip (.gz) Works Works
Tar (.tar) Partially Works Partially Works
Tar-GZip (.tar.gz) Partially Works Partially Works
Zip (.zip) Does Not Work Does Not Work

This is a very interesting set of results.  First, 7Zip archived files do not work with the default encoding.  I’m not particularly surprised by this result, as 7Zip support is relatively scarce across the board and it’s a niche file format (though a very efficient format).

The next failure case is tar.  Tar is a weird case because it missed the first row in the file but was able to collect the remaining 776 records.  Same goes for .tar.gz.  I unpackaged the .tar file and the constituent SecondBasemen.csv file did in fact have all 777 records, so it’s something weird about the codec.

Finally, I’m surprised that Zip did not work either; I would have expected it to succeed, given the popularity of .zip files.  To give you an idea of what’s going on with Zip, I did the same Zip file ingestion into Hive and got these results:

zipfileinhive

That’s because there is no native Hive InputFormat which handles Zip files.  You can write your own, but Polybase couldn’t use it, so we’ll not try to go down that road today.

So at this point, my recommendation would be to use GZip and BZip2 instead of other file compression formats.  This makes rounds two and three simpler.

Round Two:  Two Files

For round two, I’m going to combine uncompressed files, GZip compressed files, and BZip2 compressed files together.  I already know it’s possible to combine multiple files in a folder and have them work (having done so with flight data in Bzip2 format), so there’s no need to test those cases here as well.

Format 1 Format 2 Simple Query MapReduce Job
None GZip Works Works
None Bzip2 Works Works
GZip Bzip2 Works Works

I’m happier about these results:  it appears that you can mix and match pairs of supported formats without error.  But now let’s try running this test on all three types.

Round Three:  Three Files

In the final round, I have only one test:  a file with no compression, a file with GZip compression, and a file with Bzip2 compression, all in the same directory.  Not surprisingly, this worked for both the simple query and the MapReduce job.

Conclusions

If you do plan on using delimited files in a Polybase solution, my recommendation is to stick with GZip or BZ2 compression.  Compression formats like Zip may be more popular and 7z may give you better compression ratios, but you’ll end up needing to decompress the files mid-stream.  Cloudera provides some guidance on file formats:  GZip is a bit less CPU-intensive (and thus faster) than BZ2, but BZ2 gets better compression ratios and BZ2 files can be split out and handled concurrently, making it easier to distribute data across data nodes and take advantage of Hadoop’s capabilities.

Hey, What About That Codec?

Remember that I used the DefaultCodec for everything today.  If you name your files correctly, then you can safely use DefaultCodec without an issue.  But if you take a .gz file and rename it to .csv:

Msg 8680, Level 17, State 26, Line 19
Internal Query Processor Error: The query processor encountered an unexpected error during the processing of a remote query phase.

This particular error happens because the file extension helps the engine figure out how to decompress.  I strongly advise you not to change file extensions, as it causes nothing but pain.