What Makes A Data Platform Professional?

Eugene Meidinger hates the broadness of the term “data platform professional”:

During PASS Summit, I wrote a post about the broadening data platform. I talked about the term Data Professional, and how I feel how it describes the changes going on the in SQL space. Here’s the problem: It’s a terrible guide. It’s a great description, it’s a wonderful attitude, it’s an ambitious goal; but it’s a terrible guide.

Being a data professional means being a jack of all trades. It means being a renaissance man (or woman). It’s means a career plan that looks like this:

Here’s my summary of Eugene’s argument:

  1. The concept of “data platform” is too broad to be meaningful, because
  2. nobody can gain expertise in the entire data platform.  Therefore,
  3. focus on a particular slice of the platform.

Before I go on to make a counter-argument, let me start by saying that I agree almost whole-heartedly with this summary.  But that never stopped me from arguing before…

So here’s my counter-argument:  the concept of “data platform” is quite broad and nobody will master it all.  Within that group, there are core skills, position-specific core skills, secondary skills, and tertiary skills.  I recommend one of two paths:

  1. The Polyglot:  be great at the core and position-specific core skills, good at secondary skills, and aware of tertiary skills.  Be in the top 25% of your field at one to two skills, ideally one core and one secondary.
  2. The Specialist:  be outstanding (in the top 1-3% of your field) at one to two skills.

With that in mind, let’s flesh this argument out.

Gauss and Mathematics

Carl Friedrich Gauss was the last true polyglot mathematician.  He was able to make significant contributions to pretty much every branch of mathematics at the time, something no mathematician has been able to do since.  The reason for this is not that Gauss was that much smarter than any other person since him, but rather that Gauss himself helped expand the world of mathematics considerably, so a 21st century Gauss would need to know about everything Gauss did plus what his contemporaries did plus what their chronological successors did.  This spider-webbed growth of knowledge makes it impossible for one person to repeat what Gauss did.

Even though nobody can be a “true” polyglot mathematician—in the sense of knowing everything about mathematics at the present time—anymore, it doesn’t mean that “mathematics” is so broad a term as to be meaningless as a career plan.  Instead, it means that we all have to specialize to some increasingly greater extent relative to the entire body of knowledge.

What’s The Right Level Of Specialization?

One of my colleagues, Brian Carrig, was working the SQL Clinic at SQL Saturday Raleigh.  When he lost his original “I can help you with…” badge, he created his own.

This…might not be the right level of specialization.  Brian’s a sharp enough guy that he knows more than the average practitioner on a wide range of topics, but I don’t think the choices are to be Brian or to be replaced by robo-developers; there are few enough people who can reach Brian’s level of skill that if these were the only choices, it’d be a dystopian nightmare for IT practitioners (and I’m not just saying that because I want Brian to provision me some more SQL Server instances).

So there has to be a middle ground between “know everything” and “exit the industry.”  I agree with Eugene that we have to specialize, and here’s what I see, at least based off the current landscape.

Sub-Data Platform Job Categories

To save this from being a 5000-word essay, let’s pick four very broad categories for data platform jobs.  These share some overlap and there are certainly people who don’t fit in any of these roles, so this is not a complete taxonomy.  It should serve as a guide for us, however.

The four broad categories I have in mind are as follows:  database developer, database administrator, Business Intelligence specialist, and data analyst.  Database developers focus on writing and tuning queries and tend to specialize in performance tuning.  Database administrators focus on backup and recovery, dealing with database corruption, and availability; they tend to specialize in process automation.  Business Intelligence specialists build warehouses and migrate data from different systems into warehouses; this is a broad enough term that it’s hard to say what they specialize in, but pick one piece of the puzzle (cubes, warehouse modeling, ETL) and you’ll find people who specialize there.  Finally, data analysts apply concepts of statistical analysis to business problems and come up with explanations or predictions of behavior.

Choosing Your Skills

I see four major categories of skill, but the specific details of what fits into each category will differ based on the role.  Again, this is not intended to be a taxonomy but rather a conceptual description.  We have the following concepts:  core skills, position-specific core skills, secondary skills, and tertiary skills.

Core skills are skills which are common to all data platform professionals.  These are relatively uncommon but tend to be fundamental to all positions.  Think of things such as an understanding of SQL and relatively basic query tuning (which includes figuring out when to tune a query and what information is available on your platform for tuning queries).  But really, when we think of core skills, we’re thinking of position-specific core skills.

As an example of a position-specific core skill, administrators need to know how to back up and restore the databases under their care.  How you do this will differ based on the product, but if you administer a database without knowing how to recover it, you’re running a major risk and have a major skill gap.  So basically, position-specific core skills are the things that you train juniors to do and expect mid-levels to know already.

Secondary and tertiary skills are even more nebulous, but I see them as skills which are ever-more-distant from the position-specific core skills.  For a database administrator, the ability to write .NET code is a secondary skill:  relatively few employers or practitioners think of a database administrator as someone who needs to write C# or F# code, but they can see how it’d apply to the position.  A language like R would be a tertiary skill:  a skill which the average practitioner has trouble tying back to day-to-day life.  Most DBAs never think of using R for anything (although I’m trying to change that in my own small way).

Now, skills move over time.  As Eugene points out in his post, I sincerely believe that administrators who don’t understand Powershell are at a serious disadvantage and that there will come a time that database administrators entirely lacking in Powershell scripts will be superfluous.  We’re probably the better part of a decade away from containerization technologies like Docker having the same impact, but it’s ramping up as well.  On the other side, an example of a technique that differentiated good from great database administrators a long time ago was the ability to lay out files on disk to minimize drive latency.  SANs and later SSDs killed that skill altogether.

I wouldn’t describe these skill shifts as fluid, but rather tectonic; they don’t change overnight but they do significantly alter the landscape when they happen, and you don’t want to be on the wrong side of that tectonic shift.

So What’s The Answer?

In my eyes, the answer is to build your skills along one of two paths:  the polyglot or the specialist.  The polyglot knows a little about a lot but has a few major points of depth.  A polyglot database developer might know a lot about writing PL/SQL and tuning Postgres queries, but also has enough experience to query Lucene, write some basic Informatica scripts, and maintain a Python-based ETL project.  At many companies, a broad slice with good depth in a couple skills and relatively little depth in several skills is enough, and for our polyglot developer, it keeps doors open in case the market for Postgres developers flattens out for a few years or our developer wants to go down a new road.

In contrast to the polyglot, a specialist developer is elite at certain skills and knowingly ignorant of most others.  A specialist SQL Server query tuner is in the top 1-3% of all developers at tuning SQL Server queries and knows all kinds of language and configuration tricks to squeeze percentages off of queries which take milliseconds or even microseconds, but might not know (or care) much about the right way to automate taking backups.  You go to the polyglot to solve general, overarching problems but go to the specialist because you have a particular problem which is beyond the polyglot’s skill level.

In case the parallel isn’t completely clear, this model fits with the model for medical doctors:  you have Primary Care Physicians/General Practitioners (PCPs or GPs) and you have specialists.  The PCP knows how to diagnose issues and sees patients with a wide range of maladies.  Sometimes, the PCP refers a patient to a specialist for further diagnosis or action. As an example, a PCP might stitch up a patient with a nasty gash, but that same PCP won’t rebuild a shattered femur; that PCP will refer the patient to a specialist in that area.

Is This Really The Right Direction?

A couple days before Eugene’s post, I had a discussion with a person about this topic.  She was getting back into development after a couple years doing something a bit different, and one thing she noticed was the expectation of employees being more and more polyglot.  Her argument is that we, as IT professionals, have a lot to do with this, as there’s a bit of a race to the bottom with developers wanting to learn more and willing to spend more and more time learning things.  This makes IT jobs feel like running on a treadmill:  you expend a lot of effort just trying to keep up.  And this shows in how job titles and job role expectations have changed, including the concept of a data scientist (which I’ll touch upon at the end).

I’m not sure I agree with this assessment, but it does seem that more positions require (or at least request) knowledge of a range of skills and technologies, that it’s not enough to be “just” a T-SQL stored procedure developer in most shops.  So to that extent, there seems to be a combination of developers innately moving this direction as well as job roles shifting in this direction.

