This is a continuation of my Polybase series.

In today’s Polybase post, I want to see if I can replicate “SQL 2000 partitioning” using Polybase.  Back before SQL Server 2005, there was no concept of partitioned tables; the only way you could partition a table was to separate each logical subset of data into its own table and create a view which unioned those subsets together.  The reason you might want to do this is that if there were constraints on each individual table, you would be able to perform partition elimination, only querying the table(s) with potentially relevant data.

With that in mind, let’s take a look at what we can do.

Statistics, Not Constraints

The first important thing to note is that we cannot create constraints after the fact.  Taking an external table with a NOT NULL column:

ALTER TABLE dbo.RemoteFixed ADD CONSTRAINT [PK_RemoteFixed] PRIMARY KEY CLUSTERED (ID);

I get the following error:

Msg 46518, Level 16, State 2, Line 3
The feature ‘ALTER TABLE’ is not supported with external tables.

In fact, I can’t create a primary key constraint on an external table either:

CREATE EXTERNAL TABLE [dbo].[RemoteFixedPK]
(
    ID INT NOT NULL PRIMARY KEY CLUSTERED,
    SomeNum INT NOT NULL,
    SomeChar CHAR(12) NOT NULL
)
WITH
(
    DATA_SOURCE = [WASBFlights],
    LOCATION = N'fixed/',
    FILE_FORMAT = [CsvFileFormat],
    REJECT_TYPE = VALUE,
    REJECT_VALUE = 5
);

Msg 103010, Level 16, State 1, Line 1
Parse error at line: 3, column: 21: Incorrect syntax near ‘PRIMARY’.

I leave it as an exercise to the reader to look at check constraints; I did and got the same basic error messages when trying to modify or create a table with a check constraint.

Why This Makes Sense

At first, you might think that this is a terrible limitation and something that needs “fixed.”  I don’t think I agree with that assessment, as these are external tables.  The value in primary key and check constraints comes from the fact that the database engine controls the rows which go into a table and can validate these constraints whenever you try to insert rows.  With external tables, however, there are multiple ways for data to get into that table:  it can come directly from SQL Server insert statements, but (much) more often, it will come from some external process.  That external process likely will not have the concepts of primary key or check constraints, and so data could get in which fails these constraints.

That said, I think there’s a fair argument in rejecting rows upon data retrieval based on constraints, but that behavior would differ from “normal” tables, so I’m not sure it would be the best choice.

Are Statistics The Key?

So knowing that we cannot create check constraints but we can create statistics, will that help?  I’ve created a series of flight tables, one for each year, which follow the following pattern:

USE [OOTP]
GO
CREATE EXTERNAL TABLE [dbo].[Flights2006]
(
    [year] int NULL,
    [month] int NULL,
    [dayofmonth] int NULL,
    [dayofweek] int NULL,
    deptime VARCHAR(100) NULL,
    crsdeptime VARCHAR(100) NULL,
    arrtime VARCHAR(100) NULL,
    crsarrtime VARCHAR(100) NULL,
    uniquecarrier VARCHAR(100) NULL,
    flightnum VARCHAR(100) NULL,
    tailnum VARCHAR(100) NULL,
    actualelapsedtime VARCHAR(100) NULL,
    crselapsedtime VARCHAR(100) NULL,
    airtime VARCHAR(100) NULL,
    arrdelay VARCHAR(100) NULL,
    depdelay VARCHAR(100) NULL,
    origin VARCHAR(100) NULL,
    dest VARCHAR(100) NULL,
    distance VARCHAR(100) NULL,
    taxiin VARCHAR(100) NULL,
    taxiout VARCHAR(100) NULL,
    cancelled VARCHAR(100) NULL,
    cancellationcode VARCHAR(100) NULL,
    diverted VARCHAR(100) NULL,
    carrierdelay VARCHAR(100) NULL,
    weatherdelay VARCHAR(100) NULL,
    nasdelay VARCHAR(100) NULL,
    securitydelay VARCHAR(100) NULL,
    lateaircraftdelay VARCHAR(100) NULL
)
WITH
(
    LOCATION = N'historical/2006.csv.bz2',
    DATA_SOURCE = WASBFlights,
    FILE_FORMAT = CsvFileFormat,
    -- Up to 5000 rows can have bad values before Polybase returns an error.
    REJECT_TYPE = Value,
    REJECT_VALUE = 5000
);

CREATE STATISTICS [st_flights2006_year] ON [dbo].[Flights2006]([year]);
GO

For each year, I created the table, pointed it to the relevant year file, and created a single statistics histogram based on the year.  For 2006, that histogram is simple:

flights2006statistics

We can see that there’s data just for one year.  So when I run the following query, my hope would be that the contradiction checker would say that this is not going to return any results and turn the query into a constant scan:

SELECT TOP 10
	*
FROM dbo.Flights2006
WHERE
	year = 2008;

