Today’s PolyBase Revealed post covers another thing I’ve been waiting for in PolyBase for a long time: integration with Apache Spark.
MapReduce is Slow
A big reason I’m interested in integrating PolyBase with Apache Spark is that the current techniques for integrating with Hadoop—either streaming all of the data over from HDFS into SQL Server or running a MapReduce job against your Hadoop cluster—are slow. Six years ago, when Parallel Data Warehouse was new, this wasn’t such a big deal. But we’ve seen huge improvements in processing speeds on Hadoop clusters since then and being stuck in the slow lane is painful.
Apache Spark can be 3-30x faster than an equivalent MapReduce job, and I’d like to tap into that.
The Process
There are a few steps we need to follow to get Spark working. First, you need a Spark cluster. I’m using Hortonworks Data Platform 3.0.
Grab an ODBC Driver
You will need a proper ODBC driver given your Spark cluster. In my case, I’m using the ODBC driver for Spark SQL on the Hortonworks Data Platform 3.0 downloads page. If you’re using a different Spark cluster, you will probably need a different ODBC driver.
Create External Resources
Next up, we will need to create a database-scoped credential to authenticate against our Spark cluster. Here’s some sample code:
IF NOT EXISTS
(
SELECT 1
FROM sys.database_scoped_credentials dsc
WHERE
dsc.name = N'SparkCredential'
)
BEGIN
CREATE DATABASE SCOPED CREDENTIAL SparkCredential
WITH IDENTITY = '<Your User>', Secret = '<Your PWD>';
END
GO
After we have the credential in place, let’s create an external data source. It’s going to hit my server, which is on a host named clusterino
and is running Apache Thrift on port 10016.
IF NOT EXISTS
(
SELECT 1
FROM sys.external_data_sources ds
WHERE
ds.name = N'ClusterinoSpark'
)
BEGIN
CREATE EXTERNAL DATA SOURCE ClusterinoSpark WITH
(
LOCATION = 'odbc://clusterino:10016',
CONNECTION_OPTIONS = 'Driver={Hortonworks Spark ODBC Driver}; Host = clusterino; Port = 10016; Database = default; ServerNode = clusterino:10016',
CREDENTIAL = SparkCredential,
PUSHDOWN = ON
);
END
GO
Notice that I need to specify the driver name; if yours is different, change that label. I also need to connect to a ServerNode
even though I specify the host and port in separate fields. It appears that Spark uses the Host
and Port
connection string settings, whereas PolyBase focuses on ServerNode
.
Considerations of Note and Import
Your Spark Schema Matters
For my first demonstration, I’m going to take some North Carolina population data and load it into Spark in this way:
val ncPop = spark.read.format("CSV").option("header","true").load("hdfs://clusterino/PolyBaseData/NorthCarolinaPopulation.csv")
ncPop.write.format("orc").saveAsTable("NorthCarolinaPopulation")
That data was in HDFS and now it’s in ORC format as a table called NorthCarolinaPopulation. Spark inferred the data types when I created the ncPop DataFrame and I need to follow those instructions.
When creating an external table, I have to specify strings for everything because that’s the default:
CREATE EXTERNAL TABLE dbo.NorthCarolinaPopulationSpark
(
SUMLEV NVARCHAR(255),
COUNTY NVARCHAR(255),
PLACE NVARCHAR(255),
PRIMGEO_FLAG NVARCHAR(255),
NAME NVARCHAR(255),
POPTYPE NVARCHAR(255),
YEAR NVARCHAR(255),
POPULATION NVARCHAR(255)
)
WITH
(
LOCATION = 'NorthCarolinaPopulation',
DATA_SOURCE = ClusterinoSpark
);
Let’s suppose that I know SUMLEV
ought to be an integer and change its definition to SUMLEV INT
. Well, then I get an error:

The nice thing is that this error message does cover all of the type errors, so you can fix them in one go rather than one at a time.
Spark Controls the Horizontal, Spark Controls the Vertical
If you do define your Spark DataFrames well, you get a much happier result. Here’s me creating a better-looking DataFrame in Spark:
import org.apache.spark.sql.functions._
spark.sql("""
SELECT
INT(SUMLEV) AS SummaryLevel,
INT(COUNTY) AS CountyID,
INT(PLACE) AS PlaceID,
BOOLEAN(PRIMGEO_FLAG) AS IsPrimaryGeography,
NAME AS Name,
POPTYPE AS PopulationType,
INT(YEAR) AS Year,
INT(POPULATION) AS Population
FROM NorthCarolinaPopulation
WHERE
POPULATION <> 'A'
""")
.write.format("orc").saveAsTable("NorthCarolinaPopulationTyped")
And with that, I can create a typed external table:
CREATE EXTERNAL TABLE dbo.NorthCarolinaPopulationTypedSpark
(
SummaryLevel INT,
CountyID INT,
PlaceID INT,
IsPrimaryGeography BIT,
Name NVARCHAR(255),
PopulationType NVARCHAR(255),
Year INT,
Population INT
)
WITH
(
LOCATION = 'NorthCarolinaPopulationTyped',
DATA_SOURCE = ClusterinoSpark
);
Now everybody is happy.
Driver Problems are Endemic
Admittedly, this could be in part because I’ve been looking at early CTPs, but I have had driver problems with PolyBase connecting to pretty much everything. Some drivers will work, but others will fail. I had one Spark driver work and several more fail. For Apache Hive, the hit rate was similar. And the problem wasn’t going between my driver and Spark / Hive—it was coming from inside the house PolyBase engine.
I’d like to be able to scratch this out entirely with the newer CTPs. I haven’t tried Spark and Hive against 3.0 yet, but once 3.1 is out I’ll rebuild my test machines and try it out again.
Conclusion
In this post, we saw a quick demonstration of creating an external table which hits Apache Spark. I don’t have any good performance metrics for this because my demo cluster is a single node, but functionality-wise, it’s there and it’s generally faster than MapReduce.
Coda: Not all that Glitters is Spark
With this ODBC functionality, PolyBase opens the door to other technologies. One that I will talk about in PolyBase Revealed is Apache Hive, as I alluded to above.
As for the other?