To the extent that she is correct, there’s a good question as to how sustainable this strategy is, as the platform is expanding ever-further but we don’t have any more hours in the day.  But at the same time, take a step back and this is nothing new:  database developers are already a subset of all developers (much as we are loathe to admit this sometimes), so these categories are themselves cases of specialization.  But let’s shelve that for a moment.

Anecdote:  ElasticSearch And Me

It’s time for an anecdote.  A few months ago, I started running a predictive analytics team.  Our first project was to perform predictions of disk growth based on historical data.  No big deal at all, except that all the data was stored in ElasticSearch and our DBA team wanted the results in ElasticSearch as well.  My experience with ElasticSearch prior to this assignment was practically nil, but I went into it eager…at the beginning…

There were days that I wasted just figuring out how to do things that would take me five or ten minutes in SQL Server (particularly around aggregating data).  In that sense, it was a complete waste of time to use ElasticSearch, and throughout that time period I felt like an idiot for struggling so hard to do things that I intuitively knew were extremely simple.  It took a while, but I did muddle through the project, which means that I picked up some ElasticSearch experience.  I’m definitely not good at writing ElasticSearch queries, but I’m better than I was three months ago, and that experience can help me out elsewhere if I need to use ElasticSearch in other projects or even to give me an idea of other ways to store and query data.

This is one of the most common ways that people learn:  they muddle through things because they need to, either because the work requires it or because it’s a challenge or growth opportunity.  If you’re able to take the results of that muddling through and apply it to other areas, you’ve got a leg up on your competition.  And I think it’s easier to form quality systems when you have a few examples—it’s easier to reason accurately from several scenarios rather than just one scenario.

Conclusion

Summarizing a rather long blog post, I do agree with Eugene that “data platform” is a very broad space, and expecting someone to know everything about it would be folly.  But that’s not unique.  “Programmer” is extremely broad as well, but we don’t expect embedded systems developers to write databases (or even write database queries) or design responsive web applications.  Doctors and lawyers specialize to extreme degrees, as do plenty of other professionals, and I see no reason to expect anything different from data platform professionals.  I do believe that unless you are at the extreme right end of the distribution for certain skills (and can thus be a top-end specialist), you want to err in the direction of being broader than deeper, as it reduces the chances of getting caught in a sudden paradigm shift (remember how cool Web Forms was for about 4 years?) and risking your livelihood as a result.

One other point I want to make is that the broadness of this space shows the power of teamwork and complimentary skills.  There’s an argument that a good scrum team is made up of a bunch of generalists who can all fill in each other’s roles on demand.  I think that concept’s garbage for several reasons, one of which is that you often need specialists because specialists fix problems that generalists can’t.  So instead of having a team of generalists, you have a team of people with different skills, some of which overlap and some of which complement each other:  you have one or two data specialists, one or two UI specialists, one or two “backbone” specialists (usually .NET or Java developers), one or two QA specialists, etc.  This says to me that there’s less a difference in kind than a difference in degree, even between the polyglot type and the specialist type:  you can be a polyglot with respect to other data professionals (because you’re using several data platform technologies and are working across multiple parts of the stack) while being a specialist with respect to your development team (because you’re the database person).

Coda:  Data Science

One bit at the end of Eugene’s post is that he’s interested in digging into data science.  For a post criticizing the impossibility of living up to a definition, this did elicit a knowing chuckle.  The problem is that the term “data scientist” is a microcosm of the issues with “data platform professional.”  To be a data scientist, you should have development skills (preferably in multiple languages, including but not limited to SQL, R, and Python), a strong background in statistics (ideally having worked through PhD level courses), and a deep knowledge of the context of data (as in spending years getting to know the domain).  I saw the perfect t-shirt today to describe these people.

trust-me-i-m-a-unicorn-ladies-shirt-women-s-t-shirt

There are very few people who have all three skill sets and yet that’s what being a data scientist requires.  It’s the same problem as “data platform professional” but at a slightly smaller scale.

Let’s Build A Portable Hadoop Cluster, Part 1

A little while back, I had a mini-series on building a Hadoop cluster.  Part 1 covered what I bought.  Part 2 introduced the idea of a Dockerized Hadoop cluster.  Part 3 covered installation and setup.

That’s all well and good, but one of my goals is to make this a portable Hadoop cluster so I can present with it.  The Intel NUC does not have a built-in battery like a laptop, so if you pull the plug (hint:  don’t), it’ll just shut itself right off.  When you power it back on, you’ll find that your Hadoop cluster has gone into hiding:

docker

I’ll show how to bring the cluster back to life in the next blog post, but I will say that it takes about 10-15 minutes for everything to come up, and I might not have 10-15 minutes before my talk to set things up.  I’d prefer to be able to attach the NUC to an external battery pack an hour or two before the talk begins and let it coast from there.

In addition, I also need to have a network connection so I can talk to the NUC.  I don’t want to trust that my presentation location will have a good internet connection and I don’t want my NUC exposed to the network, so I need a miniature router as well.

Here’s what I landed on:

The TP-Link router was already in my bag, so I didn’t buy it specifically for this project.  It’s an alright travel router but is probably the weak link here and if I were buying new, I’d probably go with something a little more modern and powerful.

I did a lot more research on rechargable power packs, and the BP220 seemed to be the best for the job.  The Intel NUC that I have draws about 17 watts when idling and can spike up to 77 at load (and I’ve even read that it could spike into the 80s when you push it hard).  The BP220 supports that load and provides 223 watt-hours of juice per charge.  That means I could stretch out a battery charge for up to 13 hours (223 / 17), although a more realistic figure would be an average of about 35 watts, so maybe 6-7 hours.  Still, that’s more than I need to get me through a one-hour presentation and two hours of prep.

The battery pack itself is a little heavy, weighing in at a little over 3 pounds—in other words, it’s heavier than my laptop, especially if you pack the power brick as well.  Combined with the NUC, it’s about 7-8 pounds of extra gear, meaning that I’m fine taking it with me to present but wouldn’t want to schlep it around all the time.  That said, it’s also pretty compact.  At 10.6″ long, it fits nicely into my laptop bag, and it and the NUC can share the inside pocket while my laptop fits into the outside pocket.  At that point, I’m essentially carrying two laptops, but I did that for a while anyhow, so no big deal.

Finally, the power strip makes it so that I can plug in these devices along with my laptop.  Power outlets aren’t always conveniently located, and you rarely get more than one or maybe two outlets, so that’s in my bag just in case I do run low on battery power and need to plug everything in.

Brave New World

For the past three years, I’ve worked as a Database Engineer—in other words, as a database developer—at ChannelAdvisor.  2 1/2 years of that time was spent working in the digital marketing space.  Coming into this job, I had worked at small-scale organizations:  the smallest cabinet-level department in Ohio, followed by a relatively small subsidiary of a large insurance company.  Working at ChannelAdvisor helped me build up skills as a database developer, figuring out that things which work well with a hundred thousand rows in a table don’t necessarily work well when that table hits a billion rows.

Well, come January 1st, I will no longer be a Database Engineer.  That’s because I’m going to be the Engineering Manager of a new predictive analytics team.

Wait, Management?

Yeah, this feels a little crazy for me as well.  The me of five years ago would never have wanted to be a manager, and the reasoning would have been the same as for other technical people:  I enjoy being on the front line, doing things rather than filling out paperwork.

Since then, I would not say that my priorities have changed much:  I still want to be on the front line, using technology to solve business problems.  What I get, though, is a force multiplier:  I now have two more people who can help me accomplish great things.

Vision

Something I’ve observed during the last few years of work is that we have a tremendous amount of interesting data at the company, and we throw away even more due to space and budget constraints.  What we have not been so good at was taking full advantage of that data to help customers.  Most of our systems are designed around a single customer’s data.  Obviously, our transactional systems are keyed toward individual customers, rather than aggregating their results.  What’s interesting is that even our warehouses tend to be customer-focused rather than company-focused.

My vision on predictive analytics is to blow out our company’s disk budget.

