![使用Flink时从Kafka中读取Array[Byte]类型的Schema 使用Flink时从Kafka中读取Array[Byte]类型的Schema](https://image.shishitao.com:8440/aHR0cHM6Ly9ia3FzaW1nLmlrYWZhbi5jb20vdXBsb2FkL2NoYXRncHQtcy5wbmc%2FIQ%3D%3D.png?!?w=700&webp=1)
使用Flink时,如果从Kafka中读取输入流,默认提供的是String类型的Schema:
val myConsumer = new FlinkKafkaConsumer08[String]("Topic名称", new SimpleStringSchema(), properties);
如果存入Kafka中的数据不是JSON,而是Protobuf类型的数据,需要用二进制的Schema进行接收,可以自己实现一个类,很简单,只有一行代码:
class ByteArrayDeserializationSchema[T] extends AbstractDeserializationSchema[Array[Byte]]{
@throws[IOException]
override def deserialize(message: Array[Byte]): Array[Byte] = message
}
然后使用时,如下所示:
val myConsumer = new FlinkKafkaConsumer08[String]("Topic名称", new ByteArrayDeserializationSchema[Array[Byte]](), properties);