用户数据同步

时间:2022-09-19 18:52:15

文件名是今日, 数据是昨天的数据。
dayid=`date -d "1 days ago" +%Y%m%d  `
##############################################
# 功能: 开始日期加上偏移天数后的日期字符串# 入参: 开始日期,yyyymmdd#        偏移天数# 返回值:开始日期加上偏移天数日期,yyyymmdd###############################################function toDate(){   startdate=$1;   days=$2;   timestamp_startdate=`date -d ${startdate} +%s`   timestamp_resultdate=`expr ${timestamp_startdate} '+' ${days} '*' 86400`   resultdate=`date -d @${timestamp_resultdate} +%Y%m%d`   echo $resultdate}filedayid=`toDate $dayid 1` spark-submit --class="com.zyuc.stat.iot.etl.UserInfoETL"  \--master yarn-client  \--name UserInfoETL \--conf "spark.app.appName=UserInfoETL" \--conf "spark.app.dataDayid=${dayid}" \--conf "spark.app.userTable=iot_customer_userinfo" \--conf "spark.app.syncType=full" \--conf "spark.app.inputPath=/hadoop/IOT/ANALY_PLATFORM/BasicData/UserInfo/"  \--conf "spark.app.outputPath=/hadoop/IOT/ANALY_PLATFORM/BasicData/output/UserInfo/"  \--conf "spark.app.fileWildcard=all_userinfo_qureyes_${filedayid}*" \--conf "spark.app.vpdnInput=/hadoop/IOT/ANALY_PLATFORM/BasicData/VPDNProvince/"  \--conf "spark.app.vpdnWildcard=vpdninfo.txt" \--conf spark.yarn.executor.memoryOverhead=700 \--executor-memory 2G \--executor-cores  1 \--num-executors  6 \/slview/test/zcw/jars/userETL.jar





dayid=`date -d "1 days ago" +%Y%m%d  `
##############################################
# 功能: 开始日期加上偏移天数后的日期字符串# 入参: 开始日期,yyyymmdd#        偏移天数# 返回值:开始日期加上偏移天数日期,yyyymmdd###############################################function toDate(){   startdate=$1;   days=$2;   timestamp_startdate=`date -d ${startdate} +%s`   timestamp_resultdate=`expr ${timestamp_startdate} '+' ${days} '*' 86400`   resultdate=`date -d @${timestamp_resultdate} +%Y%m%d`   echo $resultdate}filedayid=`toDate $dayid 1` 
spark-submit --class="com.zyuc.stat.iot.etl.UserInfoETL"  \--master yarn-client  \--name UserInfoETL \--conf "spark.app.appName=UserInfoETL" \--conf "spark.app.dataDayid=${dayid}" \--conf "spark.app.userTable=iot_customer_userinfo" \--conf "spark.app.syncType=incr" \--conf "spark.app.inputPath=/hadoop/IOT/ANALY_PLATFORM/BasicData/UserInfo/"  \--conf "spark.app.outputPath=/hadoop/IOT/ANALY_PLATFORM/BasicData/output/UserInfo/"  \--conf "spark.app.fileWildcard=incr_userinfo_qureyes_${filedayid}*" \--conf "spark.app.vpdnInput=/hadoop/IOT/ANALY_PLATFORM/BasicData/VPDNProvince/"  \--conf "spark.app.vpdnWildcard=vpdninfo.txt" \--conf spark.yarn.executor.memoryOverhead=700 \--executor-memory 2G \--executor-cores  1 \--num-executors  6 \/slview/test/zcw/jars/userETL.jar


--conf "spark.app.fileWildcard= 如下: all_userinfo_qureyes_20170714*
incr_userinfo_qureyes_20170715*



create table iot_customer_userinfo(vpdncompanycode string, mdn string, imsicdma string, imsilte string, iccid string, imei string, company string, nettype string, vpdndomain string, isvpdn string, subscribetimeaaa string, subscribetimehlr string, subscribetimehss string, subscribetimepcrf string, firstactivetime string, userstatus string, atrbprovince string, userprovince string, crttime string, custProvince string) partitioned by (d int)  stored as orc location '/hadoop/IOT/ANALY_PLATFORM/BasicData/output/UserInfo/data/';

alter table iot_customer_userinfo add IF NOT EXISTS partition(d='20170714');

shell 调度脚本: 增量数据:
$ cat userETL.sh dayid=$1
if [ -z $dayid ] ; then
dayid=`date -d "1 days ago" "+%Y%m%d"`
fi
##############################################
# 功能: 开始日期加上偏移天数后的日期字符串
# 入参: 开始日期,yyyymmdd
# 偏移天数
# 返回值:开始日期加上偏移天数日期,yyyymmdd
###############################################
function toDate()
{
startdate=$1;
days=$2;
timestamp_startdate=`date -d ${startdate} +%s`
timestamp_resultdate=`expr ${timestamp_startdate} '+' ${days} '*' 86400`
resultdate=`date -d @${timestamp_resultdate} +%Y%m%d`
echo $resultdate
}
filedayid=`toDate $dayid 1`
spark-submit --class="com.zyuc.stat.iot.etl.UserInfoETL" \
--master yarn-client \
--name UserInfoETL \
--conf "spark.app.appName=UserInfoETL" \
--conf "spark.app.dataDayid=${dayid}" \
--conf "spark.app.userTable=iot_customer_userinfo" \
--conf "spark.app.syncType=incr" \
--conf "spark.app.inputPath=/hadoop/IOT/ANALY_PLATFORM/BasicData/UserInfo/" \
--conf "spark.app.outputPath=/hadoop/IOT/ANALY_PLATFORM/BasicData/output/UserInfo/" \
--conf "spark.app.fileWildcard=incr_userinfo_qureyes_${filedayid}*" \
--conf "spark.app.vpdnInput=/hadoop/IOT/ANALY_PLATFORM/BasicData/VPDNProvince/" \
--conf "spark.app.vpdnWildcard=vpdninfo.txt" \
--conf spark.yarn.executor.memoryOverhead=700 \
--executor-memory 2G \
--executor-cores 1 \
--num-executors 6 \
/slview/test/zcw/shell/userinfo/jars/userETL.jar >/slview/test/zcw/shell/userinfo/logs/${dayid}.log 2>&1