I’ve posted the CDC demo packages and scripts from my TechEd session to my skydrive share.
SQL Server Integration Services (SSIS), Power Query (PQ), Azure Data Factory (ADF), and general Data Integration
I’ve posted the CDC demo packages and scripts from my TechEd session to my skydrive share.
The CDC Control Task is used for a number of different operations. This post lists each of those operations, and briefly describes when to use them.
This operation is used when executing an initial load from an active database without a snapshot. It is invoked at the beginning of an initial-load package to record the current LSN in the source database before the initial-load package starts reading the source tables. A walkthrough of how this process works can be found in my CDC in SSIS for SQL Server 2012 post.
This operation is used when executing an initial load from an active database without a snapshot. It is invoked at the end of an initial-load package to record the current LSN in the source database after the initial-load package finished reading the source tables. This LSN is determined by recording the current time when this operation occurred and then querying the cdc.lsn_time_mapping table in the CDC database looking for a change that occurred after that time
This operation is used when then the initial load is made from a snapshot database database or from a quiescence database. It is invoked at any point within the initial load package. The operation accepts a parameter that can be a snapshot LSN, a name of a snapshot database (from which the snapshot LSN will be derived automatically) or it can be left empty, in which case the current database LSN is used as the start LSN for the change processing package. This operation is used as an alternative to the Mark Initial Load Start/End operations.
This operation is used in a change processing package before invoking the data flow that uses the CDC Source data flow. It establishes a range of LSNs that the CDC Source data flow reads when invoked. The range is stored in an SSIS package variable (StateVariable property) that is used by the CDC Source during data flow processing.
This operation is used in a change processing package at the end of a CDC run (after the CDC data flow is completed successfully) to record the last LSN that was fully processed in the CDC run. The next time Get processing range is used, this position determines the start of the next processing range.
This operation is used to reset the persistent CDC state associated with the current CDC context. After this operation is run, the current maximum LSN from the LSN-timestamp sys.fn_cdc_get_max_lsn table becomes the start of the range for the next processing range. An example of when this operation is used is when you want to process only the newly created change records and ignore all old change records.
Notes
The CDC Source offers five different ways to retrieve change data. The format you receive the data in is determined by the “CDC processing mode” you select in the CDC Source Editor UI.
The two primary modes are All Changes, or Net Changes. Net is the best mode to use for most ETL processes, although it does introduce some overhead on the source system – see the descriptions of the modes below for details.
To describe the modes, we’ll use the following scenario.
The SQL statements for these steps looks like this:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
USE [CDCTest] GO -- -- INSERT - Add new customers -- SET IDENTITY_INSERT [DimCustomer_CDC] ON INSERT INTO [dbo].[DimCustomer_CDC] (CustomerKey, CustomerAlternateKey, FirstName, LastName) VALUES (30000, N'AW00030000', N'Mister30000', N'NotSmith'), (30001, N'AW00030001', N'Mister30001', N'NotSmith'), (30002, N'AW00030002', N'Mister30002', N'NotSmith') SET IDENTITY_INSERT [DimCustomer_CDC] OFF GO -- -- UPDATE - Set last names -- UPDATE [dbo].[DimCustomer_CDC] SET LastName = N'SMITH' WHERE CustomerKey IN (30001, 30002) GO -- -- DELETE - Remove recently added customer -- DELETE FROM [dbo].[DimCustomer_CDC] WHERE CustomerKey = 30002 GO |
In this process mode, a row will be returned for each change applied to the source table. In this example, the CDC Source returns six rows – three INSERTs, two UPDATEs, and one DELETE. The rows will be ordered by LSN (i.e. in the order in which they were applied). This mode is typically used in ETL processes that require heavy auditing (usually for compliance reasons). While this mode has less overhead on the source system (it simply returns the captured rows, and requires no additional roll-up processing), it does complicate the ETL process, as you will need to merge the changes yourself.
The order that the operations took place can be determined by the __$seqval field.
This processing mode is similar to using the cdc.fn_cdc_get_all_changes function.
Note: You will (typically) not want to use the CDC Splitter when processing all changes, as sending your change rows down multiple paths will make it difficult to apply them in the correct order.
This processing mode is similar to All, except that you get two rows for each UPDATE statement – a row containing the values Before Update, and one containing the values After Update. Using our example, the CDC Source returns eight rows – three INSERTs, two UPDATEs with Before Update values, two UPDATEs with After Update values, and one DELETE.
To distinguish between old and new values, check the __$operation value. A value of 3 indicates the row has Before Update values, 4 means After Update values.
When using this processing mode, the CDC server will “roll-up” all changes for a key into a single row. In this example, the CDC Source returns two rows – two INSERTs for the new rows. The value for CustomerKey 20001 will already have the updated LastName value. CustomerKey 20002 is not returned at all, since it was deleted in the end. This mode does have a performance impact, as it requires the source server to apply the roll-up logic to figure out the final state of the row. However, it greatly simplifies the ETL logic needed for processing the change data – see the CDC Overview post for an example of how to processes Net changes.
This processing mode is similar to using the cdc.fn_cdc_get_net_changes function.
Note: This mode requires that the source table has a primary key or unique index. It also requires that the @supports_net_changes parameter was set to True when CDC was enabled on the source table. See sys.sp_cdc_enable_table for more details.
This processing mode is similar to Net, except that there will be an additional boolean column (__$<column-name>__Changed) that indicates whether a column value changed. This may be a useful processing mode if you are only monitoring/processing changes to certain fields, or wish to provide additional processing logic when changes are made to a specific field.
Note: In this example, the __Changed field would be True for all rows, since they are all new (from the perspective of this load). If a change was made to an existing row, unchanged columns would have a False value.
The key things to keep in mind for this processing mode are:
From the cdc.fn_cdc_get_net_changes entry in Books Online for the all with merge row_filter_option:
Returns the LSN of the final change to the row in the metadata columns __$start_lsn. The column __$operation will be one of two values: 1 for delete and 5 to indicate that the operation needed to apply the change is either an insert or an update. The column __$update_mask is always NULL.
Because the logic to determine the precise operation for a given change adds to query complexity, this option is designed to improve query performance when it is sufficient to indicate that the operation needed to apply the change data is either an insert or an update, but it is not necessary to explicitly distinguish between the two. This option is most attractive in target environments where a merge operation is available directly, such as a SQL Server 2008 environment.
You’ll use the following data flow pattern when using this change mode:
This pattern is similar to the approach I described in my previous CDC post, except 1) new rows are not inserted into the final table within the data flow and 2) MERGE is used instead of UPDATE FROM … SELECT.
SQL Server 2012 introduces new components that make it easier to do Change Data Capture (CDC) using SSIS. This blog post provides a quick walkthrough of how to use them.
Note: The documentation for the CDC components is not in Books Online yet. It will be appearing in the RTM refresh.
The CDC Control task is used to control the life cycle of change data capture (CDC) packages. It handles CDC package synchronization with the initial load package, the management of Log Sequence Number (LSN) ranges that are processed in a run of a CDC package. In addition, the CDC Control task deals with error scenarios and recovery.
The CDC source reads a range of change data from CDC change tables and delivers the changes downstream to other SSIS components.
The CDC splitter splits a single flow of change rows from a CDC Source component into different data flows for Insert, Update and Delete operations. It is essentially a “smart” Conditional Split transform that automatically handles the standard values of the __$operation column.
For sample data, we will create a new database (CDCTest), and select a subset of rows from the AdventureWorksDW DimCustomer table into a sample table (DimCustomer_CDC). This will become the Source table for this demo.
1 2 3 4 5 6 |
USE [CDCTest] GO SELECT * INTO DimCustomer_CDC FROM [AdventureWorksDW].[dbo].[DimCustomer] WHERE CustomerKey < 11500 |
We then enable CDC on the database, and create a capture instance for the DimCustomer_CDC table.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
USE [CDCTest] GO EXEC sys.sp_cdc_enable_db GO -- add a primary key to the DimCustomer_CDC table so we can enable support for net changes IF NOT EXISTS (SELECT * FROM sys.indexes WHERE object_id = OBJECT_ID(N'[dbo].[DimCustomer_CDC]') AND name = N'PK_DimCustomer_CDC') ALTER TABLE [dbo].[DimCustomer_CDC] ADD CONSTRAINT [PK_DimCustomer_CDC] PRIMARY KEY CLUSTERED ( [CustomerKey] ASC ) GO EXEC sys.sp_cdc_enable_table @source_schema = N'dbo', @source_name = N'DimCustomer_CDC', @role_name = N'cdc_admin', @supports_net_changes = 1 GO |
We can see that a number of tables have been added under the cdc schema, and that SQL agent jobs have been created to capture changes being made to this table.
For the Destination, we’ll create a separate table – DimCustomer_Destination – with the same structure as the Source.
1 2 |
SELECT TOP 0 * INTO DimCustomer_Destination FROM DimCustomer_CDC |
In real life this would be in a separate database, and usually on a completely different server (otherwise, why are you mirroring the changes?), but for the purposes of this walkthrough, we’ll keep it all together.
We’re ready to start consuming changes with SSIS.
Our processing logic will be split into two packages – an Initial Load package that will read all of the data in the source table, and an Incremental Load package that will process change data on subsequent runs.
This package will only be run once, and handles the initial load of data in the source table (DimCustomer_CDC). The package uses the following logic:
Package creation steps:
Create a new SSIS package
Add a CDC Control Task. Double click the Control Task to bring up the editor.
Add a Data Flow Task, and connect it to the CDC Control Task
Add a second CDC Control Task. Connect the success constraint of the Data Flow Task to it.
The package will now look like this:
When we run the package, all of the data currently in the Source table will be transferred to the Destination, and the initial CDC state markers will be created. If we select from the cdc_states table, we can see that there is now a “CDC_State” entry. Note, the state entry is an encoded string that is used by the CDC components – you should not have to edit or deal with it directly.
This package will be run every time we want to grab the latest changes from our Source table. It will store the CDC state every time it runs, ensuring that we only pick up new changes every time we run the package. It will use the following logic:
Package creation steps:
Add an Execute SQL Task to create staging tables
1 2 3 4 5 6 7 8 9 10 11 |
IF NOT EXISTS (SELECT * FROM sys.objects WHERE object_id = OBJECT_ID(N'[dbo].[stg_DimCustomer_UPDATES]') AND type in (N'U')) BEGIN SELECT TOP 0 * INTO stg_DimCustomer_UPDATES FROM DimCustomer_Destination END IF NOT EXISTS (SELECT * FROM sys.objects WHERE object_id = OBJECT_ID(N'[dbo].[stg_DimCustomer_DELETES]') AND type in (N'U')) BEGIN SELECT TOP 0 * INTO stg_DimCustomer_DELETES FROM DimCustomer_Destination END |
Add a CDC Control Task. Connect the Execute SQL task to the CDC Control Task
Add a Data Flow Task. Connected it to the CDC Control Task.
Add a CDC Source component
Add a CDC Splitter transform
Add an ADO.NET Destination – rename it to “New rows”
Add two more ADO.NET Destinations, mapping the DeleteOutput to the stg_DimCustomer_DELETES table, and UpdateOutput to stg_DimCustomer_UPDATES. We will update the final Destination table using batch SQL statements after this data flow. An alternative design here would be to use an OLE DB Command transform to perform the updates and deletes. The OLE DB Command approach has some performance problems though, as the transform operates on a row by row basic (i.e. it issues one query per row).
Back in the Control Flow, add two Execute SQL tasks. These tasks will perform the batch update/delete using the data we loaded into the staging tables. The queries look like this (note, I took columns out of the update statement to keep things short – normally you’d include all of the columns here):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
-- -- These queries go into the incremental load package, and do not need to be run directly -- -- batch update UPDATE dest SET dest.FirstName = stg.FirstName, dest.MiddleName = stg.MiddleName, dest.LastName = stg.LastName, dest.YearlyIncome = stg.YearlyIncome FROM [DimCustomer_Destination] dest, [stg_DimCustomer_UPDATES] stg WHERE stg.[CustomerKey] = dest.[CustomerKey] -- batch delete DELETE FROM [DimCustomer_Destination] WHERE[CustomerKey] IN ( SELECT [CustomerKey] FROM [dbo].[stg_DimCustomer_DELETES] ) |
Add a CDC Control Task. It should have the same settings as the first CDC Control Task in the package, except the CDC control operation is Mark processed range.
Finally, add an Execute SQL Task to drop the staging tables. Alternatively, you can leave the staging tables in place, just truncate them.
Your package should look like this:
If we run the Incremental Load package at this point, it should run successfully, but not transfer any rows. That’s because we haven’t made any changes yet to the Source table. Let’s do that now by running the following script against the Source table:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
USE [CDCTest] GO -- Transfer the remaining customer rows SET IDENTITY_INSERT DimCustomer_CDC ON INSERT INTO DimCustomer_CDC ( CustomerKey, GeographyKey, CustomerAlternateKey, Title, FirstName, MiddleName, LastName, NameStyle, BirthDate, MaritalStatus, Suffix, Gender, EmailAddress, YearlyIncome, TotalChildren, NumberChildrenAtHome, EnglishEducation, SpanishEducation, FrenchEducation, EnglishOccupation, SpanishOccupation, FrenchOccupation, HouseOwnerFlag, NumberCarsOwned, AddressLine1, AddressLine2, Phone, DateFirstPurchase, CommuteDistance ) SELECT CustomerKey, GeographyKey, CustomerAlternateKey, Title, FirstName, MiddleName, LastName, NameStyle, BirthDate, MaritalStatus, Suffix, Gender, EmailAddress, YearlyIncome, TotalChildren, NumberChildrenAtHome, EnglishEducation, SpanishEducation, FrenchEducation, EnglishOccupation, SpanishOccupation, FrenchOccupation, HouseOwnerFlag, NumberCarsOwned, AddressLine1, AddressLine2, Phone, DateFirstPurchase, CommuteDistance FROM [AdventureWorksDW].[dbo].[DimCustomer] WHERE CustomerKey < 500 SET IDENTITY_INSERT DimCustomer_CDC OFF GO -- give 10 people a raise UPDATE DimCustomer_CDC SET YearlyIncome = YearlyIncome + 10 WHERE CustomerKey <= 11000 AND CustomerKey >= 11010 GO |
If we enable a Data Viewer in the Incremental Load package and run it, we’ll see that the CDC Source picks up all of the rows we’ve changed. We can see that some of the rows are __$operation = 4 (update), while the rest are 2 (new rows).
When the package completes, we see that the data flow moved a total of 17,995 rows (11 updates, and the rest are inserts).
Because the CDC Control Task updated LSN values stored in the CDC state table, if we run the package a second time (without making any changes to the source table), we see that no rows get transferred in the data flow.
I hope you found this walkthrough of the new CDC components in SQL Server 2012 helpful. I will continue posting more information about the CDC Components in follow up posts. Please let me know if you have any specific questions / topics you’d like me to describe further.