It is also to take advantage of this data and solve problems for customers, between customers, and for the company as a whole.  We have the data, and it will be my job to put together the tools to collect (in a place which does not harm our transactional processes), process, aggregate, and analyze the data.  Without getting into specifics, I want to close the internal gap between what we could conceivably do versus what we can do in practice.

Plan of Action:  Data Engineering

In order to pull off my vision, I’ve got to build up skills on a number of fronts, all at the same time.  There are four major quadrants I need to hit; the good news is that I’m building a team to help me with two of them.  I’m going to start with the Data Engineering Crucible, in which I (hopefully) take people with complementary skills across the following three axes and build up people with strong skills across all three.

Statistics

Doing a few analytics projects has reminded me that I need to re-take some stats courses.  My last statistics course was in graduate school, and that was mostly statistics for economists, meaning lots of regressions.  I’m bringing in an employee who has a pretty strong background in this, and so I plan to lean on that person pretty heavily (and push that person to get better at the same time).

Development

My .NET skills have stagnated the last few years.  That makes sense, as I don’t write as much .NET code as before.  The good news is that by hanging around the .NET user group and working on projects both at work and for presentations, I haven’t forgotten much there.  I also want to have my other employee bring in a strong development background to help the team get better.

Aside from .NET development (F# for life!), we’ll use other languages too.  I have some experience with R and Python, and that experience is about to grow significantly.  I have a lot of experience with SQL that I will share with the team, as well as some Java/Scala experience and whatever other crazy languages we decide on.

Subject Matter Expertise

I’ve worked on digital marketing for the past 2 1/2 years, but that’s only a part of what the company does.  My job will be to work with my team to train them on DM but also learn more about the rest of the company’s data and processes.

Plan of Action:  Management

Aside from hitting the Data Engineering trifecta, it’s time to ramp up management skills.  I have some ideas, and I’ll probably share more as I do a bit more.  Right now, it involves reading some books and thinking through various plans, like how I want to run meetings or drive sprint work.  After a few months, I hope to have a post up which describes some of this and see how things work out.

Conclusion

Over the past year, I have been itching for a team lead position.  Now that I have it, I’ll be like a dog with a chew toy, though probably a little less destructive.

Reading Polybase Execution Plan Details

A few days ago, I made mention of how the Remote Query in a Polybase execution plan contained interesting details.

A preview of interesting details

A preview of interesting details

Today we’re going to look at what we can find in the execution plan XML.  We’ll start with a basic (i.e., non-MapReduce) query and follow up with a MapReduce query and see how they change.  As a note, this is for a single-node Polybase cluster pointing to a single-node Hadoop cluster.  As I move to multi-node queries later in this series, it’ll be interesting to see how this changes, if it does at all.

Basic Queries

My basic query will be incredibly simple:  a select statement against the SecondBasemen table joined to the TopSalaryByAge table.

SELECT TOP(50)
    sb.FirstName,
    sb.LastName,
    sb.Age,
    sb.Throws,
    sb.Bats,
    tsa.TopSalary
FROM dbo.SecondBasemen sb
    INNER JOIN dbo.TopSalaryByAge tsa
        ON sb.Age = tsa.Age
ORDER BY
    sb.LastName DESC;

The execution plan XML is as follows:

<?xml version="1.0" encoding="utf-16"?>
<ShowPlanXML xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema" Version="1.5" Build="13.0.1722.0" xmlns="http://schemas.microsoft.com/sqlserver/2004/07/showplan">
  <BatchSequence>
    <Batch>
      <Statements>
        <StmtSimple StatementCompId="1" StatementEstRows="50" StatementId="1" StatementOptmLevel="FULL" StatementOptmEarlyAbortReason="GoodEnoughPlanFound" CardinalityEstimationModelVersion="130" StatementSubTreeCost="0.100037" StatementText="SELECT TOP(50)
 sb.FirstName,
 sb.LastName,
 sb.Age,
 sb.Throws,
 sb.Bats,
 tsa.TopSalary
FROM dbo.SecondBasemen sb
 INNER JOIN dbo.TopSalaryByAge tsa
 ON sb.Age = tsa.Age
ORDER BY
 sb.LastName DESC" StatementType="SELECT" QueryHash="0xED18D81F92FC4F53" QueryPlanHash="0x77D2563391A21679" RetrievedFromCache="true" SecurityPolicyApplied="false">
          <StatementSetOptions ANSI_NULLS="true" ANSI_PADDING="true" ANSI_WARNINGS="true" ARITHABORT="true" CONCAT_NULL_YIELDS_NULL="true" NUMERIC_ROUNDABORT="false" QUOTED_IDENTIFIER="true" />
          <QueryPlan DegreeOfParallelism="0" NonParallelPlanReason="NoParallelForPDWCompilation" MemoryGrant="1024" CachedPlanSize="120" CompileTime="86" CompileCPU="83" CompileMemory="216">
            <MemoryGrantInfo SerialRequiredMemory="512" SerialDesiredMemory="560" RequiredMemory="512" DesiredMemory="560" RequestedMemory="1024" GrantWaitTime="0" GrantedMemory="1024" MaxUsedMemory="72" />
            <OptimizerHardwareDependentProperties EstimatedAvailableMemoryGrant="417021" EstimatedPagesCached="104255" EstimatedAvailableDegreeOfParallelism="2" />
            <RelOp AvgRowSize="83" EstimateCPU="5E-06" EstimateIO="0" EstimateRebinds="0" EstimateRewinds="0" EstimatedExecutionMode="Row" EstimateRows="50" LogicalOp="Top" NodeId="0" Parallel="false" PhysicalOp="Top" EstimatedTotalSubtreeCost="0.100037">
              <OutputList>
                <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="FirstName" />
                <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="LastName" />
                <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="Age" />
                <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="Throws" />
                <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="Bats" />
                <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[TopSalaryByAge]" Alias="[tsa]" Column="TopSalary" />
              </OutputList>
              <RunTimeInformation>
                <RunTimeCountersPerThread Thread="0" ActualRows="50" Batches="0" ActualEndOfScans="1" ActualExecutions="1" ActualExecutionMode="Row" ActualElapsedms="6334" ActualCPUms="3083" />
              </RunTimeInformation>
              <Top RowCount="false" IsPercent="false" WithTies="false">
                <TopExpression>
                  <ScalarOperator ScalarString="(50)">
                    <Const ConstValue="(50)" />
                  </ScalarOperator>
                </TopExpression>
                <RelOp AvgRowSize="83" EstimateCPU="0.0008151" EstimateIO="0" EstimateRebinds="0" EstimateRewinds="0" EstimatedExecutionMode="Row" EstimateRows="50" LogicalOp="Inner Join" NodeId="1" Parallel="false" PhysicalOp="Nested Loops" EstimatedTotalSubtreeCost="0.100032">
                  <OutputList>
                    <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="FirstName" />
                    <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="LastName" />
                    <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="Age" />
                    <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="Throws" />
                    <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="Bats" />
                    <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[TopSalaryByAge]" Alias="[tsa]" Column="TopSalary" />
                  </OutputList>
                  <RunTimeInformation>
                    <RunTimeCountersPerThread Thread="0" ActualRows="50" Batches="0" ActualEndOfScans="0" ActualExecutions="1" ActualExecutionMode="Row" ActualElapsedms="6334" ActualCPUms="3083" />
                  </RunTimeInformation>
                  <NestedLoops Optimized="false">
                    <OuterReferences>
                      <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="Age" />
                    </OuterReferences>
                    <RelOp AvgRowSize="75" EstimateCPU="0.00241422" EstimateIO="0.0112613" EstimateRebinds="0" EstimateRewinds="0" EstimatedExecutionMode="Row" EstimateRows="50" LogicalOp="Sort" NodeId="2" Parallel="false" PhysicalOp="Sort" EstimatedTotalSubtreeCost="0.0886755">
                      <OutputList>
                        <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="FirstName" />
                        <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="LastName" />
                        <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="Age" />
                        <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="Throws" />
                        <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="Bats" />
                      </OutputList>
                      <MemoryFractions Input="1" Output="1" />
                      <RunTimeInformation>
                        <RunTimeCountersPerThread Thread="0" ActualRebinds="1" ActualRewinds="0" ActualRows="50" Batches="0" ActualEndOfScans="0" ActualExecutions="1" ActualExecutionMode="Row" ActualElapsedms="6333" ActualCPUms="3083" ActualScans="0" ActualLogicalReads="0" ActualPhysicalReads="0" ActualReadAheads="0" ActualLobLogicalReads="0" ActualLobPhysicalReads="0" ActualLobReadAheads="0" InputMemoryGrant="1024" OutputMemoryGrant="640" UsedMemoryGrant="72" />
                      </RunTimeInformation>
                      <Sort Distinct="false">
                        <OrderBy>
                          <OrderByColumn Ascending="false">
                            <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="LastName" />
                          </OrderByColumn>
                        </OrderBy>
                        <RelOp AvgRowSize="75" EstimateCPU="0.075" EstimateIO="0" EstimateRebinds="0" EstimateRewinds="0" EstimatedExecutionMode="Row" EstimateRows="195" LogicalOp="Compute Scalar" NodeId="3" Parallel="false" PhysicalOp="Compute Scalar" EstimatedTotalSubtreeCost="0.075">
                          <OutputList>
                            <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="FirstName" />
                            <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="LastName" />
                            <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="Age" />
                            <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="Throws" />
                            <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="Bats" />
                          </OutputList>
                          <ComputeScalar>
                            <DefinedValues>
                              <DefinedValue>
                                <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="FirstName" />
                                <ScalarOperator ScalarString="[OOTP].[dbo].[SecondBasemen].[FirstName] as [sb].[FirstName]">
                                  <Identifier>
                                    <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="FirstName" />
                                  </Identifier>
                                </ScalarOperator>
                              </DefinedValue>
                              <DefinedValue>
                                <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="LastName" />
                                <ScalarOperator ScalarString="[OOTP].[dbo].[SecondBasemen].[LastName] as [sb].[LastName]">
                                  <Identifier>
                                    <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="LastName" />
                                  </Identifier>
                                </ScalarOperator>
                              </DefinedValue>
                              <DefinedValue>
                                <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="Age" />
                                <ScalarOperator ScalarString="[OOTP].[dbo].[SecondBasemen].[Age] as [sb].[Age]">
                                  <Identifier>
                                    <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="Age" />
                                  </Identifier>
                                </ScalarOperator>
                              </DefinedValue>
                              <DefinedValue>
                                <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="Throws" />
                                <ScalarOperator ScalarString="[OOTP].[dbo].[SecondBasemen].[Throws] as [sb].[Throws]">
                                  <Identifier>
                                    <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="Throws" />
                                  </Identifier>
                                </ScalarOperator>
                              </DefinedValue>
                              <DefinedValue>
                                <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="Bats" />
                                <ScalarOperator ScalarString="[OOTP].[dbo].[SecondBasemen].[Bats] as [sb].[Bats]">
                                  <Identifier>
                                    <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="Bats" />
                                  </Identifier>
                                </ScalarOperator>
                              </DefinedValue>
                            </DefinedValues>
                            <RelOp AvgRowSize="75" EstimateCPU="0.075" EstimateIO="0" EstimateRebinds="0" EstimateRewinds="0" EstimatedExecutionMode="Row" EstimateRows="195" LogicalOp="Remote Query" NodeId="4" Parallel="false" PhysicalOp="Remote Query" EstimatedTotalSubtreeCost="0.075">
                              <OutputList>
                                <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="FirstName" />
                                <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="LastName" />
                                <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="Age" />
                                <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="Throws" />
                                <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="Bats" />
                              </OutputList>
                              <RunTimeInformation>
                                <RunTimeCountersPerThread Thread="0" ActualRebinds="1" ActualRewinds="0" ActualRows="777" Batches="0" ActualEndOfScans="1" ActualExecutions="1" ActualExecutionMode="Row" ActualElapsedms="6332" ActualCPUms="3082" ActualScans="0" ActualLogicalReads="0" ActualPhysicalReads="0" ActualReadAheads="0" ActualLobLogicalReads="0" ActualLobPhysicalReads="0" ActualLobReadAheads="0" />
                              </RunTimeInformation>
                              <RemoteQuery RemoteSource="Polybase_ExternalComputation" RemoteQuery="&lt;?xml version=&quot;1.0&quot; encoding=&quot;utf-8&quot;?&gt;
&lt;dsql_query number_nodes=&quot;1&quot; number_distributions=&quot;8&quot; number_distributions_per_node=&quot;8&quot;&gt;
 &lt;sql&gt;ExecuteMemo explain query&lt;/sql&gt;
 &lt;dsql_operations total_cost=&quot;0&quot; total_number_operations=&quot;6&quot;&gt;
 &lt;dsql_operation operation_type=&quot;RND_ID&quot;&gt;
 &lt;identifier&gt;TEMP_ID_40&lt;/identifier&gt;
 &lt;/dsql_operation&gt;
 &lt;dsql_operation operation_type=&quot;ON&quot;&gt;
 &lt;location permanent=&quot;false&quot; distribution=&quot;AllDistributions&quot; /&gt;
 &lt;sql_operations&gt;
 &lt;sql_operation type=&quot;statement&quot;&gt;CREATE TABLE [tempdb].[dbo].[TEMP_ID_40] ([FirstName] VARCHAR(50) COLLATE SQL_Latin1_General_CP1_CI_AS, [LastName] VARCHAR(50) COLLATE SQL_Latin1_General_CP1_CI_AS, [Age] INT, [Throws] VARCHAR(5) COLLATE SQL_Latin1_General_CP1_CI_AS, [Bats] VARCHAR(5) COLLATE SQL_Latin1_General_CP1_CI_AS ) WITH(DATA_COMPRESSION=PAGE);&lt;/sql_operation&gt;
 &lt;/sql_operations&gt;
 &lt;/dsql_operation&gt;
 &lt;dsql_operation operation_type=&quot;ON&quot;&gt;
 &lt;location permanent=&quot;false&quot; distribution=&quot;AllDistributions&quot; /&gt;
 &lt;sql_operations&gt;
 &lt;sql_operation type=&quot;statement&quot;&gt;EXEC [tempdb].[sys].[sp_addextendedproperty] @name=N'IS_EXTERNAL_STREAMING_TABLE', @value=N'true', @level0type=N'SCHEMA', @level0name=N'dbo', @level1type=N'TABLE', @level1name=N'TEMP_ID_40'&lt;/sql_operation&gt;
 &lt;/sql_operations&gt;
 &lt;/dsql_operation&gt;
 &lt;dsql_operation operation_type=&quot;ON&quot;&gt;
 &lt;location permanent=&quot;false&quot; distribution=&quot;AllDistributions&quot; /&gt;
 &lt;sql_operations&gt;
 &lt;sql_operation type=&quot;statement&quot;&gt;UPDATE STATISTICS [tempdb].[dbo].[TEMP_ID_40] WITH ROWCOUNT = 24, PAGECOUNT = 1&lt;/sql_operation&gt;
 &lt;/sql_operations&gt;
 &lt;/dsql_operation&gt;
 &lt;dsql_operation operation_type=&quot;MULTI&quot;&gt;
 &lt;dsql_operation operation_type=&quot;STREAMING_RETURN&quot;&gt;
 &lt;operation_cost cost=&quot;1&quot; accumulative_cost=&quot;1&quot; average_rowsize=&quot;114&quot; output_rows=&quot;195&quot; /&gt;
 &lt;location distribution=&quot;AllDistributions&quot; /&gt;
 &lt;select&gt;SELECT [T1_1].[FirstName] AS [FirstName],
 [T1_1].[LastName] AS [LastName],
 [T1_1].[Age] AS [Age],
 [T1_1].[Throws] AS [Throws],
 [T1_1].[Bats] AS [Bats]
FROM [tempdb].[dbo].[TEMP_ID_40] AS T1_1&lt;/select&gt;
 &lt;/dsql_operation&gt;
 &lt;dsql_operation operation_type=&quot;ExternalRoundRobinMove&quot;&gt;
 &lt;operation_cost cost=&quot;0.80028&quot; accumulative_cost=&quot;1.80028&quot; average_rowsize=&quot;114&quot; output_rows=&quot;195&quot; /&gt;
 &lt;external_uri&gt;hdfs://sandbox.hortonworks.com:8020/tmp/ootp/secondbasemen.csv&lt;/external_uri&gt;
 &lt;destination_table&gt;[TEMP_ID_40]&lt;/destination_table&gt;
 &lt;/dsql_operation&gt;
 &lt;/dsql_operation&gt;
 &lt;dsql_operation operation_type=&quot;ON&quot;&gt;
 &lt;location permanent=&quot;false&quot; distribution=&quot;AllDistributions&quot; /&gt;
 &lt;sql_operations&gt;
 &lt;sql_operation type=&quot;statement&quot;&gt;DROP TABLE [tempdb].[dbo].[TEMP_ID_40]&lt;/sql_operation&gt;
 &lt;/sql_operations&gt;
 &lt;/dsql_operation&gt;
 &lt;/dsql_operations&gt;
&lt;/dsql_query&gt;" />
                            </RelOp>
                          </ComputeScalar>
                        </RelOp>
                      </Sort>
                    </RelOp>
                    <RelOp AvgRowSize="15" EstimateCPU="0.0001581" EstimateIO="0.003125" EstimateRebinds="46.6667" EstimateRewinds="3.33333" EstimatedExecutionMode="Row" EstimateRows="1" LogicalOp="Clustered Index Seek" NodeId="16" Parallel="false" PhysicalOp="Clustered Index Seek" EstimatedTotalSubtreeCost="0.0111881" TableCardinality="22">
                      <OutputList>
                        <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[TopSalaryByAge]" Alias="[tsa]" Column="TopSalary" />
                      </OutputList>
                      <RunTimeInformation>
                        <RunTimeCountersPerThread Thread="0" ActualRows="50" ActualRowsRead="50" Batches="0" ActualEndOfScans="0" ActualExecutions="50" ActualExecutionMode="Row" ActualElapsedms="0" ActualCPUms="0" ActualScans="0" ActualLogicalReads="100" ActualPhysicalReads="1" ActualReadAheads="0" ActualLobLogicalReads="0" ActualLobPhysicalReads="0" ActualLobReadAheads="0" />
                      </RunTimeInformation>
                      <IndexScan Ordered="true" ScanDirection="FORWARD" ForcedIndex="false" ForceSeek="false" ForceScan="false" NoExpandHint="false" Storage="RowStore">
                        <DefinedValues>
                          <DefinedValue>
                            <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[TopSalaryByAge]" Alias="[tsa]" Column="TopSalary" />
                          </DefinedValue>
                        </DefinedValues>
                        <Object Database="[OOTP]" Schema="[dbo]" Table="[TopSalaryByAge]" Index="[PK_TopSalaryByAge]" Alias="[tsa]" IndexKind="Clustered" Storage="RowStore" />
                        <SeekPredicates>
                          <SeekPredicateNew>
                            <SeekKeys>
                              <Prefix ScanType="EQ">
                                <RangeColumns>
                                  <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[TopSalaryByAge]" Alias="[tsa]" Column="Age" />
                                </RangeColumns>
                                <RangeExpressions>
                                  <ScalarOperator ScalarString="[OOTP].[dbo].[SecondBasemen].[Age] as [sb].[Age]">
                                    <Identifier>
                                      <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Alias="[sb]" Column="Age" />
                                    </Identifier>
                                  </ScalarOperator>
                                </RangeExpressions>
                              </Prefix>
                            </SeekKeys>
                          </SeekPredicateNew>
                        </SeekPredicates>
                      </IndexScan>
                    </RelOp>
                  </NestedLoops>
                </RelOp>
              </Top>
            </RelOp>
          </QueryPlan>
        </StmtSimple>
      </Statements>
    </Batch>
  </BatchSequence>
</ShowPlanXML>

Even for a simple query, I’m not going to expect you to read 174 lines of XML; I’m not a sadist, after all…

What follows is a look at significant lines and my commentary.

Line 8:
<QueryPlan DegreeOfParallelism=”0″ NonParallelPlanReason=”NoParallelForPDWCompilation” MemoryGrant=”1024″ CachedPlanSize=”120″ CompileTime=”86″ CompileCPU=”83″ CompileMemory=”216″>

For Polybase plans, our database engine operations cannot go parallel.  There’s a joke in there somewhere.

Lines 71-126 describe the Compute Scalar and Remote Query together.

computescalar

Inside the Compute Scalar XML, we have the remote query.

The Remote Query

Let’s take that encoded XML in the remote query and decode it so that we can see the innards for ourselves:

<?xml version="1.0" encoding="utf-8"?>
<dsql_query number_nodes="1" number_distributions="8" number_distributions_per_node="8">
  <sql>ExecuteMemo explain query</sql>
  <dsql_operations total_cost="0" total_number_operations="6">
    <dsql_operation operation_type="RND_ID">
      <identifier>TEMP_ID_40</identifier>
    </dsql_operation>
    <dsql_operation operation_type="ON">
      <location permanent="false" distribution="AllDistributions" />
      <sql_operations>
        <sql_operation type="statement">CREATE TABLE [tempdb].[dbo].[TEMP_ID_40] ([FirstName] VARCHAR(50) COLLATE SQL_Latin1_General_CP1_CI_AS, [LastName] VARCHAR(50) COLLATE SQL_Latin1_General_CP1_CI_AS, [Age] INT, [Throws] VARCHAR(5) COLLATE SQL_Latin1_General_CP1_CI_AS, [Bats] VARCHAR(5) COLLATE SQL_Latin1_General_CP1_CI_AS ) WITH(DATA_COMPRESSION=PAGE);</sql_operation>
      </sql_operations>
    </dsql_operation>
    <dsql_operation operation_type="ON">
      <location permanent="false" distribution="AllDistributions" />
      <sql_operations>
        <sql_operation type="statement">EXEC [tempdb].[sys].[sp_addextendedproperty] @name=N'IS_EXTERNAL_STREAMING_TABLE', @value=N'true', @level0type=N'SCHEMA', @level0name=N'dbo', @level1type=N'TABLE', @level1name=N'TEMP_ID_40'</sql_operation>
      </sql_operations>
    </dsql_operation>
    <dsql_operation operation_type="ON">
      <location permanent="false" distribution="AllDistributions" />
      <sql_operations>
        <sql_operation type="statement">UPDATE STATISTICS [tempdb].[dbo].[TEMP_ID_40] WITH ROWCOUNT = 24, PAGECOUNT = 1</sql_operation>
      </sql_operations>
    </dsql_operation>
    <dsql_operation operation_type="MULTI">
      <dsql_operation operation_type="STREAMING_RETURN">
        <operation_cost cost="1" accumulative_cost="1" average_rowsize="114" output_rows="195" />
        <location distribution="AllDistributions" />
        <select>SELECT [T1_1].[FirstName] AS [FirstName],
       [T1_1].[LastName] AS [LastName],
       [T1_1].[Age] AS [Age],
       [T1_1].[Throws] AS [Throws],
       [T1_1].[Bats] AS [Bats]
FROM   [tempdb].[dbo].[TEMP_ID_40] AS T1_1</select>
      </dsql_operation>
      <dsql_operation operation_type="ExternalRoundRobinMove">
        <operation_cost cost="0.80028" accumulative_cost="1.80028" average_rowsize="114" output_rows="195" />
        <external_uri>hdfs://sandbox.hortonworks.com:8020/tmp/ootp/secondbasemen.csv</external_uri>
        <destination_table>[TEMP_ID_40]</destination_table>
      </dsql_operation>
    </dsql_operation>
    <dsql_operation operation_type="ON">
      <location permanent="false" distribution="AllDistributions" />
      <sql_operations>
        <sql_operation type="statement">DROP TABLE [tempdb].[dbo].[TEMP_ID_40]</sql_operation>
      </sql_operations>
    </dsql_operation>
  </dsql_operations>
</dsql_query>

Yay, more XML!  Again, I’m no sadist, so let’s focus on the interesting bits inside this block.

Line 2:
<dsql_query number_nodes=”1″ number_distributions=”8″ number_distributions_per_node=”8″>

We have one Polybase node in our scaleout cluster.  Azure SQL Data Warehouse has a similar output; this Grant Fritchey post shows an example with number_distributions=”60″ instead of 8.  For more information on how this number_distributions matters, check out my post from June 27th on the topic.

Line 4:
<dsql_operations total_cost=”0″ total_number_operations=”6″>

There are going to be six operations in this query plan.  They start on lines 5, 8, 14, 20, 26, and 43, respectively.

Line 5:
<dsql_operation operation_type=”RND_ID”><identifier>TEMP_ID_40</identifier></dsql_operation>

This looks like the operation which generates the name for the temp table.

Line 8 looks to be the operation which generates the create statement for the temp table.  Line 14 then builds extended properties.  Line 20 creates external stats.  Line 26 creates the SELECT statement to retrieve data from the temp table.  Line 37 performs the Round Robin retrieval from Hadoop into TMP_ID_40.  Line 43 drops the table.  Hey, wait, this looks familiar!

nonmapreduceworkresults

Yep, these are the same details you get out of sys.dm_exec_distributed_request_steps, though I’m reusing the image here so the table IDs won’t be quite the same.  Look carefully at the image and you’ll see eight steps, but the explain plan XML above only shows six items.  It appears that step index 4 does not show up in the explain plan.  Step 6 in the request steps DMV selects data from TEMP_ID_## and calls the result T1_1; this is what shows up on line 27 of the explain plan, with an operation whose type is STREAMING_RETURN.  Step 5 in the request steps DMV selects from SecondBasemen as T1_1, so I think that matches the ExternalRoundRobinMove operation in line 37 of the explain plan.

MapReduce Queries

I’m just going to add an OPTION(FORCE EXTERNALPUSHDOWN) hint to my query.  That way, we have a plan which is as close to the original plan as we can get while still doing a MapReduce job.

This execution plan is also 174 lines long, so I won’t do a straight copy-paste like I did the first plan.  The graphical shape of the plan is the same and everything aside from the remote query looks to be the same.  The remote query’s explain plan, not surprisingly is different, so here it is:

<?xml version="1.0" encoding="utf-8"?>
<dsql_query number_nodes="1" number_distributions="8" number_distributions_per_node="8">
  <sql>ExecuteMemo explain query</sql>
  <dsql_operations total_cost="257703.37296" total_number_operations="8">
    <dsql_operation operation_type="RND_ID">
      <identifier>TEMP_ID_56</identifier>
    </dsql_operation>
    <dsql_operation operation_type="HadoopOperation">
      <operation_cost cost="257703.37296" accumulative_cost="257703.37296" average_rowsize="114" output_rows="195" />
      <Config>
  <JobTrackerName>sandbox.hortonworks.com</JobTrackerName>
  <JobTrackerPort>8050</JobTrackerPort>
  <Plan>
    <RelOp NodeId="6" LogicalOp="Materialize">
      <OutputList>
        <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Column="FirstName" />
        <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Column="LastName" />
        <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Column="Age" />
        <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Column="Throws" />
        <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Column="Bats" />
      </OutputList>
      <Children>
        <RelOp NodeId="1" LogicalOp="Scan">
          <OutputList>
            <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Column="FirstName" />
            <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Column="LastName" />
            <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Column="Age" />
            <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Column="Throws" />
            <ColumnReference Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]" Column="Bats" />
          </OutputList>
          <Object Database="[OOTP]" Schema="[dbo]" Table="[SecondBasemen]">
            <Uri>hdfs://sandbox.hortonworks.com:8020/tmp/ootp/secondbasemen.csv</Uri>
            <FileFormat Type="DelimitedText">
