Reading Polybase Execution Plan Details

A few days ago, I made mention of how the Remote Query in a Polybase execution plan contained interesting details.

A preview of interesting details

A preview of interesting details

Today we’re going to look at what we can find in the execution plan XML.  We’ll start with a basic (i.e., non-MapReduce) query and follow up with a MapReduce query and see how they change.  As a note, this is for a single-node Polybase cluster pointing to a single-node Hadoop cluster.  As I move to multi-node queries later in this series, it’ll be interesting to see how this changes, if it does at all.

Basic Queries

My basic query will be incredibly simple:  a select statement against the SecondBasemen table joined to the TopSalaryByAge table.

SELECT TOP(50)
    sb.FirstName,
    sb.LastName,
    sb.Age,
    sb.Throws,
    sb.Bats,
    tsa.TopSalary
FROM dbo.SecondBasemen sb
    INNER JOIN dbo.TopSalaryByAge tsa
        ON sb.Age = tsa.Age
ORDER BY
    sb.LastName DESC;

The execution plan XML is as follows:

<?xml version="1.0" encoding="utf-16"?>
<ShowPlanXML xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema" Version="1.5" Build="13.0.1722.0" xmlns="http://schemas.microsoft.com/sqlserver/2004/07/showplan">
  <BatchSequence>
    <Batch>
      <Statements>
        <StmtSimple StatementCompId="1" StatementEstRows="50" StatementId="1" StatementOptmLevel="FULL" StatementOptmEarlyAbortReason="GoodEnoughPlanFound" CardinalityEstimationModelVersion="130" StatementSubTreeCost="0.100037" StatementText="SELECT TOP(50)
 sb.FirstName,
 sb.LastName,
 sb.Age,
 sb.Throws,
 sb.Bats,
 tsa.TopSalary
FROM dbo.SecondBasemen sb
 INNER JOIN dbo.TopSalaryByAge tsa
 ON sb.Age = tsa.Age
ORDER BY
 sb.LastName DESC" StatementType="SELECT" QueryHash="0xED18D81F92FC4F53" QueryPlanHash="0x77D2563391A21679" RetrievedFromCache="true" SecurityPolicyApplied="false">
          <StatementSetOptions ANSI_NULLS="true" ANSI_PADDING="true" ANSI_WARNINGS="true" ARITHABORT="true" CONCAT_NULL_YIELDS_NULL="true" NUMERIC_ROUNDABORT="false" QUOTED_IDENTIFIER="true" />
          <QueryPlan DegreeOfParallelism="0" NonParallelPlanReason="NoParallelForPDWCompilation" MemoryGrant="1024" CachedPlanSize="120" CompileTime="86" CompileCPU="83" CompileMemory="216">
            <MemoryGrantInfo SerialRequiredMemory="512" SerialDesiredMemory="560" RequiredMemory="512" DesiredMemory="560" RequestedMemory="1024" GrantWaitTime="0" GrantedMemory="1024" MaxUsedMemory="72" />
            <OptimizerHardwareDependentProperties EstimatedAvailableMemoryGrant="417021" EstimatedPagesCached="104255" EstimatedAvailableDegreeOfParallelism="2" />
            <RelOp AvgRowSize="83" EstimateCPU="5E-06" EstimateIO="0" EstimateRebinds="0" EstimateRewinds="0" EstimatedExecutionMode="Row" EstimateRows="50" LogicalOp="Top" NodeId="0" Parallel="false" PhysicalOp="Top" EstimatedTotalSubtreeCost="0.100037">
              <OutputList>
                <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="FirstName" />
                <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="LastName" />
                <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="Age" />
                <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="Throws" />
                <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="Bats" />
                <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[TopSalaryByAge]" Alias="[tsa]" Column="TopSalary" />
              </OutputList>
              <RunTimeInformation>
                <RunTimeCountersPerThread Thread="0" ActualRows="50" Batches="0" ActualEndOfScans="1" ActualExecutions="1" ActualExecutionMode="Row" ActualElapsedms="6334" ActualCPUms="3083" />
              </RunTimeInformation>
              <Top RowCount="false" IsPercent="false" WithTies="false">
                <TopExpression>
                  <ScalarOperator ScalarString="(50)">
                    <Const ConstValue="(50)" />
                  </ScalarOperator>
                </TopExpression>
                <RelOp AvgRowSize="83" EstimateCPU="0.0008151" EstimateIO="0" EstimateRebinds="0" EstimateRewinds="0" EstimatedExecutionMode="Row" EstimateRows="50" LogicalOp="Inner Join" NodeId="1" Parallel="false" PhysicalOp="Nested Loops" EstimatedTotalSubtreeCost="0.100032">
                  <OutputList>
                    <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="FirstName" />
                    <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="LastName" />
                    <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="Age" />
                    <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="Throws" />
                    <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="Bats" />
                    <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[TopSalaryByAge]" Alias="[tsa]" Column="TopSalary" />
                  </OutputList>
                  <RunTimeInformation>
                    <RunTimeCountersPerThread Thread="0" ActualRows="50" Batches="0" ActualEndOfScans="0" ActualExecutions="1" ActualExecutionMode="Row" ActualElapsedms="6334" ActualCPUms="3083" />
                  </RunTimeInformation>
                  <NestedLoops Optimized="false">
                    <OuterReferences>
                      <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="Age" />
                    </OuterReferences>
                    <RelOp AvgRowSize="75" EstimateCPU="0.00241422" EstimateIO="0.0112613" EstimateRebinds="0" EstimateRewinds="0" EstimatedExecutionMode="Row" EstimateRows="50" LogicalOp="Sort" NodeId="2" Parallel="false" PhysicalOp="Sort" EstimatedTotalSubtreeCost="0.0886755">
                      <OutputList>
                        <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="FirstName" />
                        <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="LastName" />
                        <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="Age" />
                        <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="Throws" />
                        <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="Bats" />
                      </OutputList>
                      <MemoryFractions Input="1" Output="1" />
                      <RunTimeInformation>
                        <RunTimeCountersPerThread Thread="0" ActualRebinds="1" ActualRewinds="0" ActualRows="50" Batches="0" ActualEndOfScans="0" ActualExecutions="1" ActualExecutionMode="Row" ActualElapsedms="6333" ActualCPUms="3083" ActualScans="0" ActualLogicalReads="0" ActualPhysicalReads="0" ActualReadAheads="0" ActualLobLogicalReads="0" ActualLobPhysicalReads="0" ActualLobReadAheads="0" InputMemoryGrant="1024" OutputMemoryGrant="640" UsedMemoryGrant="72" />
                      </RunTimeInformation>
                      <Sort Distinct="false">
                        <OrderBy>
                          <OrderByColumn Ascending="false">
                            <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="LastName" />
                          </OrderByColumn>
                        </OrderBy>
                        <RelOp AvgRowSize="75" EstimateCPU="0.075" EstimateIO="0" EstimateRebinds="0" EstimateRewinds="0" EstimatedExecutionMode="Row" EstimateRows="195" LogicalOp="Compute Scalar" NodeId="3" Parallel="false" PhysicalOp="Compute Scalar" EstimatedTotalSubtreeCost="0.075">
                          <OutputList>
                            <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="FirstName" />
                            <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="LastName" />
                            <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="Age" />
                            <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="Throws" />
                            <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="Bats" />
                          </OutputList>
                          <ComputeScalar>
                            <DefinedValues>
                              <DefinedValue>
                                <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="FirstName" />
                                <ScalarOperator ScalarString="[OOTP].[dbo].[SecondBasemen].[FirstName] as [sb].[FirstName]">
                                  <Identifier>
                                    <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="FirstName" />
                                  </Identifier>
                                </ScalarOperator>
                              </DefinedValue>
                              <DefinedValue>
                                <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="LastName" />
                                <ScalarOperator ScalarString="[OOTP].[dbo].[SecondBasemen].[LastName] as [sb].[LastName]">
                                  <Identifier>
                                    <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="LastName" />
                                  </Identifier>
                                </ScalarOperator>
                              </DefinedValue>
                              <DefinedValue>
                                <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="Age" />
                                <ScalarOperator ScalarString="[OOTP].[dbo].[SecondBasemen].[Age] as [sb].[Age]">
                                  <Identifier>
                                    <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="Age" />
                                  </Identifier>
                                </ScalarOperator>
                              </DefinedValue>
                              <DefinedValue>
                                <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="Throws" />
                                <ScalarOperator ScalarString="[OOTP].[dbo].[SecondBasemen].[Throws] as [sb].[Throws]">
                                  <Identifier>
                                    <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="Throws" />
                                  </Identifier>
                                </ScalarOperator>
                              </DefinedValue>
                              <DefinedValue>
                                <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="Bats" />
                                <ScalarOperator ScalarString="[OOTP].[dbo].[SecondBasemen].[Bats] as [sb].[Bats]">
                                  <Identifier>
                                    <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="Bats" />
                                  </Identifier>
                                </ScalarOperator>
                              </DefinedValue>
                            </DefinedValues>
                            <RelOp AvgRowSize="75" EstimateCPU="0.075" EstimateIO="0" EstimateRebinds="0" EstimateRewinds="0" EstimatedExecutionMode="Row" EstimateRows="195" LogicalOp="Remote Query" NodeId="4" Parallel="false" PhysicalOp="Remote Query" EstimatedTotalSubtreeCost="0.075">
                              <OutputList>
                                <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="FirstName" />
                                <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="LastName" />
                                <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="Age" />
                                <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="Throws" />
                                <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="Bats" />
                              </OutputList>
                              <RunTimeInformation>
                                <RunTimeCountersPerThread Thread="0" ActualRebinds="1" ActualRewinds="0" ActualRows="777" Batches="0" ActualEndOfScans="1" ActualExecutions="1" ActualExecutionMode="Row" ActualElapsedms="6332" ActualCPUms="3082" ActualScans="0" ActualLogicalReads="0" ActualPhysicalReads="0" ActualReadAheads="0" ActualLobLogicalReads="0" ActualLobPhysicalReads="0" ActualLobReadAheads="0" />
                              </RunTimeInformation>
                              <RemoteQuery RemoteSource="Polybase_ExternalComputation" RemoteQuery="&lt;?xml version=&quot;1.0&quot; encoding=&quot;utf-8&quot;?&gt;
