Let’s Build A Portable Hadoop Cluster, Part 1

A little while back, I had a mini-series on building a Hadoop cluster.  Part 1 covered what I bought.  Part 2 introduced the idea of a Dockerized Hadoop cluster.  Part 3 covered installation and setup.

That’s all well and good, but one of my goals is to make this a portable Hadoop cluster so I can present with it.  The Intel NUC does not have a built-in battery like a laptop, so if you pull the plug (hint:  don’t), it’ll just shut itself right off.  When you power it back on, you’ll find that your Hadoop cluster has gone into hiding:

docker

I’ll show how to bring the cluster back to life in the next blog post, but I will say that it takes about 10-15 minutes for everything to come up, and I might not have 10-15 minutes before my talk to set things up.  I’d prefer to be able to attach the NUC to an external battery pack an hour or two before the talk begins and let it coast from there.

In addition, I also need to have a network connection so I can talk to the NUC.  I don’t want to trust that my presentation location will have a good internet connection and I don’t want my NUC exposed to the network, so I need a miniature router as well.

Here’s what I landed on:

The TP-Link router was already in my bag, so I didn’t buy it specifically for this project.  It’s an alright travel router but is probably the weak link here and if I were buying new, I’d probably go with something a little more modern and powerful.

I did a lot more research on rechargable power packs, and the BP220 seemed to be the best for the job.  The Intel NUC that I have draws about 17 watts when idling and can spike up to 77 at load (and I’ve even read that it could spike into the 80s when you push it hard).  The BP220 supports that load and provides 223 watt-hours of juice per charge.  That means I could stretch out a battery charge for up to 13 hours (223 / 17), although a more realistic figure would be an average of about 35 watts, so maybe 6-7 hours.  Still, that’s more than I need to get me through a one-hour presentation and two hours of prep.

The battery pack itself is a little heavy, weighing in at a little over 3 pounds—in other words, it’s heavier than my laptop, especially if you pack the power brick as well.  Combined with the NUC, it’s about 7-8 pounds of extra gear, meaning that I’m fine taking it with me to present but wouldn’t want to schlep it around all the time.  That said, it’s also pretty compact.  At 10.6″ long, it fits nicely into my laptop bag, and it and the NUC can share the inside pocket while my laptop fits into the outside pocket.  At that point, I’m essentially carrying two laptops, but I did that for a while anyhow, so no big deal.

Finally, the power strip makes it so that I can plug in these devices along with my laptop.  Power outlets aren’t always conveniently located, and you rarely get more than one or maybe two outlets, so that’s in my bag just in case I do run low on battery power and need to plug everything in.

Brave New World

For the past three years, I’ve worked as a Database Engineer—in other words, as a database developer—at ChannelAdvisor.  2 1/2 years of that time was spent working in the digital marketing space.  Coming into this job, I had worked at small-scale organizations:  the smallest cabinet-level department in Ohio, followed by a relatively small subsidiary of a large insurance company.  Working at ChannelAdvisor helped me build up skills as a database developer, figuring out that things which work well with a hundred thousand rows in a table don’t necessarily work well when that table hits a billion rows.

Well, come January 1st, I will no longer be a Database Engineer.  That’s because I’m going to be the Engineering Manager of a new predictive analytics team.

Wait, Management?

Yeah, this feels a little crazy for me as well.  The me of five years ago would never have wanted to be a manager, and the reasoning would have been the same as for other technical people:  I enjoy being on the front line, doing things rather than filling out paperwork.

Since then, I would not say that my priorities have changed much:  I still want to be on the front line, using technology to solve business problems.  What I get, though, is a force multiplier:  I now have two more people who can help me accomplish great things.

Vision

Something I’ve observed during the last few years of work is that we have a tremendous amount of interesting data at the company, and we throw away even more due to space and budget constraints.  What we have not been so good at was taking full advantage of that data to help customers.  Most of our systems are designed around a single customer’s data.  Obviously, our transactional systems are keyed toward individual customers, rather than aggregating their results.  What’s interesting is that even our warehouses tend to be customer-focused rather than company-focused.

My vision on predictive analytics is to blow out our company’s disk budget.

It is also to take advantage of this data and solve problems for customers, between customers, and for the company as a whole.  We have the data, and it will be my job to put together the tools to collect (in a place which does not harm our transactional processes), process, aggregate, and analyze the data.  Without getting into specifics, I want to close the internal gap between what we could conceivably do versus what we can do in practice.

Plan of Action:  Data Engineering

In order to pull off my vision, I’ve got to build up skills on a number of fronts, all at the same time.  There are four major quadrants I need to hit; the good news is that I’m building a team to help me with two of them.  I’m going to start with the Data Engineering Crucible, in which I (hopefully) take people with complementary skills across the following three axes and build up people with strong skills across all three.

Statistics

Doing a few analytics projects has reminded me that I need to re-take some stats courses.  My last statistics course was in graduate school, and that was mostly statistics for economists, meaning lots of regressions.  I’m bringing in an employee who has a pretty strong background in this, and so I plan to lean on that person pretty heavily (and push that person to get better at the same time).

Development

My .NET skills have stagnated the last few years.  That makes sense, as I don’t write as much .NET code as before.  The good news is that by hanging around the .NET user group and working on projects both at work and for presentations, I haven’t forgotten much there.  I also want to have my other employee bring in a strong development background to help the team get better.

