shell脚本监控文件夹文件实现自动上传数据到hive表

时间:2022-12-11 11:54:17

sh createtb.sh “tablename;field1,field2,field3,field4,field5,field6,field7;partition1,partition2”

数据库名:observation (脚本里写死了)

表名:tablename

指定名:field1,field2,field3,field4,field5,field6,field7

分区名:partition1,partition2

文件说明

本地上传的文件:/home/hive/observation/$table/*.txt

文件名格式:xx01_xx02_xxxx.txt (xx01为第一个分区名称,xx02为第二个分区名称)

比如文件名为 2019_10_02.txt(/home/hive/observation/tablename目录下),则脚本生成的hive导入语句为

load data local inpath ‘/home/hive/observation/tablename/2019_10_02.txt’ into table observation.tablename partition(partition1=‘2019’,partition2=‘10’);

脚本代码

1、输入字段,建hive表脚本 createtb.sh

#!/bin/bash

echo “---------createtb.sh--------------”

#string=“tablename;field1,field2,field3,field4,field5,field6,field7;partition1,partition2”

string=$*

group=(${string//;/ })

for i in ${group[@]};do

((m++))

if [[ m -eq 1 ]];then

tables=$i

elif [[ m -eq 2 ]];then

fields=$i

else partitions=$i

fi

done

echo “tables:” $tables

echo “fileds:” $fields

echo “partitions:” $partitions

echo “-----------------”

field=(${fields//,/ })

partition=(${partitions//,/ })

创建表目录

( m k d i r − p / h o m e / h i v e / o b s e r v a t i o n / (mkdir -p /home/hive/observation/ (mkdirp/home/hive/observation/tables ; chmod 777 /home/hive/observation/$tables)

数据库名称,这里不作为参数用自变量写了

database=“observation”

hive 拼接语句,分为h1,h2,h3

h1是建表语句的前半部,h2是参数列表中间部分,h3是建表语句的后半部

h1=“create table d a t a b a s e . database. database.tables”

在当前目录下创建文本文件temp,如果文件存在则清空文件

$(> temp)

for 循环将参数追加到当前目录的temp文件,逗号分隔,echo -n 不换行

for i in ${field[@]};do

echo -n $i" varchar(255)," >> temp

done

h2取temp文本里的字符串

temp=$(cat temp)

将字符串最后的一个逗号去掉

h2=“(${temp%*,})”

在当前目录下创建文本文件tmp,如果文件存在则清空文件

$(> tmp)

for 循环将参数追加到当前目录的temp文件,逗号分隔,echo -n 不换行

for i in ${partition[@]};do

echo -n $i" varchar(255)," >> tmp

done

h3取temp文本里的字符串

tmp=$(cat tmp)

将字符串最后的一个逗号去掉

h3是建表语句的后半部

h3="

partitioned by

(${tmp%*,})

row format delimited

fields terminated by ‘\t’

lines terminated by ‘\n’

stored as textfile

location ‘/user/hive/warehouse/ d a t a b a s e . d b / database.db/ database.db/tables’;

"

echo h 1 h1 h1h2$h3

( h i v e − e " (hive -e " (hivee"h1 h 2 h2 h2h3")

$(rm -rf tmp temp)

echo “-------create hive table successfully--------”

创建数据日期日志目录

/home/log/hive/observation/$tables.log

#/bin/bash /home/hive/loadtb.sh

#exec /home/hive/loadtb.sh

#source /home/hive/loadtb.sh

#fork /home/hive/loadtb.sh

第一个参数为表名,第二个参数为分区字段

#/home/hive/loadtb.sh $tables $partitions

/home/hive/monitor.sh $*

echo “* * * * * su - root /home/hive/monitor.sh “$*”” >> /var/spool/cron/root

echo “* * * * * root /home/hive/monitor.sh “$*”” >> /etc/crontab

/home/hive/createtb.sh “tablename;field1,field2,field3,field4,field5,field6,field7;partition1,partition2”

2、监控目录脚本 monitor.sh

#!/bin/bash

-------------接收参数,解析字符串--------------

echo “---------monitor.sh--------------”

string=$*

group=(${string//;/ })

for i in ${group[@]};do

((m++))

if [[ m -eq 1 ]];then

tables=$i

elif [[ m -eq 2 ]];then

fields=$i

else partitions=$i

fi

done

tables=(${tables//;/ })

field=(${fields//,/ })

partition=(${partitions//,/ })

-------------接收参数,解析字符串--------------

第一个参数为表名,也是监控的文件夹名称

tables=$tables

echo “tables:$tables”

echo “----------监控目录----------”

获取当前数据文件夹下的文件数量

let “total= ( l s / h o m e / h i v e / o b s e r v a t i o n / (ls /home/hive/observation/ (ls/home/hive/observation/tables | wc -l)”

echo “数据文件数量:${total}”

let “logline= ( c a t / h o m e / l o g / h i v e / o b s e r v a t i o n / (cat /home/log/hive/observation/ (cat/home/log/hive/observation/tables.log | wc -l)”

echo “日志文件行数:${logline}”

如果文件为空,则初始化log文件

if (( logline==0 ));then

/home/log/hive/observation/$tables.log

初始化log文件,赋予10行默认时间值

for (( i=0;i<10;i++ ));do

( e c h o " 2020 − 01 − 0100 : 00 : 00.00000000 (echo "2020-01-01 00:00:00.00000000 (echo"2020010100:00:00.00000000i" >> /home/log/hive/observation/$tables.log)

done

fi

获取log文件中的最后一个日期

logdate= ( t a i l − n 1 / h o m e / l o g / h i v e / o b s e r v a t i o n / (tail -n 1 /home/log/hive/observation/ (tailn1/home/log/hive/observation/tables.log)

echo “log 日志最新日期:$logdate”

获取数据目录下的最新的日期

newdate= ( l s − − f u l l − t i m e − l t / h o m e / h i v e / o b s e r v a t i o n / (ls --full-time -lt /home/hive/observation/ (lsfulltimelt/home/hive/observation/tables | tail -n -$total | awk ‘{print $6,$7}’ | head -n 1)

echo “数据目录最新日期:$newdate”

将两个时间转为时间戳

LOGDATE=date -d "$logdate" +%s

NEWDATE=date -d "$newdate" +%s

echo “log 日志最新时间戳:$LOGDATE”

echo “数据目录最新时间戳:$NEWDATE”

获取数据目录所有的日期信息

alldate= ( l s − − f u l l − t i m e − l t / h o m e / h i v e / o b s e r v a t i o n / (ls --full-time -lt /home/hive/observation/ (lsfulltimelt/home/hive/observation/tables | tail -n -$total | awk ‘{print $6,$7}’)

echo “数据目录所有日期信息:”

echo “$alldate”

数据目录日期字符串长度

let len=${#alldate}

echo “数据目录日期字符串长度:$len”

由于日期信息是空格连续的,单个日期也有空格,所以需要特殊处理

for((i=0;i<total;i++));do

let start=i*30

date[ i ] = i]= i]={alldate:$start:30}

done

去除换行处理

for((i=0;i<total;i++));do

date[ i ] = i]= i]={date[$i]:0:29}

done

echo “字符串切割后的日期信息:”

echo “${date[*]}”

((n=-1))

echo “--------开始遍历数据目录----------”

for循环遍历数组date

数组是从最新遍历到最旧的

for((i=0;i<${#date[*]};i++));do

每次遍历得到数据日期时间戳

TIMECR=date -d "${date[$i]}" +%s

#echo $TIMECR

如果日志最新数据和遍历的数据相等

if [[ $LOGDATE -eq $TIMECR ]];then

#if {[ “ L T I M E " = " LTIME" = " LTIME"="{date[$i]}” ]];then

#echo “$LTIME”;echo KaTeX parse error: Expected '}', got 'EOF' at end of input: {date[i]}

#echo “ i , i, i,LTIME,KaTeX parse error: Expected '}', got 'EOF' at end of input: {date[i]}”

echo $i

(( n=$i )) #拿到第n行数据

echo “有相同日期信息,截取最新日期到之前的日期信息”

echo “KaTeX parse error: Expected '}', got 'EOF' at end of input: {date[i]}${date[0]}”

echo “需要加载的这几行数据:$i”

break

fi

#echo KaTeX parse error: Expected '}', got 'EOF' at end of input: {date[i]}

done

echo “--------结束遍历数据目录----------”

echo “n=$n”

echo “取最新数据的第:$n个”

echo “最新数据是:${date[0]}”

n=-1则表明日志的最新日期里边没有文件里边的日期

此情况则视为所有数据都要加载到hive表里边

if (( n == -1 ));then

echo “n == -1 , 第一次加载数据, 所有数据加载到hive表, ${date[0]}, $newdate” >> /home/hive/judge.log

删除log第一行数据

( s e d − i ′ 1 d ′ / h o m e / l o g / h i v e / o b s e r v a t i o n / (sed -i '1d' /home/log/hive/observation/ (sedi1d/home/log/hive/observation/tables.log)

把之前读取的最新日期数据加到log文件下

echo " d a t e [ 0 ] " > > / h o m e / l o g / h i v e / o b s e r v a t i o n / {date[0]}" >> /home/log/hive/observation/ date[0]">>/home/log/hive/observation/tables.log

echo “” >> /home/log/hive/observation/$tables.log

/home/hive/loadtb_all.sh $tables $partitions

如果n==0表示没有最新数据(n=0为最新的数据),既不需要处理

elif (( n == 0));then

删除log第一行数据

( s e d − i ′ 1 d ′ / h o m e / l o g / h i v e / o b s e r v a t i o n / (sed -i '1d' /home/log/hive/observation/ (sedi1d/home/log/hive/observation/tables.log)

echo "n == 0 , 没有最新数据,${date[0]} "

echo "n == 0 , 没有最新数据,${date[0]} " >> /home/hive/judge.log

else

echo “最新数据:${date[0]}”

删除log第一行数据

( s e d − i ′ 1 d ′ / h o m e / l o g / h i v e / o b s e r v a t i o n / (sed -i '1d' /home/log/hive/observation/ (sedi1d/home/log/hive/observation/tables.log)

把之前读取的最新日期数据加到log文件下

echo " d a t e [ 0 ] " > > / h o m e / l o g / h i v e / o b s e r v a t i o n / {date[0]}" >> /home/log/hive/observation/ date[0]">>/home/log/hive/observation/tables.log

#echo “” >> /home/log/hive/observation/$tables.log

# ( s e d − i ′ 1 s / (sed -i '1s/ (sedi1s//2020-04-16 17:03:48.000000000/’ /home/log/hive/observation/tablename.log)

调用加载数据脚本,第一个参数为表名,第二个参数为分区字段,第三个为最新的第n个数据

echo “传递的第一个参数是: t a b l e s , 传 递 的 第 二 个 参 数 是 : tables, 传递的第二个参数是: tables,partitions, 传递的第三个参数是: n , 最 新 数 据 : n, 最新数据: n,{date[0]}” >> /home/hive/judge.log

/home/hive/loadtb_mid.sh $tables $partitions $n

fi

/home/hive/monitor.sh "tablename;field1,field2,field3,field4,field5,field6,field7;

shell脚本监控文件夹文件实现自动上传数据到hive表

partition1,partition2"

3、第一次 load into hive表脚本 loadtb_all.sh

#!/bin/bash

echo “------loadtb_all.sh–start------”

第一个参数为表名

table=$1

echo “表名参数:$table”

第二个参数为分区字段

partitions=$2

echo “分区参数:$partitions”

load data directory

DIR=/home/hive/observation/$table

partkey=(${partitions//,/ })

分区数为$m

let m=${#partkey[@]}

h1=“load data local inpath '/home/hive/observation/$table”

h3=“’ into table observation.$table partition”

allfiles= ( l s / h o m e / h i v e / o b s e r v a t i o n / (ls /home/hive/observation/ (ls/home/hive/observation/table/*.txt)

echo “allfiles:$allfiles”

echo “allfiles:$allfiles” >> /home/hive/allfiles_all.log

遍历目录,得到具体分区名称

for file in ${allfiles};do

part_all

echo “$file” >> /home/hive/allfiles_all.log

取最后一个/后的文件名称

h2=“${file##*/}”

