0、前言
最近有个需求,需要使用flinksql读写redis,由于官网上并没有redis的connector,在网上找了很久,开源的几个connector又没法满足要求,所有这里就自己动手实现了一个。已经适配了各个版本的flink,从flink1.12到flink1.15。
简单介绍一下功能吧:
- 将redis作为流表时支持BLPOP、BRPOP、LPOP、RPOP、SPOP等命令;使用lua脚本封装的批量弹出提高消费性能
- 将redis作为维表时支持GET、HGET等命令;支持lookup缓存
- 将redis作为sink表时支持LPUSH、RPUSH、SADD、SET、HSET等命令;支持指定key的ttl时间
- 支持flink常见的序列化反序列化方式,如json、csv等,具体参见flink官网:https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/formats/overview/
1、redis作为流表
1.1、数据准备
@Before public void init() { /** 设置当前属于测试模式,在这个测试模式下,当流表数据消费完成后程序会停止,方便测试,这个模式默认false */ RedisOptions.IS_TEST = true; RedisOperator redisOperator = RedisOperators.getSimple("10.201.0.33",6379,"123456",0); List<String> lists = new ArrayList<>(); for (int i = 0; i < 1000; i++) { lists.add("{\n" + " \"number\": " + i + ",\n" + " \"name\": \"学生" + i + "\",\n" + " \"school\": \"学校" + ((i % 3) + 1) +"\",\n" + " \"class_id\": " + ((i % 10) + 1) +"\n" + "}"); } /** * 初始化学生数据 */ for (int i = 0; i < 1; i++) { redisOperator.rpush("students", lists.subList(1000 * i, 1000 * (i + 1))); } /** * 初始化班级数据 */ for(int i = 0;i < 10;i++) { redisOperator.set(String.valueOf(i + 1),"银河" + (i + 1) + "班"); } /** * 初始化学校班级数据 */ for(int j = 1;j < 4;j++) { for (int i = 1; i < 11; i++) { redisOperator.hset("学校" + j, String.valueOf(i), "银河" + i + "班"); } } }
1.2、使用BLPOP、BRPOP、LPOP、RPOP、SPOP消费指定的key的list或者set的数据
@Test public void testBlpopSQL() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings); String source = "CREATE TABLE students\n" + "(\n" + " number BIGINT ,\n" + " name string,\n" + " school string, \n" + " class_id BIGINT \n" + ") \n" + "WITH (\n" + " 'connector'='redis',\n" + " 'host'='10.201.0.33', \n" + " 'port'='6379',\n" + " 'redis-mode'='single', \n" + " 'password'='123456',\n" + " 'database'='0',\n" + " 'key'='students',\n" + " 'format'='json',\n" + " 'batch-fetch-rows'='1000',\n" + " 'json.fail-on-missing-field' = 'false',\n" + " 'json.ignore-parse-errors' = 'true',\n" + " 'command'='BLPOP'\n" + " )"; String sink = "CREATE TABLE sink_students\n" + "(\n" + " number BIGINT ,\n" + " name string,\n" + " school string, \n" + " class_id BIGINT \n" + ") \n" + "WITH (\n" + " 'connector'='print'\n" + " )"; tEnv.executeSql(source); tEnv.executeSql(sink); String sql = " insert into sink_students select * from students"; TableResult tableResult = tEnv.executeSql(sql); tableResult.getJobClient().get().getJobExecutionResult().get(); }
2、redis作为维表(不带format)
2.1、数据准备
@Before public void init() { /** 设置当前属于测试模式,在这个测试模式下,当流表数据消费完成后程序会停止,方便测试,这个模式默认false */ RedisOptions.IS_TEST = true; RedisOperator redisOperator = RedisOperators.getSimple("10.201.0.33",6379,"123456",0); List<String> lists = new ArrayList<>(); for (int i = 0; i < 1000; i++) { lists.add("{\n" + " \"number\": " + i + ",\n" + " \"name\": \"学生" + i + "\",\n" + " \"school\": \"学校" + ((i % 3) + 1) +"\",\n" + " \"class_id\": " + ((i % 10) + 1) +"\n" + "}"); } /** * 初始化学生数据 */ for (int i = 0; i < 1; i++) { redisOperator.rpush("students", lists.subList(1000 * i, 1000 * (i + 1))); } /** * 初始化班级数据 */ for(int i = 0;i < 10;i++) { redisOperator.set(String.valueOf(i + 1),"银河" + (i + 1) + "班"); } /** * 初始化学校班级数据 */ for(int j = 1;j < 4;j++) { for (int i = 1; i < 11; i++) { redisOperator.hset("学校" + j, String.valueOf(i), "银河" + i + "班"); } } }
2.2、使用GET作为维表查询命令
@Test public void testGetSQL() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings); String source = "CREATE TABLE students\n" + "(\n" + " number BIGINT ,\n" + " name string,\n" + " school string, \n" + " class_id BIGINT, \n" + " proctime as PROCTIME() \n" + ") \n" + "WITH (\n" + " 'connector'='redis',\n" + " 'host'='10.201.0.33', \n" + " 'port'='6379',\n" + " 'redis-mode'='single', \n" + " 'password'='123456',\n" + " 'database'='0',\n" + " 'key'='students',\n" + " 'format'='json',\n" + " 'batch-fetch-rows'='1000',\n" + " 'json.fail-on-missing-field' = 'false',\n" + " 'json.ignore-parse-errors' = 'true',\n" + " 'command'='BLPOP'\n" + " )"; /** 这里需要注意的是,由于使用get命令,而且没有加format属性,所以维表只能有两个字段,多了也识别不到, 详细可以看源码里的注释 */ String daeamon = "CREATE TABLE classes\n" + "(\n" + " class_id BIGINT ,\n" + " class_name string " + ") \n" + "WITH (\n" + " 'connector'='redis',\n" + " 'host'='10.201.0.33', \n" + " 'port'='6379',\n" + " 'redis-mode'='single', \n" + " 'password'='123456',\n" + " 'lookup.cache.max-rows'='1000',\n" + " 'lookup.cache.ttl'='3600',\n" + " 'lookup.cache.load-all'='true',\n" + " 'database'='0',\n" + " 'command'='GET'\n" + " )"; String sink = "CREATE TABLE sink_students\n" + "(\n" + " number BIGINT ,\n" + " name string,\n" + " school string, \n" + " class_id BIGINT, \n" + " class_name string \n" + ") \n" + "WITH (\n" + " 'connector'='print'\n" + " )"; tEnv.executeSql(source); tEnv.executeSql(daeamon); tEnv.executeSql(sink); /** 这里join的字段必须是GET命令的key */ String sql = " insert into sink_students " + " select s.number,s.name,s.school,s.class_id,d.class_name from students s" + " left join classes for system_time as of s.proctime as d on d.class_id = s.class_id"; TableResult tableResult = tEnv.executeSql(sql); tableResult.getJobClient().get().getJobExecutionResult().get(); }
2.3、使用HGET作为维表查询命令
@Test public void testHGetSQL() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings); String source = "CREATE TABLE students\n" + "(\n" + " number BIGINT ,\n" + " name string,\n" + " school string, \n" + " class_id BIGINT, \n" + " proctime as PROCTIME() \n" + ") \n" + "WITH (\n" + " 'connector'='redis',\n" + " 'host'='10.201.0.33', \n" + " 'port'='6379',\n" + " 'redis-mode'='single', \n" + " 'password'='123456',\n" + " 'database'='0',\n" + " 'key'='students',\n" + " 'format'='json',\n" + " 'batch-fetch-rows'='1000',\n" + " 'json.fail-on-missing-field' = 'false',\n" + " 'json.ignore-parse-errors' = 'true',\n" + " 'command'='BLPOP'\n" + " )"; /** 这里需要注意的是,由于使用hget命令,而且没有加format属性,所以维表只能有三个字段,多了也识别不到, 详细可以看源码里的注释 */ String daeamon = "CREATE TABLE classes\n" + "(\n" + " school string, \n" + " class_id BIGINT ,\n" + " class_name string " + ") \n" + "WITH (\n" + " 'connector'='redis',\n" + " 'host'='10.201.0.33', \n" + " 'port'='6379',\n" + " 'redis-mode'='single', \n" + " 'password'='123456',\n" + " 'lookup.cache.max-rows'='1000',\n" + " 'lookup.cache.ttl'='3600',\n" + " 'lookup.cache.load-all'='true',\n" + " 'database'='0',\n" + " 'command'='HGET'\n" + " )"; String sink = "CREATE TABLE sink_students\n" + "(\n" + " number BIGINT ,\n" + " name string,\n" + " school string, \n" + " class_id BIGINT, \n" + " class_name string \n" + ") \n" + "WITH (\n" + " 'connector'='print'\n" + " )"; tEnv.executeSql(source); tEnv.executeSql(daeamon); tEnv.executeSql(sink); /** 这里需要注意的是,由于使用hget命令,这里join的参数两个参数顺序没有关系,真正执行hget命令哪个字段作为key, 哪个字段作为field只与维表定义的时候的字段顺序有关系 */ String sql = " insert into sink_students " + " select s.number,s.name,s.school,s.class_id,d.class_name from students s" + " left join classes for system_time as of s.proctime as d on d.class_id = s.class_id and d.school = s.school"; TableResult tableResult = tEnv.executeSql(sql); tableResult.getJobClient().get().getJobExecutionResult().get(); }
3、redis作为维表(带format)
3.1、数据准备
@Before public void init() { RedisOptions.IS_TEST = true; RedisOperator redisOperator = RedisOperators.getSimple("10.201.0.33",6379,"123456",0); List<String> lists = new ArrayList<>(); for (int i = 0; i < 1000; i++) { lists.add("{\n" + " \"number\": " + i + ",\n" + " \"name\": \"学生" + i + "\",\n" + " \"school\": \"学校" + ((i % 3) + 1) +"\",\n" + " \"class_id\": " + ((i % 10) + 1) +"\n" + "}"); } /** * 初始化学生数据 */ for (int i = 0; i < 1; i++) { redisOperator.rpush("students", lists.subList(1000 * i, 1000 * (i + 1))); } /** * 初始化班级数据 */ for(int i = 0;i < 10;i++) { JSONObject jsonObject = new JSONObject(); jsonObject.put("class_id",String.valueOf(i + 1)); jsonObject.put("class_name","银河" + (i + 1) + "班"); jsonObject.put("remark","remark" + i); redisOperator.set(String.valueOf(i + 1),jsonObject.toString()); } /** * 初始化学校班级数据 */ for(int j = 1;j < 4;j++) { for (int i = 1; i < 11; i++) { JSONObject jsonObject = new JSONObject(); jsonObject.put("class_id",String.valueOf(i)); jsonObject.put("class_name","银河" + i + "班"); jsonObject.put("remark","remark" + i); jsonObject.put("school","学校" + j); redisOperator.hset("学校" + j, String.valueOf(i), jsonObject.toString()); } } }
3.2、使用GET作为维表查询命令
@Test public void testGetSQL() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings); String source = "CREATE TABLE students\n" + "(\n" + " number BIGINT ,\n" + " name string,\n" + " school string, \n" + " class_id BIGINT, \n" + " proctime as PROCTIME() \n" + ") \n" + "WITH (\n" + " 'connector'='redis',\n" + " 'host'='10.201.0.33', \n" + " 'port'='6379',\n" + " 'redis-mode'='single', \n" + " 'password'='123456',\n" + " 'database'='0',\n" + " 'key'='students',\n" + " 'format'='json',\n" + " 'batch-fetch-rows'='1000',\n" + " 'json.fail-on-missing-field' = 'false',\n" + " 'json.ignore-parse-errors' = 'true',\n" + " 'command'='BLPOP'\n" + " )"; /** * 这里测试的核心是维表有format=json配置项,有了format配置项后,字段个数不受限制,但是需要注意的是,作为get命令的key的字段 * 一定要放在表申明的第一位,并且get命令的value的值使用format格式化后,比如是json格式,则json里一定要包含作为维表查询的 * join on后面带的作为key的查询列,不然会报空指针异常 */ String daeamon = "CREATE TABLE classes\n" + "(\n" + " class_id BIGINT ,\n" + " class_name string ,\n " + " remark string " + ") \n" + "WITH (\n" + " 'connector'='redis',\n" + " 'host'='10.201.0.33', \n" + " 'port'='6379',\n" + " 'redis-mode'='single', \n" + " 'format'='json', \n" + " 'password'='123456',\n" + " 'lookup.cache.max-rows'='1000',\n" + " 'lookup.cache.ttl'='3600',\n" + " 'lookup.cache.load-all'='true',\n" + " 'database'='0',\n" + " 'command'='GET'\n" + " )"; String sink = "CREATE TABLE sink_students\n" + "(\n" + " number BIGINT ,\n" + " name string,\n" + " school string, \n" + " class_id BIGINT, \n" + " class_name string, \n" + " remark string " + ") \n" + "WITH (\n" + " 'connector'='print'\n" + " )"; tEnv.executeSql(source); tEnv.executeSql(daeamon); tEnv.executeSql(sink); String sql = " insert into sink_students " + " select s.number,s.name,s.school,s.class_id,d.class_name,d.remark from students s" + " left join classes for system_time as of s.proctime as d on d.class_id = s.class_id"; TableResult tableResult = tEnv.executeSql(sql); tableResult.getJobClient().get().getJobExecutionResult().get(); }
3.3、使用HGET作为维表查询命令
@Test public void testHGetSQL() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings); String source = "CREATE TABLE students\n" + "(\n" + " number BIGINT ,\n" + " name string,\n" + " school string, \n" + " class_id BIGINT, \n" + " proctime as PROCTIME() \n" + ") \n" + "WITH (\n" + " 'connector'='redis',\n" + " 'host'='10.201.0.33', \n" + " 'port'='6379',\n" + " 'redis-mode'='single', \n" + " 'password'='123456',\n" + " 'database'='0',\n" + " 'key'='students',\n" + " 'format'='json',\n" + " 'batch-fetch-rows'='1000',\n" + " 'json.fail-on-missing-field' = 'false',\n" + " 'json.ignore-parse-errors' = 'true',\n" + " 'command'='BLPOP'\n" + " )"; /** * 这里测试的核心是维表有format=json配置项,有了format配置项后,字段个数不受限制,但是需要注意的是,作为hget命令的key的字段 * 一定要放在表申明的第一位,field的字段一定要放在申明的第二位,并且hget命令的value的值使用format格式化后,比如是json格式, * 则json里一定要包含作为维表查询的 join on后面带的作为key和field的查询列,不然会报空指针异常 */ String daeamon = "CREATE TABLE classes\n" + "(\n" + " school string, \n" + " class_id BIGINT ,\n" + " class_name string, " + " remark string " + ") \n" + "WITH (\n" + " 'connector'='redis',\n" + " 'host'='10.201.0.33', \n" + " 'port'='6379',\n" + " 'format'='json', \n" + " 'redis-mode'='single', \n" + " 'password'='123456',\n" + " 'lookup.cache.max-rows'='1000',\n" + " 'lookup.cache.ttl'='3600',\n" + " 'lookup.cache.load-all'='true',\n" + " 'database'='0',\n" + " 'command'='HGET'\n" + " )"; String sink = "CREATE TABLE sink_students\n" + "(\n" + " number BIGINT ,\n" + " name string,\n" + " school string, \n" + " class_id BIGINT, \n" + " class_name string, \n" + " remark string " + ") \n" + "WITH (\n" + " 'connector'='print'\n" + " )"; tEnv.executeSql(source); tEnv.executeSql(daeamon); tEnv.executeSql(sink); String sql = " insert into sink_students " + " select s.number,s.name,s.school,s.class_id,d.class_name,d.remark from students s" + " left join classes for system_time as of s.proctime as d on d.class_id = s.class_id and d.school = s.school"; TableResult tableResult = tEnv.executeSql(sql); tableResult.getJobClient().get().getJobExecutionResult().get(); }
4、redis作为sink表
4.1、数据准备
@Before public void init() { RedisOptions.IS_TEST = true; RedisOperator redisOperator = RedisOperators.getSimple("10.201.0.33",6379,"123456",0); List<String> lists = new ArrayList<>(); for (int i = 0; i < 1000; i++) { lists.add("{\n" + " \"number\": " + i + ",\n" + " \"name\": \"学生" + i + "\",\n" + " \"school\": \"学校" + ((i % 3) + 1) +"\",\n" + " \"class_id\": " + ((i % 10) + 1) +"\n" + "}"); } /** * 初始化学生数据 */ for (int i = 0; i < 1; i++) { redisOperator.rpush("students", lists.subList(1000 * i, 1000 * (i + 1))); } /** * 初始化班级数据 */ for(int i = 0;i < 10;i++) { redisOperator.set(String.valueOf(i + 1),"银河" + (i + 1) + "班"); } /** * 初始化学校班级数据 */ for(int j = 1;j < 4;j++) { for (int i = 1; i < 11; i++) { redisOperator.hset("学校" + j, String.valueOf(i), "银河" + i + "班"); } } }
4.2、使用LPush、RPUSH、SADD命令作为sink表写入命令
@Test public void testLPushSQL() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings); String source = "CREATE TABLE students\n" + "(\n" + " number BIGINT ,\n" + " name string,\n" + " school string, \n" + " class_id BIGINT, \n" + " proctime as PROCTIME() \n" + ") \n" + "WITH (\n" + " 'connector'='redis',\n" + " 'host'='10.201.0.33', \n" + " 'port'='6379',\n" + " 'redis-mode'='single', \n" + " 'password'='123456',\n" + " 'database'='0',\n" + " 'key'='students',\n" + " 'format'='json',\n" + " 'batch-fetch-rows'='1000',\n" + " 'json.fail-on-missing-field' = 'false',\n" + " 'json.ignore-parse-errors' = 'true',\n" + " 'command'='BLPOP'\n" + " )"; String daeamon = "CREATE TABLE classes\n" + "(\n" + " school string, \n" + " class_id BIGINT ,\n" + " class_name string " + ") \n" + "WITH (\n" + " 'connector'='redis',\n" + " 'host'='10.201.0.33', \n" + " 'port'='6379',\n" + " 'redis-mode'='single', \n" + " 'password'='123456',\n" + " 'lookup.cache.max-rows'='1000',\n" + " 'lookup.cache.ttl'='3600',\n" + " 'lookup.cache.load-all'='true',\n" + " 'database'='0',\n" + " 'command'='HGET'\n" + " )"; /** * 1、这里因为command是LPUSH,所以不需要primary key(number) not enforced, 因为这种命令只支持INSERT语义 * 2、并行度配置项sink.parallelism没有配置,默认为核心数 */ String sink = "CREATE TABLE sink_students\n" + "(\n" + " number BIGINT ,\n" + " name string,\n" + " school string, \n" + " class_id BIGINT, \n" + " class_name string \n" + ") \n" + "WITH (\n" + " 'connector'='redis',\n" + " 'host'='10.201.0.33', \n" + " 'port'='6379',\n" + " 'redis-mode'='single', \n" + " 'password'='123456',\n" + " 'database'='0',\n" + " 'key'='sink_students_list',\n" + " 'format'='json',\n" + " 'batch-fetch-rows'='1000',\n" + " 'json.fail-on-missing-field' = 'false',\n" + " 'json.ignore-parse-errors' = 'true',\n" + " 'command'='LPUSH'\n" + " )"; tEnv.executeSql(source); tEnv.executeSql(daeamon); tEnv.executeSql(sink); String sql = " insert into sink_students " + " select s.number,s.name,s.school,s.class_id,d.class_name from students s" + " left join classes for system_time as of s.proctime as d on d.class_id = s.class_id and d.school = s.school"; TableResult tableResult = tEnv.executeSql(sql); tableResult.getJobClient().get().getJobExecutionResult().get(); }
4.2、使用SET命令作为sink表写入命令
@Test public void testSet() throws Exception { long start = System.currentTimeMillis(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings); String source = "CREATE TABLE students\n" + "(\n" + " number BIGINT ,\n" + " name string,\n" + " school string, \n" + " class_id BIGINT, \n" + " proctime as PROCTIME() \n" + ") \n" + "WITH (\n" + " 'connector'='redis',\n" + " 'host'='10.201.0.33', \n" + " 'port'='6379',\n" + " 'redis-mode'='single', \n" + " 'password'='123456',\n" + " 'database'='0',\n" + " 'key'='students',\n" + " 'format'='json',\n" + " 'batch-fetch-rows'='1000',\n" + " 'json.fail-on-missing-field' = 'false',\n" + " 'json.ignore-parse-errors' = 'true',\n" + " 'command'='BLPOP'\n" + " )"; String daeamon = "CREATE TABLE classes\n" + "(\n" + " school string, \n" + " class_id BIGINT ,\n" + " class_name string " + ") \n" + "WITH (\n" + " 'connector'='redis',\n" + " 'host'='10.201.0.33', \n" + " 'port'='6379',\n" + " 'redis-mode'='single', \n" + " 'password'='123456',\n" + " 'lookup.cache.max-rows'='1000',\n" + " 'lookup.cache.ttl'='3600',\n" + " 'lookup.cache.load-all'='true',\n" + " 'database'='0',\n" + " 'command'='HGET'\n" + " )"; /** * 1、这里因为command是SET,所以需要一个key,这里key就是使用主键,多个就用下划线拼接起来, * 2、并行度配置项sink.parallelism没有配置,默认为核心数 */ String sink = "CREATE TABLE sink_students\n" + "(\n" + " school string, \n" + " number BIGINT ,\n" + " name string,\n" + " class_id BIGINT, \n" + " class_name string, \n" + " primary key(school,number) not enforced" + ") \n" + "WITH (\n" + " 'connector'='redis',\n" + " 'host'='10.201.0.33', \n" + " 'port'='6379',\n" + " 'redis-mode'='single', \n" + " 'password'='123456',\n" + " 'database'='0',\n" + " 'format'='json',\n" + " 'batch-fetch-rows'='1000',\n" + " 'json.fail-on-missing-field' = 'false',\n" + " 'json.ignore-parse-errors' = 'true',\n" + " 'command'='SET'\n" + " )"; tEnv.executeSql(source); tEnv.executeSql(daeamon); tEnv.executeSql(sink); String sql = " insert into sink_students " + " select s.school,s.number,s.name,s.class_id,d.class_name from students s" + " left join classes for system_time as of s.proctime as d on d.class_id = s.class_id and d.school = s.school"; TableResult tableResult = tEnv.executeSql(sql); tableResult.getJobClient().get().getJobExecutionResult().get(); long end = System.currentTimeMillis(); System.out.println("耗时:" + (end - start) + "ms"); }
4.3、使用HSET命令作为sink表写入命令(不指定key)
@Test public void testHSet() throws Exception { long start = System.currentTimeMillis(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings); String source = "CREATE TABLE students\n" + "(\n" + " number BIGINT ,\n" + " name string,\n" + " school string, \n" + " class_id BIGINT, \n" + " proctime as PROCTIME() \n" + ") \n" + "WITH (\n" + " 'connector'='redis',\n" + " 'host'='10.201.0.33', \n" + " 'port'='6379',\n" + " 'redis-mode'='single', \n" + " 'password'='123456',\n" + " 'database'='0',\n" + " 'key'='students',\n" + " 'format'='json',\n" + " 'batch-fetch-rows'='1000',\n" + " 'json.fail-on-missing-field' = 'false',\n" + " 'json.ignore-parse-errors' = 'true',\n" + " 'command'='BLPOP'\n" + " )"; String daeamon = "CREATE TABLE classes\n" + "(\n" + " school string, \n" + " class_id BIGINT ,\n" + " class_name string " + ") \n" + "WITH (\n" + " 'connector'='redis',\n" + " 'host'='10.201.0.33', \n" + " 'port'='6379',\n" + " 'redis-mode'='single', \n" + " 'password'='123456',\n" + " 'lookup.cache.max-rows'='1000',\n" + " 'lookup.cache.ttl'='3600',\n" + " 'lookup.cache.load-all'='true',\n" + " 'database'='0',\n" + " 'command'='HGET'\n" + " )"; /** * 1、这里因为command是HSET,所以需要一个key和一个field,这里是按照表申明的顺序,第一个作为key, * 第二个作为field,由于需要更新,也需要一个主键,这里最好把前两个字段一起作为主键 * 2、作为sink有一个sink.key.ttl参数可以设置key保存在redis的ttl生存时间,单位秒,默认为-1表示长期保存 */ String sink = "CREATE TABLE sink_students\n" + "(\n" + " school string, \n" + " number BIGINT ,\n" + " name string,\n" + " class_id BIGINT, \n" + " class_name string, \n" + " primary key(school,number) not enforced" + ") \n" + "WITH (\n" + " 'connector'='redis',\n" + " 'host'='10.201.0.33', \n" + " 'port'='6379',\n" + " 'redis-mode'='single', \n" + " 'password'='123456',\n" + " 'database'='0',\n" + " 'format'='json',\n" + " 'batch-fetch-rows'='1000',\n" + " 'json.fail-on-missing-field' = 'false',\n" + " 'json.ignore-parse-errors' = 'true',\n" + " 'sink.parallelism' = '16',\n" + " 'sink.key.ttl' = '300',\n" + " 'command'='HSET'\n" + " )"; tEnv.executeSql(source); tEnv.executeSql(daeamon); tEnv.executeSql(sink); String sql = " insert into sink_students " + " select s.school,s.number,s.name,s.class_id,d.class_name from students s" + " left join classes for system_time as of s.proctime as d on d.class_id = s.class_id and d.school = s.school"; TableResult tableResult = tEnv.executeSql(sql); tableResult.getJobClient().get().getJobExecutionResult().get(); long end = System.currentTimeMillis(); System.out.println("耗时:" + (end - start) + "ms"); }
4.4、使用HSET命令作为sink表写入命令(指定key)
@Test public void testHSetWithKey() throws Exception { long start = System.currentTimeMillis(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings); String source = "CREATE TABLE students\n" + "(\n" + " number BIGINT ,\n" + " name string,\n" + " school string, \n" + " class_id BIGINT, \n" + " proctime as PROCTIME() \n" + ") \n" + "WITH (\n" + " 'connector'='redis',\n" + " 'host'='10.201.0.33', \n" + " 'port'='6379',\n" + " 'redis-mode'='single', \n" + " 'password'='123456',\n" + " 'database'='0',\n" + " 'key'='students',\n" + " 'format'='json',\n" + " 'batch-fetch-rows'='1000',\n" + " 'json.fail-on-missing-field' = 'false',\n" + " 'json.ignore-parse-errors' = 'true',\n" + " 'command'='BLPOP'\n" + " )"; String daeamon = "CREATE TABLE classes\n" + "(\n" + " school string, \n" + " class_id BIGINT ,\n" + " class_name string " + ") \n" + "WITH (\n" + " 'connector'='redis',\n" + " 'host'='10.201.0.33', \n" + " 'port'='6379',\n" + " 'redis-mode'='single', \n" + " 'password'='123456',\n" + " 'lookup.cache.max-rows'='1000',\n" + " 'lookup.cache.ttl'='3600',\n" + " 'lookup.cache.load-all'='true',\n" + " 'database'='0',\n" + " 'command'='HGET'\n" + " )"; /** * 1、这里因为command是HSET,所以需要一个key和一个field,这里配置项指定了key,那么主键拼接就作为field, * 使用hset保存到redis * 2、作为sink有一个sink.key.ttl参数可以设置key保存在redis的ttl生存时间,单位秒,默认-1表示长期保存 */ String sink = "CREATE TABLE sink_students\n" + "(\n" + " school string, \n" + " number BIGINT ,\n" + " name string,\n" + " class_id BIGINT, \n" + " class_name string, \n" + " primary key(number) not enforced" + ") \n" + "WITH (\n" + " 'connector'='redis',\n" + " 'host'='10.201.0.33', \n" + " 'port'='6379',\n" + " 'redis-mode'='single', \n" + " 'password'='123456',\n" + " 'database'='0',\n" + " 'format'='json',\n" + " 'key'='sink_students_hset',\n" + " 'batch-fetch-rows'='1000',\n" + " 'json.fail-on-missing-field' = 'false',\n" + " 'json.ignore-parse-errors' = 'true',\n" + " 'sink.parallelism' = '16',\n" + " 'sink.key.ttl' = '300',\n" + " 'command'='HSET'\n" + " )"; tEnv.executeSql(source); tEnv.executeSql(daeamon); tEnv.executeSql(sink); String sql = " insert into sink_students " + " select s.school,s.number,s.name,s.class_id,d.class_name from students s" + " left join classes for system_time as of s.proctime as d on d.class_id = s.class_id and d.school = s.school"; TableResult tableResult = tEnv.executeSql(sql); tableResult.getJobClient().get().getJobExecutionResult().get(); long end = System.currentTimeMillis(); System.out.println("耗时:" + (end - start) + "ms"); }
5、配置说明
配置项 | 描述 |
---|---|
host | redis的host |
port | redis的port |
password | redis的password |
cluster-nodes | redis的集群节点,ip和端口之间用英文冒号分隔,多个ip端口用英文逗号分割 |
master.name | redis的sentinel模式的master节点的名称 |
sentinels.info | redis的sentinel模式的info信息 |
sentinels.password | redis的sentinel模式的密码 |
database | redis的database,一般是0~15 |
command | redis的命令,作为流表时支持BLPOP、BRPOP、LPOP、RPOP、SPOP;作为维表时支持GET、HGET;作为sink表时支持LPUSH、RPUSH、SADD、SET、HSET |
redis-mode | redis的部署模式,single、cluster、sentinel |
key | redis需要访问的key,比如数据是以某个固定的key存放在redis里,值是一个list;redis执行lpush、rpush、sadd、hset等sink使用的命令时的key; |
timeout | 连接redis的超时时间,单位毫秒 |
max-total | 连接redis的连接池的最大连接数 |
max-idle | 连接redis的连接池的最大空闲数 |
min-idle | 连接redis的连接池的最小空闲数 |
format | 格式化数据格式,如json、csv |
batch-fetch-rows | 像LPOP、BLPOP、RPOP、BRPOP这种命令每次从redis拿到数据的条数 |
lookup.cache.max-rows | 作为维表lookup模式,缓存在内存中的数据的最大条数 |
lookup.cache.ttl | 作为维表lookup模式,缓存在内存中的数据的ttl超时时间,单位秒 |
lookup.max-retries | 作为维表lookup模式,查找数据的失败重试次数 |
lookup.cache.load-all | 作为维表lookup模式,查找数据是否加载所有,主要是针对hget命令,如:HGET KEY_NAME FIELD_NAME;是否根据key查出所有field的值,这里可以根据实际hash表的大小决定是否要查询所有出来缓存起来 |
sink.max-retries | redis作为sink源时,最大重试次数 |
sink.parallelism | redis作为sink源时,sink的并行数,默认并行度为核心数 |
sink.key.ttl | redis作为sink源时,sink的数据保存在redis的ttl超时时间,单位秒,默认为-1表示长期保存 |
lookup.max-retries | 作为维表lookup模式,查找数据的失败重试次数 |
源码地址:https://gitee.com/rongdi/flinksql-connector-redis/