Aside from .NET development (F# for life!), we’ll use other languages too.  I have some experience with R and Python, and that experience is about to grow significantly.  I have a lot of experience with SQL that I will share with the team, as well as some Java/Scala experience and whatever other crazy languages we decide on.

Subject Matter Expertise

I’ve worked on digital marketing for the past 2 1/2 years, but that’s only a part of what the company does.  My job will be to work with my team to train them on DM but also learn more about the rest of the company’s data and processes.

Plan of Action:  Management

Aside from hitting the Data Engineering trifecta, it’s time to ramp up management skills.  I have some ideas, and I’ll probably share more as I do a bit more.  Right now, it involves reading some books and thinking through various plans, like how I want to run meetings or drive sprint work.  After a few months, I hope to have a post up which describes some of this and see how things work out.

Conclusion

Over the past year, I have been itching for a team lead position.  Now that I have it, I’ll be like a dog with a chew toy, though probably a little less destructive.

Reading Polybase Execution Plan Details

A few days ago, I made mention of how the Remote Query in a Polybase execution plan contained interesting details.

A preview of interesting details

A preview of interesting details

Today we’re going to look at what we can find in the execution plan XML.  We’ll start with a basic (i.e., non-MapReduce) query and follow up with a MapReduce query and see how they change.  As a note, this is for a single-node Polybase cluster pointing to a single-node Hadoop cluster.  As I move to multi-node queries later in this series, it’ll be interesting to see how this changes, if it does at all.

Basic Queries

My basic query will be incredibly simple:  a select statement against the SecondBasemen table joined to the TopSalaryByAge table.

SELECT TOP(50)
    sb.FirstName,
    sb.LastName,
    sb.Age,
    sb.Throws,
    sb.Bats,
    tsa.TopSalary
FROM dbo.SecondBasemen sb
    INNER JOIN dbo.TopSalaryByAge tsa
        ON sb.Age = tsa.Age
ORDER BY
    sb.LastName DESC;

The execution plan XML is as follows:

<?xml version="1.0" encoding="utf-16"?>
<ShowPlanXML xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema" Version="1.5" Build="13.0.1722.0" xmlns="http://schemas.microsoft.com/sqlserver/2004/07/showplan">
  <BatchSequence>
    <Batch>
      <Statements>
        <StmtSimple StatementCompId="1" StatementEstRows="50" StatementId="1" StatementOptmLevel="FULL" StatementOptmEarlyAbortReason="GoodEnoughPlanFound" CardinalityEstimationModelVersion="130" StatementSubTreeCost="0.100037" StatementText="SELECT TOP(50)
 sb.FirstName,
 sb.LastName,
 sb.Age,
 sb.Throws,
 sb.Bats,
 tsa.TopSalary
FROM dbo.SecondBasemen sb
 INNER JOIN dbo.TopSalaryByAge tsa
 ON sb.Age = tsa.Age
ORDER BY
 sb.LastName DESC" StatementType="SELECT" QueryHash="0xED18D81F92FC4F53" QueryPlanHash="0x77D2563391A21679" RetrievedFromCache="true" SecurityPolicyApplied="false">
          <StatementSetOptions ANSI_NULLS="true" ANSI_PADDING="true" ANSI_WARNINGS="true" ARITHABORT="true" CONCAT_NULL_YIELDS_NULL="true" NUMERIC_ROUNDABORT="false" QUOTED_IDENTIFIER="true" />
          <QueryPlan DegreeOfParallelism="0" NonParallelPlanReason="NoParallelForPDWCompilation" MemoryGrant="1024" CachedPlanSize="120" CompileTime="86" CompileCPU="83" CompileMemory="216">
            <MemoryGrantInfo SerialRequiredMemory="512" SerialDesiredMemory="560" RequiredMemory="512" DesiredMemory="560" RequestedMemory="1024" GrantWaitTime="0" GrantedMemory="1024" MaxUsedMemory="72" />
            <OptimizerHardwareDependentProperties EstimatedAvailableMemoryGrant="417021" EstimatedPagesCached="104255" EstimatedAvailableDegreeOfParallelism="2" />
            <RelOp AvgRowSize="83" EstimateCPU="5E-06" EstimateIO="0" EstimateRebinds="0" EstimateRewinds="0" EstimatedExecutionMode="Row" EstimateRows="50" LogicalOp="Top" NodeId="0" Parallel="false" PhysicalOp="Top" EstimatedTotalSubtreeCost="0.100037">
              <OutputList>
                <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="FirstName" />
                <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="LastName" />
                <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="Age" />
                <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="Throws" />
                <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="Bats" />
                <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[TopSalaryByAge]" Alias="[tsa]" Column="TopSalary" />
              </OutputList>
              <RunTimeInformation>
                <RunTimeCountersPerThread Thread="0" ActualRows="50" Batches="0" ActualEndOfScans="1" ActualExecutions="1" ActualExecutionMode="Row" ActualElapsedms="6334" ActualCPUms="3083" />
              </RunTimeInformation>
              <Top RowCount="false" IsPercent="false" WithTies="false">
                <TopExpression>
                  <ScalarOperator ScalarString="(50)">
                    <Const ConstValue="(50)" />
                  </ScalarOperator>
                </TopExpression>
                <RelOp AvgRowSize="83" EstimateCPU="0.0008151" EstimateIO="0" EstimateRebinds="0" EstimateRewinds="0" EstimatedExecutionMode="Row" EstimateRows="50" LogicalOp="Inner Join" NodeId="1" Parallel="false" PhysicalOp="Nested Loops" EstimatedTotalSubtreeCost="0.100032">
                  <OutputList>
                    <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="FirstName" />
                    <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="LastName" />
                    <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="Age" />
                    <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="Throws" />
                    <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="Bats" />
                    <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[TopSalaryByAge]" Alias="[tsa]" Column="TopSalary" />
                  </OutputList>
                  <RunTimeInformation>
                    <RunTimeCountersPerThread Thread="0" ActualRows="50" Batches="0" ActualEndOfScans="0" ActualExecutions="1" ActualExecutionMode="Row" ActualElapsedms="6334" ActualCPUms="3083" />
                  </RunTimeInformation>
                  <NestedLoops Optimized="false">
                    <OuterReferences>
                      <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="Age" />
                    </OuterReferences>
                    <RelOp AvgRowSize="75" EstimateCPU="0.00241422" EstimateIO="0.0112613" EstimateRebinds="0" EstimateRewinds="0" EstimatedExecutionMode="Row" EstimateRows="50" LogicalOp="Sort" NodeId="2" Parallel="false" PhysicalOp="Sort" EstimatedTotalSubtreeCost="0.0886755">
                      <OutputList>
                        <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="FirstName" />
                        <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="LastName" />
                        <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="Age" />
                        <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="Throws" />
                        <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="Bats" />
                      </OutputList>
                      <MemoryFractions Input="1" Output="1" />
                      <RunTimeInformation>
                        <RunTimeCountersPerThread Thread="0" ActualRebinds="1" ActualRewinds="0" ActualRows="50" Batches="0" ActualEndOfScans="0" ActualExecutions="1" ActualExecutionMode="Row" ActualElapsedms="6333" ActualCPUms="3083" ActualScans="0" ActualLogicalReads="0" ActualPhysicalReads="0" ActualReadAheads="0" ActualLobLogicalReads="0" ActualLobPhysicalReads="0" ActualLobReadAheads="0" InputMemoryGrant="1024" OutputMemoryGrant="640" UsedMemoryGrant="72" />
                      </RunTimeInformation>
                      <Sort Distinct="false">
                        <OrderBy>
                          <OrderByColumn Ascending="false">
                            <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="LastName" />
                          </OrderByColumn>
                        </OrderBy>
                        <RelOp AvgRowSize="75" EstimateCPU="0.075" EstimateIO="0" EstimateRebinds="0" EstimateRewinds="0" EstimatedExecutionMode="Row" EstimateRows="195" LogicalOp="Compute Scalar" NodeId="3" Parallel="false" PhysicalOp="Compute Scalar" EstimatedTotalSubtreeCost="0.075">
                          <OutputList>
                            <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="FirstName" />
                            <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="LastName" />
                            <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="Age" />
                            <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="Throws" />
                            <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="Bats" />
                          </OutputList>
                          <ComputeScalar>
                            <DefinedValues>
                              <DefinedValue>
                                <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="FirstName" />
                                <ScalarOperator ScalarString="[OOTP].[dbo].[SecondBasemen].[FirstName] as [sb].[FirstName]">
                                  <Identifier>
                                    <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="FirstName" />
                                  </Identifier>
                                </ScalarOperator>
                              </DefinedValue>
                              <DefinedValue>
                                <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="LastName" />
                                <ScalarOperator ScalarString="[OOTP].[dbo].[SecondBasemen].[LastName] as [sb].[LastName]">
                                  <Identifier>
                                    <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="LastName" />
                                  </Identifier>
                                </ScalarOperator>
                              </DefinedValue>
                              <DefinedValue>
                                <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="Age" />
                                <ScalarOperator ScalarString="[OOTP].[dbo].[SecondBasemen].[Age] as [sb].[Age]">
                                  <Identifier>
                                    <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="Age" />
                                  </Identifier>
                                </ScalarOperator>
                              </DefinedValue>
                              <DefinedValue>
                                <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="Throws" />
                                <ScalarOperator ScalarString="[OOTP].[dbo].[SecondBasemen].[Throws] as [sb].[Throws]">
                                  <Identifier>
                                    <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="Throws" />
                                  </Identifier>
                                </ScalarOperator>
                              </DefinedValue>
                              <DefinedValue>
                                <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="Bats" />
                                <ScalarOperator ScalarString="[OOTP].[dbo].[SecondBasemen].[Bats] as [sb].[Bats]">
                                  <Identifier>
                                    <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="Bats" />
                                  </Identifier>
                                </ScalarOperator>
                              </DefinedValue>
                            </DefinedValues>
                            <RelOp AvgRowSize="75" EstimateCPU="0.075" EstimateIO="0" EstimateRebinds="0" EstimateRewinds="0" EstimatedExecutionMode="Row" EstimateRows="195" LogicalOp="Remote Query" NodeId="4" Parallel="false" PhysicalOp="Remote Query" EstimatedTotalSubtreeCost="0.075">
                              <OutputList>
                                <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="FirstName" />
                                <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="LastName" />
                                <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="Age" />
                                <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="Throws" />
                                <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="Bats" />
                              </OutputList>
                              <RunTimeInformation>
                                <RunTimeCountersPerThread Thread="0" ActualRebinds="1" ActualRewinds="0" ActualRows="777" Batches="0" ActualEndOfScans="1" ActualExecutions="1" ActualExecutionMode="Row" ActualElapsedms="6332" ActualCPUms="3082" ActualScans="0" ActualLogicalReads="0" ActualPhysicalReads="0" ActualReadAheads="0" ActualLobLogicalReads="0" ActualLobPhysicalReads="0" ActualLobReadAheads="0" />
                              </RunTimeInformation>
                              <RemoteQuery RemoteSource="Polybase_ExternalComputation" RemoteQuery="&lt;?xml version=&quot;1.0&quot; encoding=&quot;utf-8&quot;?&gt;
&lt;dsql_query number_nodes=&quot;1&quot; number_distributions=&quot;8&quot; number_distributions_per_node=&quot;8&quot;&gt;
 &lt;sql&gt;ExecuteMemo explain query&lt;/sql&gt;
 &lt;dsql_operations total_cost=&quot;0&quot; total_number_operations=&quot;6&quot;&gt;
 &lt;dsql_operation operation_type=&quot;RND_ID&quot;&gt;
 &lt;identifier&gt;TEMP_ID_40&lt;/identifier&gt;
 &lt;/dsql_operation&gt;
 &lt;dsql_operation operation_type=&quot;ON&quot;&gt;
 &lt;location permanent=&quot;false&quot; distribution=&quot;AllDistributions&quot; /&gt;
 &lt;sql_operations&gt;
 &lt;sql_operation type=&quot;statement&quot;&gt;CREATE TABLE [tempdb].[dbo].[TEMP_ID_40] ([FirstName] VARCHAR(50) COLLATE SQL_Latin1_General_CP1_CI_AS, [LastName] VARCHAR(50) COLLATE SQL_Latin1_General_CP1_CI_AS, [Age] INT, [Throws] VARCHAR(5) COLLATE SQL_Latin1_General_CP1_CI_AS, [Bats] VARCHAR(5) COLLATE SQL_Latin1_General_CP1_CI_AS ) WITH(DATA_COMPRESSION=PAGE);&lt;/sql_operation&gt;
 &lt;/sql_operations&gt;
 &lt;/dsql_operation&gt;
 &lt;dsql_operation operation_type=&quot;ON&quot;&gt;
 &lt;location permanent=&quot;false&quot; distribution=&quot;AllDistributions&quot; /&gt;
 &lt;sql_operations&gt;
 &lt;sql_operation type=&quot;statement&quot;&gt;EXEC [tempdb].[sys].[sp_addextendedproperty] @name=N'IS_EXTERNAL_STREAMING_TABLE', @value=N'true', @level0type=N'SCHEMA', @level0name=N'dbo', @level1type=N'TABLE', @level1name=N'TEMP_ID_40'&lt;/sql_operation&gt;
 &lt;/sql_operations&gt;
 &lt;/dsql_operation&gt;
 &lt;dsql_operation operation_type=&quot;ON&quot;&gt;
 &lt;location permanent=&quot;false&quot; distribution=&quot;AllDistributions&quot; /&gt;
 &lt;sql_operations&gt;
 &lt;sql_operation type=&quot;statement&quot;&gt;UPDATE STATISTICS [tempdb].[dbo].[TEMP_ID_40] WITH ROWCOUNT = 24, PAGECOUNT = 1&lt;/sql_operation&gt;
 &lt;/sql_operations&gt;
 &lt;/dsql_operation&gt;
 &lt;dsql_operation operation_type=&quot;MULTI&quot;&gt;
 &lt;dsql_operation operation_type=&quot;STREAMING_RETURN&quot;&gt;
 &lt;operation_cost cost=&quot;1&quot; accumulative_cost=&quot;1&quot; average_rowsize=&quot;114&quot; output_rows=&quot;195&quot; /&gt;
 &lt;location distribution=&quot;AllDistributions&quot; /&gt;
 &lt;select&gt;SELECT [T1_1].[FirstName] AS [FirstName],
 [T1_1].[LastName] AS [LastName],
 [T1_1].[Age] AS [Age],
 [T1_1].[Throws] AS [Throws],
 [T1_1].[Bats] AS [Bats]
FROM [tempdb].[dbo].[TEMP_ID_40] AS T1_1&lt;/select&gt;
 &lt;/dsql_operation&gt;
 &lt;dsql_operation operation_type=&quot;ExternalRoundRobinMove&quot;&gt;
 &lt;operation_cost cost=&quot;0.80028&quot; accumulative_cost=&quot;1.80028&quot; average_rowsize=&quot;114&quot; output_rows=&quot;195&quot; /&gt;
 &lt;external_uri&gt;hdfs://sandbox.hortonworks.com:8020/tmp/ootp/secondbasemen.csv&lt;/external_uri&gt;
 &lt;destination_table&gt;[TEMP_ID_40]&lt;/destination_table&gt;
 &lt;/dsql_operation&gt;
 &lt;/dsql_operation&gt;
 &lt;dsql_operation operation_type=&quot;ON&quot;&gt;
 &lt;location permanent=&quot;false&quot; distribution=&quot;AllDistributions&quot; /&gt;
 &lt;sql_operations&gt;
 &lt;sql_operation type=&quot;statement&quot;&gt;DROP TABLE [tempdb].[dbo].[TEMP_ID_40]&lt;/sql_operation&gt;
 &lt;/sql_operations&gt;
 &lt;/dsql_operation&gt;
 &lt;/dsql_operations&gt;
&lt;/dsql_query&gt;" />
                            </RelOp>
                          </ComputeScalar>
                        </RelOp>
                      </Sort>
                    </RelOp>
                    <RelOp AvgRowSize="15" EstimateCPU="0.0001581" EstimateIO="0.003125" EstimateRebinds="46.6667" EstimateRewinds="3.33333" EstimatedExecutionMode="Row" EstimateRows="1" LogicalOp="Clustered Index Seek" NodeId="16" Parallel="false" PhysicalOp="Clustered Index Seek" EstimatedTotalSubtreeCost="0.0111881" TableCardinality="22">
                      <OutputList>
                        <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[TopSalaryByAge]" Alias="[tsa]" Column="TopSalary" />
                      </OutputList>
                      <RunTimeInformation>
                        <RunTimeCountersPerThread Thread="0" ActualRows="50" ActualRowsRead="50" Batches="0" ActualEndOfScans="0" ActualExecutions="50" ActualExecutionMode="Row" ActualElapsedms="0" ActualCPUms="0" ActualScans="0" ActualLogicalReads="100" ActualPhysicalReads="1" ActualReadAheads="0" ActualLobLogicalReads="0" ActualLobPhysicalReads="0" ActualLobReadAheads="0" />
                      </RunTimeInformation>
                      <IndexScan Ordered="true" ScanDirection="FORWARD" ForcedIndex="false" ForceSeek="false" ForceScan="false" NoExpandHint="false" Storage="RowStore">
                        <DefinedValues>
                          <DefinedValue>
                            <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[TopSalaryByAge]" Alias="[tsa]" Column="TopSalary" />
                          </DefinedValue>
                        </DefinedValues>
                        <Object Database="[OOTP]" Schema="[dbo]" Table="[TopSalaryByAge]" Index="[PK_TopSalaryByAge]" Alias="[tsa]" IndexKind="Clustered" Storage="RowStore" />
                        <SeekPredicates>
                          <SeekPredicateNew>
                            <SeekKeys>
                              <Prefix ScanType="EQ">
                                <RangeColumns>
                                  <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[TopSalaryByAge]" Alias="[tsa]" Column="Age" />
                                </RangeColumns>
                                <RangeExpressions>
                                  <ScalarOperator ScalarString="[OOTP].[dbo].[SecondBasemen].[Age] as [sb].[Age]">
                                    <Identifier>
                                      <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="Age" />
                                    </Identifier>
                                  </ScalarOperator>
                                </RangeExpressions>
                              </Prefix>
                            </SeekKeys>
                          </SeekPredicateNew>
                        </SeekPredicates>
                      </IndexScan>
                    </RelOp>
                  </NestedLoops>
                </RelOp>
              </Top>
            </RelOp>
          </QueryPlan>
        </StmtSimple>
      </Statements>
    </Batch>
  </BatchSequence>
</ShowPlanXML>

Even for a simple query, I’m not going to expect you to read 174 lines of XML; I’m not a sadist, after all…

What follows is a look at significant lines and my commentary.

Line 8:
<QueryPlan DegreeOfParallelism=”0″ NonParallelPlanReason=”NoParallelForPDWCompilation” MemoryGrant=”1024″ CachedPlanSize=”120″ CompileTime=”86″ CompileCPU=”83″ CompileMemory=”216″>

For Polybase plans, our database engine operations cannot go parallel.  There’s a joke in there somewhere.

Lines 71-126 describe the Compute Scalar and Remote Query together.

computescalar

Inside the Compute Scalar XML, we have the remote query.

The Remote Query

Let’s take that encoded XML in the remote query and decode it so that we can see the innards for ourselves:

<?xml version="1.0" encoding="utf-8"?>
<dsql_query number_nodes="1" number_distributions="8" number_distributions_per_node="8">
  <sql>ExecuteMemo explain query</sql>
  <dsql_operations total_cost="0" total_number_operations="6">
    <dsql_operation operation_type="RND_ID">
      <identifier>TEMP_ID_40</identifier>
    </dsql_operation>
    <dsql_operation operation_type="ON">
      <location permanent="false" distribution="AllDistributions" />
      <sql_operations>
        <sql_operation type="statement">CREATE TABLE [tempdb].[dbo].[TEMP_ID_40] ([FirstName] VARCHAR(50) COLLATE SQL_Latin1_General_CP1_CI_AS, [LastName] VARCHAR(50) COLLATE SQL_Latin1_General_CP1_CI_AS, [Age] INT, [Throws] VARCHAR(5) COLLATE SQL_Latin1_General_CP1_CI_AS, [Bats] VARCHAR(5) COLLATE SQL_Latin1_General_CP1_CI_AS ) WITH(DATA_COMPRESSION=PAGE);</sql_operation>
      </sql_operations>
    </dsql_operation>
    <dsql_operation operation_type="ON">
      <location permanent="false" distribution="AllDistributions" />
      <sql_operations>
        <sql_operation type="statement">EXEC [tempdb].[sys].[sp_addextendedproperty] @name=N'IS_EXTERNAL_STREAMING_TABLE', @value=N'true', @level0type=N'SCHEMA', @level0name=N'dbo', @level1type=N'TABLE', @level1name=N'TEMP_ID_40'</sql_operation>
      </sql_operations>
    </dsql_operation>
    <dsql_operation operation_type="ON">
      <location permanent="false" distribution="AllDistributions" />
      <sql_operations>
        <sql_operation type="statement">UPDATE STATISTICS [tempdb].[dbo].[TEMP_ID_40] WITH ROWCOUNT = 24, PAGECOUNT = 1</sql_operation>
      </sql_operations>
    </dsql_operation>
    <dsql_operation operation_type="MULTI">
      <dsql_operation operation_type="STREAMING_RETURN">
        <operation_cost cost="1" accumulative_cost="1" average_rowsize="114" output_rows="195" />
        <location distribution="AllDistributions" />
        <select>SELECT [T1_1].[FirstName] AS [FirstName],
       [T1_1].[LastName] AS [LastName],
       [T1_1].[Age] AS [Age],
       [T1_1].[Throws] AS [Throws],
       [T1_1].[Bats] AS [Bats]
FROM   [tempdb].[dbo].[TEMP_ID_40] AS T1_1</select>
      </dsql_operation>
      <dsql_operation operation_type="ExternalRoundRobinMove">
        <operation_cost cost="0.80028" accumulative_cost="1.80028" average_rowsize="114" output_rows="195" />
        <external_uri>hdfs://sandbox.hortonworks.com:8020/tmp/ootp/secondbasemen.csv</external_uri>
        <destination_table>[TEMP_ID_40]</destination_table>
      </dsql_operation>
    </dsql_operation>
    <dsql_operation operation_type="ON">
      <location permanent="false" distribution="AllDistributions" />
      <sql_operations>
        <sql_operation type="statement">DROP TABLE [tempdb].[dbo].[TEMP_ID_40]</sql_operation>
      </sql_operations>
    </dsql_operation>
  </dsql_operations>
</dsql_query>

Yay, more XML!  Again, I’m no sadist, so let’s focus on the interesting bits inside this block.

Line 2:
<dsql_query number_nodes=”1″ number_distributions=”8″ number_distributions_per_node=”8″>

We have one Polybase node in our scaleout cluster.  Azure SQL Data Warehouse has a similar output; this Grant Fritchey post shows an example with number_distributions=”60″ instead of 8.  For more information on how this number_distributions matters, check out my post from June 27th on the topic.

Line 4:
<dsql_operations total_cost=”0″ total_number_operations=”6″>

There are going to be six operations in this query plan.  They start on lines 5, 8, 14, 20, 26, and 43, respectively.

Line 5:
<dsql_operation operation_type=”RND_ID”><identifier>TEMP_ID_40</identifier></dsql_operation>

This looks like the operation which generates the name for the temp table.

Line 8 looks to be the operation which generates the create statement for the temp table.  Line 14 then builds extended properties.  Line 20 creates external stats.  Line 26 creates the SELECT statement to retrieve data from the temp table.  Line 37 performs the Round Robin retrieval from Hadoop into TMP_ID_40.  Line 43 drops the table.  Hey, wait, this looks familiar!

nonmapreduceworkresults

Yep, these are the same details you get out of sys.dm_exec_distributed_request_steps, though I’m reusing the image here so the table IDs won’t be quite the same.  Look carefully at the image and you’ll see eight steps, but the explain plan XML above only shows six items.  It appears that step index 4 does not show up in the explain plan.  Step 6 in the request steps DMV selects data from TEMP_ID_## and calls the result T1_1; this is what shows up on line 27 of the explain plan, with an operation whose type is STREAMING_RETURN.  Step 5 in the request steps DMV selects from SecondBasemen as T1_1, so I think that matches the ExternalRoundRobinMove operation in line 37 of the explain plan.

MapReduce Queries

I’m just going to add an OPTION(FORCE EXTERNALPUSHDOWN) hint to my query.  That way, we have a plan which is as close to the original plan as we can get while still doing a MapReduce job.

This execution plan is also 174 lines long, so I won’t do a straight copy-paste like I did the first plan.  The graphical shape of the plan is the same and everything aside from the remote query looks to be the same.  The remote query’s explain plan, not surprisingly is different, so here it is:

<?xml version="1.0" encoding="utf-8"?>
<dsql_query number_nodes="1" number_distributions="8" number_distributions_per_node="8">
  <sql>ExecuteMemo explain query</sql>
  <dsql_operations total_cost="257703.37296" total_number_operations="8">
    <dsql_operation operation_type="RND_ID">
      <identifier>TEMP_ID_56</identifier>
    </dsql_operation>
    <dsql_operation operation_type="HadoopOperation">
      <operation_cost cost="257703.37296" accumulative_cost="257703.37296" average_rowsize="114" output_rows="195" />
      <Config>
  <JobTrackerName>sandbox.hortonworks.com</JobTrackerName>
  <JobTrackerPort>8050</JobTrackerPort>
  <Plan>
    <RelOp NodeId="6" LogicalOp="Materialize">
      <OutputList>
        <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Column="FirstName" />
        <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Column="LastName" />
        <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Column="Age" />
        <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Column="Throws" />
        <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Column="Bats" />
      </OutputList>
      <Children>
        <RelOp NodeId="1" LogicalOp="Scan">
          <OutputList>
            <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Column="FirstName" />
            <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Column="LastName" />
            <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Column="Age" />
            <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Column="Throws" />
            <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Column="Bats" />
          </OutputList>
          <Object Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]">
            <Uri>hdfs://sandbox.hortonworks.com:8020/tmp/ootp/secondbasemen.csv</Uri>
            <FileFormat Type="DelimitedText">
