Understanding Blob Storage Behavior

This is a continuation of my Polybase series.

In the last chapter of the Polybase series, I looked at creating external tables pointing to files in Azure Blob Storage.  Today, we’re going to take a deeper look at how the Polybase engine interacts with Azure Blob Storage.  To do this is as simple as running a basic query.  The query that I will run is the following:

SELECT TOP(1000)
	f.month,
	f.arrdelay
FROM dbo.Flights2008 f
WHERE
	f.dest = N'CMH'
	AND f.dayofmonth = 14;

Execution Plan Details

The execution plan was pretty simple:

executionplan

As far as the XML goes, here’s the full plan if you’re curious:

<?xml version="1.0" encoding="utf-16"?>
<ShowPlanXML xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema" Version="1.5" Build="13.0.1722.0" xmlns="http://schemas.microsoft.com/sqlserver/2004/07/showplan">
  <BatchSequence>
    <Batch>
      <Statements>
        <StmtSimple StatementCompId="1" StatementEstRows="67.2391" StatementId="1" StatementOptmLevel="FULL" StatementOptmEarlyAbortReason="GoodEnoughPlanFound" CardinalityEstimationModelVersion="130" StatementSubTreeCost="0.032413" StatementText="SELECT TOP(1000)
 f.month,
 f.arrdelay
FROM dbo.Flights2008 f
WHERE
 f.dest = N'CMH'
 AND f.dayofmonth = 14" StatementType="SELECT" QueryHash="0x955C4C3E381C844B" QueryPlanHash="0x53368F42C59FAD79" RetrievedFromCache="true" SecurityPolicyApplied="false">
          <StatementSetOptions ANSI_NULLS="true" ANSI_PADDING="true" ANSI_WARNINGS="true" ARITHABORT="true" CONCAT_NULL_YIELDS_NULL="true" NUMERIC_ROUNDABORT="false" QUOTED_IDENTIFIER="true" />
          <QueryPlan DegreeOfParallelism="0" NonParallelPlanReason="NoParallelForPDWCompilation" CachedPlanSize="120" CompileTime="50" CompileCPU="49" CompileMemory="208">
            <Warnings>
              <PlanAffectingConvert ConvertIssue="Seek Plan" Expression="CONVERT_IMPLICIT(nvarchar(100),[f].[dest],0)=N'CMH'" />
            </Warnings>
            <MemoryGrantInfo SerialRequiredMemory="0" SerialDesiredMemory="0" />
            <OptimizerHardwareDependentProperties EstimatedAvailableMemoryGrant="417021" EstimatedPagesCached="104255" EstimatedAvailableDegreeOfParallelism="2" />
            <RelOp AvgRowSize="65" EstimateCPU="0.032413" EstimateIO="0" EstimateRebinds="0" EstimateRewinds="0" EstimatedExecutionMode="Row" EstimateRows="67.2391" LogicalOp="Compute Scalar" NodeId="0" Parallel="false" PhysicalOp="Compute Scalar" EstimatedTotalSubtreeCost="0.032413">
              <OutputList>
                <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[Flights2008]" Alias="[f]" Column="month" />
                <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[Flights2008]" Alias="[f]" Column="arrdelay" />
              </OutputList>
              <ComputeScalar>
                <DefinedValues>
                  <DefinedValue>
                    <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[Flights2008]" Alias="[f]" Column="month" />
                    <ScalarOperator ScalarString="[OOTP].[dbo].[Flights2008].[month] as [f].[month]">
                      <Identifier>
                        <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[Flights2008]" Alias="[f]" Column="month" />
                      </Identifier>
                    </ScalarOperator>
                  </DefinedValue>
                  <DefinedValue>
                    <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[Flights2008]" Alias="[f]" Column="arrdelay" />
                    <ScalarOperator ScalarString="[OOTP].[dbo].[Flights2008].[arrdelay] as [f].[arrdelay]">
                      <Identifier>
                        <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[Flights2008]" Alias="[f]" Column="arrdelay" />
                      </Identifier>
                    </ScalarOperator>
                  </DefinedValue>
                </DefinedValues>
                <RelOp AvgRowSize="65" EstimateCPU="0.032413" EstimateIO="0" EstimateRebinds="0" EstimateRewinds="0" EstimatedExecutionMode="Row" EstimateRows="67.2391" LogicalOp="Remote Query" NodeId="1" Parallel="false" PhysicalOp="Remote Query" EstimatedTotalSubtreeCost="0.032413">
                  <OutputList>
                    <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[Flights2008]" Alias="[f]" Column="month" />
                    <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[Flights2008]" Alias="[f]" Column="arrdelay" />
                  </OutputList>
                  <RunTimeInformation>
                    <RunTimeCountersPerThread Thread="0" ActualRebinds="1" ActualRewinds="0" ActualRows="1000" Batches="0" ActualEndOfScans="1" ActualExecutions="1" ActualExecutionMode="Row" ActualElapsedms="265523" ActualCPUms="80822" ActualScans="0" ActualLogicalReads="0" ActualPhysicalReads="0" ActualReadAheads="0" ActualLobLogicalReads="0" ActualLobPhysicalReads="0" ActualLobReadAheads="0" />
                  </RunTimeInformation>
                  <RemoteQuery RemoteSource="Polybase_ExternalComputation" RemoteQuery="&lt;?xml version=&quot;1.0&quot; encoding=&quot;utf-8&quot;?&gt;
