在批量数据同步中,有些表的数据量随着业务数据每天的积累越来越大,每天同步的时候继续使用清表再全量同步的方式在数据量大的时候会耗费大量的服务器资源,因此选择每次只同步更新和增加的增量数据,然后与上一个同步周期获得的全量数据进行合并,从而获得最新版本的全量数据,淘宝订单交易同步示意图如下。

传统的数据整合方案是采用 merge 方式(update+insert),但当前的大数据平台基本都不支持update操作,阿里推荐的解决方案是全外连接(full outer join)+ 数据全量覆盖重新加载(insert overwrite)),即如日调度,则将当天的增量数据和前一 天的全量数据做全外连接,重新加载最新的全量数据。在大数据量规模下,全量更新的性能比 update 要高得多。此外,如果担心数据更新错误问题,可以采用分区方式,每天保持一个最新的全量版本,保留较短的时间周期(如3-7 天)。

当前我们的处理方式稍微改进了一下,以订单表为例,一笔单子在2020.12.01号开票下单,2020.12.03号结账,2020.12.06号收款,所以这笔单子在2020.12.01就会入库,所形成的数据库记录如下:

因为12.01号还未结账和收款,因此这两列都是null值:

分部门店顾客姓名商品名开票日期结账日期收款日期
南京新街口店williams_z卡萨帝冰箱2020.12.01nullnull

到12.03号时如果增量抽不更新只插入那数据表中会有两列,如下:

分部门店顾客姓名商品名开票日期结账日期收款日期
南京新街口店williams_z卡萨帝冰箱2020.12.01nullnull
南京新街口店williams_z卡萨帝冰箱2020.12.012020.12.03null

到12.06号如果增量抽不更新只插入那数据表中会有三列,如下:

分部门店顾客姓名商品名开票日期结账日期收款日期
南京新街口店williams_z卡萨帝冰箱2020.12.01nullnull
南京新街口店williams_z卡萨帝冰箱2020.12.012020.12.03null
南京新街口店williams_z卡萨帝冰箱2020.12.012020.12.032020.12.06

而我们需要的数据是不断的在第一条数据上进行更新,最终在数据表中只有一条数据。

最终方案操作步骤如下:

1.找出增量数据的所有分区存储于临时表0,这部分增量数据中有更新的数据和新增的数据,找出所有增量数据分区的作用是更新的数据根据开票日期直接覆盖原有分区,新增的数据直接插入最新的日期分区。

drop table if exists  dwd.tmp_dwd_trade_erp_lsxsjl_0;

create table dwd.tmp_dwd_trade_erp_lsxsjl_0 tblproperties ('transactional' = 'false')
  as
    select p0.order_date,row_number() over(partition by 1 order by order_date) rn
    from 
	(select
           replace(substr(kprq, 1, 10), '-', '') as order_date
      from ods.ods_trade_erp_lsxsjl
      group by replace(substr(kprq, 1, 10), '-', '')
     )p0;

2.用临时表1存储从ods上来的增量数据

drop table if exists dwd.tmp_dwd_trade_erp_lsxsjl_1;

-- 存储从ODS层上来的全部增量数据 
create table dwd.tmp_dwd_trade_erp_lsxsjl_1 tblproperties ('transactional' = 'false')
  as
  select  p0.jlbh,
          p0.skt_id,
          p0.deptid,
          p0.djlx,
          p0.guest,
          p0.fph,
          p0.kprq,
          p0.jzrq,
          p0.yyy_id,
          p0.sky_id,
          coalesce(cast(p0.xsje as decimal(17, 2)), 0) as xsje,
          coalesce(cast(p0.cxje as decimal(17, 2)), 0) as cxje,
          p0.th_flag,
          p0.account,
          p0.zpxx,
          p0.zpfh,
          coalesce(cast(p0.qkje as decimal(17, 2)), 0) as qkje,
          coalesce(cast(p0.zkje as decimal(17, 2)), 0) as zkje,
          p0.region,
          p0.cxy_id,
          p0.thyy,
          coalesce(cast(p0.lpje as decimal(17, 2)), 0) as lpje,
          p0.tm_flag,
          p0.fpdm,
          p0.m_jlbh,
          p0.hyid,
          p0.yydh,
          p0.tel_yddh,
          p0.shfs,
          p0.sh_address,
          p0.tel_gddh,
          p0.zjhm,
          p0.shre,
          p0.shsj,
          p0.shsj_name,
          p0.shrq,
          p0.zhsp_jlbh,
          p0.txbxw,
          p0.zh_flag,
          p0.xeflx,
          coalesce(cast(p0.zfje as decimal(17, 2)), 0) as zfje,
          p0.ysfs_flag,
          p0.mobile_flag,
          p0.fhdj_flag,
          p0.fbjf_code,
          p0.tmo2o_flag,
          p0.skrq02,
          p0.b2bi_send_flag,
          p0.old_hyid,
          p0.old_tel,
          p0.fp_type,
          p0.onlinejlbh,
          p0.vc_code,
          p0.jsjk_flag,
          p0.guest_sh,
          p0.dztd_flag,
          p0.yhzx_uid,
          p0.dsf_onlinejlbh,
          p0.etl_time,
          p0.fbid,
          p0.notes,
          replace(substr(p0.kprq, 1, 10), '-', '') as order_date,
          p1.rn
      from ods.ods_trade_erp_lsxsjl p0
      inner join dwd.tmp_dwd_trade_erp_lsxsjl_0 p1
        on replace(substr(p0.kprq, 1, 10), '-', '') = p1.order_date ;

