It’s Hall of Fame season again!

Let’s Go Tribe has an excellent breakdown of the new 2015 HOF Ballot. They have both the first timers and the guys who have been on the ballot for a while. Jay Jaffe’s breakdown is even more detailed and awesome, because he’s Jay Jaffe and is rapidly becoming my favorite baseball writer.

The inner asshole in me would love to see Biggio held off for a second year, maybe missing by one vote because somebody spells his name wrong or something. Randy Johnson is the slam dunk candidate in this class; Pedro is only slightly less slam-dunkier because of the brevity of his career, but it’s probably going to be his year too. Biggio probably gets in, too. However, I think that’s it for 2015’s ballot. Piazza’s got an outside chance, and I expect John Smoltz will make a nice showing his first year on the ballot, but it’s hard to see anybody apart from that making a serious impact.

Reducing maximum eligibility from 15 years to 10 years isn’t a horrible move, but they really need to increase the number of ballot slots, maybe to 15. I’ll probably be covering this until ballots are released, so stay tuned for updates.

BIML, Part 6—Lookups

Last time around, we built a BIML package which generates one data flow per table we wish to migrate. Before loading the customer’s data, we needed to delete existing data for that customer. This works fine with a toy database, but once you’re talking about millions or billions of rows of data in fact tables, the delete-and-then-reload model doesn’t scale at all. Instead, I’d like to insert into the new node only if that record does not already exist. The way that I’m going to implement this is by using a Lookup component in each data flow, between the SQL data source and SQL destination.

Doing this is going to require a few changes to the BIML. Let’s start with the final product, and then I’ll walk through each of the changes individually.

<#@ template language="C#" hostspecific="true" #>
<#@ import namespace="System.Data" #>

