基于Kafka和Elasticsearch构建实时站内搜索功能的实践

时间:2022-03-25 01:01:12

作者:京东物流 纪卓志

目前我们在构建一个多租户多产品类网站,为了让用户更好的找到他们所需要的产品,我们需要构建站内搜索功能,并且它应该是实时更新的。本文将会讨论构建这一功能的核心基础设施,以及支持此搜索能力的技术栈。

问题的定义与决策

为了构建一个快速、实时的搜索引擎,我们必须做出某些设计决策。我们使用 MySQL 作为主数据库存储,因此有以下选择:

  1. 直接在 MySQL 数据库中查询用户在搜索框中输入的每个关键词,就像 %#{word1}%#{word2}%... 这样。 ????
  2. 使用一个高效的搜索数据库,如 Elasticsearch。????

考虑到我们是一个多租户应用程序,同时被搜索的实体可能需要大量的关联操作(如果我们使用的是 MySQL 一类的关系型数据库),因为不同类型的产品有不同的数据结构,所以我们还可以能需要同时遍历多个数据表来查询用户输入的关键词。所以我们决定不使用直接在 MySQL 中查询关键词的方案。????

因此,我们必须决定一种高效、可靠的方式,将数据实时地从 MySQL 迁移到 Elasticsearch 中。接下来需要做出如下的决定:

  1. 使用 Worker 定期查询 MySQL 数据库,并将所有变化的数据发送到 Elasticsearch。????
  2. 在应用程序中使用 Elasticsearch 客户端,将数据同时写入到 MySQL 和 Elasticsearch 中。????
  3. 使用基于事件的流引擎,将 MySQL 数据库中的数据更改作为事件,发送到流处理服务器上,经过处理后将其转发到 Elasticsearch。????

选项 1 并不是实时的,所以可以直接排除,而且即使我们缩短轮询间隔,也会造成全表扫描给数据库造成查询压力。除了不是实时的之外,选项 1 无法支持对数据的删除操作,如果对数据进行了删除,那么我们需要额外的表记录之前存在过的数据,这样才能保证用户不会搜索到已经删除了的脏数据。对于其他两种选择,不同的应用场景做出的决定可能会有所不同。在我们的场景中,如果选择选项 2,那么我们可以预见一些问题:如过 Elasticsearch 建立网络连接并确认更新时速度很慢,那么这可能会降低我们应用程序的速度;或者在写入 Elasticsearch 时发生了未知异常,我们该如何对这一操作进行重试来保证数据完整性;不可否认开发团队中不是所有开发人员都能了解所有的功能,如果有开发人员在开发新的与产品有关的业务逻辑时没有引入 Elasticsearch 客户端,那么我们将在 Elasticsearch 中更新这次数据的更改,无法保证 MySQL 与 Elasticsearch 间的数据一致性。

接下来我们该考虑如何将 MySQL 数据库中的数据更改作为事件,发送到流处理服务器上。我们可以在数据库变更后,在应用程序中使用消息管道的客户端同步地将事件发送到消息管道,但是这并没有解决上面提到的使用 Elasticsearch 客户端带来的问题,只不过是将风险从 Elasticsearch 转移到了消息管道。最终我们决定通过采集 MySQL Binlog,将 MySQL Binlog 作为事件发送到消息管道中的方式来实现基于事件的流引擎。关于 binlog 的内容可以点击链接,在这里不再赘述。

服务简介

基于Kafka和Elasticsearch构建实时站内搜索功能的实践

为了对外提供统一的搜索接口,我们首先需要定义用于搜索的数据结构。对于大部分的搜索系统而言,对用户展示的搜索结果通常包括为标题内容,这部分内容我们称之可搜索内容(Searchable Content)。在多租户系统中我们还需要在搜索结果中标示出该搜索结果属于哪个租户,或用来过滤当前租户下可搜索的内容,我们还需要额外的信息来帮助用户筛选自己想要搜索的产品类别,我们将这部分通用的但不用来进行搜索的内容称为元数据(Metadata)。最后,在我们展示搜索结果时可能希望根据不同类型的产品提供不同的展示效果,我们需要在搜索结果中返回这些个性化展示所需要的原始内容(Raw Content)。到此为止我们可以定义出了存储到 Elasticsearch 中的通用数据结构:

{
	"searchable": {
		"title": "string",
		"content": "string"
	},
	"metadata": {
		"tenant_id": "long",
		"type": "long",
		"created_at": "date",
		"created_by": "string",
		"updated_at": "date",
		"updated_by": "string"
	},
	"raw": {}
}