&lt;dsql_query number_nodes=&quot;1&quot; number_distributions=&quot;8&quot; number_distributions_per_node=&quot;8&quot;&gt;
 &lt;sql&gt;ExecuteMemo explain query&lt;/sql&gt;
 &lt;dsql_operations total_cost=&quot;0&quot; total_number_operations=&quot;6&quot;&gt;
 &lt;dsql_operation operation_type=&quot;RND_ID&quot;&gt;
 &lt;identifier&gt;TEMP_ID_40&lt;/identifier&gt;
 &lt;/dsql_operation&gt;
 &lt;dsql_operation operation_type=&quot;ON&quot;&gt;
 &lt;location permanent=&quot;false&quot; distribution=&quot;AllDistributions&quot; /&gt;
 &lt;sql_operations&gt;
 &lt;sql_operation type=&quot;statement&quot;&gt;CREATE TABLE [tempdb].[dbo].[TEMP_ID_40] ([FirstName] VARCHAR(50) COLLATE SQL_Latin1_General_CP1_CI_AS, [LastName] VARCHAR(50) COLLATE SQL_Latin1_General_CP1_CI_AS, [Age] INT, [Throws] VARCHAR(5) COLLATE SQL_Latin1_General_CP1_CI_AS, [Bats] VARCHAR(5) COLLATE SQL_Latin1_General_CP1_CI_AS ) WITH(DATA_COMPRESSION=PAGE);&lt;/sql_operation&gt;
 &lt;/sql_operations&gt;
 &lt;/dsql_operation&gt;
 &lt;dsql_operation operation_type=&quot;ON&quot;&gt;
 &lt;location permanent=&quot;false&quot; distribution=&quot;AllDistributions&quot; /&gt;
 &lt;sql_operations&gt;
 &lt;sql_operation type=&quot;statement&quot;&gt;EXEC [tempdb].[sys].[sp_addextendedproperty] @name=N'IS_EXTERNAL_STREAMING_TABLE', @value=N'true', @level0type=N'SCHEMA', @level0name=N'dbo', @level1type=N'TABLE', @level1name=N'TEMP_ID_40'&lt;/sql_operation&gt;
 &lt;/sql_operations&gt;
 &lt;/dsql_operation&gt;
 &lt;dsql_operation operation_type=&quot;ON&quot;&gt;
 &lt;location permanent=&quot;false&quot; distribution=&quot;AllDistributions&quot; /&gt;
 &lt;sql_operations&gt;
 &lt;sql_operation type=&quot;statement&quot;&gt;UPDATE STATISTICS [tempdb].[dbo].[TEMP_ID_40] WITH ROWCOUNT = 24, PAGECOUNT = 1&lt;/sql_operation&gt;
 &lt;/sql_operations&gt;
 &lt;/dsql_operation&gt;
 &lt;dsql_operation operation_type=&quot;MULTI&quot;&gt;
 &lt;dsql_operation operation_type=&quot;STREAMING_RETURN&quot;&gt;
 &lt;operation_cost cost=&quot;1&quot; accumulative_cost=&quot;1&quot; average_rowsize=&quot;114&quot; output_rows=&quot;195&quot; /&gt;
 &lt;location distribution=&quot;AllDistributions&quot; /&gt;
 &lt;select&gt;SELECT [T1_1].[FirstName] AS [FirstName],
 [T1_1].[LastName] AS [LastName],
 [T1_1].[Age] AS [Age],
 [T1_1].[Throws] AS [Throws],
 [T1_1].[Bats] AS [Bats]
FROM [tempdb].[dbo].[TEMP_ID_40] AS T1_1&lt;/select&gt;
 &lt;/dsql_operation&gt;
 &lt;dsql_operation operation_type=&quot;ExternalRoundRobinMove&quot;&gt;
 &lt;operation_cost cost=&quot;0.80028&quot; accumulative_cost=&quot;1.80028&quot; average_rowsize=&quot;114&quot; output_rows=&quot;195&quot; /&gt;
 &lt;external_uri&gt;hdfs://sandbox.hortonworks.com:8020/tmp/ootp/secondbasemen.csv&lt;/external_uri&gt;
 &lt;destination_table&gt;[TEMP_ID_40]&lt;/destination_table&gt;
 &lt;/dsql_operation&gt;
 &lt;/dsql_operation&gt;
 &lt;dsql_operation operation_type=&quot;ON&quot;&gt;
 &lt;location permanent=&quot;false&quot; distribution=&quot;AllDistributions&quot; /&gt;
 &lt;sql_operations&gt;
 &lt;sql_operation type=&quot;statement&quot;&gt;DROP TABLE [tempdb].[dbo].[TEMP_ID_40]&lt;/sql_operation&gt;
 &lt;/sql_operations&gt;
 &lt;/dsql_operation&gt;
 &lt;/dsql_operations&gt;