<Biml xmlns="http://schemas.varigence.com/biml.xsd">
	<#
	string metadataConnectionString ="Provider=SQLNCLI10;Server=LOCALHOST;Initial Catalog=AdventureWorksDW2012;Integrated Security=SSPI;";
	DataTable tables = ExternalDataAccess.GetDataTable(metadataConnectionString,
	"SELECT SchemaName, TableName, Section, JoinCriteria, SelectCriteria, WhereCriteria, NeedToDelete, LookupColumns, LookupColumnXML, CASE WHEN WhereCriteria = '1=1' THEN '1=1 \"' ELSE REPLACE(WhereCriteria, '?', '\" + (DT_WSTR, 12)@[$Project::CustomerID]') END AS ExpressionWhereCriteria FROM dbo.MigrationStep ORDER BY Section, SchemaName, TableName");
	DataTable sections = ExternalDataAccess.GetDataTable(metadataConnectionString,
	"SELECT DISTINCT Section FROM dbo.MigrationStep ORDER BY Section");
	DataTable sectionsDesc = ExternalDataAccess.GetDataTable(metadataConnectionString,
	"SELECT DISTINCT Section FROM dbo.MigrationStep ORDER BY Section DESC"); #>
	<PackageProjects>
		<PackageProject Name="MigrateWarehouseData" ProtectionLevel="DontSaveSensitive">
			<Parameters>
				<Parameter Name="CustomerID" DataType="Int32">1234</Parameter>
				<Parameter Name="OldWarehouseServer" DataType="String">LOCALHOST</Parameter>
				<Parameter Name="OldWarehouseDatabase" DataType="String">AdventureWorksDW2012</Parameter>
				<Parameter Name="NewWarehouseServer" DataType="String">LOCALHOST</Parameter>
				<Parameter Name="NewWarehouseDatabase" DataType="String">AdventureWorksDW2012Node2</Parameter>
				<Parameter Name="PerformMigration" DataType="Boolean">True</Parameter>
				<Parameter Name="PerformDeletion" DataType="Boolean">True</Parameter>
			</Parameters>
			<Packages>
				<Package PackageName="MigrateCustomer" />
			</Packages>
		</PackageProject>
	</PackageProjects>
	<Connections>
		<Connection Name="OldWarehouseNode" ConnectionString="Data Source=LOCALHOST;Initial Catalog=AdventureWorksDW2012;Provider=SQLNCLI11.1;Integrated Security=SSPI;Auto Translate=False;" />
		<Connection Name="NewWarehouseNode" ConnectionString="Data Source=LOCALHOST;Initial Catalog=AdventureWorksDW2012Node2;Provider=SQLNCLI11.1;Integrated Security=SSPI;Auto Translate=False;" />
	</Connections>
	<Packages>
		<Package Name="MigrateCustomer" ConstraintMode="Linear" ProtectionLevel="DontSaveSensitive">
			<Connections>
				<Connection ConnectionName="OldWarehouseNode">
					<Expressions>
						<Expression ExternalProperty="InitalCatalog">@[$Project::OldWarehouseDatabase]</Expression>
						<Expression ExternalProperty="ServerName">@[$Project::OldWarehouseServer]</Expression>
					</Expressions>
				</Connection>
				<Connection ConnectionName="NewWarehouseNode">
					<Expressions>
						<Expression ExternalProperty="InitialCatalog">@[$Project::NewWarehouseDatabase]</Expression>
						<Expression ExternalProperty="ServerName">@[$Project::NewWarehouseServer]</Expression>
					</Expressions>
				</Connection>
			</Connections>
			<Tasks>
				<Container Name="Delete From New Tables" ConstraintMode="Linear">
					<Expressions>
						<Expression ExternalProperty="Disable">!(@[$Project::PerformDeletion])</Expression>
					</Expressions>
					<Tasks>
						<# foreach (DataRow row in sectionsDesc.Rows) { #>
						<Container Name="Delete From New Table <#=row[0]#>" ConstraintMode="Linear">
							<Tasks>
								<Container Name="Start Deleting..." ConstraintMode="Linear"></Container>
								<# foreach (DataRow trow in tables.Rows) {
							if (Convert.ToInt32(trow[2]) == Convert.ToInt32(row[0]) && Convert.ToBoolean(trow[6])) { #>
								<ExecuteSQL Name="Prepare <#=trow[1]#>" ConnectionName="NewWarehouseNode">
									<DirectInput>
										DELETE FROM [<#=trow[0]#>].[<#=trow[1]#>];
									</DirectInput>
									<Parameters>
										<Parameter Name="0" VariableName="MigrateWarehouseData.CustomerID" DataType="Int32" />
									</Parameters>
								</ExecuteSQL>
								<# }
							} #>
							</Tasks>
						</Container>
						<# } #>
					</Tasks>
				</Container>
				
				<Container Name="Migrate To New Table" ConstraintMode="Linear">
					<Expressions>
						<Expression ExternalProperty="Disable">!(@[$Project::PerformMigration])</Expression>
					</Expressions>
					<Tasks>
						<# foreach (DataRow row in sections.Rows) { #>
						<Container Name="Migrate To New Table <#=row[0]#>" ConstraintMode="Linear">
							<Tasks>
								<# foreach (DataRow trow in tables.Rows) {
							if (Convert.ToInt32(trow[2]) == Convert.ToInt32(row[0])) {#>
								<Dataflow Name="Migrate <#=trow[1]#>" DefaultBufferSize="104857600">
									<Transformations>
										<OleDbSource Name="Old Node" ConnectionName="OldWarehouseNode">
											<DirectInput>
												SELECT 
													<#=trow[4]#> 
												FROM [<#=trow[0]#>].[<#=trow[1]#>] x 
													<#=trow[3]#> 
												WHERE
													<#=trow[5]#>;
											</DirectInput>
											<Parameters>
												<Parameter Name="0" VariableName="MigrateWarehouseData.CustomerID" />
											</Parameters>
										</OleDbSource>
										<# if(!Convert.ToBoolean(trow[6])) {#>
										<Lookup Name="Lookup" OleDbConnectionName="NewWarehouseNode" CacheMode="Full" NoMatchBehavior="RedirectRowsToNoMatchOutput">
											<DirectInput>
												SELECT
													<#=trow[7]#>
												FROM [<#=trow[0]#>].[<#=trow[1]#>] x
											</DirectInput>
											<Inputs>
												<#=trow[8]#>
											</Inputs>
											<Outputs></Outputs>
										</Lookup>
										<# } #>
										<OleDbDestination Name="New Node" ConnectionName="NewWarehouseNode" BatchSize="10000" KeepIdentity=<# if(!Convert.ToBoolean(trow[6])) { #> "true" <# } else { #> "false" <# } #> UseFastLoadIfAvailable="true" MaximumInsertCommitSize="2000000">
										<# if(!Convert.ToBoolean(trow[6])) {#>
											<InputPath OutputPathName="Lookup.NoMatch"></InputPath>
										<# } #>
											<ExternalTableOutput Table="[<#=trow[0]#>].[<#=trow[1]#>]"></ExternalTableOutput>
										</OleDbDestination>
									</Transformations>
									<Expressions>
										<Expression ExternalProperty="[Lookup].[SqlCommand]">
											"SELECT DISTINCT
												<#=trow[7]#>
											FROM [<#=trow[0]#>].[<#=trow[1]#>] x
												<#=trow[3]#>
											WHERE
												<#=trow[9]#>;"
										</Expression>
									</Expressions>
								</Dataflow>
								<# }
							} #>
							</Tasks>
						</Container>
						<# } #>
					</Tasks>
				</Container>
			</Tasks>
		</Package>
	</Packages>
</Biml>

Comparing this BIML to the previous set, you can see that we’re up to 143 lines of code. That seems like a lot, but we’ll look at what we’re getting in return, and I think the extra few dozen lines of code is completely worth it.

Our first major change starts in the Delete container. Previously, we forced deleting from each table. Now, we only want to delete if the NeedToDelete field has a value of 1 for each relevant table. Here’s what we do now:

<Container Name="Delete From New Tables" ConstraintMode="Linear">
	<Expressions>
		<Expression ExternalProperty="Disable">!(@[$Project::PerformDeletion])</Expression>
	</Expressions>
	<Tasks>
		<# foreach (DataRow row in sectionsDesc.Rows) { #>
		<Container Name="Delete From New Table <#=row[0]#>" ConstraintMode="Linear">
			<Tasks>
				<Container Name="Start Deleting..." ConstraintMode="Linear"></Container>
				<# foreach (DataRow trow in tables.Rows) {
			if (Convert.ToInt32(trow[2]) == Convert.ToInt32(row[0]) && Convert.ToBoolean(trow[6])) { #>
				<ExecuteSQL Name="Prepare <#=trow[1]#>" ConnectionName="NewWarehouseNode">
					<DirectInput>
						DELETE FROM [<#=trow[0]#>].[<#=trow[1]#>];
					</DirectInput>
					<Parameters>
						<Parameter Name="0" VariableName="MigrateWarehouseData.CustomerID" DataType="Int32" />
					</Parameters>
				</ExecuteSQL>
				<# }
			} #>
			</Tasks>
		</Container>
		<# } #>
	</Tasks>
</Container>

For each row, we create a container called “Delete From New Table [Section #]” and loop through the table set. For each table, if that table’s section is equal to the current section and that table’s NeedToDelete flag is set to 1, then we add an ExecuteSQL task and delete data from that table. Otherwise, I don’t need to delete data from that table.

The way that I handle lookups is with code changes in a couple of places. Let’s look at them, starting first with the lookup itself.

<# if(!Convert.ToBoolean(trow[6])) {#>
	<Lookup Name="Lookup" OleDbConnectionName="NewWarehouseNode" CacheMode="Full" NoMatchBehavior="RedirectRowsToNoMatchOutput">
		<DirectInput>
			SELECT
				<#=trow[7]#>
			FROM [<#=trow[0]#>].[<#=trow[1]#>] x
		</DirectInput>
		<Inputs>
			<#=trow[8]#>
		</Inputs>
		<Outputs></Outputs>
	</Lookup>
<# } #>

So, if NeedToDelete is set to 0, I can support a lookup component. I want to call my component Lookup and have it look up against the new warehouse node. You’ll notice that I set CacheMode to “Full” and my no-match behavior to redirect records to the no-match output. I want a full cache because I’d rather not hit the database over and over again for every row; that would be a horrible performance problem. You’ll notice that I have a special select statement: I’m using the LookupColumn column of MigrationStep, which is the set of key columns associated with this table. In AdventureWorksDW2012, that’s almost always a single column, but there is one table with a two-part key. Then, once I have that input, I need to load the Inputs section. To do that, I use the contents of LookupColumnXML, which is an XML representation of my primary key columns. This tells SSIS how to join each incoming row from the old node to my lookup component, with its new node data. Because this is a simple check, I don’t have any outputs, so that section can be blank.

I have a problem with this bit as it currently exists: I’m selecting all of the rows from my new warehouse node’s table and putting them into cache. If I have a billion rows and my primary key is 8 bytes, that’s 8GB of memory usage for that cache. On a beefy server, 8GB of memory won’t bring a machine down, but suppose I only need 100,000 of those billion rows; in that case, I’ve wasted a lot of time on rows that I am guaranteed never to use.

As such, I really want to use an expression which joins back to CustomerKey. Unfortunately, the Lookup component does not support parameters in this way. Yes, you could use a cache component, save the data that way, and use it in lookups, but there’s an easier way to do this: use the data flow’s expression.

<Expressions>
	<Expression ExternalProperty="[Lookup].[SqlCommand]">
		"SELECT DISTINCT
			<#=trow[7]#>
		FROM [<#=trow[0]#>].[<#=trow[1]#>] x
			<#=trow[3]#>
		WHERE
			<#=trow[9]#>;"
	</Expression>
</Expressions>

In this case, you’ll see that I introduce a WHERE clause along with the join criteria necessary to tie back to DimCustomer and its all-important CustomerKey. I should note that the expression is very finicky: you need to use “ExternalProperty” and the value needs to be exactly “[Lookup].[SqlCommand]” if “Lookup” is the name of your lookup component. If you drop the brackets, BIML won’t be able to make the connection and you’ll either get an error or won’t have the expression work as you expect. The positive to this, however, is that I only get the records I absolutely need.

In order to support this, I needed to change my SQL query in line 8 just a little bit:

"SELECT SchemaName, TableName, Section, JoinCriteria, SelectCriteria, WhereCriteria, NeedToDelete, LookupColumns, LookupColumnXML, CASE WHEN WhereCriteria = '1=1' THEN '1=1 \"' ELSE REPLACE(WhereCriteria, '?', '\" + (DT_WSTR, 12)@[$Project::CustomerID]') END AS ExpressionWhereCriteria FROM dbo.MigrationStep ORDER BY Section, SchemaName, TableName");

This replaces the “x.CustomerKey = ?;” pattern with x.CustomerKey = " + (DT_WSTR,12)@[$Project::CustomerID]

Now that I have my lookup set up and filtering out matches from non-matches, I want to hook the destination to my lookup component. The important thing here is that I need to connect to the no-match path, not the match path. The default is to use the match path, so I need to make a change to my OleDbDestination call:

<OleDbDestination Name="New Node" ConnectionName="NewWarehouseNode" BatchSize="10000" KeepIdentity=<# if(!Convert.ToBoolean(trow[6])) { #> "true" <# } else { #> "false" <# } #> UseFastLoadIfAvailable="true" MaximumInsertCommitSize="2000000">
<# if(!Convert.ToBoolean(trow[6])) {#>
	<InputPath OutputPathName="Lookup.NoMatch"></InputPath>
<# } #>
	<ExternalTableOutput Table="[<#=trow[0]#>].[<#=trow[1]#>]"></ExternalTableOutput>
</OleDbDestination>

This was a very small change. Basically, if NeedToDelete is set to 0, then add an InputPath tag and tell BIML that you want the NoMatch path of your lookup component.

The end result of all of this, once you generate the BIML, is this thing of beauty:

LookupSuccess