<options>
                <FieldTerminator>,</FieldTerminator>
                <UseTypeDefault>True</UseTypeDefault>
                <Encoding>UTF8</Encoding>
              </Options>
            </FileFormat>
          </Object>
          <Children />
        </RelOp>
      </Children>
    </RelOp>
  </Plan>
</Config>
    </dsql_operation>
    <dsql_operation operation_type="RND_ID">
      <identifier>TEMP_ID_57</identifier>
    </dsql_operation>
    <dsql_operation operation_type="ON">
      <location permanent="false" distribution="AllDistributions" />
      <sql_operations>
        <sql_operation type="statement">CREATE TABLE [tempdb].[dbo].[TEMP_ID_57] ([FirstName] VARCHAR(50) COLLATE SQL_Latin1_General_CP1_CI_AS, [LastName] VARCHAR(50) COLLATE SQL_Latin1_General_CP1_CI_AS, [Age] INT, [Throws] VARCHAR(5) COLLATE SQL_Latin1_General_CP1_CI_AS, [Bats] VARCHAR(5) COLLATE SQL_Latin1_General_CP1_CI_AS ) WITH(DATA_COMPRESSION=PAGE);</sql_operation>
      </sql_operations>
    </dsql_operation>
    <dsql_operation operation_type="ON">
      <location permanent="false" distribution="AllDistributions" />
      <sql_operations>
        <sql_operation type="statement">EXEC [tempdb].[sys].[sp_addextendedproperty] @name=N'IS_EXTERNAL_STREAMING_TABLE', @value=N'true', @level0type=N'SCHEMA', @level0name=N'dbo', @level1type=N'TABLE', @level1name=N'TEMP_ID_57'</sql_operation>
      </sql_operations>
    </dsql_operation>
    <dsql_operation operation_type="ON">
      <location permanent="false" distribution="AllDistributions" />
      <sql_operations>
        <sql_operation type="statement">UPDATE STATISTICS [tempdb].[dbo].[TEMP_ID_57] WITH ROWCOUNT = 24, PAGECOUNT = 1</sql_operation>
      </sql_operations>
    </dsql_operation>
    <dsql_operation operation_type="MULTI">
      <dsql_operation operation_type="STREAMING_RETURN">
        <operation_cost cost="1" accumulative_cost="257704.37296" average_rowsize="114" output_rows="195" />
        <location distribution="AllDistributions" />
        <select>SELECT [T1_1].[FirstName] AS [FirstName],
       [T1_1].[LastName] AS [LastName],
       [T1_1].[Age] AS [Age],
       [T1_1].[Throws] AS [Throws],
       [T1_1].[Bats] AS [Bats]
