Elasticsearch:跟踪 ElasticSearch 日志摄取中的缓慢

时间:2021-04-20 00:43:43

我们想跟踪日志的摄取是否有超出我们 Elasticsearch 可接受延迟的额外延迟。 因此,我们已按照之前文章 “Elasticsearch:在 Elasticsearch 中计算摄取延迟并存储摄取时间以提高可观察性” 中提供的步骤进行操作。

1. 创建如下的一个 ingest pipeline

PUT _ingest/pipeline/calculate_lag
{
  "description": "Add an ingest timestamp and calculate ingest lag",
  "processors": [
    {
      "set": {
        "field": "_source.ingest_time",
        "value": "{{_ingest.timestamp}}"
      }
    },
    {
      "script": {
        "lang": "painless",
        "source": """ 
            if(ctx.containsKey("ingest_time") && ctx.containsKey("event_timestamp")) { 
              ctx['lag_in_seconds'] = ChronoUnit.MILLIS.between(ZonedDateTime.parse(ctx['event_timestamp']), ZonedDateTime.parse(ctx['ingest_time']))/1000; 
            } 
        """
      }
    }
  ]
}

注意:此脚本是用 Painless 脚本编写的,它以秒为单位跟踪摄取时间延迟。更多关于 Painless 的编程请参考 “Elastic:开发者上手指南” 中的 “Painless 编程” 部分。

2. 添加如下的 pipeline 到你想要的索引配置中

如果你的索引是新的并且你正在尝试这些东西,则可以执行此步骤。 如果你在运行以下命令之前已经有一个包含数据的索引,请阅读下面的注释。

PUT my_index/_settings
{
  "index.default_pipeline": "calculate_lag"
}

注意:

  • 如果你只是继续并仅将上述 PUT 方法与 index.default_pipeline 一起应用,它可能会弄乱你已经存在的索引设置。 确保首先获得索引的设置并将此管道添加为 ndex.final_pipeline 以及你已有的任何设置,然后进行应用
  • 下面是一个名为 my_index 的示例索引设置
PUT my_index/_settings
{
  "index": {
    "routing": {
      "allocation": {
        "total_shards_per_node": "3"
      }
    },
    "mapping": {
      "total_fields": {
        "limit": "2000"
      }
    },
    "refresh_interval": "30s",
    "default_pipeline": "pipeline_default",
    "number_of_replicas": "1"
  }
}
  • 再次应用完整设置以及添加新字段以将管道添加为 final_pipeline。
PUT my_index/_settings
{
  "index": {
    "routing": {
      "allocation": {
        "total_shards_per_node": "3"
      }
    },
    "mapping": {
      "total_fields": {
        "limit": "2000"
      }
    },
    "refresh_interval": "30s",
    "final_pipeline": "calculate_lag",
    "default_pipeline": "pipeline_default",
    "number_of_replicas": "1"
  }
}

注意:使用 index.final_pipeline 索引设置来设置最终管道。 Elasticsearch 在请求或默认管道之后应用此管道,即使两者均未指定。如果你之前已经有一个 ingest pipeline,设置 calculate_tag 将使得我们计算出摄入的时间差。

这将在获取的日志记录中添加名为 lag_in_seconds 的新字段。 你可以再次查看索引设置的变化以进行交叉验证。 基本上,这表示记录创建的 event_timestamp与它被摄取的时间之间的时间差,即 _ingest.timestamp。 如果你的日志记录具有不同的时间戳文件名,请相应地进行修改。