&lt;dsql_query number_nodes=&quot;1&quot; number_distributions=&quot;8&quot; number_distributions_per_node=&quot;8&quot;&gt;
 &lt;sql&gt;ExecuteMemo explain query&lt;/sql&gt;
 &lt;dsql_operations total_cost=&quot;1&quot; total_number_operations=&quot;10&quot;&gt;
 &lt;dsql_operation operation_type=&quot;RND_ID&quot;&gt;
 &lt;identifier&gt;TEMP_ID_23&lt;/identifier&gt;
 &lt;/dsql_operation&gt;
 &lt;dsql_operation operation_type=&quot;ON&quot;&gt;
 &lt;location permanent=&quot;false&quot; distribution=&quot;AllDistributions&quot; /&gt;
 &lt;sql_operations&gt;
 &lt;sql_operation type=&quot;statement&quot;&gt;CREATE TABLE [tempdb].[dbo].[TEMP_ID_23] ([month] INT, [dayofmonth] INT, [arrdelay] VARCHAR(100) COLLATE SQL_Latin1_General_CP1_CI_AS, [dest] VARCHAR(100) COLLATE SQL_Latin1_General_CP1_CI_AS ) WITH(DATA_COMPRESSION=PAGE);&lt;/sql_operation&gt;
 &lt;/sql_operations&gt;
 &lt;/dsql_operation&gt;
 &lt;dsql_operation operation_type=&quot;ON&quot;&gt;
 &lt;location permanent=&quot;false&quot; distribution=&quot;AllDistributions&quot; /&gt;
 &lt;sql_operations&gt;
 &lt;sql_operation type=&quot;statement&quot;&gt;EXEC [tempdb].[sys].[sp_addextendedproperty] @name=N'IS_EXTERNAL_STREAMING_TABLE', @value=N'true', @level0type=N'SCHEMA', @level0name=N'dbo', @level1type=N'TABLE', @level1name=N'TEMP_ID_23'&lt;/sql_operation&gt;
 &lt;/sql_operations&gt;
 &lt;/dsql_operation&gt;
 &lt;dsql_operation operation_type=&quot;ON&quot;&gt;
 &lt;location permanent=&quot;false&quot; distribution=&quot;AllDistributions&quot; /&gt;
 &lt;sql_operations&gt;
 &lt;sql_operation type=&quot;statement&quot;&gt;UPDATE STATISTICS [tempdb].[dbo].[TEMP_ID_23] WITH ROWCOUNT = 5651, PAGECOUNT = 143&lt;/sql_operation&gt;
 &lt;/sql_operations&gt;
 &lt;/dsql_operation&gt;
 &lt;dsql_operation operation_type=&quot;RND_ID&quot;&gt;
 &lt;identifier&gt;TEMP_ID_24&lt;/identifier&gt;
 &lt;/dsql_operation&gt;
 &lt;dsql_operation operation_type=&quot;ON&quot;&gt;
 &lt;location permanent=&quot;false&quot; distribution=&quot;Control&quot; /&gt;
 &lt;sql_operations&gt;
 &lt;sql_operation type=&quot;statement&quot;&gt;CREATE TABLE [tempdb].[dbo].[TEMP_ID_24] ([month] INT, [arrdelay] VARCHAR(100) COLLATE SQL_Latin1_General_CP1_CI_AS ) WITH(DATA_COMPRESSION=PAGE);&lt;/sql_operation&gt;
 &lt;/sql_operations&gt;
 &lt;/dsql_operation&gt;
 &lt;dsql_operation operation_type=&quot;MULTI&quot;&gt;
 &lt;dsql_operation operation_type=&quot;PARTITION_MOVE&quot;&gt;
 &lt;operation_cost cost=&quot;1.678287936&quot; accumulative_cost=&quot;1.678287936&quot; average_rowsize=&quot;104&quot; output_rows=&quot;67.2391&quot; /&gt;
 &lt;location distribution=&quot;AllDistributions&quot; /&gt;
 &lt;source_statement&gt;SELECT [T1_1].[month] AS [month],
 [T1_1].[arrdelay] AS [arrdelay]
