Logstash导入数据到ElasticSearch

时间:2022-11-17 13:39:41

一:在Windows环境

  1 下载解压Logstash的压缩包

  2 在Logstash的压缩包中安装Logstash-jdbc-input插件:

    在Bin命令行下运行命令: .\logstash-plugin install logstash-jdbc-input 

    及可以安装成功,(该插件主要是用来进行增量同步时记录跟踪数值的最大值或者时间的最新时间)

      2 将oracle的驱动放入Logstash的解压包中

      3 在Logstash的包中创建jdbc.conf

            3.1 Logstash的jdbc.conf基本结构:

input {
    stdin { }
         jdbc {
            jdbc_driver_library => "E:\es\logstash-6.2.4\ojdbc14-10.2.0.4.0.jar"
            jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
            jdbc_connection_string => "jdbc:oracle:thin:@//60.205.228.254:1521/orcl"
            jdbc_user => "DJP"
            jdbc_password=> "123456"
            schedule => "* * * * *"
            statement_filepath => "E:\es\logstash-6.2.4\sql\demon.sql"
            #是否记录上次Logstash的运行的时间
            last_run_metadata_path => "E:\es\logstash-6.2.4\status\last_time.txt"
            #是否强制导入的字母的大小写默认为true 强制为小写
            # lowercase_column_names => false
            #是否保存上次运行的状态,不保存在last_run_metadata_path中
            #record_last_run =>
            # 是否跟踪某个字段的值
            use_column_value => true
            tracking_column => id
            #设置跟踪字段的类型  number和Timetape类型
            #tracking_column_type =>“”
            
            }
}
output {
  elasticsearch {
    index => "addIndex02"
    hosts => ["193.112.52.129:9200"]
    }
  stdout { codec => rubydebug }
}

      4 整理sql语句在一个文件下

SELECT
    ID,
    TRAIN_SYSTEM,
    CAR_BRAND,
    CAR_NAME
FROM
    TB_CAR_MODEL
WHERE ID > :sql_last_value ORDER BY ID

在进行增量同步的时候一定要添加 ORDER BY ID 以确保保存最大的ID

      5 启动sql语句进行导入数据