flink demo

时间:2025-04-13 07:34:28

main:

package com;

import .Tuple2;
import ;
import ;
import ;

import ;
import ;
import ;
import ;
import ;

/**
 * 启动类
 * 
 * @author duhai
 * @date 2021年10月8日
 */
public class IotMain {

	/**
	 * ****注意:<br/>
	 * 之前和之后是两套环境,创建的类不共享<br/>
	 * 比如下面的main方法中的((""));<br/>
	 * 初始化了PropertiesUtils,但是在flink 的流程中还需要再初始化一次,否则无法使用,会报错 <br/>
	 * 
	 * execute之前的类有:<br/>
	 * DataSourceGet和DataSourceGet里面的KafkaProduceUtils能使用main方法中初始化的PropertiesUtils<br/>
	 * 
	 * execute之后的类有:<br/>
	 * FibonacciSource、MyProcessFunction、LogSink,和里面创建的类,<br/>
	 * 这个三个类和里面的类需要额外的初始化一次PropertiesUtils,即使用的是FibonacciSource里面初始化的KafkaProduceUtils
	 * 
	 * ------同理:<br/>
	 * 之前的log消息也不会打印在Flink Dashboard的日志中<br/>
	 * 
	 * @param args
	 * @throws Exception
	 */
	public static void main(final String[] args) throws Exception {
		// 0.获取环境
		StreamExecutionEnvironment env = ();
		// 检查点 every 5000 msecs
		(5000);
		// 1.获取参数
		final JSONObject argsObject = getFormArgs(args);
		((""));
		(());
		//
		DataStreamSource<Tuple2<Integer, Integer>> dataStreamSource = env
				.addSource(new DataSourceGet().getFibonacciSource(argsObject));

		SingleOutputStreamOperator<String> process = (new MyProcessFunction());

		(new LogSink());
		("iot test");
	}

	/**
	 * 拆解获取到参数:<br/>
	 * flink-1.13.2/bin/flink run -d
	 * /data/flink/demo/iot-rule-engine-criterion-1.0.
	 * =/data/flink/demo <br/>
	 * 中获取到
	 */
	private static JSONObject getFormArgs(final String[] args) {
		// 参数map
		JSONObject argsObject = new JSONObject();
		// 循环参数和值
		for (int i = 0; i < ; i++) {
			String[] args_is = args[i].split("=");
			if (args_is.length == 2) {
				(args_is[0], args_is[1]);
			}
		}
		return argsObject;
	}

}

DataSourceGet:

package ;

import ;
import ;

public class DataSourceGet {

	private KafkaProduceUtils kafkaProduceUtils = new KafkaProduceUtils();

	public FibonacciSource getFibonacciSource(final JSONObject argsObject) {

		//
		("dh_test_dh", "123");
		
		//
		FibonacciSource fibonacciSource = new FibonacciSource(argsObject);
		return fibonacciSource;
	}

}

MyProcessFunction:

package ;

import .Tuple2;
import ;
import ;
import .;

import ;

/**
 * 流程处理函数
 * 
 * @author duhai
 * @date 2021年10月8日
 */
public class MyProcessFunction extends ProcessFunction<Tuple2<Integer, Integer>, String> {

	private static final long serialVersionUID = 665249935064432746L;

	private static Logger logger = ();

	private JedisUtil jedisUtil = new JedisUtil();

	@Override
	public void processElement(final Tuple2<Integer, Integer> data, //
			final ProcessFunction<Tuple2<Integer, Integer>, String>.Context context, //
			final Collector<String> out) throws Exception {

		final Integer key = data.f0;
		final Integer value = data.f1;

		// 从redis获取规则
		String redisValue = ("afcs", "event_47_post");
		if (()) {
			("##### redisValue为:" + redisValue + "#####");
		}

		("key:" + key + ",value:" + value);
	}

}

LogSink:

package ;

import ;
import ;
import .;

/**
 * sink节点
 * 
 * @author duhai
 * @date 2021年10月9日
 */
public class LogSink extends RichSinkFunction<String> {

	private static final long serialVersionUID = -8897600652941199311L;
	private static Logger logger = ();

	/**
	 * open方法在sink第一次启动时调用,一般用于sink的初始化操作
	 */
	@Override
	public void open(final Configuration parameters) throws Exception {
		(parameters);
	}

	/**
	 * invoke方法是sink数据处理逻辑的方法,source端传来的数据都在invoke方法中进行处理
	 * 其中invoke方法中第一个参数类型与RichSinkFunction<String>中的泛型对应。 第二个参数 为一些上下文信息
	 */
	@Override
	public void invoke(final String value, final Context context) throws Exception {
		("\r\n##### value:" + value + " #####\r\n");
	}

