ElasticSearch-jdbc从Mysql数据库导入和更新数据

时间:2022-09-21 17:29:50

环境

需要的资源和版本:
- git客户端 下载
- Elasticsearch 版本 2.2.0 安装教程
- elasticsearch-jdbc 版本 2.2 下载
- ik分词器 版本 1.8 下载
(下载后解压将ik文件夹放在elasticsearch-2.2.0\plugins目下,重启elasticsearch即可)
ElasticSearch-jdbc从Mysql数据库导入和更新数据

一、安装 jdbc

下载完后解压,示例如下:

ElasticSearch-jdbc从Mysql数据库导入和更新数据

提示:如果导入的数据量很大,先修改elasticsearch的内存
打开目录D:\elasticsearch-2.2.0\bin
修改elasticsearch.in.bat和elasticsearch.in.sh文件(好像只改前面一个就可以)

if "%ES_MIN_MEM%" == "" (
set ES_MIN_MEM=4g
)

if "%ES_MAX_MEM%" == "" (
set ES_MAX_MEM=10g
)

这是设置后的,实际情况根据自己电脑配置。

二、使用jdbc

1.配置环境变量

变量名:JDBC_IMPORTER_HOME
变量值:D:\elasticsearch-2.2.0\elasticsearch-jdbc-2.2.0.0

ElasticSearch-jdbc从Mysql数据库导入和更新数据

2.在mysql数据库里加时间戳(只导入数据不更新的可以不加)

名:updateupdate_time
类型:timestamp
默认:CURRENT_TIMESTAMP

ElasticSearch-jdbc从Mysql数据库导入和更新数据

3.数据导入

进入D:\elasticsearch-2.2.0\elasticsearch-jdbc-2.2.0.0\bin目录
创建 mysql-import.sh 的base脚本,内容如下(使用时把注释去掉):

#!/bin/sh
bin=$JDBC_IMPORTER_HOME/bin
lib=$JDBC_IMPORTER_HOME/lib

echo '
{
"type" : "jdbc",
"jdbc" : {
"url" : "jdbc:mysql://localhost:3306/test", #数据库名
"user" : "root", #数据库用户名
"password" : "root", #数据库密码
"sql" : "select *, id as _id from user", #sql插入语句
"index" : "user", #索引名
"type" : "user", #type名
"index_settings" : {
"analysis" : {
"analyzer" : {
"ik" : {
"tokenizer" : "ik"
}
}
}
},
"type_mapping": {
"article" : {
"properties" : {
"id" : {
"type" : "integer",
"index" : "not_analyzed"
},
"subject" : {
"type" : "string",
"analyzer" : "ik"
},
"author" : {
"type" : "string",
"analyzer" : "ik"
},
"create_time" : {
"type" : "date"
},
"update_time" : {
"type" : "date"
}
}
}
}
}
}
' | java \
-cp "${lib}/*" \
-Dlog4j.configurationFile=${bin}/log4j2.xml \
org.xbib.tools.Runner \
org.xbib.tools.JDBCImporter

不使用ik分词器时:

#!/bin/sh
bin=$JDBC_IMPORTER_HOME/bin
lib=$JDBC_IMPORTER_HOME/lib
echo '{
"type" : "jdbc",
"jdbc": {
"elasticsearch.autodiscover":true,
"elasticsearch.cluster":"elasticsearch", #集群名
"url":"jdbc:mysql://127.0.0.1:3306/test", #数据库名
"user":"root", #数据库用户名
"password":"root", #数据库密码
"sql":"select *, id as _id from user", #sql插入语句
"elasticsearch" : {
"host" : "localhost",
"port" : 9300
},
"index" : "user", #索引名
"type" : "user" #type名
}
}'| java \
-cp "${lib}/*" \
-Dlog4j.configurationFile=${bin}/log4j2.xml \
org.xbib.tools.Runner \
org.xbib.tools.JDBCImporter

直接点击import.sh运行脚本即可。

4.增量索引、更新

进入D:\elasticsearch-2.2.0\elasticsearch-jdbc-2.2.0.0\bin目录
创建 mysql-update.sh 的base脚本,内容如下(使用时把注释去掉):

#!/bin/sh
bin=$JDBC_IMPORTER_HOME/bin
lib=$JDBC_IMPORTER_HOME/lib

echo '
{
"type" : "jdbc",
"jdbc" : {
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "root",
"password" : "root",
"statefile" : "user.json",
"schedule" : "0 0-59 0-23 ? * *",
"sql" : [
{
"statement" : "select * from user where update_time > ?",
"parameter" : [ "$metrics.lastexecutionstart" ]
}
],
"index" : "user",
"type" : "user",
"index_settings" : {
"analysis" : {
"analyzer" : {
"ik" : {
"tokenizer" : "ik"
}
}
}
},
"type_mapping": {
"article" : {
"properties" : {
"id" : {
"type" : "integer",
"index" : "not_analyzed"
},
"subject" : {
"type" : "string",
"analyzer" : "ik"
},
"author" : {
"type" : "string",
"analyzer" : "ik"
},
"create_time" : {
"type" : "date"
},
"update_time" : {
"type" : "date"
}
}
}
}
}
}
' | java \
-cp "${lib}/*" \
-Dlog4j.configurationFile=${bin}/log4j2.xml \
org.xbib.tools.Runner \
org.xbib.tools.JDBCImporter

运行mysql-update.sh脚本
可以看到 命令行端 被占用,一直在运行,并且在 mysql-update.sh 的同级目录下生成了一个user .json 的文件,sql 语句中需要的数据 lastexecutionstart 就保存在该文件中 。