&lt;/dsql_query&gt;" />
                            </RelOp>
                          </ComputeScalar>
                        </RelOp>
                      </Sort>
                    </RelOp>
                    <RelOp AvgRowSize="15" EstimateCPU="0.0001581" EstimateIO="0.003125" EstimateRebinds="46.6667" EstimateRewinds="3.33333" EstimatedExecutionMode="Row" EstimateRows="1" LogicalOp="Clustered Index Seek" NodeId="16" Parallel="false" PhysicalOp="Clustered Index Seek" EstimatedTotalSubtreeCost="0.0111881" TableCardinality="22">
                      <OutputList>
                        <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[TopSalaryByAge]" Alias="[tsa]" Column="TopSalary" />
                      </OutputList>
                      <RunTimeInformation>
                        <RunTimeCountersPerThread Thread="0" ActualRows="50" ActualRowsRead="50" Batches="0" ActualEndOfScans="0" ActualExecutions="50" ActualExecutionMode="Row" ActualElapsedms="0" ActualCPUms="0" ActualScans="0" ActualLogicalReads="100" ActualPhysicalReads="1" ActualReadAheads="0" ActualLobLogicalReads="0" ActualLobPhysicalReads="0" ActualLobReadAheads="0" />
                      </RunTimeInformation>
                      <IndexScan Ordered="true" ScanDirection="FORWARD" ForcedIndex="false" ForceSeek="false" ForceScan="false" NoExpandHint="false" Storage="RowStore">
                        <DefinedValues>
                          <DefinedValue>
                            <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[TopSalaryByAge]" Alias="[tsa]" Column="TopSalary" />
                          </DefinedValue>
                        </DefinedValues>
                        <Object Database="[OOTP]" Schema="[dbo]" Table="[TopSalaryByAge]" Index="[PK_TopSalaryByAge]" Alias="[tsa]" IndexKind="Clustered" Storage="RowStore" />
                        <SeekPredicates>
                          <SeekPredicateNew>
                            <SeekKeys>
                              <Prefix ScanType="EQ">
                                <RangeColumns>
                                  <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[TopSalaryByAge]" Alias="[tsa]" Column="Age" />
                                </RangeColumns>
                                <RangeExpressions>
                                  <ScalarOperator ScalarString="[OOTP].[dbo].[SecondBasemen].[Age] as [sb].[Age]">
                                    <Identifier>
                                      <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="Age" />
                                    </Identifier>
                                  </ScalarOperator>
                                </RangeExpressions>
                              </Prefix>
                            </SeekKeys>
                          </SeekPredicateNew>
                        </SeekPredicates>
                      </IndexScan>
                    </RelOp>
                  </NestedLoops>
                </RelOp>
              </Top>
            </RelOp>
          </QueryPlan>
        </StmtSimple>
      </Statements>
    </Batch>
  </BatchSequence>
</ShowPlanXML>

Even for a simple query, I’m not going to expect you to read 174 lines of XML; I’m not a sadist, after all…

What follows is a look at significant lines and my commentary.

Line 8:
<QueryPlan DegreeOfParallelism=”0″ NonParallelPlanReason=”NoParallelForPDWCompilation” MemoryGrant=”1024″ CachedPlanSize=”120″ CompileTime=”86″ CompileCPU=”83″ CompileMemory=”216″>

For Polybase plans, our database engine operations cannot go parallel.  There’s a joke in there somewhere.

Lines 71-126 describe the Compute Scalar and Remote Query together.

computescalar

Inside the Compute Scalar XML, we have the remote query.

The Remote Query

Let’s take that encoded XML in the remote query and decode it so that we can see the innards for ourselves:

<?xml version="1.0" encoding="utf-8"?>
<dsql_query number_nodes="1" number_distributions="8" number_distributions_per_node="8">
  <sql>ExecuteMemo explain query</sql>
  <dsql_operations total_cost="0" total_number_operations="6">
    <dsql_operation operation_type="RND_ID">
      <identifier>TEMP_ID_40</identifier>
    </dsql_operation>
    <dsql_operation operation_type="ON">
      <location permanent="false" distribution="AllDistributions" />
      <sql_operations>
        <sql_operation type="statement">CREATE TABLE [tempdb].[dbo].[TEMP_ID_40] ([FirstName] VARCHAR(50) COLLATE SQL_Latin1_General_CP1_CI_AS, [LastName] VARCHAR(50) COLLATE SQL_Latin1_General_CP1_CI_AS, [Age] INT, [Throws] VARCHAR(5) COLLATE SQL_Latin1_General_CP1_CI_AS, [Bats] VARCHAR(5) COLLATE SQL_Latin1_General_CP1_CI_AS ) WITH(DATA_COMPRESSION=PAGE);</sql_operation>
      </sql_operations>
    </dsql_operation>
    <dsql_operation operation_type="ON">
      <location permanent="false" distribution="AllDistributions" />
      <sql_operations>
        <sql_operation type="statement">EXEC [tempdb].[sys].[sp_addextendedproperty] @name=N'IS_EXTERNAL_STREAMING_TABLE', @value=N'true', @level0type=N'SCHEMA', @level0name=N'dbo', @level1type=N'TABLE', @level1name=N'TEMP_ID_40'</sql_operation>
      </sql_operations>
    </dsql_operation>
    <dsql_operation operation_type="ON">
      <location permanent="false" distribution="AllDistributions" />
      <sql_operations>
        <sql_operation type="statement">UPDATE STATISTICS [tempdb].[dbo].[TEMP_ID_40] WITH ROWCOUNT = 24, PAGECOUNT = 1</sql_operation>
      </sql_operations>
    </dsql_operation>
    <dsql_operation operation_type="MULTI">
      <dsql_operation operation_type="STREAMING_RETURN">
        <operation_cost cost="1" accumulative_cost="1" average_rowsize="114" output_rows="195" />
        <location distribution="AllDistributions" />
        <select>SELECT [T1_1].[FirstName] AS [FirstName],
       [T1_1].[LastName] AS [LastName],
       [T1_1].[Age] AS [Age],
       [T1_1].[Throws] AS [Throws],
       [T1_1].[Bats] AS [Bats]
FROM   [tempdb].[dbo].[TEMP_ID_40] AS T1_1</select>
      </dsql_operation>
      <dsql_operation operation_type="ExternalRoundRobinMove">
        <operation_cost cost="0.80028" accumulative_cost="1.80028" average_rowsize="114" output_rows="195" />
        <external_uri>hdfs://sandbox.hortonworks.com:8020/tmp/ootp/secondbasemen.csv</external_uri>
        <destination_table>[TEMP_ID_40]</destination_table>
      </dsql_operation>
    </dsql_operation>
    <dsql_operation operation_type="ON">
      <location permanent="false" distribution="AllDistributions" />
      <sql_operations>
        <sql_operation type="statement">DROP TABLE [tempdb].[dbo].[TEMP_ID_40]</sql_operation>
      </sql_operations>
    </dsql_operation>
  </dsql_operations>