基础设施

Apache Kafka: Apache Kafka 是开源的分布式事件流平台。我们使用 Apache kafka 作为数据库事件(插入、修改和删除)的持久化存储。

mysql-binlog-connector-java: 我们使用 mysql-binlog-connector-java 从 MySQL Binlog 中获取数据库事件,并将它发送到 Apache Kafka 中。我们将单独启动一个服务来完成这个过程。

在接收端我们也将单独启动一个服务来消费 Kafka 中的事件,并对数据进行处理然后发送到 Elasticsearch 中。

Q:为什么不使用Elasticsearch connector之类的连接器对数据进行处理并发送到Elasticsearch中?
A:在我们的系统中是不允许将大文本存入到MySQL中的,所以我们使用了额外的对象存储服务来存放我们的产品文档,所以我们无法直接使用连接器将数据发送到Elasticsearch中。
Q:为什么不在发送到Kafka前就将数据进行处理?
A:这样会有大量的数据被持久化到Kafka中,占用Kafka的磁盘空间,而这部分数据实际上也被存储到了Elasticsearch。
Q:为什么要用单独的服务来采集binlog,而不是使用Filebeat之类的agent?
A:当然可以直接在MySQL数据库中安装agent来直接采集binlog并发送到Kafka中。但是在部分情况下开发者使用的是云服务商或其他基础设施部门提供的MySQL服务器,这种情况下我们无法直接进入服务器安装agent,所以使用更加通用的、无侵入性的C/S结构来消费MySQL的binlog。

配置技术栈

我们使用 docker 和 docker-compose 来配置和部署服务。为了简单起见,MySQL 直接使用了 root 作为用户名和密码,Kafka 和 Elasticsearch 使用的是单节点集群,且没有设置任何鉴权方式,仅供开发环境使用,请勿直接用于生产环境。

version: "3"
services:
  mysql:
    image: mysql:5.7
    container_name: mysql
    environment:
      MYSQL_ROOT_PASSWORD: root
      MYSQL_DATABASE: app
    ports:
      - 3306:3306
    volumes:
      - mysql:/var/lib/mysql
  zookeeper:
    image: bitnami/zookeeper:3.6.2
    container_name: zookeeper
    ports:
      - 2181:2181
    volumes:
      - zookeeper:/bitnami
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  kafka:
    image: bitnami/kafka:2.7.0
    container_name: kafka
    ports:
      - 9092:9092
    volumes:
      - kafka:/bitnami
    environment:
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
    depends_on:
      - zookeeper
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:7.11.0
    container_name: elasticsearch
    environment:
      - discovery.type=single-node
    volumes:
      - elasticsearch:/usr/share/elasticsearch/data
    ports:
      - 9200:9200
volumes:
  mysql:
    driver: local
  zookeeper:
    driver: local
  kafka:
    driver: local
  elasticsearch:
    driver: local

在服务启动成功后我们需要为 Elasticsearch 创建索引,在这里我们直接使用 curl 调用 Elasticsearch 的 RESTful API,也可以使用 busybox 基础镜像创建服务来完成这个步骤。

# Elasticsearch
curl "http://localhost:9200/search" -XPUT -d '
{
  "mappings": {
    "properties": {
      "searchable": {
        "type": "nested",
        "properties": {
          "title": {
            "type": "text"
          },
          "content": {
            "type": "text"
          }
        }
      },
      "metadata": {
        "type": "nested",
        "properties": {
          "tenant_id": {
            "type": "long"
          },
          "type": {
            "type": "integer"
          },
          "created_at": {
            "type": "date"
          },
          "created_by": {
            "type": "keyword"
          },
          "updated_at": {
            "type": "date"
          },
          "updated_by": {
            "type": "keyword"
          }
        }
      },
      "raw": {
        "type": "nested"
      }
    }
  }
}'

核心代码实现(SpringBoot + Kotlin)