3.(前一天的全量数据表) inner join (临时表0) on (全量数据表.分区字段=临时表0.分区字段) 这一步的作用是关联出所有变动的分区,找出所有变动分区中的所有数据,然后left join 临时表1,并且临时表1的字段为null,那么就取出了变动分区中未更改的数据,然后再插入临时表1,原来临时表1中有的是变动分区中更改的数据和新增的分区数据,现在又插入了变动分区中未更改的数据,这样更改分区中的所有数据在 insert overwrite到全量表的时候就会对变动分区进行全量覆盖,对新增数据全量插入。

insert into dwd.tmp_dwd_trade_erp_lsxsjl_1
(jlbh,skt_id,deptid,djlx,guest,fph,kprq,jzrq,yyy_id,sky_id,xsje,cxje,th_flag,account,zpxx,zpfh,qkje,zkje,region,cxy_id,thyy,lpje,tm_flag,fpdm,m_jlbh,hyid,yydh,tel_yddh,shfs,sh_address,tel_gddh,zjhm,shre,shsj,shsj_name,shrq,zhsp_jlbh,txbxw,zh_flag,xeflx,zfje,ysfs_flag,mobile_flag,fhdj_flag,fbjf_code,tmo2o_flag,skrq02,b2bi_send_flag,old_hyid,old_tel,fp_type,onlinejlbh,vc_code,jsjk_flag,guest_sh,dztd_flag,yhzx_uid,dsf_onlinejlbh,etl_time,fbid,notes,order_date,rn
)
select p0.jlbh,
       p0.skt_id,
       p0.deptid,
       p0.djlx,
       p0.guest,
       p0.fph,
       p0.kprq,
       p0.jzrq,
       p0.yyy_id,
       p0.sky_id,
       p0.xsje,
       p0.cxje,
       p0.th_flag,
       p0.account,
       p0.zpxx,
       p0.zpfh,
       p0.qkje,
       p0.zkje,
       p0.region,
       p0.cxy_id,
       p0.thyy,
       p0.lpje,
       p0.tm_flag,
       p0.fpdm,
       p0.m_jlbh,
       p0.hyid,
       p0.yydh,
       p0.tel_yddh,
       p0.shfs,
       p0.sh_address,
       p0.tel_gddh,
       p0.zjhm,
       p0.shre,
       p0.shsj,
       p0.shsj_name,
       p0.shrq,
       p0.zhsp_jlbh,
       p0.txbxw,
       p0.zh_flag,
       p0.xeflx,
       p0.zfje,
       p0.ysfs_flag,
       p0.mobile_flag,
       p0.fhdj_flag,
       p0.fbjf_code,
       p0.tmo2o_flag,
       p0.skrq02,
       p0.b2bi_send_flag,
       p0.old_hyid,
       p0.old_tel,
       p0.fp_type,
       p0.onlinejlbh,
       p0.vc_code,
       p0.jsjk_flag,
       p0.guest_sh,
       p0.dztd_flag,
       p0.yhzx_uid,
       p0.dsf_onlinejlbh,
       p0.etl_time,
       p0.fbid,
       p0.notes,
       p0.create_date,
       p1.rn