<options>
                <FieldTerminator>,</FieldTerminator>
                <UseTypeDefault>True</UseTypeDefault>
                <Encoding>UTF8</Encoding>
              </Options>
            </FileFormat>
          </Object>
          <Children />
        </RelOp>
      </Children>
    </RelOp>
  </Plan>
</Config>
    </dsql_operation>
    <dsql_operation operation_type="RND_ID">
      <identifier>TEMP_ID_57</identifier>
    </dsql_operation>
    <dsql_operation operation_type="ON">
      <location permanent="false" distribution="AllDistributions" />
      <sql_operations>
        <sql_operation type="statement">CREATE TABLE [tempdb].[dbo].[TEMP_ID_57] ([FirstName] VARCHAR(50) COLLATE SQL_Latin1_General_CP1_CI_AS, [LastName] VARCHAR(50) COLLATE SQL_Latin1_General_CP1_CI_AS, [Age] INT, [Throws] VARCHAR(5) COLLATE SQL_Latin1_General_CP1_CI_AS, [Bats] VARCHAR(5) COLLATE SQL_Latin1_General_CP1_CI_AS ) WITH(DATA_COMPRESSION=PAGE);</sql_operation>
      </sql_operations>
    </dsql_operation>
    <dsql_operation operation_type="ON">
      <location permanent="false" distribution="AllDistributions" />
      <sql_operations>
        <sql_operation type="statement">EXEC [tempdb].[sys].[sp_addextendedproperty] @name=N'IS_EXTERNAL_STREAMING_TABLE', @value=N'true', @level0type=N'SCHEMA', @level0name=N'dbo', @level1type=N'TABLE', @level1name=N'TEMP_ID_57'</sql_operation>
      </sql_operations>
    </dsql_operation>
    <dsql_operation operation_type="ON">
      <location permanent="false" distribution="AllDistributions" />
      <sql_operations>
        <sql_operation type="statement">UPDATE STATISTICS [tempdb].[dbo].[TEMP_ID_57] WITH ROWCOUNT = 24, PAGECOUNT = 1</sql_operation>
      </sql_operations>
    </dsql_operation>
    <dsql_operation operation_type="MULTI">
      <dsql_operation operation_type="STREAMING_RETURN">
        <operation_cost cost="1" accumulative_cost="257704.37296" average_rowsize="114" output_rows="195" />
        <location distribution="AllDistributions" />
        <select>SELECT [T1_1].[FirstName] AS [FirstName],
       [T1_1].[LastName] AS [LastName],
       [T1_1].[Age] AS [Age],
       [T1_1].[Throws] AS [Throws],
       [T1_1].[Bats] AS [Bats]
