es增量自定义更新的脚本

时间:2025-03-23 09:38:36

安装需要可软件

sudo apt-get install python-pip
sudo pip install elasticsearch;
sudo apt-get install python-dev
sudo pip install MySQL-python

导入脚本

#!/bin/bash
set -e

bin=/usr/local/elasticsearch-jdbc-1.5.2.0/bin
lib=/usr/local/elasticsearch-jdbc-1.5.2.0/lib

echo '{
"type" : "jdbc",
"jdbc" : {
"url" : "jdbc:mysql://192.168.10.29:3306/db_1",
"user" : "root",
"password" : "root",
"sql" : "select * from '${1}' where dtTime>\"'${2}'\" ",
"index": "db_1",
"type": "'${1}'"
}
}' | java \
-cp "${lib}/*" \
-=${bin}/ \
 \


if [ $? != 0 ];then
  exit -1
fi

python调用实现增量添加:

#!/usr/bin/env python

from datetime import datetime
from elasticsearch import Elasticsearch
import MySQLdb
import time
import os
import subprocess

es=Elasticsearch("192.168.10.29")

def now():
    return ("%Y-%m-%d %H:%M:%S",(()))

def getLastTime(tableName):
    global es
    q={
      "aggs":
      {
         "max":{
            "max":{"field":"dtTime"}
          }
       }
    }
    dt=(index="db_1",doc_type=tableName,body=q)['aggregations']['max']['value']

    if dt is None:
        return '2015-01-01 00:00:00'
    return ("%Y-%m-%d %H:%M:%S",(dt/1000))

def insert(tableName,dtLastTime):
    global es
    print tableName+" startTime:"+str(dtLastTime)
    print '/usr/local/elasticsearch-jdbc-1.5.2.0/bin/ %s "%s"'%(tableName,str(dtLastTime))

    retCode = ('/usr/local/elasticsearch-jdbc-1.5.2.0/bin/ %s "%s"'%(tableName,str(dtLastTime)),shell=True)

    if retCode!=0:
        print "Import failed"
        return
    print "%s Import finished"%(now())
    (index="db_1")

def increment():

    conn=(host='192.168.10.29',port=3306,user='root',passwd='root',db ='db_1',)

    cur=()
    ret=('select vTableName,dtLastTime from importinfo')
    ret=()
    for line in ret:
        tableName=line[0]
        fileName=line[1].strftime("%Y-%m-%d-%H-%M-%S")
        dtLastTime=getLastTime(tableName)
        insert(tableName,dtLastTime)
    ()
    ()

if __name__=="__main__":
    increment()
    #getLastTime("achi")