from  dwd.dwd_trade_erp_sale_order p0
  inner join dwd.tmp_dwd_trade_erp_lsxsjl_0 p1
    on p0.create_date = p1.order_date
  left join dwd.tmp_dwd_trade_erp_lsxsjl_1 p2
    on p0.jlbh = p2.jlbh and p0.fbid = p2.fbid
  where p2.jlbh is null;
insert overwrite table dwd.dwd_trade_erp_sale_order
        select  p0.jlbh,
                p0.skt_id,
                p0.deptid,
                p0.djlx,
                p0.guest,
                p0.fph,
                p0.kprq,
                p0.jzrq,
                p0.yyy_id,
                p0.sky_id,
                p0.xsje,
                p0.cxje,
                p0.th_flag,
                p0.account,
                p0.zpxx,
                p0.zpfh,
                p0.qkje,
                p0.zkje,
                p0.region,
                p0.cxy_id,
                p0.thyy,
                p0.lpje,
                p0.tm_flag,
                p0.fpdm,
                p0.m_jlbh,
                p0.hyid,
                p0.yydh,
                p0.tel_yddh,
                p0.shfs,
                p0.sh_address,
                p0.tel_gddh,
                p0.zjhm,
                p0.shre,
                p0.shsj,
                p0.shsj_name,
                p0.shrq,
                p0.zhsp_jlbh,
                p0.txbxw,
                p0.zh_flag,
                p0.xeflx,
                p0.zfje,
                p0.ysfs_flag,
                p0.mobile_flag,
                p0.fhdj_flag,
                p0.fbjf_code,
                p0.tmo2o_flag,
                p0.skrq02,
                p0.b2bi_send_flag,
                p0.old_hyid,
                p0.old_tel,
                p0.fp_type,
                p0.onlinejlbh,
                p0.vc_code,
                p0.jsjk_flag,
                p0.guest_sh,
                p0.dztd_flag,
                p0.yhzx_uid,
                p0.dsf_onlinejlbh,
                date_format(from_utc_timestamp(current_timestamp(), 'PRC'), 'yyyy-MM-dd HH:mm:ss') etl_time,
                p0.fbid,
                p0.notes,
                p0.order_date       
        from dwd.tmp_dwd_trade_erp_lsxsjl_1 p0

最终完整shell脚本如下:

#!/bin/sh
source /etc/profile

cur_time_s=`date "+%Y-%m-%d %H:%M:%S"`

beeline -u "${BEELINE_CONN_URL}" -n hive -e "

drop table if exists  dwd.tmp_dwd_trade_erp_lsxsjl_0;

create table dwd.tmp_dwd_trade_erp_lsxsjl_0 tblproperties ('transactional' = 'false')
  as
    select p0.order_date,            
           row_number() over(partition by 1 order by order_date) rn
    from          
    (select
           replace(substr(kprq, 1, 10), '-', '') as order_date
      from ods.ods_trade_erp_lsxsjl
      group by replace(substr(kprq, 1, 10), '-', '')
     )p0;

drop table if exists dwd.tmp_dwd_trade_erp_lsxsjl_1;

-- 存储从ODS层上来的全部增量数据 
create table dwd.tmp_dwd_trade_erp_lsxsjl_1 tblproperties ('transactional' = 'false')
  as
  select  p0.jlbh,
          p0.skt_id,
          p0.deptid,
          p0.djlx,
          p0.guest,
          p0.fph,
          p0.kprq,
          p0.jzrq,
          p0.yyy_id,
          p0.sky_id,
          coalesce(cast(p0.xsje as decimal(17, 2)), 0) as xsje,
          coalesce(cast(p0.cxje as decimal(17, 2)), 0) as cxje,
          p0.th_flag,
          p0.account,
          p0.zpxx,
          p0.zpfh,
          coalesce(cast(p0.qkje as decimal(17, 2)), 0) as qkje,
          coalesce(cast(p0.zkje as decimal(17, 2)), 0) as zkje,
          p0.region,
          p0.cxy_id,
          p0.thyy,
          coalesce(cast(p0.lpje as decimal(17, 2)), 0) as lpje,
          p0.tm_flag,
          p0.fpdm,
          p0.m_jlbh,
          p0.hyid,
          p0.yydh,
          p0.tel_yddh,
          p0.shfs,
          p0.sh_address,
          p0.tel_gddh,
          p0.zjhm,
          p0.shre,
          p0.shsj,
          p0.shsj_name,
          p0.shrq,
          p0.zhsp_jlbh,
          p0.txbxw,
          p0.zh_flag,
          p0.xeflx,
          coalesce(cast(p0.zfje as decimal(17, 2)), 0) as zfje,
          p0.ysfs_flag,
          p0.mobile_flag,
          p0.fhdj_flag,
          p0.fbjf_code,
          p0.tmo2o_flag,
          p0.skrq02,
          p0.b2bi_send_flag,
          p0.old_hyid,
          p0.old_tel,
          p0.fp_type,
          p0.onlinejlbh,
          p0.vc_code,
          p0.jsjk_flag,
          p0.guest_sh,
          p0.dztd_flag,
          p0.yhzx_uid,
          p0.dsf_onlinejlbh,
          p0.etl_time,
          p0.fbid,
          p0.notes,
          replace(substr(p0.kprq, 1, 10), '-', '') as order_date,
          p1.rn
      from ods.ods_trade_erp_lsxsjl p0
      inner join dwd.tmp_dwd_trade_erp_lsxsjl_0 p1
        on replace(substr(p0.kprq, 1, 10), '-', '') = p1.order_date ;


