elasticsearch使用river同步mysql数据

时间:2024-09-04 19:37:38

====== mysql的river介绍======
      - 什么是river?river代表es的一个数据源,也是其它存储方式(如:数据库)同步数据到es的一个方法。它是以插件方式存在的一个es服务,通过读取river中的数据并把它索引到es中,官方的river有couchDB的,RabbitMQ的,Twitter的,Wikipedia的。这里主要研究针对mysql的river。
      - mysql的river插件:mysql的river安装见https://github.com/jprante/elasticsearch-river-jdbc,就不具体介绍了。
      - 实践测试:
      * 环境:
       服务器172.16.170.21   数据库:profile 表user
      * 创建索引
        curl -XPUT 'http://localhost:9200/profile'
      * 创建数据表与索引映射
curl -XPUT 'http://localhost:9200/profile/user/_mapping' -d '
{
    "user": {
        "properties": {
            "id": {
                "type": "string",
                "store": "yes"
            },
            "name": {
                "type": "string",
                "store": "yes"
            },
            "login_name": {
                "type": "string",
                "store": "yes"
            }
        }
    }
}'

      * 运行river同步数据

curl -XPUT 'http://localhost:9200/_river/who_jdbc_river/_meta' -d '{
    "type": "jdbc",
    "jdbc": {
        "driver": "com.mysql.jdbc.Driver",
        "url": "jdbc:mysql://localhost:3306/profile",
        "user": "root",
        "password": "root",
        "sql": "select id as _id,name,login_name from user",
        "index": "profile",
        "type": "user",
        "bulk_size": 100,
        "max_bulk_requests": 30,
        "bulk_timeout": "10s",
        "flush_interval": "5s",
        "schedule": "0 0-59 0-23 ? * *"
    }
}'

      * 增量更新索引
增量更新,表需要维护时间戳,发现时间戳更新的列需要更新
curl -XPUT 'http://localhost:9200/_river/who_jdbc_river/_meta' -d '{
    "type": "jdbc",
    "jdbc": {
        "driver": "com.mysql.jdbc.Driver",
        "url": "jdbc:mysql://localhost:3306/profile",
        "user": "root",
        "password": "root",
        "sql": [
            {
                "statement": "select id as _id,name,login_name from user where mytimestamp > ?",
                "parameter": [
                    "$river.state.last_active_begin"
                ]
            }
        ],
        "index": "profile",
        "type": "user",
        "bulk_size": 100,
        "max_bulk_requests": 30,
        "bulk_timeout": "10s",
        "flush_interval": "5s",
        "schedule": "0 0-59 0-23 ? * *"
    }
}'

删除river

curl -XDELETE 'localhost:9200/_river/report_jdbc_river'