FROM   [tempdb].[dbo].[TEMP_ID_57] AS T1_1</select>
      </dsql_operation>
      <dsql_operation operation_type="ExternalRoundRobinMove">
        <operation_cost cost="0.80028" accumulative_cost="257705.17324" average_rowsize="114" output_rows="195" />
        <external_uri>hdfs://sandbox.hortonworks.com:8020/TEMP_ID_56_OUTPUT_DIR</external_uri>
        <destination_table>[TEMP_ID_57]</destination_table>
      </dsql_operation>
    </dsql_operation>
    <dsql_operation operation_type="ON">
      <location permanent="false" distribution="AllDistributions" />
      <sql_operations>
        <sql_operation type="statement">DROP TABLE [tempdb].[dbo].[TEMP_ID_57]</sql_operation>
      </sql_operations>
    </dsql_operation>
  </dsql_operations>
</dsql_query>

The first difference between the MapReduce query and the basic query is that the MapReduce query has a cost and does 8 operations, whereas the basic query had a cost value of 0 and performed 6 operations.

We also get to see a HadoopOperation, which includes JobTracker details as well as details on the file format of the second basemen file in HDFS.  What interests me here are the two LogicalOp operators on lines 14 and 23, respectively:  Materialize (the set of columns on OOTP.dbo.SecondBaseman) and Scan (the secondbaseman.csv file).

