flink写入starrocks案例

时间:2025-02-11 08:13:19
import com.starrocks.connector.flink.StarRocksSink; import com.starrocks.connector.flink.table.StarRocksSinkOptions; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.TableSchema; public class StarRocksSinkExample { static class Book { public Book(Long id, String title, String authors, Integer year) { this.id = id; this.title = title; this.authors = authors; this.year = year; } final Long id; final String title; final String authors; final Integer year; } public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.fromElements( new Book(101L, "Stream Processing with Apache Flink", "F* Hueske, Vasiliki Kalavri", 2019), new Book(102L, "Streaming Systems", "Tyler Akidau, Slava Chernyak, Reuven Lax", 2018), new Book(103L, "Designing Data-Intensive Applications", "Martin Kleppmann", 2017), new Book(104L, "Kafka: The Definitive Guide", "Gwen Shapira, Neha Narkhede, Todd Palino", 2017) ).addSink( StarRocksSink.sink( TableSchema.builder() .field("id", DataTypes.BIGINT()) .field("title", DataTypes.VARCHAR(100)) .field("authors", DataTypes.VARCHAR(100)) .field("year", DataTypes.INT()) .build(), StarRocksSinkOptions.builder() .withProperty("connector","starrocks") .withProperty("jdbc-url","jdbc:mysql://:9030?characterEncoding=utf-8&useSSL=false") .withProperty("load-url",":8030;:8030;:8030") .withProperty("username","root") .withProperty("password","pass") .withProperty("table-name","book") .withProperty("database-name","database_name") .withProperty(".column_separator", "\\x01") .withProperty(".row_delimiter", "\\x02") .withProperty("-ms", "10000") .build(), (slot,book)->{ slot[0] = book.id; slot[1] = book.title; slot[2] = book.authors; slot[3] = book.year; } ) ); env.execute(); } }