-- 取出全量表中有修改分区但未修改记录的数据
insert into dwd.tmp_dwd_trade_erp_lsxsjl_1
(jlbh,skt_id,deptid,djlx,guest,fph,kprq,jzrq,yyy_id,sky_id,xsje,cxje,th_flag,account,zpxx,zpfh,qkje,zkje,region,cxy_id,thyy,lpje,tm_flag,fpdm,m_jlbh,hyid,yydh,tel_yddh,shfs,sh_address,tel_gddh,zjhm,shre,shsj,shsj_name,shrq,zhsp_jlbh,txbxw,zh_flag,xeflx,zfje,ysfs_flag,mobile_flag,fhdj_flag,fbjf_code,tmo2o_flag,skrq02,b2bi_send_flag,old_hyid,old_tel,fp_type,onlinejlbh,vc_code,jsjk_flag,guest_sh,dztd_flag,yhzx_uid,dsf_onlinejlbh,etl_time,fbid,notes,order_date,rn
)
select p0.jlbh,
       p0.skt_id,
       p0.deptid,
       p0.djlx,
       p0.guest,
       p0.fph,
       p0.kprq,
       p0.jzrq,
       p0.yyy_id,
       p0.sky_id,
       p0.xsje,
       p0.cxje,
       p0.th_flag,
       p0.account,
       p0.zpxx,
       p0.zpfh,
       p0.qkje,
       p0.zkje,
       p0.region,
       p0.cxy_id,
       p0.thyy,
       p0.lpje,
       p0.tm_flag,
       p0.fpdm,
       p0.m_jlbh,
       p0.hyid,
       p0.yydh,
       p0.tel_yddh,
       p0.shfs,
       p0.sh_address,
       p0.tel_gddh,
       p0.zjhm,
       p0.shre,
       p0.shsj,
       p0.shsj_name,
       p0.shrq,
       p0.zhsp_jlbh,
       p0.txbxw,
       p0.zh_flag,
       p0.xeflx,
       p0.zfje,
       p0.ysfs_flag,
       p0.mobile_flag,
       p0.fhdj_flag,
       p0.fbjf_code,
       p0.tmo2o_flag,
       p0.skrq02,
       p0.b2bi_send_flag,
       p0.old_hyid,
       p0.old_tel,
       p0.fp_type,
       p0.onlinejlbh,
       p0.vc_code,
       p0.jsjk_flag,
       p0.guest_sh,
       p0.dztd_flag,
       p0.yhzx_uid,
       p0.dsf_onlinejlbh,
       p0.etl_time,
       p0.fbid,
       p0.notes,
       p0.create_date,
       p1.rn
from  dwd.dwd_trade_erp_sale_order p0
  inner join dwd.tmp_dwd_trade_erp_lsxsjl_0 p1
    on p0.create_date = p1.order_date
  left join dwd.tmp_dwd_trade_erp_lsxsjl_1 p2
    on p0.jlbh = p2.jlbh and p0.fbid = p2.fbid
  where p2.jlbh is null;
"


if [ $? -eq 0 ]; then
  echo "step 1 suss."
else
  echo "step 1 fail."
  exit 1;
fi