FROM (SELECT TOP (CAST ((1000) AS BIGINT)) [T2_1].[month] AS [month],
 [T2_1].[arrdelay] AS [arrdelay]
 FROM (SELECT CONVERT (NVARCHAR (100), [T3_1].[dest], 0) COLLATE SQL_Latin1_General_CP1_CI_AS AS [col],
 [T3_1].[month] AS [month],
 [T3_1].[arrdelay] AS [arrdelay]
 FROM [tempdb].[dbo].[TEMP_ID_23] AS T3_1
 WHERE ([T3_1].[dayofmonth] = CAST ((14) AS INT))) AS T2_1
 WHERE ([T2_1].[col] = CAST (N'CMH' COLLATE SQL_Latin1_General_CP1_CI_AS AS NVARCHAR (3)) COLLATE SQL_Latin1_General_CP1_CI_AS)) AS T1_1&lt;/source_statement&gt;
 &lt;destination&gt;Control&lt;/destination&gt;
 &lt;destination_table&gt;[TEMP_ID_24]&lt;/destination_table&gt;
 &lt;/dsql_operation&gt;
 &lt;dsql_operation operation_type=&quot;ExternalRoundRobinMove&quot;&gt;
 &lt;operation_cost cost=&quot;964.621896&quot; accumulative_cost=&quot;966.300183936&quot; average_rowsize=&quot;208&quot; output_rows=&quot;45211&quot; /&gt;
 &lt;external_uri&gt;wasbs://csflights@cspolybase.blob.core.windows.net/historical/2008.csv.bz2&lt;/external_uri&gt;
 &lt;destination_table&gt;[TEMP_ID_23]&lt;/destination_table&gt;
 &lt;/dsql_operation&gt;
 &lt;/dsql_operation&gt;
 &lt;dsql_operation operation_type=&quot;ON&quot;&gt;
 &lt;location permanent=&quot;false&quot; distribution=&quot;AllDistributions&quot; /&gt;
 &lt;sql_operations&gt;
 &lt;sql_operation type=&quot;statement&quot;&gt;DROP TABLE [tempdb].[dbo].[TEMP_ID_23]&lt;/sql_operation&gt;
 &lt;/sql_operations&gt;
 &lt;/dsql_operation&gt;
 &lt;dsql_operation operation_type=&quot;STREAMING_RETURN&quot;&gt;
 &lt;operation_cost cost=&quot;1&quot; accumulative_cost=&quot;967.300183936&quot; average_rowsize=&quot;104&quot; output_rows=&quot;67.2391&quot; /&gt;
 &lt;location distribution=&quot;Control&quot; /&gt;
 &lt;select&gt;SELECT [T1_1].[month] AS [month],
 [T1_1].[arrdelay] AS [arrdelay]