FROM   [tempdb].[dbo].[TEMP_ID_57] AS T1_1</select>
      </dsql_operation>
      <dsql_operation operation_type="ExternalRoundRobinMove">
        <operation_cost cost="0.80028" accumulative_cost="257705.17324" average_rowsize="114" output_rows="195" />
        <external_uri>hdfs://sandbox.hortonworks.com:8020/TEMP_ID_56_OUTPUT_DIR</external_uri>
        <destination_table>[TEMP_ID_57]</destination_table>
      </dsql_operation>
    </dsql_operation>
    <dsql_operation operation_type="ON">
      <location permanent="false" distribution="AllDistributions" />
      <sql_operations>
        <sql_operation type="statement">DROP TABLE [tempdb].[dbo].[TEMP_ID_57]</sql_operation>
      </sql_operations>
    </dsql_operation>
  </dsql_operations>
</dsql_query>

The first difference between the MapReduce query and the basic query is that the MapReduce query has a cost and does 8 operations, whereas the basic query had a cost value of 0 and performed 6 operations.

We also get to see a HadoopOperation, which includes JobTracker details as well as details on the file format of the second basemen file in HDFS.  What interests me here are the two LogicalOp operators on lines 14 and 23, respectively:  Materialize (the set of columns on OOTP.dbo.SecondBaseman) and Scan (the secondbaseman.csv file).

