Last time around, we built a BIML package which generates one data flow per table we wish to migrate. Before loading the customer’s data, we needed to delete existing data for that customer. This works fine with a toy database, but once you’re talking about millions or billions of rows of data in fact tables, the delete-and-then-reload model doesn’t scale at all. Instead, I’d like to insert into the new node only if that record does not already exist. The way that I’m going to implement this is by using a Lookup component in each data flow, between the SQL data source and SQL destination.

Doing this is going to require a few changes to the BIML. Let’s start with the final product, and then I’ll walk through each of the changes individually.

<#@ template language="C#" hostspecific="true" #>
<#@ import namespace="System.Data" #>

<Biml xmlns="http://schemas.varigence.com/biml.xsd">
	<#
	string metadataConnectionString ="Provider=SQLNCLI10;Server=LOCALHOST;Initial Catalog=AdventureWorksDW2012;Integrated Security=SSPI;";
	DataTable tables = ExternalDataAccess.GetDataTable(metadataConnectionString,
	"SELECT SchemaName, TableName, Section, JoinCriteria, SelectCriteria, WhereCriteria, NeedToDelete, LookupColumns, LookupColumnXML, CASE WHEN WhereCriteria = '1=1' THEN '1=1 \"' ELSE REPLACE(WhereCriteria, '?', '\" + (DT_WSTR, 12)@[$Project::CustomerID]') END AS ExpressionWhereCriteria FROM dbo.MigrationStep ORDER BY Section, SchemaName, TableName");
	DataTable sections = ExternalDataAccess.GetDataTable(metadataConnectionString,
	"SELECT DISTINCT Section FROM dbo.MigrationStep ORDER BY Section");
	DataTable sectionsDesc = ExternalDataAccess.GetDataTable(metadataConnectionString,
	"SELECT DISTINCT Section FROM dbo.MigrationStep ORDER BY Section DESC"); #>
	<PackageProjects>
		<PackageProject Name="MigrateWarehouseData" ProtectionLevel="DontSaveSensitive">
			<Parameters>
				<Parameter Name="CustomerID" DataType="Int32">1234</Parameter>
				<Parameter Name="OldWarehouseServer" DataType="String">LOCALHOST</Parameter>
				<Parameter Name="OldWarehouseDatabase" DataType="String">AdventureWorksDW2012</Parameter>
				<Parameter Name="NewWarehouseServer" DataType="String">LOCALHOST</Parameter>
				<Parameter Name="NewWarehouseDatabase" DataType="String">AdventureWorksDW2012Node2</Parameter>
				<Parameter Name="PerformMigration" DataType="Boolean">True</Parameter>
				<Parameter Name="PerformDeletion" DataType="Boolean">True</Parameter>
			</Parameters>
			<Packages>
				<Package PackageName="MigrateCustomer" />
			</Packages>
		</PackageProject>
	</PackageProjects>
	<Connections>
		<Connection Name="OldWarehouseNode" ConnectionString="Data Source=LOCALHOST;Initial Catalog=AdventureWorksDW2012;Provider=SQLNCLI11.1;Integrated Security=SSPI;Auto Translate=False;" />
		<Connection Name="NewWarehouseNode" ConnectionString="Data Source=LOCALHOST;Initial Catalog=AdventureWorksDW2012Node2;Provider=SQLNCLI11.1;Integrated Security=SSPI;Auto Translate=False;" />
	</Connections>
	<Packages>
		<Package Name="MigrateCustomer" ConstraintMode="Linear" ProtectionLevel="DontSaveSensitive">
			<Connections>
				<Connection ConnectionName="OldWarehouseNode">
					<Expressions>
						<Expression ExternalProperty="InitalCatalog">@[$Project::OldWarehouseDatabase]</Expression>
						<Expression ExternalProperty="ServerName">@[$Project::OldWarehouseServer]</Expression>
					</Expressions>
				</Connection>
				<Connection ConnectionName="NewWarehouseNode">
					<Expressions>
						<Expression ExternalProperty="InitialCatalog">@[$Project::NewWarehouseDatabase]</Expression>
						<Expression ExternalProperty="ServerName">@[$Project::NewWarehouseServer]</Expression>
					</Expressions>
				</Connection>
			</Connections>
			<Tasks>
				<Container Name="Delete From New Tables" ConstraintMode="Linear">
					<Expressions>
						<Expression ExternalProperty="Disable">!(@[$Project::PerformDeletion])</Expression>
					</Expressions>
					<Tasks>
						<# foreach (DataRow row in sectionsDesc.Rows) { #>
						<Container Name="Delete From New Table <#=row[0]#>" ConstraintMode="Linear">
							<Tasks>
								<Container Name="Start Deleting..." ConstraintMode="Linear"></Container>
								<# foreach (DataRow trow in tables.Rows) {
							if (Convert.ToInt32(trow[2]) == Convert.ToInt32(row[0]) && Convert.ToBoolean(trow[6])) { #>
								<ExecuteSQL Name="Prepare <#=trow[1]#>" ConnectionName="NewWarehouseNode">
									<DirectInput>
										DELETE FROM [<#=trow[0]#>].[<#=trow[1]#>];
									</DirectInput>
									<Parameters>
										<Parameter Name="0" VariableName="MigrateWarehouseData.CustomerID" DataType="Int32" />
									</Parameters>
								</ExecuteSQL>
								<# }
							} #>
							</Tasks>
						</Container>
						<# } #>
					</Tasks>
				</Container>
				
				<Container Name="Migrate To New Table" ConstraintMode="Linear">
					<Expressions>
						<Expression ExternalProperty="Disable">!(@[$Project::PerformMigration])</Expression>
					</Expressions>
					<Tasks>
						<# foreach (DataRow row in sections.Rows) { #>
						<Container Name="Migrate To New Table <#=row[0]#>" ConstraintMode="Linear">
							<Tasks>
								<# foreach (DataRow trow in tables.Rows) {
							if (Convert.ToInt32(trow[2]) == Convert.ToInt32(row[0])) {#>
								<Dataflow Name="Migrate <#=trow[1]#>" DefaultBufferSize="104857600">
									<Transformations>
										<OleDbSource Name="Old Node" ConnectionName="OldWarehouseNode">
											<DirectInput>
												SELECT 
													<#=trow[4]#> 
												FROM [<#=trow[0]#>].[<#=trow[1]#>] x 
													<#=trow[3]#> 
												WHERE
													<#=trow[5]#>;
											</DirectInput>
											<Parameters>
												<Parameter Name="0" VariableName="MigrateWarehouseData.CustomerID" />
											</Parameters>
										</OleDbSource>
										<# if(!Convert.ToBoolean(trow[6])) {#>
										<Lookup Name="Lookup" OleDbConnectionName="NewWarehouseNode" CacheMode="Full" NoMatchBehavior="RedirectRowsToNoMatchOutput">
											<DirectInput>
												SELECT
													<#=trow[7]#>
												FROM [<#=trow[0]#>].[<#=trow[1]#>] x
											</DirectInput>
											<Inputs>
												<#=trow[8]#>
											</Inputs>
											<Outputs></Outputs>
										</Lookup>
										<# } #>
										<OleDbDestination Name="New Node" ConnectionName="NewWarehouseNode" BatchSize="10000" KeepIdentity=<# if(!Convert.ToBoolean(trow[6])) { #> "true" <# } else { #> "false" <# } #> UseFastLoadIfAvailable="true" MaximumInsertCommitSize="2000000">
										<# if(!Convert.ToBoolean(trow[6])) {#>
											<InputPath OutputPathName="Lookup.NoMatch"></InputPath>
										<# } #>
											<ExternalTableOutput Table="[<#=trow[0]#>].[<#=trow[1]#>]"></ExternalTableOutput>
										</OleDbDestination>
									</Transformations>
									<Expressions>
										<Expression ExternalProperty="[Lookup].[SqlCommand]">
											"SELECT DISTINCT
												<#=trow[7]#>
											FROM [<#=trow[0]#>].[<#=trow[1]#>] x
												<#=trow[3]#>
											WHERE
												<#=trow[9]#>;"
										</Expression>
									</Expressions>
								</Dataflow>
								<# }
							} #>
							</Tasks>
						</Container>
						<# } #>
					</Tasks>
				</Container>
			</Tasks>
		</Package>
	</Packages>
</Biml>

Comparing this BIML to the previous set, you can see that we’re up to 143 lines of code. That seems like a lot, but we’ll look at what we’re getting in return, and I think the extra few dozen lines of code is completely worth it.

Our first major change starts in the Delete container. Previously, we forced deleting from each table. Now, we only want to delete if the NeedToDelete field has a value of 1 for each relevant table. Here’s what we do now:

<Container Name="Delete From New Tables" ConstraintMode="Linear">
	<Expressions>
		<Expression ExternalProperty="Disable">!(@[$Project::PerformDeletion])</Expression>
	</Expressions>
	<Tasks>
		<# foreach (DataRow row in sectionsDesc.Rows) { #>
		<Container Name="Delete From New Table <#=row[0]#>" ConstraintMode="Linear">
			<Tasks>
				<Container Name="Start Deleting..." ConstraintMode="Linear"></Container>
				<# foreach (DataRow trow in tables.Rows) {
			if (Convert.ToInt32(trow[2]) == Convert.ToInt32(row[0]) && Convert.ToBoolean(trow[6])) { #>
				<ExecuteSQL Name="Prepare <#=trow[1]#>" ConnectionName="NewWarehouseNode">
					<DirectInput>
						DELETE FROM [<#=trow[0]#>].[<#=trow[1]#>];
					</DirectInput>
					<Parameters>
						<Parameter Name="0" VariableName="MigrateWarehouseData.CustomerID" DataType="Int32" />
					</Parameters>
				</ExecuteSQL>
				<# }
			} #>
			</Tasks>
		</Container>
		<# } #>
	</Tasks>
</Container>

For each row, we create a container called “Delete From New Table [Section #]” and loop through the table set. For each table, if that table’s section is equal to the current section and that table’s NeedToDelete flag is set to 1, then we add an ExecuteSQL task and delete data from that table. Otherwise, I don’t need to delete data from that table.

The way that I handle lookups is with code changes in a couple of places. Let’s look at them, starting first with the lookup itself.

<# if(!Convert.ToBoolean(trow[6])) {#>
	<Lookup Name="Lookup" OleDbConnectionName="NewWarehouseNode" CacheMode="Full" NoMatchBehavior="RedirectRowsToNoMatchOutput">
		<DirectInput>
			SELECT
				<#=trow[7]#>
			FROM [<#=trow[0]#>].[<#=trow[1]#>] x
		</DirectInput>
		<Inputs>
			<#=trow[8]#>
		</Inputs>
		<Outputs></Outputs>
	</Lookup>
<# } #>

So, if NeedToDelete is set to 0, I can support a lookup component. I want to call my component Lookup and have it look up against the new warehouse node. You’ll notice that I set CacheMode to “Full” and my no-match behavior to redirect records to the no-match output. I want a full cache because I’d rather not hit the database over and over again for every row; that would be a horrible performance problem. You’ll notice that I have a special select statement: I’m using the LookupColumn column of MigrationStep, which is the set of key columns associated with this table. In AdventureWorksDW2012, that’s almost always a single column, but there is one table with a two-part key. Then, once I have that input, I need to load the Inputs section. To do that, I use the contents of LookupColumnXML, which is an XML representation of my primary key columns. This tells SSIS how to join each incoming row from the old node to my lookup component, with its new node data. Because this is a simple check, I don’t have any outputs, so that section can be blank.

I have a problem with this bit as it currently exists: I’m selecting all of the rows from my new warehouse node’s table and putting them into cache. If I have a billion rows and my primary key is 8 bytes, that’s 8GB of memory usage for that cache. On a beefy server, 8GB of memory won’t bring a machine down, but suppose I only need 100,000 of those billion rows; in that case, I’ve wasted a lot of time on rows that I am guaranteed never to use.

As such, I really want to use an expression which joins back to CustomerKey. Unfortunately, the Lookup component does not support parameters in this way. Yes, you could use a cache component, save the data that way, and use it in lookups, but there’s an easier way to do this: use the data flow’s expression.

<Expressions>
	<Expression ExternalProperty="[Lookup].[SqlCommand]">
		"SELECT DISTINCT
			<#=trow[7]#>
		FROM [<#=trow[0]#>].[<#=trow[1]#>] x
			<#=trow[3]#>
		WHERE
			<#=trow[9]#>;"
	</Expression>
</Expressions>

In this case, you’ll see that I introduce a WHERE clause along with the join criteria necessary to tie back to DimCustomer and its all-important CustomerKey. I should note that the expression is very finicky: you need to use “ExternalProperty” and the value needs to be exactly “[Lookup].[SqlCommand]” if “Lookup” is the name of your lookup component. If you drop the brackets, BIML won’t be able to make the connection and you’ll either get an error or won’t have the expression work as you expect. The positive to this, however, is that I only get the records I absolutely need.

In order to support this, I needed to change my SQL query in line 8 just a little bit:

"SELECT SchemaName, TableName, Section, JoinCriteria, SelectCriteria, WhereCriteria, NeedToDelete, LookupColumns, LookupColumnXML, CASE WHEN WhereCriteria = '1=1' THEN '1=1 \"' ELSE REPLACE(WhereCriteria, '?', '\" + (DT_WSTR, 12)@[$Project::CustomerID]') END AS ExpressionWhereCriteria FROM dbo.MigrationStep ORDER BY Section, SchemaName, TableName");

This replaces the “x.CustomerKey = ?;” pattern with x.CustomerKey = " + (DT_WSTR,12)@[$Project::CustomerID]

Now that I have my lookup set up and filtering out matches from non-matches, I want to hook the destination to my lookup component. The important thing here is that I need to connect to the no-match path, not the match path. The default is to use the match path, so I need to make a change to my OleDbDestination call:

<OleDbDestination Name="New Node" ConnectionName="NewWarehouseNode" BatchSize="10000" KeepIdentity=<# if(!Convert.ToBoolean(trow[6])) { #> "true" <# } else { #> "false" <# } #> UseFastLoadIfAvailable="true" MaximumInsertCommitSize="2000000">
<# if(!Convert.ToBoolean(trow[6])) {#>
	<InputPath OutputPathName="Lookup.NoMatch"></InputPath>
<# } #>
	<ExternalTableOutput Table="[<#=trow[0]#>].[<#=trow[1]#>]"></ExternalTableOutput>
</OleDbDestination>

This was a very small change. Basically, if NeedToDelete is set to 0, then add an InputPath tag and tell BIML that you want the NoMatch path of your lookup component.

The end result of all of this, once you generate the BIML, is this thing of beauty:

LookupSuccess

Advertisement

2 thoughts on “BIML, Part 6—Lookups

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s