FROM (SELECT TOP (CAST ((1000) AS BIGINT)) [T2_1].[month] AS [month],
 [T2_1].[arrdelay] AS [arrdelay]
 FROM [tempdb].[dbo].[TEMP_ID_24] AS T2_1) AS T1_1&lt;/select&gt;
 &lt;/dsql_operation&gt;
 &lt;dsql_operation operation_type=&quot;ON&quot;&gt;
 &lt;location permanent=&quot;false&quot; distribution=&quot;Control&quot; /&gt;
 &lt;sql_operations&gt;
 &lt;sql_operation type=&quot;statement&quot;&gt;DROP TABLE [tempdb].[dbo].[TEMP_ID_24]&lt;/sql_operation&gt;
 &lt;/sql_operations&gt;
 &lt;/dsql_operation&gt;
 &lt;/dsql_operations&gt;
&lt;/dsql_query&gt;" />
                </RelOp>
              </ComputeScalar>
            </RelOp>
          </QueryPlan>
        </StmtSimple>
      </Statements>
    </Batch>
  </BatchSequence>
</ShowPlanXML>

And here is the decoded remote query:

<?xml version="1.0" encoding="utf-8"?>
<dsql_query number_nodes="1" number_distributions="8" number_distributions_per_node="8">
  <sql>ExecuteMemo explain query</sql>
  <dsql_operations total_cost="1" total_number_operations="10">
    <dsql_operation operation_type="RND_ID">
      <identifier>TEMP_ID_23</identifier>
    </dsql_operation>
    <dsql_operation operation_type="ON">
      <location permanent="false" distribution="AllDistributions" />
      <sql_operations>
        <sql_operation type="statement">CREATE TABLE [tempdb].[dbo].[TEMP_ID_23] ([month] INT, [dayofmonth] INT, [arrdelay] VARCHAR(100) COLLATE SQL_Latin1_General_CP1_CI_AS, [dest] VARCHAR(100) COLLATE SQL_Latin1_General_CP1_CI_AS ) WITH(DATA_COMPRESSION=PAGE);</sql_operation>
      </sql_operations>
    </dsql_operation>
    <dsql_operation operation_type="ON">
      <location permanent="false" distribution="AllDistributions" />
      <sql_operations>
        <sql_operation type="statement">EXEC [tempdb].[sys].[sp_addextendedproperty] @name=N'IS_EXTERNAL_STREAMING_TABLE', @value=N'true', @level0type=N'SCHEMA', @level0name=N'dbo', @level1type=N'TABLE', @level1name=N'TEMP_ID_23'</sql_operation>
      </sql_operations>
    </dsql_operation>
    <dsql_operation operation_type="ON">
      <location permanent="false" distribution="AllDistributions" />
      <sql_operations>
        <sql_operation type="statement">UPDATE STATISTICS [tempdb].[dbo].[TEMP_ID_23] WITH ROWCOUNT = 5651, PAGECOUNT = 143</sql_operation>
      </sql_operations>
    </dsql_operation>
    <dsql_operation operation_type="RND_ID">
      <identifier>TEMP_ID_24</identifier>
    </dsql_operation>
    <dsql_operation operation_type="ON">
      <location permanent="false" distribution="Control" />
      <sql_operations>
        <sql_operation type="statement">CREATE TABLE [tempdb].[dbo].[TEMP_ID_24] ([month] INT, [arrdelay] VARCHAR(100) COLLATE SQL_Latin1_General_CP1_CI_AS ) WITH(DATA_COMPRESSION=PAGE);</sql_operation>
      </sql_operations>
    </dsql_operation>
    <dsql_operation operation_type="MULTI">
      <dsql_operation operation_type="PARTITION_MOVE">
        <operation_cost cost="1.678287936" accumulative_cost="1.678287936" average_rowsize="104" output_rows="67.2391" />
        <location distribution="AllDistributions" />
        <source_statement>SELECT [T1_1].[month] AS [month],
       [T1_1].[arrdelay] AS [arrdelay]