There’s another difference down in lines 81-82.  Here, instead of querying secondbaseman.csv like the basic query did, we’re reading the results of the MapReduce directory, labeled as TEMP_ID_56_OUTPUT_DIR in my example above.

Conclusion

When you combine this post with the posts on digging into basic and MapReduce queries, I think the picture starts getting clearer and clearer.  Hopefully at this point the Polybase engine is looking less like a black box and more like a “normal” data mover service with several inspection points available to us.

Polybase MapReduce Container Size

If you like this content, check out the entire Polybase series.

Whenever you create a MapReduce job via YARN, YARN will create a series of containers, where containers are isolated bubbles of memory and CPU power.  This blog post will go over an issue that I ran into with my HDP 2.4 VM concerning YARN containers and Polybase.

Memory Starvation In A Nutshell

Originally, I assigned 8 GB of RAM to my HDP 2.4 sandbox.  This was enough for basic work like running Hive queries, and so I figured it was enough to run a MapReduce job initiated by Polybase.  It turns out that I was wrong.

Configuring YARN Containers

There are a couple configuration settings in Ambari (for the sandbox, the default Ambari page is http://sandbox.hortonworks.com:8080).  From there, if you select the YARN menu item and go to the Configs tab, you can see the two Container configuration settings.

containersettings

On my machine, I have the minimum container size marked at 512 MB and maximum container size at 1536 MB.  I also have the minimum and maximum number of CPU VCores set as well, ranging from 1 to 4.

Setting The Stage

Back at PASS Summit, I had a problem getting Polybase to work.  Getting the SQL Server settings correct was critical, but even after that, I still had a problem:  my jobs would just run endlessly.  Eventually I had to kill the jobs, even though I’d let them run for upwards of 30 minutes.  Considering that the job was just pulling 777 rows from the SecondBasemen table, this seemed…excessive…  Also, after the first 20-30 seconds, it seemed like nothing was happening, based on the logs.

longrunningjobs

On the Friday of PASS Summit, I had a chance to sit down with Bill Preachuk (t) and Scott Shaw (b | t) of Hortonworks and they helped me diagnose my issues.  By focusing down on what was happening while the MapReduce job ran, they figured out that my laptop was not creating the necessary number of YARN containers due to the service running out of memory, and so my MapReduce jobs would just hang.

Our solution was simple:  scale down the container min and max size to 512 MB, as that would guarantee that I could create at least 3 containers—which is what the Polybase engine wanted.

Now It Breaks For Real

Once we did that and I restarted all of the services, I ended up getting an interesting error message from SQL Server:

Msg 7320, Level 16, State 110, Line 2
Cannot execute the query “Remote Query” against OLE DB provider “SQLNCLI11” for linked server “(null)”. EXTERNAL TABLE access failed due to internal error: ‘Java exception raised on call to JobSubmitter_SubmitJob: Error [org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException: Invalid resource request, requested memory < 0, or requested memory > max configured, requestedMemory=1536, maxMemory=512

The error message is pretty clear:  the Polybase service wants to create containers that are 1536 MB in size, but the maximum size I’m allowing is 512 MB.  Therefore, the Polybase MapReduce operation fails.

Let’s Change Some Config Files!

To get around that, I looked up the proper way to set the map and reduce memory sizes for a MapReduce job:  by changing mapred-site.xml.  Therefore, I went back to the Polybase folder on my SQL Server installation and added the following to my mapred-site.xml:

<property>
<name>mapreduce.map.memory.mb</name>
<value>512</value>
</property>
<property>
<name>mapreduce.reduce.memory.mb</name>
<value>512</value>
</property>

After restarting the Polybase service, I ran the query again and got the same error.  It seems changing that setting did not work.

Conclusions

The way I got around this problem was to increase the HDP sandbox VM to have 12 GB of RAM allocated to it.  That’s a large percentage of my total memory allocation on the laptop, but at least once I set the container size back to allowing 1536 MB of RAM, my Polybase MapReduce jobs ran.

My conjecture is that the 1536 MB size might be hard-coded, but it’s just that:  conjecture.  I have no proof either way.

One last thing I should note is that the Polybase service wants to create three containers, so you need to have enough memory available to YARN to allocate three 1536 MB containers, or 4608 MB of RAM.

Forcing External Pushdown

In the last post, we dug into how the MapReduce process works.  Today, I want to spend a little bit of time talking about forcing MapReduce operations and disabling external pushdown.

What Can We Force?

Before we get into the mechanics behind forcing pushdown, it’s important to note that predicate pushdown is somewhat limited in the sense of what information we can pass to the MapReduce engine.

For example, let’s say that I have a set of flight details in Hadoop and a set of airports in SQL Server.  Suppose I want to run the following query as quickly as possible:

SELECT
    *
FROM dbo.Flights f
    INNER JOIN dbo.Airports ar
        ON f.dest = ar.IATA
WHERE
    ar.state = 'OH'
    AND ar.city = 'Columbus';

Ideally, the engine would start with the Airports table, filtering by state and city and getting back the 3 records in that table which associate with Columbus airports.  Then, knowing the IATA values for those 3 airports, the Polybase engine would feed those into the remote query predicate and we’d get back the fewest number of rows.  There’s just one catch:

The Polybase engine does not work that way!

Unfortunately, my ideal scenario isn’t one that Polybase can give me (at least today; we’ll see if they can in the future).  The only predicates which the Polybase engine can push are those which are explicit, so we can’t feed a set of results from one part of a query plan into Hadoop.  If you did want to do something like that, I think the easiest solution would be to generate dynamic SQL and build an IN clause with the IATA/dest codes.

So with that in mind, let’s talk more about forcing and preventing pushdown.

Forcing Predicate Pushdown

As a reminder, in order to allow predicate pushdown to occur, we need to hit a Hadoop cluster; we can’t use predicate pushdown on other systems like Azure Blob Storage.  Second, we need to have a resource manager link set up in our external data source.  Third, we need to make sure that everything is configured correctly on the Polybase side.  But once you have those items in place, it’s possible to use the FORCE EXTERNALPUSHDOWN command like so:

SELECT TOP(50)
	sb.FirstName,
	sb.LastName,
	sb.Age,
	a.AgeDesc,
	sb.Throws,
	sb.Bats,
	tsa.TopSalary
FROM dbo.SecondBasemen sb
	CROSS APPLY (SELECT CASE WHEN Age > 30 THEN 'Old' ELSE 'Not Old' END AS AgeDesc) a
	INNER JOIN dbo.TopSalaryByAge tsa
		ON sb.Age = tsa.Age
ORDER BY
	sb.LastName DESC
OPTION(FORCE EXTERNALPUSHDOWN);

You might want to force external pushdown when you don’t have good external table stats but do have some extra knowledge the optimizer doesn’t have regarding the data set.  For example, you might know that you’re asking for a tiny fraction of the total data set to get returned but the optimizer may not be aware of this, and so it pulls the entire data set over to SQL Server for the operation.

Be wary of using this operation, however; it will spin up a MapReduce job, and that can take 15-30 seconds to start up.  If you can get your entire data set over the wire in 10 seconds, there’s little value in shunting the work off to your Hadoop cluster.

Disabling External Pushdown

The opposite of FORCE EXTERNALPUSHDOWN is DISABLE EXTERNALPUSHDOWN.  On Azure Blob Storage and other external sources, this is a no-op.  This is also a no-op if you have a Hadoop cluster but no resource manager set up.

With everything set up, it will ensure that you do not push down to your Hadoop cluster.  This might be a good thing if you consistently see faster times by just pulling the entire data set over the wire and performing any filters or operations in SQL Server.  Note that even with this option set, performance is still going to be a good bit better than using Hive over a linked server.

Pushdown-Related Errors

Let’s say that you want to run OPTION(FORCE EXTERNALPUSHDOWN) and you get an error as soon as the command starts:

Msg 7320, Level 16, State 110, Line 1
Cannot execute the query “Remote Query” against OLE DB provider “SQLNCLI11” for linked server “(null)”. Query processor could not produce a query plan because of the hints defined in this query. Resubmit the query without specifying any hints.

If you’re trying to perform this operation against a Hadoop cluster, you’ll want to re-read the configuration settings blog post.  There are several files which need to change and if you don’t get all of them, you’ll end up with strange-looking error messages.

Running MapReduce Polybase Queries

Previously, we looked at basic querie which do not perform MapReduce operations.  Today’s post is going to look at queries which do perform MapReduce operations and we’ll see how they differ.

MapReduce Queries

In order for us to be able to perform a MapReduce operation, we need the external data source to be set up with a resource manager.  In addition, one of the following two circumstances must be met:

  1. Based on external table statistics, our query must be set up in such a way that it makes sense to distribute the load across the Hadoop data nodes and have the difference be great enough to make it worth waiting 15-30 seconds for a MapReduce job to spin up, OR
  2. We force a MapReduce job by using the OPTION(FORCE EXTERNALPUSHDOWN) query hint.

For the purposes of today’s post, I’m going to focus on the latter.  Regarding condition #1, my hunch is that there’s some kind of in-built cost optimizer the Polybase engine uses to determine whether to spin up a MapReduce job.  If this is in fact the case, it’s important to note that there is no “cost threshold for MapReduce” option that we can set, so if you’re looking for ways to fine-tune MapReduce operations, you may have to be liberal with the query hints to enable and disable external pushdown.

Our MapReduce query is pretty simple:

SELECT TOP(50)
	sb.FirstName,
	sb.LastName,
	sb.Age,
	a.AgeDesc,
	sb.Throws,
	sb.Bats,
	tsa.TopSalary
FROM dbo.SecondBasemen sb
	CROSS APPLY (SELECT CASE WHEN Age &gt; 30 THEN 'Old' ELSE 'Not Old' END AS AgeDesc) a
	INNER JOIN dbo.TopSalaryByAge tsa
		ON sb.Age = tsa.Age
ORDER BY
	sb.LastName DESC
OPTION(FORCE EXTERNALPUSHDOWN);

The only thing I’ve done here compared to the basic query is to add a query hint to force external pushdown.  Let’s see how this changes things.

Step 1:  Communicating with Hadoop

Just like with the basic query, we can use Wireshark to see how the Polybase engine interacts with our Hadoop cluster.  This time around, there’s a lot more going on over the wire than in the basic query scenario; there are ten separate TCP streams which the Polybase engine initiated, and we’re going to take at least a quick look at each one.

mapreducepacketcapture

A quick look at the Wireshark packet capture.

Note that my names for these streams comes from the order in which they show up in Wireshark.  They might have specific internal names, but I’m not going to know what they are.

Stream 0

Stream 0 appears to act as a coordinator, setting everything up and building the framework for the rest of the network operations to succeed.  This first stream has three major sections.  The first section is around basic tasks, like making sure the file we are looking for exists:

mapreducesetup

Once we know it exists, we want to create a job.  Eventually, that job will complete (we hope).

mapreducecreate

Then, after everything completes, we get file info for the staging work:

mapreducegetfileinfo

Stream 0 runs through the entire operation, with the earliest packet being packet #7 and the last packet being #20118 of approximately 20286.  We communicate with the Hadoop name node on port 8020 for this stream.

Stream 1

The next stream is Stream 1.  This stream does two things.  The first thing, which you can see at the beginning of the stream text below, is to prep the MapReduce job, including running /bin/bash to set the CLASSPATH and LD_LIBRARY_PATH.

mapreducestream1

After doing that, it regularly calls YARN’s getApplicationReport method.  When it gets word that the MapReduce job is complete, it finishes its work.

mapreducestream1end

Stream 1 starts with packet #23 and ends with packet #20286.  But there’s a big gap between packets; stream 1 is responsible for packets 23-30, 128, and then most of 19989 through 20110.  This conversation takes place on port 8050 of the name node; remember that 8050 is the resource manager port for YARN.

Stream 2

Stream 2 is the single largest stream, responsible for 18,864 of the ~20,286 packets.  We can see exactly why by looking at the beginning and end sections of the stream.  First the beginning:

mapreducestream2start

And now the end:

mapreducestream2end

Stream 2 sends along 27 MB worth of data.  It’s packaging everything Polybase needs to perform operations, including JAR files and any internal conversion work that the Polybase engine might need to translate Hadoop results into SQL Server results.  For example, the sqlsort at the bottom is a DLL that performs ordering using SQL Server-style collations, as per the Polybase academic paper.

Stream 2 is responsible for almost every packet from 43 through 19811; only 479 packets in that range belonged to some other stream (19811 – 43 – 18864 – 425 = 479).  We send all of this data to the data node via port 50010.

Stream 3

After the behemoth known as Stream 2, the rest of these are pretty small.

mapreducestream3

This stream appears to perform a block write operation.  It is responsible for most of the packets from 19830 to 19846.  This conversation takes place over the data node’s port 50010.

Stream 4

This stream follows up on the work Stream 3 did.

mapreducestream4

It appears to be writing metadata, and is responsible for most of the packets from 19853 through 19866.  This communication is happening over the data node’s port 50010.

Stream 5

After two tiny streams, we get back to a slightly larger stream.

mapreducestream5

As you can guess from the contents, this stream creates the YARN XML job and passes it over the wire.  The stream starts with packet 19875 and ends at 19968, using 83 packets to transmit the payload.  We connect to port 50010 to push the YARN config file to the data node.

Stream 6

Our next stream is of the “child on a long road trip” variety.

mapreducestream6

Basically this stream calls two methods as part of a loop:  getTaskAttemptCompletionEvents and getJobReport.  It looks like there might be a periodic call of getCounters or this may be triggered by something.  In any event, the loop continues until getTaskAttemptCompletionEvents gets back a result.  In my case, the result fired back $http://sandbox.hortonworks.com:13562 .”(.  Of note is that port 13562 is the default MapReduce shuffle port.

Stream 6 was responsible for 80 of the packets from 20103 through 20209.  One interesting thing to note is that the Hadoop-side port was 48311, which I believe is just a dynamic port and does not signify anything like many of the other ports we’re looking at.

Stream 7

This is another loop-until-done stream, but it ended pretty quickly.

mapreducestream7

This looped for getting file info and listing information by connecting to the Hadoop name node on port 8020.  It took packets 20210 though 20224, as well as a few wrapup packets at the end.

Stream 8

Stream 8 retrieved information on the finished MapReduced data.

mapreducestream8

It first looks for a file called part-m-00000.ppax.  The PPAX format is a proprietary data format Polybase uses (the link is a Powerpoint deck that the current Polybase PM presented at SQL Saturday Boston).

Stream 8 is a TCP connection to the name node on port 8020 and stretches from 20225-20234 and a few packets after that as well.

Stream 9

The final Polybase stream actually receives the data in PPAX format.

mapreducestream9

We connect to the Hadoop data node on port 50010 and retrieve the data file.  This takes 34 packets and stretches from 20235 through 20280.

Step 2:  MapReduce Logs

For MapReduce logs, you’ll want to go to port 8088 of the name node, so for example, http://sandbox.hortonworks.com:8088/cluster.  On this link, we can see the various “applications” that have been created.

mapreducelogapplication

Clicking on one of these brings up details on the application, and we can see logs for each attempt.

mapreduceloglogs

If you click the Logs link, you’ll get to see the various logs, starting in short form and with the option to drill in further.

mapreducelogsyslog

In this example, we can see that the stderr and stdout logs didn’t have much, but the syslog log had more than 4K, so if we want to see the full log, we can click a link.

Step 3:  The Rest Of The Story

Once we have the file available, it’s time for Polybase to wrap up the story.  We can see this in the DMV outputs:

mapreducedmv

We can see that the dm_exec_external_work DMV looks slightly different, but still performs a file split.  But this time, instead of splitting the CSV file directly, we split the MapReduced file /user/pdw_user/TEMP_DIR/5130041d4035420b80f88096f197d4b2/Output/part-m-00000.ppax.  The operations to load the table are pretty similar to before, though:  we create a temp table, build statistics on the table, and then import the data using a round robin approach.

Conclusion

Figuring out how the Polybase engine performs MapReduce operations has been interesting.  For this simple single-Hadoop-node, single-Polybase-node, single-file setup, we saw ten separate TCP streams, and understanding how they work in conjunction with DMV and MapReduce logs can give you a fuller understanding of the Polybase MapReduce process.

Running Basic Polybase Queries On Hadoop

Following the Polybase series, we have created an external table and we can now use it like any other table.  For example:

SELECT TOP(50)
	sb.FirstName,
	sb.LastName,
	sb.Age,
	a.AgeDesc,
	sb.Throws,
	sb.Bats,
	tsa.TopSalary
FROM dbo.SecondBasemen sb
	CROSS APPLY (SELECT CASE WHEN Age > 30 THEN 'Old' ELSE 'Not Old' END AS AgeDesc) a
	INNER JOIN dbo.TopSalaryByAge tsa
		ON sb.Age = tsa.Age
ORDER BY
	sb.LastName DESC;

In this query, dbo.SecondBasemen is an external table and TopSalaryByAge is a table in SQL Server.  I haven’t found any T-SQL syntax which does not work, and although I haven’t tried everything, I’ve tried a few wacky ideas.

With that in mind, let’s dig a little deeper into what’s actually happening when we kick off basic queries.

The Basic Query

For the purposes of discussion, I’m going to label “basic” queries as any query which does not generate MapReduce jobs.  What happens is that all of the data is streamed over the wire to our SQL Server instance and stored in a temp table (or multiple temp tables if you have a scale-out Polybase cluster).  The database engine then continues along its merry way, performing any joins or other T-SQL-specific operations.  Let’s look at an example in action by running the query in my intro section.

We’re going to look at a couple of sources to see what’s happening with our request:  a Wireshark packet capture and DMV outputs.

Step 1:  Communicating With Hadoop

First, let’s check out the Wireshark packet capture:

basicpacketcapture

We have three TCP streams running here, two from my port 49423 to the Hadoop node’s 8020, and one from my port 49439 to the Hadoop node’s 50010.  The first TCP stream connects to the name node and asks for details regarding a file located at /tmp/secondbasemen.csv.

nonmapreducefileinfo

The name node responds with some basic information which confirms that the file actually exists (and, incidentally, that the name node is up and running).  Then, the Polybase engine requests information on who owns copies of that file, specifically by calling the getBlockLocations method:

nonmapreducegetblocklocations

The name node returns information on which machines in the Hadoop cluster own which blocks (with block names starting with BP).  The Polybase engine also wants the server defaults to know of any particular client protocol oddities, and the name node returns its results.

The final stream is where the Polybase engine connects directly to the data node on port 50010 and requests the block contents.

nonmapreducetcpstream

In this case, it runs a non-MapReduce job, meaning it just wants the data node to stream its block to the Polybase engine.  The data node complies, sending our data in plaintext to our SQL Server instance.

Step 2:  Putting It Together

Once the Polybase engine has retrieved the data, it feeds that data into a temp table and continues the operation.  We can see this clearly from a pair of DMVs:

nonmapreduceworkresults

The dm_exec_external_work DMV tells us which execution we care about; in this case, I ended up running the same query twice, but I decided to look at the first run of it.  Then, I can get step information from dm_exec_distributed_request_steps.  This shows that we created a table in tempdb called TEMP_ID_14 and streamed results into it.  The engine also created some statistics (though I’m not quite sure where it got the 24 rows from), and then we perform a round-robin query.  Each Polybase compute node queries its temp table and streams the data back to the head node.  Even though our current setup only has one compute node, the operation is the same as if we had a dozen Polybase compute nodes.

From there, we’ve satisfied the Remote Query operation in the execution plan, and the rest is just like any other SQL Server query:

nonmapreduceexecutionplan

Note that there are some interesting Remote Query elements in the execution plan, and I plan to look at them in more detail later in the series.  For now, though, this should suffice.

Conclusion

Today’s post covers how to figure out what’s going on with non-MapReduce queries in Polybase.  I originally intended to cover MapReduce queries as well, but I realized that there’s a lot more going on and that it deserves its own blog post, and this post will appear tomorrow.  Stay tuned!