main:
package com;
import .Tuple2;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
/**
* 启动类
*
* @author duhai
* @date 2021年10月8日
*/
public class IotMain {
/**
* ****注意:<br/>
* 之前和之后是两套环境,创建的类不共享<br/>
* 比如下面的main方法中的((""));<br/>
* 初始化了PropertiesUtils,但是在flink 的流程中还需要再初始化一次,否则无法使用,会报错 <br/>
*
* execute之前的类有:<br/>
* DataSourceGet和DataSourceGet里面的KafkaProduceUtils能使用main方法中初始化的PropertiesUtils<br/>
*
* execute之后的类有:<br/>
* FibonacciSource、MyProcessFunction、LogSink,和里面创建的类,<br/>
* 这个三个类和里面的类需要额外的初始化一次PropertiesUtils,即使用的是FibonacciSource里面初始化的KafkaProduceUtils
*
* ------同理:<br/>
* 之前的log消息也不会打印在Flink Dashboard的日志中<br/>
*
* @param args
* @throws Exception
*/
public static void main(final String[] args) throws Exception {
// 0.获取环境
StreamExecutionEnvironment env = ();
// 检查点 every 5000 msecs
(5000);
// 1.获取参数
final JSONObject argsObject = getFormArgs(args);
((""));
(());
//
DataStreamSource<Tuple2<Integer, Integer>> dataStreamSource = env
.addSource(new DataSourceGet().getFibonacciSource(argsObject));
SingleOutputStreamOperator<String> process = (new MyProcessFunction());
(new LogSink());
("iot test");
}
/**
* 拆解获取到参数:<br/>
* flink-1.13.2/bin/flink run -d
* /data/flink/demo/iot-rule-engine-criterion-1.0.
* =/data/flink/demo <br/>
* 中获取到
*/
private static JSONObject getFormArgs(final String[] args) {
// 参数map
JSONObject argsObject = new JSONObject();
// 循环参数和值
for (int i = 0; i < ; i++) {
String[] args_is = args[i].split("=");
if (args_is.length == 2) {
(args_is[0], args_is[1]);
}
}
return argsObject;
}
}
DataSourceGet:
package ;
import ;
import ;
public class DataSourceGet {
private KafkaProduceUtils kafkaProduceUtils = new KafkaProduceUtils();
public FibonacciSource getFibonacciSource(final JSONObject argsObject) {
//
("dh_test_dh", "123");
//
FibonacciSource fibonacciSource = new FibonacciSource(argsObject);
return fibonacciSource;
}
}
MyProcessFunction:
package ;
import .Tuple2;
import ;
import ;
import .;
import ;
/**
* 流程处理函数
*
* @author duhai
* @date 2021年10月8日
*/
public class MyProcessFunction extends ProcessFunction<Tuple2<Integer, Integer>, String> {
private static final long serialVersionUID = 665249935064432746L;
private static Logger logger = ();
private JedisUtil jedisUtil = new JedisUtil();
@Override
public void processElement(final Tuple2<Integer, Integer> data, //
final ProcessFunction<Tuple2<Integer, Integer>, String>.Context context, //
final Collector<String> out) throws Exception {
final Integer key = data.f0;
final Integer value = data.f1;
// 从redis获取规则
String redisValue = ("afcs", "event_47_post");
if (()) {
("##### redisValue为:" + redisValue + "#####");
}
("key:" + key + ",value:" + value);
}
}
LogSink:
package ;
import ;
import ;
import .;
/**
* sink节点
*
* @author duhai
* @date 2021年10月9日
*/
public class LogSink extends RichSinkFunction<String> {
private static final long serialVersionUID = -8897600652941199311L;
private static Logger logger = ();
/**
* open方法在sink第一次启动时调用,一般用于sink的初始化操作
*/
@Override
public void open(final Configuration parameters) throws Exception {
(parameters);
}
/**
* invoke方法是sink数据处理逻辑的方法,source端传来的数据都在invoke方法中进行处理
* 其中invoke方法中第一个参数类型与RichSinkFunction<String>中的泛型对应。 第二个参数 为一些上下文信息
*/
@Override
public void invoke(final String value, final Context context) throws Exception {
("\r\n##### value:" + value + " #####\r\n");
}
/**
* close方法在sink结束时调用,一般用于资源的回收操作
*/
@Override
public void close() throws Exception {
();
}
}
JedisUtil:
package ;
import ;
import ;
import ;
import ;
import ;
import ;
/**
* redis工具类
*
* @author duhai
* @date 2021年10月8日
*/
public class JedisUtil implements Serializable {
private static final long serialVersionUID = 2712033148155101965L;
private static volatile JedisPool jedisPool = null;
/**
* 初始化
*
* @return
*/
public JedisPool getJedisPool() {
if (jedisPool == null) {
synchronized (this) {
if (jedisPool == null) {
String redisHost = ("");
String redisPort = ("");
String redisPass = ("");
String database = ("");
String redisPoolMaxIdle = ("-idle");
JedisPoolConfig config = new JedisPoolConfig();
// 控制一个pool可分配多少个jedis实例,通过()来获取;
// 如果赋值为-1,则表示不限制;如果pool已经分配了maxActive个jedis实例,则此时pool的状态为exhausted(耗尽)。
(500);
// 控制一个pool最多有多少个状态为idle(空闲的)的jedis实例。
((redisPoolMaxIdle));
// 表示当borrow(引入)一个jedis实例时,最大的等待时间,如果超过等待时间,则直接抛出JedisConnectionException;
(1000 * 100);
// 在borrow一个jedis实例时,是否提前进行validate操作;如果为true,则得到的jedis实例均是可用的;
(true);
// redis如果设置了密码:
jedisPool = new JedisPool(config, redisHost, (redisPort), 10000, redisPass,
(database));
}
}
}
return jedisPool;
}
/**
* 从jedis连接池中获取获取jedis对象
*
* @return
*/
public Jedis getJedis() {
return getJedisPool().getResource();
}
/**
* 回收jedis(放到finally中)
*
* @param jedis
*/
public void returnJedis(final Jedis jedis) {
if (null != jedis) {
();
}
}
/**
* 删除keys对应的记录,可以是多个key
*
* @param String
* ... keys
* @return 删除的记录数
*/
public long del(final String... keys) {
Jedis jedis = getJedis();
try {
long count = (keys);
return count;
} finally {
returnJedis(jedis);
}
}
/**
* 从hash中删除指定的存储
*
* @param String
* key
* @param String
* fieid 存储的名字
* @return 状态码,1成功,0失败
*/
public long hdel(final String key, final String fieid) {
Jedis jedis = getJedis();
try {
long s = (key, fieid);
return s;
} finally {
returnJedis(jedis);
}
}
/**
* 添加一个对应关系
*
* @param String
* key
* @param String
* fieid
* @param String
* value
* @return 状态码 1成功,0失败,fieid已存在将更新,也返回0
**/
public long hset(final String key, final String fieid, final String value) {
Jedis jedis = getJedis();
try {
long s = (key, fieid, value);
return s;
} finally {
returnJedis(jedis);
}
}
/**
* 返回hash中指定存储位置的值
*
* @param String
* key
* @param String
* fieid 存储的名字
* @return 存储对应的值
*/
public String hget(final String key, final String fieid) {
Jedis jedis = getJedis();
try {
String s = (key, fieid);
return s;
} finally {
returnJedis(jedis);
}
}
/**
* 返回hash中指定存储位置的值
*
* @param String
* key
* @return 存储对应的值
*/
public Map<String, String> hgetAll(final String key) {
Jedis jedis = getJedis();
try {
Map<String, String> map = (key);
return map;
} finally {
returnJedis(jedis);
}
}
/**
* 根据key获取记录
*
* @param String
* key
* @return 值
*/
public String get(final String key) {
Jedis jedis = getJedis();
try {
String value = (key);
return value;
} finally {
returnJedis(jedis);
}
}
/**
* 添加记录,如果记录已存在将覆盖原有的value
*
* @param String
* key
* @param String
* value
* @return 状态码
*/
public String set(final String key, final String value) {
return set((key), (value));
}
/**
* 添加记录,如果记录已存在将覆盖原有的value
*
* @param byte[]
* key
* @param byte[]
* value
* @return 状态码
*/
public String set(final byte[] key, final byte[] value) {
Jedis jedis = getJedis();
try {
String status = (key, value);
return status;
} finally {
returnJedis(jedis);
}
}
}
KafkaProduceUtils:
package ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import .;
/**
* KafkaProduce发送消息到AMQP
*
* @author duhai
* @date 2020年5月11日
*/
public class KafkaProduceUtils implements Serializable {
private static final long serialVersionUID = 3442009340575361234L;
private static Logger logger = ();
private static volatile KafkaProducer<String, String> kafkaProducer;
/**
*
* @return
*/
public KafkaProducer<String, String> getkafkaProducer() {
if (kafkaProducer == null) {
synchronized (this) {
if (kafkaProducer == null) {
("##### new KafkaProducer 开始#####");
String bootstrap = ("-servers");
String retry = ("");
String acks = ("");
final Map<String, Object> props = new HashMap<String, Object>();
(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap);
(ProducerConfig.RETRIES_CONFIG, (retry));
(ProducerConfig.ACKS_CONFIG, acks);
(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, );
(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, );
kafkaProducer = new KafkaProducer<String, String>(props);
("##### new KafkaProducer 结束#####");
}
}
}
return kafkaProducer;
}
/**
* 生产消息
*
* @param topic
* @param value
*/
public void producer(final String topic, final String value) {
("##### kafkaProduceUtils开始,topic=" + topic + ",value=" + value);
// 发送消息
getkafkaProducer().send(new ProducerRecord<String, String>(topic, value));
("##### kafkaProduceUtils结束 #####");
}
}
PropertiesUtils:
package ;
import ;
import ;
import ;
import ;
import ;
import ;
import .;
import .;
/**
* 先从外部获取配置文件,没有则从内部获取
*
* @author duhai
* @date 2021年10月13日
*/
public class PropertiesUtils implements Serializable {
private static final long serialVersionUID = 315601047419047701L;
private static Logger logger = ();
private static Properties properties = null;
/**
* 初始化propertiies
*/
public static void initPropertiesByKey(final String propPath) {
properties = new Properties();
try {
if ((propPath)) {
// 优先从项目路径获取连接信息
// String confPath = ("");
String confPath = propPath + + "";
final File file = new File(confPath);
if (()) {
("配置文件路径---->>" + confPath);
("配置文件路径---->>" + confPath);
final InputStream in = new FileInputStream(new File(confPath));
(in);
} else {// 未传入路径时,读取classpath路径
("项目路径confPath[" + confPath + "]下无连接信息,默认从classpath路径下加载properties文件");
("项目路径confPath[" + confPath + "]下无连接信息,默认从classpath路径下加载properties文件");
final InputStream in = ()
.getResourceAsStream("");
(in);
}
} else {// propPath属性为空
("项目路径propPath没有传入,默认从classpath路径下加载properties文件");
("项目路径propPath没有传入,默认从classpath路径下加载properties文件");
final InputStream in = ()
.getResourceAsStream("");
(in);
}
} catch (IOException e) {
(e);
}
}
/**
*
* 根据key获取value值
*
* @param key
* @return
*/
public static String getPropertiesByKey(final String key) {
String value = (key);
return value == null ? "" : value;
}
}
#kafka
-servers=10.11.11.11:9092
-id=aa
#kafka amqp
-servers=10.11.11.12:9092
=1
=0
#
=10.11.11.11
=6379
=111111
=1
-idle=8
#
=test
=10.11.11.11
=5672
=admin
=admin
=/vadmin_host
#
=jdbc:mysql://10.11.11.11:3306/aa_aa?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai&zeroDateTimeBehavior=convertToNull
=root
=111111