benthos stream nats 集成试用

时间:2022-06-30 00:55:18

测试demo 来自官方例子

使用docker-compose 进行运行

nats docker-compose file

version: '3.3'
services:
  nats:
    image: nats
    ports:
      - "4222:4222"
      - "8222:8222"
      - "6222:6222"

benthos stream 配置

参考 https://github.com/Jeffail/benthos/tree/master/resources/docker/streams

  • 整体说明
    里面的例子就是数据从文件读取到nats 之后不同的消费者进行nats 消息消费,再到webhook,webhook 标准输出

  • 文件读取说明

    读取文件,之后到nats

input:
  type: read_until
  read_until:
    condition:
      type: static
      static: false
    input:
      type: file
      file:
        path: ./sample.json
    restart_input: true
pipeline:
  processors:
    - type: throttle
      throttle:
        period: 3s
output:
  type: nats
  nats:
    subject: benthos_messages
    urls:
      - nats://localhost:4222
  • nats 消费者(三个)
input:
  type: nats
  nats:
    subject: benthos_messages
    urls:
    - nats://localhost:4222
pipeline:
  processors:
    - type: filter
      filter:
        type: jmespath
        jmespath:
          query: |
            keys(@) | contains(@, 'title')
output:
  type: http_client
  http_client:
    url: http://localhost:4195/webhooks/post/customer1
    verb: POST
  • webhook 配置
input:
  type: broker
  broker:
    inputs:
      - type: http_server
        http_server:
          path: /post/customer1
        processors:
          - type: text
            text:
              operator: prepend
              value: "Customer 1 received: "
      - type: http_server
        http_server:
          path: /post/customer2
        processors:
          - type: text
            text:
              operator: prepend
              value: "Customer 2 received: "
      - type: http_server
        http_server:
          path: /post/customer3
        processors:
          - type: text
            text:
              operator: prepend
              value: "Customer 3 received: "
output:
  type: stdout

运行

  • docker-compose
dokcer-compose up -d
  • 启动benthos,基于文件stream 配置
benthos --streams --streams-dir ./configs

运行效果

benthos stream nats 集成试用
修改sample.json 测试
benthos stream nats 集成试用

参考资料

https://github.com/Jeffail/benthos/tree/master/resources/docker/streams