Singer 学习十二 指南

时间:2021-05-15 16:49:49

版本0.3.0
tap是一个应用程序,需要一个配置文件和可选的状态文件作为输入,并产生有序的流记录, 状态和模式信息作为输出。
一个记录是任何类型的JSON编码的数据。tap 状态消息用于保留一个调用之间的信息。一个模式消息描述stream 中的
数据类型。Tap可以用任何编程语言实现。
tap设计用于从数据库和Web服务API等源生成数据流,以用于数据集成或ETL管道。

概要

 
tap --config CONFIG [--state STATE] [--catalog CATALOG]
CONFIG is a required argument that points to a JSON file containing any
configuration parameters the Tap needs.
STATE is an optional argument pointing to a JSON file that the
Tap can use to remember information from the previous invocation,
like, for example, the point where it left off.
CATALOG is an optional argument pointing to a JSON file that the
Tap can use to filter which streams should be synced.

输入

配置

配置包含Tap需要的任何参数,以便从源中提取数据。通常,这将包括API或数据源的凭据。配置的格式因Tap而异,
但必须采用JSON编码,配置的根必须是对象。
对于需要在运行期间更改配置的tap,tap应将更改写回提供的配置文件,以便可以在后续运行中使用更改。

state

JSON编码状态用于在Tap的调用之间保持信息。
希望持久状态的Tap应该在处理流时定期将STATE消息写入stdout,并且应该期望--state STATE参数命名的文件具有与它发出
的STATE消息的值相同的格式。
状态的一个常见用例是在最后一次调用中断的流中记录该点。如果在没有--state STATE 参数的情况下调用Tap ,它应该从流的
开头或某个适当的默认位置开始。如果使用--state STATE 参数调用它,它应该在状态文件中读取并从流中的相应位置开始。

catalog

catalog 是一个JSON编码文件,列出了可用的stream及其schema。*是一个对象,其中一个名为streams的键指向一个流对象数组。
可以修改每个流对象的元数据,以选择是否应复制流和/或其字段,以及应如何复制数据(FULL TABLE与INCREMENTAL)。

示例调用

从头开始同步,无catalog

 
$ tap --config config.json
 

从存储状态开始与目录同步

$ tap --config config.json --state state.json --catalog catalog.json

输出

Tap以 stdout JSON格式输出结构化消息,每行一条消息。可以发出日志和其他信息以stderr 帮助调试。
流式传输器在成功时以退出代码退出,在失败时退出非零。
正文包含编码为JSON映射的消息,每行一条消息。每条消息都必须包含一个type属性。type 允许任何消息,
并且types被解释为不区分大小写。以下types具有特定含义:

record 消息

RECORD消息包含数据流中的数据。它们必须具有以下属性:
record 必填。包含流数据点的JSON映射
stream 必填。流的字符串名称
time_extracted 可选。在源中观察此记录的时间。这应该是 RFC3339格式的日期时间,如“2017-11-20T16:45:33.000Z”。
单个Tap可以输出具有不同流名称的RECORD消息。单个RECORD条目可能只包含单个流的记录。
例:
注意:每条消息必须在它们自己的行上,但这里的示例使用多行来提高可读性。

 
 {
  "type": "RECORD",
  "stream": "users",
  "time_extracted": "2017-11-20T16:45:33.000Z",
  "record": {
    "id": 0,
    "name": "Chris"
  }
}
 

SCHEMA消息

SCHEMA消息描述流中数据的数据类型。它们必须具有以下属性:

schema 必填。一个JSON模式,用于描述data RECORD 的 属性stream

stream 必填。此架构描述的流的字符串名称

key_properties 必填。一个字符串列表,指示哪些属性构成此流的主键。列表中的每个项目都必须是架构中定义的*属性的名称。key_properties必须提供值,但它可能是一个空列表,表示没有主键。

bookmark_properties 可选。一个字符串列表,指示抽头用作书签的属性。列表中的每个项目都必须是架构中定义的*属性的名称。

单个Tap可以输出具有不同流名称的SCHEMA消息。如果来自流的RECORD消息之前没有SCHEMA该流的 消息,则假定它是无模式的。

例:

注意:每条消息必须在它们自己的行上,但这里的示例使用多行来提高可读性。

{
  "type": "SCHEMA",
  "stream": "users",
  "schema": {
    "properties": {
      "id": {
        "type": "integer"
      },
      "name": {
        "type": "string"
      },
      "updated_at": {
        "type": "string",
        "format": "date-time"
      }
    }
  },
  "key_properties": ["id"],
  "bookmark_properties": ["updated_at"]
}
 

state 消息

STATE消息包含Tap希望保留的状态。STATE消息具有以下属性:
value 必填。JSON格式的状态值
STATE值的语义不是规范的一部分,应由每个Tap独立确定。

 
{"type": "SCHEMA", "stream": "users", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}}}}
{"type": "RECORD", "stream": "users", "record": {"id": 1, "name": "Chris"}}
{"type": "RECORD", "stream": "users", "record": {"id": 2, "name": "Mike"}}
{"type": "SCHEMA", "stream": "locations", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}}}}
{"type": "RECORD", "stream": "locations", "record": {"id": 1, "name": "Philadelphia"}}
{"type": "STATE", "value": {"users": 2, "locations": 1}}
 

版本

Tap的API包含其输入和输出 - 包括其配置,它如何解释状态,以及它产生的数据是如何结构化和解释的。点击应该遵循 语义版本控制,这意味着对其中任何一个的更改应该是新的MAJOR版本,并且向后兼容的更改应该是新的MINOR版本。

参考资料

https://github.com/singer-io/getting-started/blob/master/docs/SPEC.md