There’s another difference down in lines 81-82.  Here, instead of querying secondbaseman.csv like the basic query did, we’re reading the results of the MapReduce directory, labeled as TEMP_ID_56_OUTPUT_DIR in my example above.

Conclusion

When you combine this post with the posts on digging into basic and MapReduce queries, I think the picture starts getting clearer and clearer.  Hopefully at this point the Polybase engine is looking less like a black box and more like a “normal” data mover service with several inspection points available to us.

Polybase MapReduce Container Size

If you like this content, check out the entire Polybase series.

Whenever you create a MapReduce job via YARN, YARN will create a series of containers, where containers are isolated bubbles of memory and CPU power.  This blog post will go over an issue that I ran into with my HDP 2.4 VM concerning YARN containers and Polybase.

Memory Starvation In A Nutshell

Originally, I assigned 8 GB of RAM to my HDP 2.4 sandbox.  This was enough for basic work like running Hive queries, and so I figured it was enough to run a MapReduce job initiated by Polybase.  It turns out that I was wrong.

Configuring YARN Containers

There are a couple configuration settings in Ambari (for the sandbox, the default Ambari page is http://sandbox.hortonworks.com:8080).  From there, if you select the YARN menu item and go to the Configs tab, you can see the two Container configuration settings.

containersettings

On my machine, I have the minimum container size marked at 512 MB and maximum container size at 1536 MB.  I also have the minimum and maximum number of CPU VCores set as well, ranging from 1 to 4.

Setting The Stage

Back at PASS Summit, I had a problem getting Polybase to work.  Getting the SQL Server settings correct was critical, but even after that, I still had a problem:  my jobs would just run endlessly.  Eventually I had to kill the jobs, even though I’d let them run for upwards of 30 minutes.  Considering that the job was just pulling 777 rows from the SecondBasemen table, this seemed…excessive…  Also, after the first 20-30 seconds, it seemed like nothing was happening, based on the logs.

longrunningjobs

On the Friday of PASS Summit, I had a chance to sit down with Bill Preachuk (t) and Scott Shaw (b | t) of Hortonworks and they helped me diagnose my issues.  By focusing down on what was happening while the MapReduce job ran, they figured out that my laptop was not creating the necessary number of YARN containers due to the service running out of memory, and so my MapReduce jobs would just hang.

Our solution was simple:  scale down the container min and max size to 512 MB, as that would guarantee that I could create at least 3 containers—which is what the Polybase engine wanted.

Now It Breaks For Real

Once we did that and I restarted all of the services, I ended up getting an interesting error message from SQL Server:

Msg 7320, Level 16, State 110, Line 2
Cannot execute the query “Remote Query” against OLE DB provider “SQLNCLI11” for linked server “(null)”. EXTERNAL TABLE access failed due to internal error: ‘Java exception raised on call to JobSubmitter_SubmitJob: Error [org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException: Invalid resource request, requested memory < 0, or requested memory > max configured, requestedMemory=1536, maxMemory=512

The error message is pretty clear:  the Polybase service wants to create containers that are 1536 MB in size, but the maximum size I’m allowing is 512 MB.  Therefore, the Polybase MapReduce operation fails.

Let’s Change Some Config Files!

To get around that, I looked up the proper way to set the map and reduce memory sizes for a MapReduce job:  by changing mapred-site.xml.  Therefore, I went back to the Polybase folder on my SQL Server installation and added the following to my mapred-site.xml:

<property>
<name>mapreduce.map.memory.mb</name>
<value>512</value>
</property>
<property>
<name>mapreduce.reduce.memory.mb</name>
<value>512</value>
</property>

After restarting the Polybase service, I ran the query again and got the same error.  It seems changing that setting did not work.

Conclusions

The way I got around this problem was to increase the HDP sandbox VM to have 12 GB of RAM allocated to it.  That’s a large percentage of my total memory allocation on the laptop, but at least once I set the container size back to allowing 1536 MB of RAM, my Polybase MapReduce jobs ran.

My conjecture is that the 1536 MB size might be hard-coded, but it’s just that:  conjecture.  I have no proof either way.

One last thing I should note is that the Polybase service wants to create three containers, so you need to have enough memory available to YARN to allocate three 1536 MB containers, or 4608 MB of RAM.

Forcing External Pushdown

In the last post, we dug into how the MapReduce process works.  Today, I want to spend a little bit of time talking about forcing MapReduce operations and disabling external pushdown.

What Can We Force?

Before we get into the mechanics behind forcing pushdown, it’s important to note that predicate pushdown is somewhat limited in the sense of what information we can pass to the MapReduce engine.

For example, let’s say that I have a set of flight details in Hadoop and a set of airports in SQL Server.  Suppose I want to run the following query as quickly as possible:

SELECT
    *
FROM dbo.Flights f
    INNER JOIN dbo.Airports ar
        ON f.dest = ar.IATA
WHERE
    ar.state = 'OH'
    AND ar.city = 'Columbus';

Ideally, the engine would start with the Airports table, filtering by state and city and getting back the 3 records in that table which associate with Columbus airports.  Then, knowing the IATA values for those 3 airports, the Polybase engine would feed those into the remote query predicate and we’d get back the fewest number of rows.  There’s just one catch:

The Polybase engine does not work that way!

Unfortunately, my ideal scenario isn’t one that Polybase can give me (at least today; we’ll see if they can in the future).  The only predicates which the Polybase engine can push are those which are explicit, so we can’t feed a set of results from one part of a query plan into Hadoop.  If you did want to do something like that, I think the easiest solution would be to generate dynamic SQL and build an IN clause with the IATA/dest codes.

So with that in mind, let’s talk more about forcing and preventing pushdown.

Forcing Predicate Pushdown

As a reminder, in order to allow predicate pushdown to occur, we need to hit a Hadoop cluster; we can’t use predicate pushdown on other systems like Azure Blob Storage.  Second, we need to have a resource manager link set up in our external data source.  Third, we need to make sure that everything is configured correctly on the Polybase side.  But once you have those items in place, it’s possible to use the FORCE EXTERNALPUSHDOWN command like so:

SELECT TOP(50)
	sb.FirstName,
	sb.LastName,
	sb.Age,
	a.AgeDesc,
	sb.Throws,
	sb.Bats,
	tsa.TopSalary
FROM dbo.SecondBasemen sb
	CROSS APPLY (SELECT CASE WHEN Age > 30 THEN 'Old' ELSE 'Not Old' END AS AgeDesc) a
	INNER JOIN dbo.TopSalaryByAge tsa
		ON sb.Age = tsa.Age
ORDER BY
	sb.LastName DESC
OPTION(FORCE EXTERNALPUSHDOWN);

You might want to force external pushdown when you don’t have good external table stats but do have some extra knowledge the optimizer doesn’t have regarding the data set.  For example, you might know that you’re asking for a tiny fraction of the total data set to get returned but the optimizer may not be aware of this, and so it pulls the entire data set over to SQL Server for the operation.

Be wary of using this operation, however; it will spin up a MapReduce job, and that can take 15-30 seconds to start up.  If you can get your entire data set over the wire in 10 seconds, there’s little value in shunting the work off to your Hadoop cluster.

Disabling External Pushdown

The opposite of FORCE EXTERNALPUSHDOWN is DISABLE EXTERNALPUSHDOWN.  On Azure Blob Storage and other external sources, this is a no-op.  This is also a no-op if you have a Hadoop cluster but no resource manager set up.

With everything set up, it will ensure that you do not push down to your Hadoop cluster.  This might be a good thing if you consistently see faster times by just pulling the entire data set over the wire and performing any filters or operations in SQL Server.  Note that even with this option set, performance is still going to be a good bit better than using Hive over a linked server.

Pushdown-Related Errors

Let’s say that you want to run OPTION(FORCE EXTERNALPUSHDOWN) and you get an error as soon as the command starts:

Msg 7320, Level 16, State 110, Line 1
Cannot execute the query “Remote Query” against OLE DB provider “SQLNCLI11” for linked server “(null)”. Query processor could not produce a query plan because of the hints defined in this query. Resubmit the query without specifying any hints.

If you’re trying to perform this operation against a Hadoop cluster, you’ll want to re-read the configuration settings blog post.  There are several files which need to change and if you don’t get all of them, you’ll end up with strange-looking error messages.

Running MapReduce Polybase Queries

Previously, we looked at basic querie which do not perform MapReduce operations.  Today’s post is going to look at queries which do perform MapReduce operations and we’ll see how they differ.

MapReduce Queries

In order for us to be able to perform a MapReduce operation, we need the external data source to be set up with a resource manager.  In addition, one of the following two circumstances must be met:

  1. Based on external table statistics, our query must be set up in such a way that it makes sense to distribute the load across the Hadoop data nodes and have the difference be great enough to make it worth waiting 15-30 seconds for a MapReduce job to spin up, OR
  2. We force a MapReduce job by using the OPTION(FORCE EXTERNALPUSHDOWN) query hint.

For the purposes of today’s post, I’m going to focus on the latter.  Regarding condition #1, my hunch is that there’s some kind of in-built cost optimizer the Polybase engine uses to determine whether to spin up a MapReduce job.  If this is in fact the case, it’s important to note that there is no “cost threshold for MapReduce” option that we can set, so if you’re looking for ways to fine-tune MapReduce operations, you may have to be liberal with the query hints to enable and disable external pushdown.

Our MapReduce query is pretty simple:

SELECT TOP(50)
	sb.FirstName,
	sb.LastName,
	sb.Age,
	a.AgeDesc,
	sb.Throws,
	sb.Bats,
	tsa.TopSalary
FROM dbo.SecondBasemen sb
	CROSS APPLY (SELECT CASE WHEN Age &gt; 30 THEN 'Old' ELSE 'Not Old' END AS AgeDesc) a
	INNER JOIN dbo.TopSalaryByAge tsa
		ON sb.Age = tsa.Age
ORDER BY
	sb.LastName DESC
OPTION(FORCE EXTERNALPUSHDOWN);

The only thing I’ve done here compared to the basic query is to add a query hint to force external pushdown.  Let’s see how this changes things.

Step 1:  Communicating with Hadoop

Just like with the basic query, we can use Wireshark to see how the Polybase engine interacts with our Hadoop cluster.  This time around, there’s a lot more going on over the wire than in the basic query scenario; there are ten separate TCP streams which the Polybase engine initiated, and we’re going to take at least a quick look at each one.

mapreducepacketcapture

A quick look at the Wireshark packet capture.

Note that my names for these streams comes from the order in which they show up in Wireshark.  They might have specific internal names, but I’m not going to know what they are.

Stream 0

Stream 0 appears to act as a coordinator, setting everything up and building the framework for the rest of the network operations to succeed.  This first stream has three major sections.  The first section is around basic tasks, like making sure the file we are looking for exists:

mapreducesetup

Once we know it exists, we want to create a job.  Eventually, that job will complete (we hope).

mapreducecreate

Then, after everything completes, we get file info for the staging work:

mapreducegetfileinfo

Stream 0 runs through the entire operation, with the earliest packet being packet #7 and the last packet being #20118 of approximately 20286.  We communicate with the Hadoop name node on port 8020 for this stream.

Stream 1

The next stream is Stream 1.  This stream does two things.  The first thing, which you can see at the beginning of the stream text below, is to prep the MapReduce job, including running /bin/bash to set the CLASSPATH and LD_LIBRARY_PATH.

mapreducestream1

After doing that, it regularly calls YARN’s getApplicationReport method.  When it gets word that the MapReduce job is complete, it finishes its work.

mapreducestream1end

Stream 1 starts with packet #23 and ends with packet #20286.  But there’s a big gap between packets; stream 1 is responsible for packets 23-30, 128, and then most of 19989 through 20110.  This conversation takes place on port 8050 of the name node; remember that 8050 is the resource manager port for YARN.

Stream 2

Stream 2 is the single largest stream, responsible for 18,864 of the ~20,286 packets.  We can see exactly why by looking at the beginning and end sections of the stream.  First the beginning:

mapreducestream2start

And now the end:

mapreducestream2end

Stream 2 sends along 27 MB worth of data.  It’s packaging everything Polybase needs to perform operations, including JAR files and any internal conversion work that the Polybase engine might need to translate Hadoop results into SQL Server results.  For example, the sqlsort at the bottom is a DLL that performs ordering using SQL Server-style collations, as per the Polybase academic paper.

Stream 2 is responsible for almost every packet from 43 through 19811; only 479 packets in that range belonged to some other stream (19811 – 43 – 18864 – 425 = 479).  We send all of this data to the data node via port 50010.

Stream 3

After the behemoth known as Stream 2, the rest of these are pretty small.

mapreducestream3

This stream appears to perform a block write operation.  It is responsible for most of the packets from 19830 to 19846.  This conversation takes place over the data node’s port 50010.

Stream 4

This stream follows up on the work Stream 3 did.

mapreducestream4

It appears to be writing metadata, and is responsible for most of the packets from 19853 through 19866.  This communication is happening over the data node’s port 50010.

Stream 5

After two tiny streams, we get back to a slightly larger stream.

mapreducestream5

As you can guess from the contents, this stream creates the YARN XML job and passes it over the wire.  The stream starts with packet 19875 and ends at 19968, using 83 packets to transmit the payload.  We connect to port 50010 to push the YARN config file to the data node.

Stream 6

Our next stream is of the “child on a long road trip” variety.

mapreducestream6

Basically this stream calls two methods as part of a loop:  getTaskAttemptCompletionEvents and getJobReport.  It looks like there might be a periodic call of getCounters or this may be triggered by something.  In any event, the loop continues until getTaskAttemptCompletionEvents gets back a result.  In my case, the result fired back $http://sandbox.hortonworks.com:13562 .”(.  Of note is that port 13562 is the default MapReduce shuffle port.

Stream 6 was responsible for 80 of the packets from 20103 through 20209.  One interesting thing to note is that the Hadoop-side port was 48311, which I believe is just a dynamic port and does not signify anything like many of the other ports we’re looking at.

Stream 7

This is another loop-until-done stream, but it ended pretty quickly.

mapreducestream7

This looped for getting file info and listing information by connecting to the Hadoop name node on port 8020.  It took packets 20210 though 20224, as well as a few wrapup packets at the end.

Stream 8

Stream 8 retrieved information on the finished MapReduced data.

mapreducestream8

It first looks for a file called part-m-00000.ppax.  The PPAX format is a proprietary data format Polybase uses (the link is a Powerpoint deck that the current Polybase PM presented at SQL Saturday Boston).

Stream 8 is a TCP connection to the name node on port 8020 and stretches from 20225-20234 and a few packets after that as well.

Stream 9

The final Polybase stream actually receives the data in PPAX format.

mapreducestream9

We connect to the Hadoop data node on port 50010 and retrieve the data file.  This takes 34 packets and stretches from 20235 through 20280.

Step 2:  MapReduce Logs

For MapReduce logs, you’ll want to go to port 8088 of the name node, so for example, http://sandbox.hortonworks.com:8088/cluster.  On this link, we can see the various “applications” that have been created.

mapreducelogapplication

Clicking on one of these brings up details on the application, and we can see logs for each attempt.

mapreduceloglogs

If you click the Logs link, you’ll get to see the various logs, starting in short form and with the option to drill in further.

mapreducelogsyslog

In this example, we can see that the stderr and stdout logs didn’t have much, but the syslog log had more than 4K, so if we want to see the full log, we can click a link.

Step 3:  The Rest Of The Story

Once we have the file available, it’s time for Polybase to wrap up the story.  We can see this in the DMV outputs:

mapreducedmv

We can see that the dm_exec_external_work DMV looks slightly different, but still performs a file split.  But this time, instead of splitting the CSV file directly, we split the MapReduced file /user/pdw_user/TEMP_DIR/5130041d4035420b80f88096f197d4b2/Output/part-m-00000.ppax.  The operations to load the table are pretty similar to before, though:  we create a temp table, build statistics on the table, and then import the data using a round robin approach.

Conclusion

Figuring out how the Polybase engine performs MapReduce operations has been interesting.  For this simple single-Hadoop-node, single-Polybase-node, single-file setup, we saw ten separate TCP streams, and understanding how they work in conjunction with DMV and MapReduce logs can give you a fuller understanding of the Polybase MapReduce process.