The answer, a couple minutes later, was “close, but no cigar.”  As Benjamin Nevarez points out, contradiction detection is on a constraint, so just because our statistics say there are no records outside of 2006, doesn’t mean there really are no records outside of 2006, and so the contradiction detection logic won’t come into play here.

Creating A View

The next idea is to create a view, specifying our “constraints” in the WHERE clause of each segment:

CREATE VIEW dbo.AllFlights AS
SELECT * FROM dbo.Flights2006 WHERE year = 2006
UNION ALL
SELECT * FROM dbo.Flights2007 WHERE year = 2007
UNION ALL
SELECT * FROM dbo.Flights2008 WHERE year = 2008;

This does pass the obvious contradiction test:

contradiction

For a year outside of our range, we get no results back.  How about when we look at a contradiction in terms of two separate years?

contradiction2

I was a little concerned that we’d hit something like the scenario Conor Cunningham described back in 2008, but it looks like the engine was smart enough to figure this out.

So now for the big question:  can we get the equivalent of partition elimination?  The answer is yes.  When I ran the query, the execution plan’s remote query made reference to only one file:  wasbs://csflights@cspolybase.blob.core.windows.net/historical/2006.csv.bz2.  And my Wireshark packet capture only found 79,116 packets of at least 1400 bytes.

flights2006packets

As we saw earlier, that is consistent with reading just a single file.  So how about we look at two years; will we see two files?

SELECT
	year,
	COUNT(1) AS NumberOfFlights
FROM dbo.AllFlights af
WHERE
	year IN (2006, 2007)
GROUP BY
	year
ORDER BY
	year;

The Wireshark capture looks a bit different now:

multiyearflightspackets

And my execution plan now includes two wasbs blocks.  Here’s the remote query XML block which shows that:

<?xml version="1.0" encoding="utf-8"?>
<dsql_query number_nodes="1" number_distributions="8" number_distributions_per_node="8">
  <sql>ExecuteMemo explain query</sql>
  <dsql_operations total_cost="696.59944" total_number_operations="15">
    <dsql_operation operation_type="RND_ID">
      <identifier>TEMP_ID_40</identifier>
    </dsql_operation>
    <dsql_operation operation_type="ON">
      <location permanent="false" distribution="AllDistributions" />
      <sql_operations>
        <sql_operation type="statement">CREATE TABLE [tempdb].[dbo].[TEMP_ID_40] ([year] INT ) WITH(DATA_COMPRESSION=PAGE);</sql_operation>
      </sql_operations>
    </dsql_operation>
    <dsql_operation operation_type="ExternalRoundRobinMove">
      <operation_cost cost="695.59944" accumulative_cost="695.59944" average_rowsize="4" output_rows="45715" />
      <external_uri>wasbs://csflights@cspolybase.blob.core.windows.net/historical/2006.csv.bz2</external_uri>
      <destination_table>[TEMP_ID_40]</destination_table>
    </dsql_operation>
    <dsql_operation operation_type="RND_ID">
      <identifier>TEMP_ID_41</identifier>
    </dsql_operation>
    <dsql_operation operation_type="ON">
      <location permanent="false" distribution="AllDistributions" />
      <sql_operations>
        <sql_operation type="statement">CREATE TABLE [tempdb].[dbo].[TEMP_ID_41] ([year] INT ) WITH(DATA_COMPRESSION=PAGE);</sql_operation>
      </sql_operations>
    </dsql_operation>
    <dsql_operation operation_type="ON">
      <location permanent="false" distribution="AllDistributions" />
      <sql_operations>
        <sql_operation type="statement">EXEC [tempdb].[sys].[sp_addextendedproperty] @name=N'IS_EXTERNAL_STREAMING_TABLE', @value=N'true', @level0type=N'SCHEMA', @level0name=N'dbo', @level1type=N'TABLE', @level1name=N'TEMP_ID_41'</sql_operation>
      </sql_operations>
    </dsql_operation>
    <dsql_operation operation_type="ON">
      <location permanent="false" distribution="AllDistributions" />
      <sql_operations>
        <sql_operation type="statement">UPDATE STATISTICS [tempdb].[dbo].[TEMP_ID_41] WITH ROWCOUNT = 6024, PAGECOUNT = 2</sql_operation>
      </sql_operations>
    </dsql_operation>
    <dsql_operation operation_type="ON">
      <location permanent="false" distribution="AllDistributions" />
      <sql_operations>
        <sql_operation type="statement">CREATE STATISTICS [year] ON [tempdb].[dbo].[TEMP_ID_41] ([year]) WITH STATS_STREAM = 0x010000000100000000000000000000...</sql_operation>
      </sql_operations>
    </dsql_operation>
    <dsql_operation operation_type="RND_ID">
      <identifier>TEMP_ID_42</identifier>
    </dsql_operation>
    <dsql_operation operation_type="ON">
      <location permanent="false" distribution="AllDistributions" />
      <sql_operations>
        <sql_operation type="statement">CREATE TABLE [tempdb].[dbo].[TEMP_ID_42] ([year] INT NOT NULL, [col] BIGINT ) WITH(DATA_COMPRESSION=PAGE);</sql_operation>
      </sql_operations>
    </dsql_operation>
    <dsql_operation operation_type="MULTI">
      <dsql_operation operation_type="SHUFFLE_MOVE">
        <operation_cost cost="0.00576" accumulative_cost="695.6052" average_rowsize="12" output_rows="4" />
        <source_statement>SELECT [T1_1].[year] AS [year],
       [T1_1].[col] AS [col]