Binlog 采集端:

    override fun run() {
        client.serverId = properties.serverId
        val eventDeserializer = EventDeserializer()
        eventDeserializer.setCompatibilityMode(
            EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG
        )
        client.setEventDeserializer(eventDeserializer)
        client.registerEventListener {
            val header = it.getHeader<EventHeader>()
            val data = it.getData<EventData>()
            if (header.eventType == EventType.TABLE_MAP) {
                tableRepository.updateTable(Table.of(data as TableMapEventData))
            } else if (EventType.isRowMutation(header.eventType)) {
                val events = when {
                    EventType.isWrite(header.eventType) -> mapper.map(data as WriteRowsEventData)
                    EventType.isUpdate(header.eventType) -> mapper.map(data as UpdateRowsEventData)
                    EventType.isDelete(header.eventType) -> mapper.map(data as DeleteRowsEventData)
                    else -> emptyList()
                }
                logger.info("Mutation events: {}", events)
                for (event in events) {
                    kafkaTemplate.send("binlog", objectMapper.writeValueAsString(event))
                }
            }
        }
        client.connect()
    }

在这段代码里面,我们首先是对 binlog 客户端进行了初始化,随后开始监听 binlog 事件。binlog 事件类型有很多,大部分都是我们不需要关心的事件,我们只需要关注 TABLE_MAP 和 WRITE/UPDATE/DELETE 就可以。当我们接收到 TABLE_MAP 事件,我们会对内存中的数据库表结构进行更新,在后续的 WRITE/UPDATE/DELETE 事件中,我们会使用内存缓存的数据库结构进行映射。整个过程大概如下所示:

Table: ["id", "title", "content",...]
Row: [1, "Foo", "Bar",...]
=>
{
	"id": 1,
	"title": "Foo",
	"content": "Bar"
}

随后我们将收集到的事件发送到 Kafka 中,并由 Event Processor 进行消费处理。

事件处理器

@Component
class KafkaBinlogTopicListener(
    val binlogEventHandler: BinlogEventHandler
) {

    companion object {
        private val logger = LoggerFactory.getLogger(KafkaBinlogTopicListener::class.java)
    }

    private val objectMapper = jacksonObjectMapper()

    @KafkaListener(topics = ["binlog"])
    fun process(message: String) {
        val binlogEvent = objectMapper.readValue<BinlogEvent>(message)
        logger.info("Consume binlog event: {}", binlogEvent)
        binlogEventHandler.handle(binlogEvent)
    }
}

首先使用 SpringBoot Message Kafka 提供的注解对事件进行消费,接下来将事件委托到 binlogEventHandler 去进行处理。实际上 BinlogEventHandler 是个自定义的函数式接口,我们自定义事件处理器实现该接口后通过 Spring Bean 的方式注入到 KafkaBinlogTopicListener 中。

@Component
class ElasticsearchIndexerBinlogEventHandler(
    val restHighLevelClient: RestHighLevelClient
) : BinlogEventHandler {
    override fun handle(binlogEvent: BinlogEvent) {
        val payload = binlogEvent.payload as Map<*, *>
        val documentId = "${binlogEvent.database}_${binlogEvent.table}_${payload["id"]}"
        // Should delete from Elasticsearch
        if (binlogEvent.eventType == EVENT_TYPE_DELETE) {
            val deleteRequest = DeleteRequest()
            deleteRequest
                .index("search")
                .id(documentId)
            restHighLevelClient.delete(deleteRequest, DEFAULT)
        } else {
            // Not ever WRITE or UPDATE, just reindex
            val indexRequest = IndexRequest()
            indexRequest
                .index("search")
                .id(documentId)
                .source(
                    mapOf<String, Any>(
                        "searchable" to mapOf(
                            "title" to payload["title"],
                            "content" to payload["content"]
                        ),
                        "metadata" to mapOf(
                            "tenantId" to payload["tenantId"],
                            "type" to payload["type"],
                            "createdAt" to payload["createdAt"],
                            "createdBy" to payload["createdBy"],
                            "updatedAt" to payload["updatedAt"],
                            "updatedBy" to payload["updatedBy"]
                        )
                    )
                )
            restHighLevelClient.index(indexRequest, DEFAULT)
        }
    }
}

在这里我们只需要简单地判断是否为删除操作就可以,如果是删除操作需要在 Elasticsearch 中将数据删除,而如果是非删除操作只需要在 Elasticsearch 重新按照为文档建立索引即可。这段代码简单地使用了 Kotlin 中提供的 mapOf 方法对数据进行映射,如果需要其他复杂的处理只需要按照 Java 代码的方式编写处理器即可。

总结

其实 Binlog 的处理部分有很多开源的处理引擎,包括 Alibaba Canal,本文使用手动处理的方式也是为其他使用非 MySQL 数据源的同学类似的解决方案。大家可以按需所取,因地制宜,为自己的网站设计属于自己的实时站内搜索引擎!