图片 2

ETL之增量抽取方式

Posted by

 图片 1

8、比较和分析

代码如下:(以下同步适用于SQL SERVER 不同DB的表增量同步)

、特定数据库方式(ORACLE)
以下介绍常见的针对特有数据库系统的增景抽取方式。
7.1
ORACLE改变数据捕获(CHANGEDDATACAPTURE,CDC)方式:ORACLECDC特性是在ORAELE9I数据库中引入的。CDC能够帮助识别从上次抽取之后发生变化的数据。
利用CDC,在对源表进行INSERT、UPCLATE或DELETE等操作的同时就可以提取数据,并且变化的数据被保存在数据库的变化表中。这样就可以捕获发生变化的数据,然后利用数据库视图以一种可控的方式提供给ETL抽取进程,作为增量抽取的依据。CDC方式对源表数据变化情况的捕获有两种方式:同步CDC和异步CDC。同步CDC使用源数据库触发器来捕获变更的数据。这种方式是实时的,没有任何延迟。当DML操作提交后,变更表中就产生了变更数据。异步CDC使用数据库重做日志(REDOLOG)文件,在源数据库发生变更以后,才进行数据捕获。
7.2
ORACLE闪回查询方式:ORACLE9I以上版本的数据库系统提供了闪回查询机制,允许用户查询过去某个时刻的数据库状态。这样,抽取进程可以将源数据库的(BI)
当前状态和上次抽取时刻的状态进行对比,快速得出源表数据记录的变化情况。

TableName:要同步的表名,UPTime每一次同步的触发时间点(可更改),sys_tamp行变更时间戳(不可更改),LastUPstamp行最后有效变量时间戳(可以更新)

4、全表比对方式

如上步骤即可实现可靠的同步,有人可能有疑问,这样就能实现可靠同步吗?我这里解释一下:

对于建立了业务系统的生产数据库,可以在数据库中创建业务日志表,当特定需要监控的业务数据发生变化时,由相应的业务系统程序模块来更新维护日志表内容。增量抽取时,

 上述同步代码逻辑很简单,可以参照之前的文章,这里主要是说明几个重要点:

全表比对即在增量抽取时,ETL进程逐条比较源表和目标表的记录,将新增和修改的记录读取出来。优化之后的全部比对方式是采用MD5校验码,需要事先为要抽取的表建立一个结构类似的MD5临时表,该临时表记录源表的主键值以及根据源表所有字段的数据计算出来的(BI)

2.2使用LastUPstamp作为过滤条件,查询>源DB的源表中时间戳字段,这样就可以查询出自上一次同步触发点到当前时间待同步的记录(增、改)

图片 2

3.1同步触发时记录当前触发时间点,并取得上一次的触发时间点(这里的上一次触发时间点是指上一次开始准备同步的记录时间点,确保从上一次查询到同步完成之间的时间点都包括其中,防止漏数据)

这样,对表T的所有DML操作就记录在增量日志表DML_LOG中,注意增量日志表中并没有完全记录增量数据本身,只是记录了增量数据的来源。进行增量ETL时,只需要根据增量日志表中的记录情况,反查源表得到真正的增量数据。
SQL代码
(1)创建增量日志表DML_LOG:
CREATE TABLE DML_LOG(
ID NUMBER PRIMARY KEY, //自增主键
TABLE NAME VARCHAR2(200). //源表名称
RECORD ID NUMBER, //源表增量记录的主键值
DML TYPE CH根(1)。∥增量类型,I表示新增:U表示更新;D表示删除
EXECUTE DATE DATE //发生时间
);

有些场景下,需要隔离不同的DB,彼此DB之间不能互相访问,但实际的业务场景又需要从A
DB访问B DB的情形,这时怎么办?我认为有如下常规的三种方案:

1、触发器方式
触发器方式是普遍采取的一种增量抽取机制。该方式是根据抽取要求,在要被抽取的源表上建立插入、修改、删除3个触发器,每当源表中的数据发生变化,就被相应的触发器将变化的数据写入一个增量日志表,ETL的增量抽取则是从增量日志表中而不是直接在源表中抽取数据,同时增量日志表中抽取过的数据要及时被标记或删除。为了简单起见,增量日志表一般不存储增量数据的所有字段信息,而只是存储源表名称、更新的关键字值和更新操作类型(KNSEN、UPDATE或DELETE),ETL增量抽取进程首先根据源表名称和更新的关键字值,从源表中提取对应的完整记录,再根据更新操作类型,对目标表进行相应的处理。

这种方案的优点是:可以根据实际情况灵活定制同步的表数据,不局限于某一张表或某一个DB,可以保证不同DB间同步表的数据一致性,让本来跨DB操作表变成了同一个DB的事情,而且可以增、删、改、查,功能不受限;缺点是灵活性太强,程序代码实现可靠的跨DB的实时同步逻辑的实现复杂度较高,对于开发人员的要求较高,如果写的同步逻辑无法保证实时、可靠、高可用,那对于业务来讲是灾难性的。

通过读日志表数据决定加载哪些数据及如何加载。日志表的维护需要由业务系统程序用代码来完成。

2.3利作BCP执行同步(详见之前文章说明)

然后,还需要对在源表中已不存在而目标表仍保留的主键值,执行DELETE操作。

            try
            {
                SqlConnection obConnSrc = new SqlConnection(connLMSStr);
                SqlConnection obConnDest = new SqlConnection(mconnCCSStr);

                string lastTamp = ClsDatabase.gGetFieldValue(obConnSrc, "update TS_SyncUptime set UPTime=GETDATE() OUTPUT (deleted.LastUPstamp) as oldtamp FROM TS_CCSUptime WHERE TableName=N'tableNameA'", "oldtamp");


                string selectSql = @"SELECT id,aaa,bbb,ccc,ddd,eee,fff  
                                  FROM tableNameA WHERE 其它同步过滤查询条件 AND CONVERT(bigint,sys_tamp)>{0}";

                selectSql = string.Format(selectSql, lastTamp);

                master.TransferBulkCopy(selectSql, obConnSrc,
                                "tableNameA", obConnDest,
                                 (stable) =>
                                 {
                                     var colMaps = new Dictionary<string, string>();
                                     foreach (DataColumn col in stable.Columns)
                                     {
                                         colMaps.Add(col.ColumnName, col.ColumnName);
                                     }
                                     return colMaps;
                                 },
                                 (tempTableName, stable, destConn, srcConn) =>
                                 {
                                     StringBuilder saveSqlBuilder = new StringBuilder("begin tran" + Environment.NewLine);

                                     string IUSql = master.BuildInsertOrUpdateToDestTableSql("tableNameA", tempTableName, new[] { "id" }, stable.ExtendedProperties[master.MapDestColNames_String], 2);
                                     saveSqlBuilder.Append(IUSql);

                                     saveSqlBuilder.AppendLine("commit");

                                     ClsDatabase.gExecCommand(destConn, saveSqlBuilder.ToString());


                                     ClsDatabase.gExecCommand(srcConn, "update TS_SyncUptime set UPTime=GETDATE(),LastUPstamp=CONVERT(bigint,sys_tamp) FROM TS_SyncUptime WHERE TableName=N'tableNameA'");

                                     return false;
                                 });


            }
            catch (Exception ex)
            {
                writeLog(ex);//记错误日志
            }

全表删除插入方式是指每次抽取前先删除目标表数据,抽取时全新加载数据。该方式实际上将增量抽取等同于全量抽取。对于数据量不大,全量抽取的时间代价小于执行增量抽取的算法和条件代价时,可以采用该方式。

2.具体关键同步逻辑如下:

2、时间戳方式

关于程序代码实现跨DB同步表数据方案,之前已有总结过,详见:https://www.cnblogs.com/zuowj/p/6264711.html 
—》4.利用BCP(sqlbulkcopy)来实现两个不同数据库之间进行数据差异传输(即:数据同步)

5、日志表方式

相关文章

Leave a Reply

电子邮件地址不会被公开。 必填项已用*标注