FROM   (SELECT TOP (CAST ((1000) AS BIGINT)) [T2_1].[month] AS [month],
                                             [T2_1].[arrdelay] AS [arrdelay]
        FROM   (SELECT CONVERT (NVARCHAR (100), [T3_1].[dest], 0) COLLATE SQL_Latin1_General_CP1_CI_AS AS [col],
                       [T3_1].[month] AS [month],
                       [T3_1].[arrdelay] AS [arrdelay]
                FROM   [tempdb].[dbo].[TEMP_ID_23] AS T3_1
                WHERE  ([T3_1].[dayofmonth] = CAST ((14) AS INT))) AS T2_1
        WHERE  ([T2_1].[col] = CAST (N'CMH' COLLATE SQL_Latin1_General_CP1_CI_AS AS NVARCHAR (3)) COLLATE SQL_Latin1_General_CP1_CI_AS)) AS T1_1</source_statement>
        <destination>Control</destination>
        <destination_table>[TEMP_ID_24]</destination_table>
      </dsql_operation>
      <dsql_operation operation_type="ExternalRoundRobinMove">
        <operation_cost cost="964.621896" accumulative_cost="966.300183936" average_rowsize="208" output_rows="45211" />
        <external_uri>wasbs://csflights@cspolybase.blob.core.windows.net/historical/2008.csv.bz2</external_uri>
        <destination_table>[TEMP_ID_23]</destination_table>
      </dsql_operation>
    </dsql_operation>
    <dsql_operation operation_type="ON">
      <location permanent="false" distribution="AllDistributions" />
      <sql_operations>
        <sql_operation type="statement">DROP TABLE [tempdb].[dbo].[TEMP_ID_23]</sql_operation>
      </sql_operations>
    </dsql_operation>
    <dsql_operation operation_type="STREAMING_RETURN">
      <operation_cost cost="1" accumulative_cost="967.300183936" average_rowsize="104" output_rows="67.2391" />
      <location distribution="Control" />
      <select>SELECT [T1_1].[month] AS [month],
       [T1_1].[arrdelay] AS [arrdelay]
FROM   (SELECT TOP (CAST ((1000) AS BIGINT)) [T2_1].[month] AS [month],
                                             [T2_1].[arrdelay] AS [arrdelay]
        FROM   [tempdb].[dbo].[TEMP_ID_24] AS T2_1) AS T1_1</select>
    </dsql_operation>
    <dsql_operation operation_type="ON">
      <location permanent="false" distribution="Control" />
      <sql_operations>
        <sql_operation type="statement">DROP TABLE [tempdb].[dbo].[TEMP_ID_24]</sql_operation>
      </sql_operations>
    </dsql_operation>
  </dsql_operations>
</dsql_query>

A couple of interesting details.  First, this plan looks pretty similar to the Hadoop plan when I did not induce a MapReduce job.  We can see 8 distributions per node, just like in on-prem Hadoop.  Second, as expected, our external URI points to the WASBS (secure WASB) blob location.  Given that I’m using the WASBS protocol, I’m expecting the connection to pull this data down to use SSL/TLS.  And there’s one easy way to find out.

Wireshark Details

While running this query, I also pulled a packet capture.  After figuring out the IP address (which wasn’t hard, given that there were about 93,000 packets in my packet capture associated with it), I was able to get the following details.

Overall Picture

packetcapture

We can see in the image above that Polybase uses TLS v 1.2 to communicate with Azure Blob Storage when you access Blob Storage using the WASBS protocol.  The connection is encrypted, and so we don’t get much useful information out of it unless you set up Wireshark to deal with encrypted data.  For what we want to do, that’s not neccessary.

The Download

What I do want to see is whether my statement that the Polybase engine needs to download the entire file is correct.  To do that, I can filter on the TCP stream associated with my file download and look for any with a length more than 1000 bytes.

downloadsize

In this case, all of those packets were 1514 bytes, so it’s an easy multiplication problem to see that we downloaded approximately 113 MB.  The 2008.csv.bz2 file itself is 108 MB, so factoring in TCP packet overhead and that there were additional, smaller packets in the stream, I think that’s enough to show that we did in fact download the entire file.  Just like in the Hadoop scenario without MapReduce, the Polybase engine needs to take all of the data and load it into a temp table (or set of temp tables if you’re using a Polybase scale-out cluster) before it can pull out the relevant rows based on our query.

Conclusion

Looking at the execution plan and packet capture, it’s pretty clear that using Polybase with Azure Blob Storage behaves similarly to Polybase with Hadoop as long as you don’t use MapReduce.

Advertisements

2 thoughts on “Understanding Blob Storage Behavior

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s