FROM   (SELECT   COUNT_BIG(CAST ((1) AS INT)) AS [col],
                 [T3_1].[year] AS [year]
        FROM     [tempdb].[dbo].[TEMP_ID_40] AS T3_1
        WHERE    ([T3_1].[year] = CAST ((2006) AS INT))
        GROUP BY [T3_1].[year]
        UNION ALL
        SELECT   COUNT_BIG(CAST ((1) AS INT)) AS [col],
                 [T3_1].[year] AS [year]
        FROM     [tempdb].[dbo].[TEMP_ID_41] AS T3_1
        WHERE    ([T3_1].[year] = CAST ((2007) AS INT))
        GROUP BY [T3_1].[year]) AS T1_1</source_statement>
        <destination_table>[TEMP_ID_42]</destination_table>
        <shuffle_columns>year;</shuffle_columns>
      </dsql_operation>
      <dsql_operation operation_type="ExternalRoundRobinMove">
        <operation_cost cost="733.274256" accumulative_cost="1428.879456" average_rowsize="4" output_rows="48191" />
        <external_uri>wasbs://csflights@cspolybase.blob.core.windows.net/historical/2007.csv.bz2</external_uri>
        <destination_table>[TEMP_ID_41]</destination_table>
      </dsql_operation>
    </dsql_operation>
    <dsql_operation operation_type="ON">
      <location permanent="false" distribution="AllDistributions" />
      <sql_operations>
        <sql_operation type="statement">DROP TABLE [tempdb].[dbo].[TEMP_ID_41]</sql_operation>
      </sql_operations>
    </dsql_operation>
    <dsql_operation operation_type="ON">
      <location permanent="false" distribution="AllDistributions" />
      <sql_operations>
        <sql_operation type="statement">DROP TABLE [tempdb].[dbo].[TEMP_ID_40]</sql_operation>
      </sql_operations>
    </dsql_operation>
    <dsql_operation operation_type="STREAMING_RETURN">
      <operation_cost cost="1" accumulative_cost="1429.879456" average_rowsize="8" output_rows="2" />
      <location distribution="AllDistributions" />
      <select>SELECT [T1_1].[year] AS [year],
       [T1_1].[col] AS [col]
FROM   (SELECT CONVERT (INT, [T2_1].[col], 0) AS [col],
               [T2_1].[year] AS [year]
        FROM   (SELECT ISNULL([T3_1].[col], CONVERT (BIGINT, 0, 0)) AS [col],
                       [T3_1].[year] AS [year]
                FROM   (SELECT   SUM([T4_1].[col]) AS [col],
                                 [T4_1].[year] AS [year]
                        FROM     [tempdb].[dbo].[TEMP_ID_42] AS T4_1
                        GROUP BY [T4_1].[year]) AS T3_1) AS T2_1) AS T1_1</select>
    </dsql_operation>
    <dsql_operation operation_type="ON">
      <location permanent="false" distribution="AllDistributions" />
      <sql_operations>
        <sql_operation type="statement">DROP TABLE [tempdb].[dbo].[TEMP_ID_42]</sql_operation>
      </sql_operations>
    </dsql_operation>
  </dsql_operations>
</dsql_query>

We see wasbs references on line 16 and 76.  What’s interesting is that each individual file goes into its own temp table (TEMP_ID_40 and TEMP_ID_41 for 2006 and 2007, respectively), and then the Polybase engine creates a third temp table (TEMP_ID_42) and inserts the individual aggregations from 40 and 41 into this third temp table.

Conclusion:  “Stretch” Database

Using a view, we were able to create a “partitioned” Polybase experience, similar to what we had in SQL Server 2000.  This form of poor man’s partitioning allows us to segment out data sets and query them independently, something which can be helpful when storing very large amounts of data off-site and only occasionally needing to query it.  The thing to remember, though, is that if you store this in Azure Blob Storage, you will need to pull down the entire table’s worth of data to do any processing.

This leads to a concept I first heard from Ginger Grant:  pseudo-StretchDB.  Instead of paying for what Stretch offers, you get an important subset of the functionality at a much, much lower price.  If you do store the data in Azure Blob Storage, you’re paying pennies per gigabyte per month.  For cold storage, like a scenario in which you need to keep data around to keep the auditors happy but your main application doesn’t use that information, it can work fine.  But if you need to query this data frequently, performance might be a killer.

One thought on “Partitioned Views With Polybase

Leave a comment