	/**
	 * close方法在sink结束时调用,一般用于资源的回收操作
	 */
	@Override
	public void close() throws Exception {
		();
	}
}

JedisUtil:

package ;

import ;
import ;

import ;
import ;
import ;
import ;

/**
 * redis工具类
 * 
 * @author duhai
 * @date 2021年10月8日
 */
public class JedisUtil implements Serializable {

	private static final long serialVersionUID = 2712033148155101965L;

	private static volatile JedisPool jedisPool = null;

	/**
	 * 初始化
	 * 
	 * @return
	 */
	public JedisPool getJedisPool() {
		if (jedisPool == null) {
			synchronized (this) {
				if (jedisPool == null) {
					String redisHost = ("");
					String redisPort = ("");
					String redisPass = ("");
					String database = ("");
					String redisPoolMaxIdle = ("-idle");

					JedisPoolConfig config = new JedisPoolConfig();
					// 控制一个pool可分配多少个jedis实例,通过()来获取;
					// 如果赋值为-1,则表示不限制;如果pool已经分配了maxActive个jedis实例,则此时pool的状态为exhausted(耗尽)。
					(500);
					// 控制一个pool最多有多少个状态为idle(空闲的)的jedis实例。
					((redisPoolMaxIdle));
					// 表示当borrow(引入)一个jedis实例时,最大的等待时间,如果超过等待时间,则直接抛出JedisConnectionException;
					(1000 * 100);
					// 在borrow一个jedis实例时,是否提前进行validate操作;如果为true,则得到的jedis实例均是可用的;
					(true);
					// redis如果设置了密码:
					jedisPool = new JedisPool(config, redisHost, (redisPort), 10000, redisPass,
							(database));
				}
			}
		}
		return jedisPool;
	}

	/**
	 * 从jedis连接池中获取获取jedis对象
	 * 
	 * @return
	 */
	public Jedis getJedis() {
		return getJedisPool().getResource();
	}

	/**
	 * 回收jedis(放到finally中)
	 * 
	 * @param jedis
	 */
	public void returnJedis(final Jedis jedis) {
		if (null != jedis) {
			();
		}
	}

	/**
	 * 删除keys对应的记录,可以是多个key
	 * 
	 * @param String
	 *            ... keys
	 * @return 删除的记录数
	 */
	public long del(final String... keys) {
		Jedis jedis = getJedis();
		try {
			long count = (keys);
			return count;
		} finally {
			returnJedis(jedis);
		}
	}

	/**
	 * 从hash中删除指定的存储
	 * 
	 * @param String
	 *            key
	 * @param String
	 *            fieid 存储的名字
	 * @return 状态码,1成功,0失败
	 */
	public long hdel(final String key, final String fieid) {
		Jedis jedis = getJedis();
		try {
			long s = (key, fieid);
			return s;
		} finally {
			returnJedis(jedis);
		}
	}

	/**
	 * 添加一个对应关系
	 * 
	 * @param String
	 *            key
	 * @param String
	 *            fieid
	 * @param String
	 *            value
	 * @return 状态码 1成功,0失败,fieid已存在将更新,也返回0
	 **/
	public long hset(final String key, final String fieid, final String value) {
		Jedis jedis = getJedis();
		try {
			long s = (key, fieid, value);
			return s;
		} finally {
			returnJedis(jedis);
		}
	}

	/**
	 * 返回hash中指定存储位置的值
	 * 
	 * @param String
	 *            key
	 * @param String
	 *            fieid 存储的名字
	 * @return 存储对应的值
	 */
	public String hget(final String key, final String fieid) {
		Jedis jedis = getJedis();
		try {
			String s = (key, fieid);
			return s;
		} finally {
			returnJedis(jedis);
		}
	}

	/**
	 * 返回hash中指定存储位置的值
	 * 
	 * @param String
	 *            key
	 * @return 存储对应的值
	 */
	public Map<String, String> hgetAll(final String key) {
		Jedis jedis = getJedis();
		try {
			Map<String, String> map = (key);
			return map;
		} finally {
			returnJedis(jedis);
		}
	}

	/**
	 * 根据key获取记录
	 * 
	 * @param String
	 *            key
	 * @return 值
	 */
	public String get(final String key) {
		Jedis jedis = getJedis();
		try {
			String value = (key);
			return value;
		} finally {
			returnJedis(jedis);
		}
	}