## 获取待处理分区的最小值
query_min_rn=`beeline -u "${BEELINE_CONN_URL}" --hiveconf hive.server2.logging.operation.level=NONE -n hive --showHeader=false --silent=true -e "

  select coalesce(min(rn), 0) min_rn from dwd.tmp_dwd_trade_erp_lsxsjl_0;

"`

if [ $? -eq 0 ]; then
  echo "step 2 suss."
else
  echo "step 2 fail."
  exit 1;
fi

min_rn=`echo $query_min_rn | tr -cd "[0-9]"`

if [ $? -eq 0 ]; then
  echo "step 3 suss."
else
  echo "step 3 fail."
  exit 1;
fi

echo "min_rn: "$min_rn


## 获取待处理分区的最大值
query_max_rn=`beeline -u "${BEELINE_CONN_URL}" --hiveconf hive.server2.logging.operation.level=NONE -n hive --showHeader=false --silent=true -e "

  select coalesce(max(rn), 0) max_rn from dwd.tmp_dwd_trade_erp_lsxsjl_0;

"`

if [ $? -eq 0 ]; then
  echo "step 4 suss."
else
  echo "step 4 fail."
  exit 1;
fi

max_rn=`echo $query_max_rn | tr -cd "[0-9]"`

if [ $? -eq 0 ]; then
  echo "step 5 suss."
else
  echo "step 5 fail."
  exit 1;
fi

echo "max_rn: "$max_rn


## 若无新增数据则直接退出
if [ $min_rn -eq 0 -a $max_rn -eq 0 ];then

    cur_time_e=`date "+%Y-%m-%d %H:%M:%S"`
    echo "no new data. application end. start time: "$cur_time_s", end time: "$cur_time_e"."
    exit 0

fi

## 循环将数据插入目标表
while [[ $min_rn -le $max_rn ]]
do

  rn_start=$min_rn
  min_rn=`expr $min_rn + 50`
  rn_end=`expr $min_rn - 1`
  echo "rn_start:"$rn_start
  echo "rn_end:"$rn_end


  beeline -u "${BEELINE_CONN_URL}" -n hive -e "
	
    
    insert overwrite table dwd.dwd_trade_erp_sale_order
        select  p0.jlbh,
                p0.skt_id,
                p0.deptid,
                p0.djlx,
                p0.guest,
                p0.fph,
                p0.kprq,
                p0.jzrq,
                p0.yyy_id,
                p0.sky_id,
                p0.xsje,
                p0.cxje,
                p0.th_flag,
                p0.account,
                p0.zpxx,
                p0.zpfh,
                p0.qkje,
                p0.zkje,
                p0.region,
                p0.cxy_id,
                p0.thyy,
                p0.lpje,
                p0.tm_flag,
                p0.fpdm,
                p0.m_jlbh,
                p0.hyid,
                p0.yydh,
                p0.tel_yddh,
                p0.shfs,
                p0.sh_address,
                p0.tel_gddh,
                p0.zjhm,
                p0.shre,
                p0.shsj,
                p0.shsj_name,
                p0.shrq,
                p0.zhsp_jlbh,
                p0.txbxw,
                p0.zh_flag,
                p0.xeflx,
                p0.zfje,
                p0.ysfs_flag,
                p0.mobile_flag,
                p0.fhdj_flag,
                p0.fbjf_code,
                p0.tmo2o_flag,
                p0.skrq02,
                p0.b2bi_send_flag,
                p0.old_hyid,
                p0.old_tel,
                p0.fp_type,
                p0.onlinejlbh,
                p0.vc_code,
                p0.jsjk_flag,
                p0.guest_sh,
                p0.dztd_flag,
                p0.yhzx_uid,
                p0.dsf_onlinejlbh,
                date_format(from_utc_timestamp(current_timestamp(), 'PRC'), 'yyyy-MM-dd HH:mm:ss') etl_time,
                p0.fbid,
                p0.notes,
                p0.order_date       
        from dwd.tmp_dwd_trade_erp_lsxsjl_1 p0
       where p0.rn between ${rn_start} and ${rn_end}; 
  "


  if [ $? -eq 0 ]; then
    echo "step 6 suss, rn from $rn_start to $rn_end."
  else
    echo "step 6 fail."
    exit 1;
  fi


done

cur_time_e=`date "+%Y-%m-%d %H:%M:%S"`

echo "application end. start time: "$cur_time_s", end time: "$cur_time_e"."