</dsql_query>

Yay, more XML!  Again, I’m no sadist, so let’s focus on the interesting bits inside this block.

Line 2:
<dsql_query number_nodes=”1″ number_distributions=”8″ number_distributions_per_node=”8″>

We have one Polybase node in our scaleout cluster.  Azure SQL Data Warehouse has a similar output; this Grant Fritchey post shows an example with number_distributions=”60″ instead of 8.  For more information on how this number_distributions matters, check out my post from June 27th on the topic.

Line 4:
<dsql_operations total_cost=”0″ total_number_operations=”6″>

There are going to be six operations in this query plan.  They start on lines 5, 8, 14, 20, 26, and 43, respectively.

Line 5:
<dsql_operation operation_type=”RND_ID”><identifier>TEMP_ID_40</identifier></dsql_operation>

This looks like the operation which generates the name for the temp table.

Line 8 looks to be the operation which generates the create statement for the temp table.  Line 14 then builds extended properties.  Line 20 creates external stats.  Line 26 creates the SELECT statement to retrieve data from the temp table.  Line 37 performs the Round Robin retrieval from Hadoop into TMP_ID_40.  Line 43 drops the table.  Hey, wait, this looks familiar!

nonmapreduceworkresults

Yep, these are the same details you get out of sys.dm_exec_distributed_request_steps, though I’m reusing the image here so the table IDs won’t be quite the same.  Look carefully at the image and you’ll see eight steps, but the explain plan XML above only shows six items.  It appears that step index 4 does not show up in the explain plan.  Step 6 in the request steps DMV selects data from TEMP_ID_## and calls the result T1_1; this is what shows up on line 27 of the explain plan, with an operation whose type is STREAMING_RETURN.  Step 5 in the request steps DMV selects from SecondBasemen as T1_1, so I think that matches the ExternalRoundRobinMove operation in line 37 of the explain plan.

MapReduce Queries

I’m just going to add an OPTION(FORCE EXTERNALPUSHDOWN) hint to my query.  That way, we have a plan which is as close to the original plan as we can get while still doing a MapReduce job.

This execution plan is also 174 lines long, so I won’t do a straight copy-paste like I did the first plan.  The graphical shape of the plan is the same and everything aside from the remote query looks to be the same.  The remote query’s explain plan, not surprisingly is different, so here it is:

<?xml version="1.0" encoding="utf-8"?>
<dsql_query number_nodes="1" number_distributions="8" number_distributions_per_node="8">
  <sql>ExecuteMemo explain query</sql>
  <dsql_operations total_cost="257703.37296" total_number_operations="8">
    <dsql_operation operation_type="RND_ID">
      <identifier>TEMP_ID_56</identifier>
    </dsql_operation>
    <dsql_operation operation_type="HadoopOperation">
      <operation_cost cost="257703.37296" accumulative_cost="257703.37296" average_rowsize="114" output_rows="195" />
      <Config>
  <JobTrackerName>sandbox.hortonworks.com</JobTrackerName>
  <JobTrackerPort>8050</JobTrackerPort>
  <Plan>
    <RelOp NodeId="6" LogicalOp="Materialize">
      <OutputList>
        <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Column="FirstName" />
        <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Column="LastName" />
        <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Column="Age" />
        <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Column="Throws" />
        <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Column="Bats" />
      </OutputList>
      <Children>
        <RelOp NodeId="1" LogicalOp="Scan">
          <OutputList>
            <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Column="FirstName" />
            <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Column="LastName" />
            <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Column="Age" />
            <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Column="Throws" />
            <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Column="Bats" />
          </OutputList>
          <Object Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]">
            <Uri>hdfs://sandbox.hortonworks.com:8020/tmp/ootp/secondbasemen.csv</Uri>
            <FileFormat Type="DelimitedText">
<options>
                <FieldTerminator>,</FieldTerminator>
                <UseTypeDefault>True</UseTypeDefault>
                <Encoding>UTF8</Encoding>
              </Options>
            </FileFormat>
          </Object>
          <Children />
        </RelOp>
      </Children>
    </RelOp>
  </Plan>
</Config>
    </dsql_operation>
    <dsql_operation operation_type="RND_ID">
      <identifier>TEMP_ID_57</identifier>
    </dsql_operation>
    <dsql_operation operation_type="ON">
      <location permanent="false" distribution="AllDistributions" />
      <sql_operations>
        <sql_operation type="statement">CREATE TABLE [tempdb].[dbo].[TEMP_ID_57] ([FirstName] VARCHAR(50) COLLATE SQL_Latin1_General_CP1_CI_AS, [LastName] VARCHAR(50) COLLATE SQL_Latin1_General_CP1_CI_AS, [Age] INT, [Throws] VARCHAR(5) COLLATE SQL_Latin1_General_CP1_CI_AS, [Bats] VARCHAR(5) COLLATE SQL_Latin1_General_CP1_CI_AS ) WITH(DATA_COMPRESSION=PAGE);</sql_operation>
      </sql_operations>
    </dsql_operation>
    <dsql_operation operation_type="ON">
      <location permanent="false" distribution="AllDistributions" />
      <sql_operations>
        <sql_operation type="statement">EXEC [tempdb].[sys].[sp_addextendedproperty] @name=N'IS_EXTERNAL_STREAMING_TABLE', @value=N'true', @level0type=N'SCHEMA', @level0name=N'dbo', @level1type=N'TABLE', @level1name=N'TEMP_ID_57'</sql_operation>
      </sql_operations>
    </dsql_operation>
    <dsql_operation operation_type="ON">
      <location permanent="false" distribution="AllDistributions" />
      <sql_operations>
        <sql_operation type="statement">UPDATE STATISTICS [tempdb].[dbo].[TEMP_ID_57] WITH ROWCOUNT = 24, PAGECOUNT = 1</sql_operation>
      </sql_operations>
    </dsql_operation>
    <dsql_operation operation_type="MULTI">
      <dsql_operation operation_type="STREAMING_RETURN">
        <operation_cost cost="1" accumulative_cost="257704.37296" average_rowsize="114" output_rows="195" />
        <location distribution="AllDistributions" />
        <select>SELECT [T1_1].[FirstName] AS [FirstName],
       [T1_1].[LastName] AS [LastName],
       [T1_1].[Age] AS [Age],
       [T1_1].[Throws] AS [Throws],
       [T1_1].[Bats] AS [Bats]
FROM   [tempdb].[dbo].[TEMP_ID_57] AS T1_1</select>
      </dsql_operation>
      <dsql_operation operation_type="ExternalRoundRobinMove">
        <operation_cost cost="0.80028" accumulative_cost="257705.17324" average_rowsize="114" output_rows="195" />
        <external_uri>hdfs://sandbox.hortonworks.com:8020/TEMP_ID_56_OUTPUT_DIR</external_uri>
        <destination_table>[TEMP_ID_57]</destination_table>
      </dsql_operation>
    </dsql_operation>
    <dsql_operation operation_type="ON">
      <location permanent="false" distribution="AllDistributions" />
      <sql_operations>
        <sql_operation type="statement">DROP TABLE [tempdb].[dbo].[TEMP_ID_57]</sql_operation>
      </sql_operations>
    </dsql_operation>
  </dsql_operations>
</dsql_query>

The first difference between the MapReduce query and the basic query is that the MapReduce query has a cost and does 8 operations, whereas the basic query had a cost value of 0 and performed 6 operations.