	/**
	 * 添加记录,如果记录已存在将覆盖原有的value
	 * 
	 * @param String
	 *            key
	 * @param String
	 *            value
	 * @return 状态码
	 */
	public String set(final String key, final String value) {
		return set((key), (value));
	}

	/**
	 * 添加记录,如果记录已存在将覆盖原有的value
	 * 
	 * @param byte[]
	 *            key
	 * @param byte[]
	 *            value
	 * @return 状态码
	 */
	public String set(final byte[] key, final byte[] value) {
		Jedis jedis = getJedis();
		try {
			String status = (key, value);
			return status;
		} finally {
			returnJedis(jedis);
		}
	}

}

KafkaProduceUtils:

package ;

import ;
import ;
import ;

import ;
import ;
import ;
import ;
import .;

/**
 * KafkaProduce发送消息到AMQP
 * 
 * @author duhai
 * @date 2020年5月11日
 */
public class KafkaProduceUtils implements Serializable {

	private static final long serialVersionUID = 3442009340575361234L;

	private static Logger logger = ();

	private static volatile KafkaProducer<String, String> kafkaProducer;

	/**
	 * 
	 * @return
	 */
	public KafkaProducer<String, String> getkafkaProducer() {
		if (kafkaProducer == null) {
			synchronized (this) {
				if (kafkaProducer == null) {
					("##### new KafkaProducer 开始#####");
					String bootstrap = ("-servers");
					String retry = ("");
					String acks = ("");
					final Map<String, Object> props = new HashMap<String, Object>();
					(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap);
					(ProducerConfig.RETRIES_CONFIG, (retry));
					(ProducerConfig.ACKS_CONFIG, acks);
					(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, );
					(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, );
					kafkaProducer = new KafkaProducer<String, String>(props);
					("##### new KafkaProducer 结束#####");
				}
			}
		}
		return kafkaProducer;
	}

	/**
	 * 生产消息
	 * 
	 * @param topic
	 * @param value
	 */
	public void producer(final String topic, final String value) {
		("##### kafkaProduceUtils开始,topic=" + topic + ",value=" + value);
		// 发送消息
		getkafkaProducer().send(new ProducerRecord<String, String>(topic, value));
		("##### kafkaProduceUtils结束 #####");
	}

}

PropertiesUtils:

package ;

import ;
import ;
import ;
import ;
import ;
import ;

import .;
import .;

/**
 * 先从外部获取配置文件,没有则从内部获取
 * 
 * @author duhai
 * @date 2021年10月13日
 */
public class PropertiesUtils implements Serializable {

	private static final long serialVersionUID = 315601047419047701L;

	private static Logger logger = ();

	private static Properties properties = null;

	/**
	 * 初始化propertiies
	 */
	public static void initPropertiesByKey(final String propPath) {
		properties = new Properties();
		try {
			if ((propPath)) {
				// 优先从项目路径获取连接信息
				// String confPath = ("");
				String confPath = propPath +  + "";
				final File file = new File(confPath);
				if (()) {
					("配置文件路径---->>" + confPath);
					("配置文件路径---->>" + confPath);
					final InputStream in = new FileInputStream(new File(confPath));
					(in);
				} else {// 未传入路径时,读取classpath路径
					("项目路径confPath[" + confPath + "]下无连接信息,默认从classpath路径下加载properties文件");
					("项目路径confPath[" + confPath + "]下无连接信息,默认从classpath路径下加载properties文件");
					
					final InputStream in = ()
							.getResourceAsStream("");
					(in);
				}
			} else {// propPath属性为空
				("项目路径propPath没有传入,默认从classpath路径下加载properties文件");
				("项目路径propPath没有传入,默认从classpath路径下加载properties文件");
				
				final InputStream in = ()
						.getResourceAsStream("");
				(in);
			}

		} catch (IOException e) {
			(e);
		}
	}

	/**
	 * 
	 * 根据key获取value值
	 * 
	 * @param key
	 * @return
	 */
	public static String getPropertiesByKey(final String key) {
		String value = (key);
		return value == null ? "" : value;
	}

}

#kafka
-servers=10.11.11.11:9092
-id=aa

#kafka amqp
-servers=10.11.11.12:9092
=1
=0

#
=10.11.11.11
=6379
=111111
=1
-idle=8

#
=test
=10.11.11.11
=5672
=admin
=admin
=/vadmin_host

#
=jdbc:mysql://10.11.11.11:3306/aa_aa?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai&zeroDateTimeBehavior=convertToNull
=root
=111111