In the first two articles in this series, Incremental Loads with Change Data Capture – Enabling Change Data Capture and Incremental Loads with Change Data Capture – Detecting Changes, I outlined the steps required to enable change data capture (CDC) and then how to use CDC to detect data changes. In the final article of this series, I will outline the steps required to build a SQL Server Integration Package that uses the change data to build a Slowly Changing Dimension (SCD).
Stored Procedure
First to begin, let’s create a stored procedure that will act as the source for our change data.
IF(OBJECT_ID('dbo.ChangeDetection_Customer')) IS NOT NULL
DROP PROC dbo.ChangeDetection_Customer
GO
CREATE PROC dbo.ChangeDetection_Customer
AS
DECLARE
@fromDateTime datetime,
@toDateTime datetime;
SELECT
@fromDateTime = LastChangeTime
FROM dbo.ChangeTables
WHERE
TableName = 'Customers'
--Obtain the high point using the get max lsn function
@toDateTime = sys.fn_cdc_map_lsn_to_time(sys.fn_cdc_get_max_lsn())
--select change data using wrapper function
SELECT *
FROM dbo.fn_net_changes_dbo_Customers(@fromDateTime, @toDateTime, 'all with mask')
--update lastchange time
UPDATE dbo.ChangeTables
SET
LastChangeTime = @toDateTime
The stored procedure leverages a wrapper function (dbo.fn_net_changes_dbo_Customers). This function is the source of the change data and was created in Article 2. A few changes were made to the function. In the next section I will explain the changes.
Wrapper Function
The wrapper function requires three parameters. The first two, start_time and end_time are obvious. The third on the other hand is not quite as obvious. It accepts three possible values:
1. All – returns the final content of a changed row and the change operations
2. All with mask – returns the same thing as all, except if and update flag list was specified when the wrapper was used to generate the funtion. In that case, this value is required to populate the update mask.
3. All with merge – Returns all of the final content, but the change operations is limited to D (all deletes) and M (inserts and updates).
When I ran the function I did not supply a list, therefore an update column will not be included with my return list. So how am I going to detect column level changes.
When I first started working with CDC and building this process I did include an update list, but I found that it added unneccessary complexity to the process, which I will explain later. As a result, I decided to generate the wrapper function without the parameter and then modify it to accommodate my needs. The following is a snippet of the function that was generated using the wrapper.
select
[CustomerID],
[FirstName],
[LastName],
[CompanyName], [ZipCode],
case [__$operation]
when 1 then 'D'
when 2 then 'I'
when 3 then 'UO'
when 4 then 'UN'
when 5 then 'M'
else null
end as [__CDC_OPERATION] ,
case
when sys.fn_cdc_is_bit_set(4,__$update_mask) = 1
then 1
else 0
end hasChanged
from [cdc].[fn_cdc_get_net_changes_dbo_Customers](@from_lsn, @to_lsn, @row_filter_option)
I just want to highlight one thing here. Instead of allowing the wrapper to create an update column for each column that I want to specifically track changes on, I used the sys.fn_cdc_is_bit_set scalar function. This function indicates whether a captured column has been changed. It accomplishes this by checking whether the ordinal position of the column is included within a bitmask. The function accepts two parameters, the ordinal position of the column and a bit mask. The bit mask is returned as part of the result set of the wrapper function and you can obtain the ordinal position of the column by using the sys.fn_cdc_get_column_ordinal scalar function. If a change has occurred a 1 is returned, if not a zero is returned.
So what does this mean? For any columns that I want to track history on (Type II), I can simply modify the stored procedure to include another WHEN in the CASE statement. For example, see the following script:
when sys.fn_cdc_is_bit_set(5, __$update_mask) = 1
In the bold section of the script I have added an additional WHEN that will check another column. In the event that a change occurs on either column, I only need to check one column instead of checking each change column. This in turn, provides my one consolidated method for detecting changed column data. As a result, if I had a dimension with Type II columns, I could check the single column and determine how to handle that row.
SSIS Package
Now that I have created everything that is needed, I will finally provide the steps to build the SSIS package. First I will create a connection to a data source. The data source would be the database that contains the change tables. Then add an Execute SQL Task. This task will simply truncate the data in the staging, which I will discuss later in the article. Open the Execute SQL Task Editor and paste the following code in the column labeled SQLStatement:
TRUNCATE TABLE dbo.Stage.DimCustomer
Next drop a data flow task onto the control flow design surface. Drag the green precedence constraint from the Execute SQL Task onto the Data Flow Task. Activate the Data Flow tab and drag an OLE DB Source onto the data flow design surface. The query used for the OLE DB Source is the stored procedure that was created earlier. Paste EXEC dbo.ChangeDetection_Customer in the textbox labeled SQL command text on the OLE DB Source Editor. The results will be limited to only the rows that have been inserted or updated.
Before leaving the OLE DB editor click Columns in the left navigation pane. Change the value for CustomerID to CustomerAK in the OutputColumn.
Next drag a Conditional Split task onto the data flow design. Configure it so that any rows that were inserted are sent to one output and any remaining rows will be sent to another output. To do this, paste the following expression in the column labeled Condition:
__CDC_OPERATION == "I"
Then type New Row in the column labeled OutputName. Lastly, type Stage Data in the textbox labeled Default Output Name towards the bottom of the editor.
Drag a Derived Column task onto the design surface and drag one of the green data flow paths from the conditional split onto it. The following dialog box will appear:
Select New Row from the Input Output select screen. Next, double-click the Derived column to open the editor and insert the following into the column labeled Expression:
(DT_DBDATE)"1/1/1900"
Next, enter EffectiveDate in the column labeled Derived Column Name.
To complete the process of loading new rows into the SCD drag an OLE DBE Destination onto the design surface. Open the editor and select the destination data source and select the Dimension table that will hold the final data. Typically, the source will be the data warehouse where the dimension resides. Then click on Mappings in the left navigation pane and ensure that all the columns as mapped accordingly.
All of the new rows will be inserted in a batch. How to handle the other data required a little more thought. Instead of replicating the existing Slowly Changing Dimension Transformation that performs row-by-row operations, I decided to perform batch operations on the additional three sets of data.
Each of the remaining sets of data will require some type of data manipulation (DML) operation, Insert or Update. The most effective approach is to perform the operations in batches. As a result, I will insert the data into a staging table and run three different statements to complete the load. The script to create the staging table will be included in the download file at the end of the article.
To do this, first activate the control flow tab, then right-click in the empty space and select variables from the menu that appears. Add a new variable named BatchRows with a data type of Int32. Ensure the scope of the variable is the entire package.
Now back to the data flow. Drag a Row Count task onto the data flow and drag remaining green data flow path from the lookup transform onto it. Open the editor for the row count task and select the BatchRows variable from the drop down list labeled VariableName.
Then drag an OLE DB Destination onto the design surface. Drag the data flow path from the Row Count Task onto it. Next configure the destination so that the data is inserted into the stage table. Once complete the data flow will resemble the following screen shot:
Now return to the control flow and drag a Sequence Container onto the design surface. Drag the precedence constraint from the Data Flow onto the container. Double-click the constraint and select Expression and Constraint from the drop down list labeled Evaluation Operation. Then type @BatchRows > 0 in the textbox labeled Expression.
Next drag three Execute SQL Tasks into the container. Name one, Expire Deleted Rows, another Apply Type I Changes and the last Apply Type II Changes. Configure each one to a data source where the dimension resides. Finally, I have included three SQL Scripts in the download file and each has a name that corresponds to each of the aforementioned tasks. Copy and paste each script into the corresponding SQL Task. To do this, open the Execute SQL Task Editor, click on the ellipses next to the column labeled SQLStatement and paste the script into the dialog box that appears. Now the control flow should resemble the following screen shot:
I could have used the OLE DB command, which performs row-by-row operations, on the data flow to add the data, but after testing the approach I concluded that the batch approach was more efficient and several times faster. I have included the row-by-row approach in the download. Give it a try if you do not believe me.
Download: CDCSSISSCD.zip
As always if you have any questions or concerns regarding this article please feel free to email me at pleblanc@pragmaticworks.com.
Talk to you soon,
Patrick LeBlanc, MCTS, Founder SQLLunch.com