We also get to see a HadoopOperation, which includes JobTracker details as well as details on the file format of the second basemen file in HDFS.  What interests me here are the two LogicalOp operators on lines 14 and 23, respectively:  Materialize (the set of columns on OOTP.dbo.SecondBaseman) and Scan (the secondbaseman.csv file).

There’s another difference down in lines 81-82.  Here, instead of querying secondbaseman.csv like the basic query did, we’re reading the results of the MapReduce directory, labeled as TEMP_ID_56_OUTPUT_DIR in my example above.

Conclusion

When you combine this post with the posts on digging into basic and MapReduce queries, I think the picture starts getting clearer and clearer.  Hopefully at this point the Polybase engine is looking less like a black box and more like a “normal” data mover service with several inspection points available to us.

Polybase MapReduce Container Size

If you like this content, check out the entire Polybase series.

Whenever you create a MapReduce job via YARN, YARN will create a series of containers, where containers are isolated bubbles of memory and CPU power.  This blog post will go over an issue that I ran into with my HDP 2.4 VM concerning YARN containers and Polybase.

Memory Starvation In A Nutshell

Originally, I assigned 8 GB of RAM to my HDP 2.4 sandbox.  This was enough for basic work like running Hive queries, and so I figured it was enough to run a MapReduce job initiated by Polybase.  It turns out that I was wrong.

Configuring YARN Containers