echo “h2:$h2” >> /home/hive/allfiles_all.log

去掉文件名后缀.txt

str=${h2%%.*}

echo “str:$str” >> /home/hive/allfiles_all.log

分区名称分割,遍历文件名

partvalue=(${str//_/ })

分区拼接字符串

for (( i=0;i<m;i++ ));do

echo -n “KaTeX parse error: Expected '}', got 'EOF' at end of input: {partkey[i]}=‘KaTeX parse error: Expected '}', got 'EOF' at end of input: {partvalue[i]}’,” >> part_all

done

h4_tmp=$(cat part_all)

h4=“(${h4_tmp%*,});”

echo “ h 1 / h1/ h1/h2 h 3 h3 h3h4”

( h i v e − e " (hive -e " (hivee"h1/ h 2 h2 h2h3$h4")

#hive -e “load data local inpath '/home/hive/observation/ t a b l e ′ i n t o t a b l e o b s e r v a t i o n . table' into table observation. tableintotableobservation.table partition(partition1= i , p a r t i t i o n 2 = i,partition2= i,partition2=i);”

done

$(rm -rf part_all)

echo “------loadtb_all.sh–end------”

3、第N次 load into hive表脚本 loadtb_mid.sh

#!/bin/bash

echo “--------loadtb_mid.sh–start------”

第一个参数为表名

table=$1

echo “表名参数:$table”

第二个参数为分区字段

partitions=$2

echo “分区参数:$partitions”

第三个为最新的第n个数据

echo “第三个参数:$3”

echo “全部参数:$*”

N=$(($3))

echo “N为:$N”

partkey=(${partitions//,/ })

分区数为$m

let m=${#partkey[@]}

h1=“load data local inpath '/home/hive/observation/$table”

h3=“’ into table observation.$table partition”

allfiles= ( l s − − f u l l − t i m e − l t / h o m e / h i v e / o b s e r v a t i o n / (ls --full-time -lt /home/hive/observation/ (lsfulltimelt/home/hive/observation/table/*.txt | awk ‘{print $9}’ | head -n ${N})

#allfiles= ( l s − − f u l l − t i m e − l t / h o m e / h i v e / o b s e r v a t i o n / (ls --full-time -lt /home/hive/observation/ (lsfulltimelt/home/hive/observation/table/*.txt | head -n ${N})

allfiles= ( l s / h o m e / h i v e / o b s e r v a t i o n / (ls /home/hive/observation/ (ls/home/hive/observation/table/*.txt | tail -n ${N})

表名

table=$1

echo “表名参数:$table”

第二个参数为分区字段

partitions=$2

echo “分区参数:$partitions”

第三个为最新的第n个数据

echo “第三个参数:$3”

echo “全部参数:$*”

N=$(($3))

echo “N为:$N”

partkey=(${partitions//,/ })

分区数为$m

let m=${#partkey[@]}

h1=“load data local inpath '/home/hive/observation/$table”

h3=“’ into table observation.$table partition”

allfiles= ( l s − − f u l l − t i m e − l t / h o m e / h i v e / o b s e r v a t i o n / (ls --full-time -lt /home/hive/observation/ (lsfulltimelt/home/hive/observation/table/*.txt | awk ‘{print $9}’ | head -n ${N})

#allfiles= ( l s − − f u l l − t i m e − l t / h o m e / h i v e / o b s e r v a t i o n / (ls --full-time -lt /home/hive/observation/ (lsfulltimelt/home/hive/observation/table/*.txt | head -n ${N})

allfiles= ( l s / h o m e / h i v e / o b s e r v a t i o n / (ls /home/hive/observation/ (ls/home/hive/observation/table/*.txt | tail -n ${N})