There are a couple configuration settings in Ambari (for the sandbox, the default Ambari page is http://sandbox.hortonworks.com:8080).  From there, if you select the YARN menu item and go to the Configs tab, you can see the two Container configuration settings.

containersettings

On my machine, I have the minimum container size marked at 512 MB and maximum container size at 1536 MB.  I also have the minimum and maximum number of CPU VCores set as well, ranging from 1 to 4.

Setting The Stage

Back at PASS Summit, I had a problem getting Polybase to work.  Getting the SQL Server settings correct was critical, but even after that, I still had a problem:  my jobs would just run endlessly.  Eventually I had to kill the jobs, even though I’d let them run for upwards of 30 minutes.  Considering that the job was just pulling 777 rows from the SecondBasemen table, this seemed…excessive…  Also, after the first 20-30 seconds, it seemed like nothing was happening, based on the logs.

longrunningjobs

On the Friday of PASS Summit, I had a chance to sit down with Bill Preachuk (t) and Scott Shaw (b | t) of Hortonworks and they helped me diagnose my issues.  By focusing down on what was happening while the MapReduce job ran, they figured out that my laptop was not creating the necessary number of YARN containers due to the service running out of memory, and so my MapReduce jobs would just hang.

Our solution was simple:  scale down the container min and max size to 512 MB, as that would guarantee that I could create at least 3 containers—which is what the Polybase engine wanted.

Now It Breaks For Real

Once we did that and I restarted all of the services, I ended up getting an interesting error message from SQL Server:

Msg 7320, Level 16, State 110, Line 2
Cannot execute the query “Remote Query” against OLE DB provider “SQLNCLI11” for linked server “(null)”. EXTERNAL TABLE access failed due to internal error: ‘Java exception raised on call to JobSubmitter_SubmitJob: Error [org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException: Invalid resource request, requested memory < 0, or requested memory > max configured, requestedMemory=1536, maxMemory=512

The error message is pretty clear:  the Polybase service wants to create containers that are 1536 MB in size, but the maximum size I’m allowing is 512 MB.  Therefore, the Polybase MapReduce operation fails.

Let’s Change Some Config Files!

To get around that, I looked up the proper way to set the map and reduce memory sizes for a MapReduce job:  by changing mapred-site.xml.  Therefore, I went back to the Polybase folder on my SQL Server installation and added the following to my mapred-site.xml:

<property>
<name>mapreduce.map.memory.mb</name>
<value>512</value>
</property>
<property>
<name>mapreduce.reduce.memory.mb</name>
<value>512</value>
</property>

After restarting the Polybase service, I ran the query again and got the same error.  It seems changing that setting did not work.

Conclusions

The way I got around this problem was to increase the HDP sandbox VM to have 12 GB of RAM allocated to it.  That’s a large percentage of my total memory allocation on the laptop, but at least once I set the container size back to allowing 1536 MB of RAM, my Polybase MapReduce jobs ran.

My conjecture is that the 1536 MB size might be hard-coded, but it’s just that:  conjecture.  I have no proof either way.

One last thing I should note is that the Polybase service wants to create three containers, so you need to have enough memory available to YARN to allocate three 1536 MB containers, or 4608 MB of RAM.

Forcing External Pushdown

In the last post, we dug into how the MapReduce process works.  Today, I want to spend a little bit of time talking about forcing MapReduce operations and disabling external pushdown.

What Can We Force?

Before we get into the mechanics behind forcing pushdown, it’s important to note that predicate pushdown is somewhat limited in the sense of what information we can pass to the MapReduce engine.

For example, let’s say that I have a set of flight details in Hadoop and a set of airports in SQL Server.  Suppose I want to run the following query as quickly as possible:

SELECT
    *
FROM dbo.Flights f
    INNER JOIN dbo.Airports ar
        ON f.dest = ar.IATA
WHERE
    ar.state = 'OH'
    AND ar.city = 'Columbus';

Ideally, the engine would start with the Airports table, filtering by state and city and getting back the 3 records in that table which associate with Columbus airports.  Then, knowing the IATA values for those 3 airports, the Polybase engine would feed those into the remote query predicate and we’d get back the fewest number of rows.  There’s just one catch:

The Polybase engine does not work that way!

Unfortunately, my ideal scenario isn’t one that Polybase can give me (at least today; we’ll see if they can in the future).  The only predicates which the Polybase engine can push are those which are explicit, so we can’t feed a set of results from one part of a query plan into Hadoop.  If you did want to do something like that, I think the easiest solution would be to generate dynamic SQL and build an IN clause with the IATA/dest codes.

So with that in mind, let’s talk more about forcing and preventing pushdown.

Forcing Predicate Pushdown

As a reminder, in order to allow predicate pushdown to occur, we need to hit a Hadoop cluster; we can’t use predicate pushdown on other systems like Azure Blob Storage.  Second, we need to have a resource manager link set up in our external data source.  Third, we need to make sure that everything is configured correctly on the Polybase side.  But once you have those items in place, it’s possible to use the FORCE EXTERNALPUSHDOWN command like so:

SELECT TOP(50)
	sb.FirstName,
	sb.LastName,
	sb.Age,
	a.AgeDesc,
	sb.Throws,
	sb.Bats,
	tsa.TopSalary
FROM dbo.SecondBasemen sb
	CROSS APPLY (SELECT CASE WHEN Age > 30 THEN 'Old' ELSE 'Not Old' END AS AgeDesc) a
	INNER JOIN dbo.TopSalaryByAge tsa
		ON sb.Age = tsa.Age
ORDER BY
	sb.LastName DESC
OPTION(FORCE EXTERNALPUSHDOWN);

You might want to force external pushdown when you don’t have good external table stats but do have some extra knowledge the optimizer doesn’t have regarding the data set.  For example, you might know that you’re asking for a tiny fraction of the total data set to get returned but the optimizer may not be aware of this, and so it pulls the entire data set over to SQL Server for the operation.

Be wary of using this operation, however; it will spin up a MapReduce job, and that can take 15-30 seconds to start up.  If you can get your entire data set over the wire in 10 seconds, there’s little value in shunting the work off to your Hadoop cluster.

Disabling External Pushdown

The opposite of FORCE EXTERNALPUSHDOWN is DISABLE EXTERNALPUSHDOWN.  On Azure Blob Storage and other external sources, this is a no-op.  This is also a no-op if you have a Hadoop cluster but no resource manager set up.

With everything set up, it will ensure that you do not push down to your Hadoop cluster.  This might be a good thing if you consistently see faster times by just pulling the entire data set over the wire and performing any filters or operations in SQL Server.  Note that even with this option set, performance is still going to be a good bit better than using Hive over a linked server.

Pushdown-Related Errors

Let’s say that you want to run OPTION(FORCE EXTERNALPUSHDOWN) and you get an error as soon as the command starts:

Msg 7320, Level 16, State 110, Line 1
Cannot execute the query “Remote Query” against OLE DB provider “SQLNCLI11” for linked server “(null)”. Query processor could not produce a query plan because of the hints defined in this query. Resubmit the query without specifying any hints.

If you’re trying to perform this operation against a Hadoop cluster, you’ll want to re-read the configuration settings blog post.  There are several files which need to change and if you don’t get all of them, you’ll end up with strange-looking error messages.

Running MapReduce Polybase Queries

Previously, we looked at basic querie which do not perform MapReduce operations.  Today’s post is going to look at queries which do perform MapReduce operations and we’ll see how they differ.

MapReduce Queries

In order for us to be able to perform a MapReduce operation, we need the external data source to be set up with a resource manager.  In addition, one of the following two circumstances must be met:

  1. Based on external table statistics, our query must be set up in such a way that it makes sense to distribute the load across the Hadoop data nodes and have the difference be great enough to make it worth waiting 15-30 seconds for a MapReduce job to spin up, OR
  2. We force a MapReduce job by using the OPTION(FORCE EXTERNALPUSHDOWN) query hint.

For the purposes of today’s post, I’m going to focus on the latter.  Regarding condition #1, my hunch is that there’s some kind of in-built cost optimizer the Polybase engine uses to determine whether to spin up a MapReduce job.  If this is in fact the case, it’s important to note that there is no “cost threshold for MapReduce” option that we can set, so if you’re looking for ways to fine-tune MapReduce operations, you may have to be liberal with the query hints to enable and disable external pushdown.

Our MapReduce query is pretty simple:

SELECT TOP(50)
	sb.FirstName,
	sb.LastName,
	sb.Age,
	a.AgeDesc,
	sb.Throws,
	sb.Bats,
	tsa.TopSalary
FROM dbo.SecondBasemen sb
	CROSS APPLY (SELECT CASE WHEN Age &gt; 30 THEN 'Old' ELSE 'Not Old' END AS AgeDesc) a
	INNER JOIN dbo.TopSalaryByAge tsa
		ON sb.Age = tsa.Age
ORDER BY
	sb.LastName DESC
OPTION(FORCE EXTERNALPUSHDOWN);

The only thing I’ve done here compared to the basic query is to add a query hint to force external pushdown.  Let’s see how this changes things.

Step 1:  Communicating with Hadoop

Just like with the basic query, we can use Wireshark to see how the Polybase engine interacts with our Hadoop cluster.  This time around, there’s a lot more going on over the wire than in the basic query scenario; there are ten separate TCP streams which the Polybase engine initiated, and we’re going to take at least a quick look at each one.

mapreducepacketcapture

A quick look at the Wireshark packet capture.

Note that my names for these streams comes from the order in which they show up in Wireshark.  They might have specific internal names, but I’m not going to know what they are.

Stream 0

Stream 0 appears to act as a coordinator, setting everything up and building the framework for the rest of the network operations to succeed.  This first stream has three major sections.  The first section is around basic tasks, like making sure the file we are looking for exists:

mapreducesetup

Once we know it exists, we want to create a job.  Eventually, that job will complete (we hope).

mapreducecreate

Then, after everything completes, we get file info for the staging work:

mapreducegetfileinfo

Stream 0 runs through the entire operation, with the earliest packet being packet #7 and the last packet being #20118 of approximately 20286.  We communicate with the Hadoop name node on port 8020 for this stream.

Stream 1

The next stream is Stream 1.  This stream does two things.  The first thing, which you can see at the beginning of the stream text below, is to prep the MapReduce job, including running /bin/bash to set the CLASSPATH and LD_LIBRARY_PATH.

mapreducestream1

After doing that, it regularly calls YARN’s getApplicationReport method.  When it gets word that the MapReduce job is complete, it finishes its work.

mapreducestream1end

Stream 1 starts with packet #23 and ends with packet #20286.  But there’s a big gap between packets; stream 1 is responsible for packets 23-30, 128, and then most of 19989 through 20110.  This conversation takes place on port 8050 of the name node; remember that 8050 is the resource manager port for YARN.

Stream 2

Stream 2 is the single largest stream, responsible for 18,864 of the ~20,286 packets.  We can see exactly why by looking at the beginning and end sections of the stream.  First the beginning:

mapreducestream2start

And now the end:

mapreducestream2end

Stream 2 sends along 27 MB worth of data.  It’s packaging everything Polybase needs to perform operations, including JAR files and any internal conversion work that the Polybase engine might need to translate Hadoop results into SQL Server results.  For example, the sqlsort at the bottom is a DLL that performs ordering using SQL Server-style collations, as per the Polybase academic paper.

Stream 2 is responsible for almost every packet from 43 through 19811; only 479 packets in that range belonged to some other stream (19811 – 43 – 18864 – 425 = 479).  We send all of this data to the data node via port 50010.

Stream 3

After the behemoth known as Stream 2, the rest of these are pretty small.

mapreducestream3

This stream appears to perform a block write operation.  It is responsible for most of the packets from 19830 to 19846.  This conversation takes place over the data node’s port 50010.

Stream 4

This stream follows up on the work Stream 3 did.

mapreducestream4

It appears to be writing metadata, and is responsible for most of the packets from 19853 through 19866.  This communication is happening over the data node’s port 50010.

Stream 5

After two tiny streams, we get back to a slightly larger stream.

mapreducestream5

As you can guess from the contents, this stream creates the YARN XML job and passes it over the wire.  The stream starts with packet 19875 and ends at 19968, using 83 packets to transmit the payload.  We connect to port 50010 to push the YARN config file to the data node.

Stream 6

Our next stream is of the “child on a long road trip” variety.

mapreducestream6

Basically this stream calls two methods as part of a loop:  getTaskAttemptCompletionEvents and getJobReport.  It looks like there might be a periodic call of getCounters or this may be triggered by something.  In any event, the loop continues until getTaskAttemptCompletionEvents gets back a result.  In my case, the result fired back $http://sandbox.hortonworks.com:13562 .”(.  Of note is that port 13562 is the default MapReduce shuffle port.

Stream 6 was responsible for 80 of the packets from 20103 through 20209.  One interesting thing to note is that the Hadoop-side port was 48311, which I believe is just a dynamic port and does not signify anything like many of the other ports we’re looking at.

Stream 7

This is another loop-until-done stream, but it ended pretty quickly.

mapreducestream7

This looped for getting file info and listing information by connecting to the Hadoop name node on port 8020.  It took packets 20210 though 20224, as well as a few wrapup packets at the end.

Stream 8

Stream 8 retrieved information on the finished MapReduced data.

mapreducestream8

It first looks for a file called part-m-00000.ppax.  The PPAX format is a proprietary data format Polybase uses (the link is a Powerpoint deck that the current Polybase PM presented at SQL Saturday Boston).

Stream 8 is a TCP connection to the name node on port 8020 and stretches from 20225-20234 and a few packets after that as well.

Stream 9

The final Polybase stream actually receives the data in PPAX format.

mapreducestream9

We connect to the Hadoop data node on port 50010 and retrieve the data file.  This takes 34 packets and stretches from 20235 through 20280.

Step 2:  MapReduce Logs

For MapReduce logs, you’ll want to go to port 8088 of the name node, so for example, http://sandbox.hortonworks.com:8088/cluster.  On this link, we can see the various “applications” that have been created.

mapreducelogapplication

Clicking on one of these brings up details on the application, and we can see logs for each attempt.

mapreduceloglogs

If you click the Logs link, you’ll get to see the various logs, starting in short form and with the option to drill in further.

mapreducelogsyslog

In this example, we can see that the stderr and stdout logs didn’t have much, but the syslog log had more than 4K, so if we want to see the full log, we can click a link.

Step 3:  The Rest Of The Story

Once we have the file available, it’s time for Polybase to wrap up the story.  We can see this in the DMV outputs:

mapreducedmv

We can see that the dm_exec_external_work DMV looks slightly different, but still performs a file split.  But this time, instead of splitting the CSV file directly, we split the MapReduced file /user/pdw_user/TEMP_DIR/5130041d4035420b80f88096f197d4b2/Output/part-m-00000.ppax.  The operations to load the table are pretty similar to before, though:  we create a temp table, build statistics on the table, and then import the data using a round robin approach.

Conclusion

Figuring out how the Polybase engine performs MapReduce operations has been interesting.  For this simple single-Hadoop-node, single-Polybase-node, single-file setup, we saw ten separate TCP streams, and understanding how they work in conjunction with DMV and MapReduce logs can give you a fuller understanding of the Polybase MapReduce process.

Running Basic Polybase Queries On Hadoop

Following the Polybase series, we have created an external table and we can now use it like any other table.  For example:

SELECT TOP(50)
	sb.FirstName,
	sb.LastName,
	sb.Age,
	a.AgeDesc,
	sb.Throws,
	sb.Bats,
	tsa.TopSalary
FROM dbo.SecondBasemen sb
	CROSS APPLY (SELECT CASE WHEN Age > 30 THEN 'Old' ELSE 'Not Old' END AS AgeDesc) a
	INNER JOIN dbo.TopSalaryByAge tsa
		ON sb.Age = tsa.Age
ORDER BY
	sb.LastName DESC;

In this query, dbo.SecondBasemen is an external table and TopSalaryByAge is a table in SQL Server.  I haven’t found any T-SQL syntax which does not work, and although I haven’t tried everything, I’ve tried a few wacky ideas.

With that in mind, let’s dig a little deeper into what’s actually happening when we kick off basic queries.

The Basic Query

For the purposes of discussion, I’m going to label “basic” queries as any query which does not generate MapReduce jobs.  What happens is that all of the data is streamed over the wire to our SQL Server instance and stored in a temp table (or multiple temp tables if you have a scale-out Polybase cluster).  The database engine then continues along its merry way, performing any joins or other T-SQL-specific operations.  Let’s look at an example in action by running the query in my intro section.

We’re going to look at a couple of sources to see what’s happening with our request:  a Wireshark packet capture and DMV outputs.

Step 1:  Communicating With Hadoop

First, let’s check out the Wireshark packet capture:

basicpacketcapture

We have three TCP streams running here, two from my port 49423 to the Hadoop node’s 8020, and one from my port 49439 to the Hadoop node’s 50010.  The first TCP stream connects to the name node and asks for details regarding a file located at /tmp/secondbasemen.csv.

nonmapreducefileinfo

The name node responds with some basic information which confirms that the file actually exists (and, incidentally, that the name node is up and running).  Then, the Polybase engine requests information on who owns copies of that file, specifically by calling the getBlockLocations method:

nonmapreducegetblocklocations

The name node returns information on which machines in the Hadoop cluster own which blocks (with block names starting with BP).  The Polybase engine also wants the server defaults to know of any particular client protocol oddities, and the name node returns its results.

The final stream is where the Polybase engine connects directly to the data node on port 50010 and requests the block contents.

nonmapreducetcpstream

In this case, it runs a non-MapReduce job, meaning it just wants the data node to stream its block to the Polybase engine.  The data node complies, sending our data in plaintext to our SQL Server instance.

Step 2:  Putting It Together

Once the Polybase engine has retrieved the data, it feeds that data into a temp table and continues the operation.  We can see this clearly from a pair of DMVs:

nonmapreduceworkresults

The dm_exec_external_work DMV tells us which execution we care about; in this case, I ended up running the same query twice, but I decided to look at the first run of it.  Then, I can get step information from dm_exec_distributed_request_steps.  This shows that we created a table in tempdb called TEMP_ID_14 and streamed results into it.  The engine also created some statistics (though I’m not quite sure where it got the 24 rows from), and then we perform a round-robin query.  Each Polybase compute node queries its temp table and streams the data back to the head node.  Even though our current setup only has one compute node, the operation is the same as if we had a dozen Polybase compute nodes.

From there, we’ve satisfied the Remote Query operation in the execution plan, and the rest is just like any other SQL Server query:

nonmapreduceexecutionplan

Note that there are some interesting Remote Query elements in the execution plan, and I plan to look at them in more detail later in the series.  For now, though, this should suffice.

Conclusion

Today’s post covers how to figure out what’s going on with non-MapReduce queries in Polybase.  I originally intended to cover MapReduce queries as well, but I realized that there’s a lot more going on and that it deserves its own blog post, and this post will appear tomorrow.  Stay tuned!

Let’s Build A Hadoop Cluster, Part 3

Last time around, we installed Ubuntu and Docker on our Hadoop cluster-to-be.  Now we strike and install Hadoop.

Caochong

My project of choice for installing a Hadoop cluster using Docker is Weiqing Yang’s caochong.  It’s pretty easy to install, so let’s get started.  I’m going to assume that you have a user account and have pulled caochong into a folder called caochong.

Making Changes

After grabbing the code, I’m going to go into caochong/from-ambari and edit the run.sh file.  The reason is that by default, the installation does not forward any ports to the outside world.  That’s fine if you’re testing Hadoop on your own machine (which is, admittedly, the point of this Docker project), but in our case, we’re doing some basic remote development, so I want to expose a series of ports and set privileged=true.

caochongchanges

In case you want to follow along at home or my default vim coloring scheme is hard to read, here’s the section I changed:

# launch containers
master_id=$(docker run -d --net caochong -p $PORT:8080 -p 6080:6080 -p 9090:9090 -p 9000:9000 -p 2181:2181 -p 8000:8000 -p 8020:8020 -p 42111:42111 -p 10500:10500 -p 16030:16030 -p 8042:8042 -p 8040:8040 -p 2100:2100 -p 4200:4200 -p 4040:4040 -p 8050:8050 -p 9996:9996 -p 9995:9995 -p 8088:8088 -p 8886:8886 -p 8889:8889 -p 8443:8443 -p 8744:8744 -p 8888:8888 -p 8188:8188 -p 8983:8983 -p 1000:1000 -p 1100:1100 -p 11000:11000 -p 10001:10001 -p 15000:15000 -p 10000:10000 -p 8993:8993 -p 1988:1988 -p 5007:5007 -p 50070:50070 -p 19888:19888 -p 16010:16010 -p 50111:50111 -p 50075:50075 -p 18080:18080 -p 60000:60000 -p 8090:8090 -p 8091:8091 -p 8005:8005 -p 8086:8086 -p 8082:8082 -p 60080:60080 -p 8765:8765 -p 5011:5011 -p 6001:6001 -p 6003:6003 -p 6008:6008 -p 1220:1220 -p 21000:21000 -p 6188:6188 -p 2222:22 -p 50010:50010 -p 6667:6667 -p 3000:3000 --privileged=true --name $NODE_NAME_PREFIX-0 caochong-ambari)
echo ${master_id:0:12} > hosts
for i in $(seq $((N-1)));
do
    container_id=$(docker run -d --net caochong --privileged=true --name $NODE_NAME_PREFIX-$i caochong-ambari)
    echo ${container_id:0:12} >> hosts
done

There’s a bit of downside risk to doing this:  I am forwarding all of these ports from my machine onto the name node’s Docker instance.  This means I have to install all of the Hadoop services on the name node, rather than splitting it over the various nodes (which is generally a smarter idea with a real cluster).

Anyhow, once that’s done, run ./run.sh --nodes=5 --port=8080 and you’ll get a lot of messages, one of which includes something like the following:

Hostnames.png

The box shows the five nodes that I’ve created.  Specifically, it gives us the Docker container names.  But which of those is the name node?

With caochong, you can tell which is the name node because it will have the name caochong-ambari-0 by default.  You can get that by running docker ps while the images are running.

HostNode.png

Once we know the primary node, we’re good to go.  We’ll need to copy all of those hostnames that get created and keep note of which one’s the name node when we install Hadoop via Ambari.  If you’ve forgotten those names and have closed the install window, don’t fret:  you can get those host names in a file named caochong/from-ambari/hosts.

Installing Hadoop Via Ambari

Once this is set up, we can connect to Ambari to finish installation.  We have port forwarding set up, so from a laptop or other device, you can connect via web browser to port 8080 on your NUC device’s IP address and you’ll get a screen which looks like this:

0_ambariwizard

We don’t have a cluster yet, so we’ll need to click the “Launch Install Wizard” button.  This will prompt us for a cluster name:

1a_namecluster

The next step is to figure out which version of the Hortonworks Data Platform we want to install:

1b_repos

The underlying Linux installation is Ubuntu 14, so we’ll select that box.  Note that I tried to trick the installer into installing HDP 2.5 by putting in the public repo information for 2.5, but it ended up still installing 2.4.  There might be some trick that I missed that gets it to work, though.

After selecting the repo, you get to list the nodes.  This is the set that you’ll copy and paste from the hosts list:

2_installoptions

You can safely ignore any warnings you get about not using fully-qualified domain names; within the Docker virtual network that caochong sets up, these are all accessible names.  After entering the hosts, you’ll want to copy and paste the SSH private key which gets generated.  That’s in caochong/from-ambari/id_rsa.  Copy the contents of that file into the SSH private key box and you can register and confirm the nodes.

3_confirmhosts

This can take a couple minutes but eventually all of the bars should go green and you’ll be able to click the Next button to go to the next page, where you get to select the services you want to install on this cluster.

4a_chooseservices

I selected pretty much all of the services, although if you’re testing production clusters, you might want to match whatever’s in your production cluster.  Going to the next step, you get the chance to set master nodes for the various services.

5_assignmasters

Notice how 3bd shows up for pretty much all of these services.  This is not what you’d want to do in a real production environment, but because we want to use Docker and easily pass ports through, it’s the simplest way for me to set this up.  If you knew beforehand which node would host which service, you could modify the run.sh batch script that we discussed earlier and open those specific ports.

After assigning masters, we next have to define which nodes are clients in which clusters.

6_assignclients

We want each node to be a data node, and from there, I spread out the load across the five.  That way, if one node goes down (e.g., if I’m testing a secondary node failure), there’s at least one other node which can take up the slack.

Once we’ve assigned nodes, it’s time to modify any configurations.  The installer is pretty helpful about what you need to modify, specifically passwords.

7_customizeservices

Each one of the numbered notes above is a password or secret key which needs set.  Fill those out and you can go to the next step, which is reviewing all of your changes.

8_review

Assuming you don’t want to make any changes, hit the Deploy button and you’ll get to watch an installer.

9_install

This installer will take quite some time.  I didn’t clock installations, but I wouldn’t be shocked if it took 45 minutes or so for everything to install.  But once you’re finished, you officially have a Hadoop cluster of your own.

Conclusion

In this third part of the Hadoop on the Go miniseries, we created a five-node cluster.  There are a couple more blog posts around administration that I want to get to, particularly around rebooting the cluster and quickly rebuilding the cluster (something I think will come to me as I become more familiar with Docker).

Let’s Build A Hadoop Cluster, Part 2

In part 1 of this series, we bought some hardware.  After patiently(?) waiting for it, we have the hardware and installed Ubuntu, so let’s keep going.

Docker?  I hardly even know her!

Hadoop on Docker is a relatively new thing.  Thanks to Randy Gelhausen’s work, the Hortonworks Data Platform 2.5 sandbox now uses Docker.  That has negative consequences for those of us who want to use Polybase with the sandbox, but for this series, I’m going to forego the benefits of Polybase to get a functional 5-node cluster on the go.

Before I go on, a couple notes about Docker and Hadoop.  First, why would I want to do this?  Let’s think about the various ways we can install Hadoop:

  1. As a one-node, standalone server.  This is probably the worst scenario because it’s the least realistic.  The benefit of Hadoop is its linear scalability, so you’re never going to run a production system with a single, standalone node.
  2. As a one-node cluster.  This is what the various sandboxes do, and they’re useful for getting your feet wet.  Because they’re set up to be multi-node clusters, you get to think about things like service placement and can write code the way you would in production.  Sandboxes are fine, but they’re not a great way of showing off Hadoop.
  3. As multiple VMs using VirtualBox or VMware.  This works, but virtual machines burn through resources.  Even with 32 GB of RAM, putting three or four VMs together will burn 8-12 GB of RAM just for the operating systems and 5x the disk.
  4. As multiple containerized nodes.  In other words, put together several nodes in Docker.  Each node has a much lower resource overhead than a virtual machine, so I can easily fit 5 nodes.  I also have one copy of the Ubuntu image and small marginal disk additions for the five nodes.

With these in mind, and because I want to show off Hadoop as Hadoop and not a glorified single-node system, I’m going to use Docker.

Installing Docker

After setting up Ubuntu, the first step is to install Docker.  It’s really easy to do on Ubuntu, and probably takes 10 minutes to do.  If you want to learn more about Docker, Pluralsight has a Container Management path which is extremely helpful.

Setting Up A Cluster

There are a few guides that you can follow for setting up a multi-node Hadoop cluster with Docker.  Here are a few I didn’t use, but which you might find helpful:

I used Kiwen Lau’s blog post first to understand how to do it, but I wanted to put together a Hortonworks Data Platform installation instead.  In Friday’s post, I’ll show you the project I used to set up a HDP cluster in Docker.