Flink Java Example之AsyncIOExample详解

时间:2025-04-13 07:22:54

我们学习完Flink相关概念之后发现对Flink编程和程序还是一无所知。这时候我们就需要官方的代码example进行学习和研究,本文就官网github的AsyncIOExample的例子进行详细的代码注释。(ps:其实大家应该都能看懂哈)

Flink Example版本:1.8

Flink 所有java的example代码的Github地址:github

AsyncIO的原理:Flink 原理与实现:Aysnc I/O

AsyncIOExample的例子比较简单:在Flink代码中的嵌入式Flink迷你集群上模拟运行作业。

过程:将数据源发送的数据流元素交给AsyncFunction(异步函数)处理,对每个元素模拟发了出外部请求。AsyncFunction处理过程中会模拟发送请求处理失败产生异常的情形,然后通过checkpoint机制进行job的恢复。整个处理流式任务的过程可结合日志和代码仔细体会即可。

从main开始看,比较清晰,所以我把代码顺序调整了一下,把main放到了最前面:

package ;

import ;
import ;
import ;
import ;
import ;
import ;
import ;

import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import org.;
import org.;

/**
 * 示例演示如何使用AsyncFunction:触发异步I/O操作的函数
 * Example to illustrates how to use {@link AsyncFunction}.
 */
public class AsyncIOExample {

	private static final Logger LOG = ();
	
	/**
	 * 定义一些常量,用于equals比较
	 */
	private static final String EXACTLY_ONCE_MODE = "exactly_once";
	private static final String EVENT_TIME = "EventTime";
	private static final String INGESTION_TIME = "IngestionTime";
	private static final String ORDERED = "ordered";
	
	
	public static void main(String[] args) throws Exception {

		//1.获取一个流式环境
		StreamExecutionEnvironment env = ();

		//用来设置或解析参数
		final ParameterTool params = (args);
		//状态存放路径
		final String statePath;
		//checkPoint的模式
		final String cpMode;
		//最大数量
		final int maxCount;
		//睡眠因素
		final long sleepFactor;
		//失败率
		final float failRatio;
		//等待模式
		final String mode;
		//task任务数
		final int taskNum;
		//时间类型
		final String timeType;
		//关闭等待秒数
		final long shutdownWaitTS;
		//超时时间
		final long timeout;

		try {
			/**
			 * 2.此处为job设置参数值
			 * :如果键存在返回值,否则返回给定的默认值。
			 */
			statePath = ("fsStatePath", null);
			cpMode = ("checkpointMode", "exactly_once");
			maxCount = ("maxCount", 1000);
			sleepFactor = ("sleepFactor", 100);
			failRatio = ("failRatio", 0.001f);
			mode = ("waitMode", "ordered");
			taskNum = ("waitOperatorParallelism", 1);
			timeType = ("eventType", "EventTime");
			shutdownWaitTS = ("shutdownWaitTS", 20000);
			timeout = ("timeout", 10000L);
		} catch (Exception e) {
			printUsage();
			throw e;
		}
		
		//用于拼接config
		StringBuilder configStringBuilder = new StringBuilder();
		
		//获取换行符\n:规避了linux和windows换行符的区别
		final String lineSeparator = ("");
		
		//开始拼接参数
		configStringBuilder
			.append("Job configuration").append(lineSeparator)
			.append("FS state path=").append(statePath).append(lineSeparator)
			.append("Checkpoint mode=").append(cpMode).append(lineSeparator)
			.append("Max count of input from source=").append(maxCount).append(lineSeparator)
			.append("Sleep factor=").append(sleepFactor).append(lineSeparator)
			.append("Fail ratio=").append(failRatio).append(lineSeparator)
			.append("Waiting mode=").append(mode).append(lineSeparator)
			.append("Parallelism for async wait operator=").append(taskNum).append(lineSeparator)
			.append("Event type=").append(timeType).append(lineSeparator)
			.append("Shutdown wait timestamp=").append(shutdownWaitTS);
		//打印拼接好的参数
		(());
		
		//此处为null,并没有走这步
		if (statePath != null) {
			// setup state and checkpoint mode
			(new FsStateBackend(statePath));
		}
		
		/**
		 * 3.根据参数cpMode的值为作业设置checkpoint的类型
		 * param1:checkpoint的时间间隔
		 */
		if (EXACTLY_ONCE_MODE.equals(cpMode)) {
			(1000L, CheckpointingMode.EXACTLY_ONCE);
		}
		else {
			(1000L, CheckpointingMode.AT_LEAST_ONCE);
		}

		//4.判断时间类型,如果是eventTime:事件真实发生时间同时设置watermark
		if (EVENT_TIME.equals(timeType)) {
			();
		}

		//否则为ingestionTime:进入flink系统的时间
		else if (INGESTION_TIME.equals(timeType)) {
			();
		}
		
		//5.流式环境创建一个数据源:创建单个整数的输入流
		DataStream<Integer> inputStream = (new SimpleSource(maxCount));

		//6.创建async函数,它将“等待”一段时间来模拟异步i/o的过程
		AsyncFunction<Integer, String> function =
				new SampleAsyncFunction(sleepFactor, failRatio, shutdownWaitTS);
		
		//7.给流作业添加一个异步的操作
		DataStream<String> result;
		if ((mode)) {
			/**
			 * AsyncDataStream帮助DataStream应用一个异步的函数
			 * 参数:1.数据流源 2.异步函数 3.完成异步操作的超时时长 4.时间单位 5.触发的异步i/o操作的最大数目
			 * setParallelism :设置此操作函数的并行度。
			 */
			result = (
				inputStream,
				function,
				timeout,
				,
				20).setParallelism(taskNum);
		}
		else {
			result = (
				inputStream,
				function,
				timeout,
				,
				20).setParallelism(taskNum);
		}
		//(assigner)
		//8.执行流作业
		("Async IO Example");
		();
	}

	/**
	 * 定义了一个简单的流数据源,实现了 flink数据源通用接口SourceFunction
	 * flink内部的SourceContext调用该类的run方法开始发送数据
	 * 具体功能:一个数据流 -> 不断发送一个从0递增的整数
	 */
	private static class SimpleSource implements SourceFunction<Integer>, ListCheckpointed<Integer> {
		private static final long serialVersionUID = 1L;
		
		//初始化数据源的是否正常运行,默认值:true
		private volatile boolean isRunning = true;
		//计数器
		private int counter = 0;
		//起始值
		private int start = 0;
		
		/**
		 * 储存快照状态:每次作业执行成功后,会保存成功的上一条数据的状态,也就是保存start的值
		 */
		@Override
		public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
			return (start);
		}
		
		/**
		 * 当执行到某个流作业发生异常时,Flink会调用此方法,将状态还原到上一次的成功checkpoint的那个状态点
		 */
		@Override
		public void restoreState(List<Integer> state) throws Exception {
			//找到最新的一次checkpoint成功时start的值
			for (Integer i : state) {
				 = i;
			}
		}
		
		/**
		 * 静态内部类的构造方法:用于初始化计算器的值
		 * @param maxNum
		 */
		public SimpleSource(int maxNum) {
			 = maxNum;
		}

		@Override
		public void run(SourceContext<Integer> ctx) throws Exception {
			/**
			 * 如果:(起始值小于<计算器的值 || 计数器的值 等于 -1) && 数据源是运行的,执行while内部方法。
			 * 也就是说:我们可以设置计算器的值让数据源发送指定次数元素,或者设置计算器的值为-1,让数据源一直发送数据
			 */
			while ((start < counter || counter == -1) && isRunning) {
				/**
				 * 检查点锁
				 * 通过使用提供的检查点锁定对象来保护同步块中元素的状态更新和释放
				 */
				synchronized (()) {
					/**
					 * Flink数据源开始发送元素:元素值为start的值
					 */
					(start);
					/**
					 * 每次发送完一次数据,start值+1
					 */
					++start;
					/**
					 * 如果计数器设置的初始值为-1,直到start值为Integer.MAX_VALUE时重新赋值为0
					 */
					if (start == Integer.MAX_VALUE) {
						start = 0;
					}
				}
				//每发送完一次数据,停顿10毫秒
				(10L);
			}
		}
		
		/**
		 * 实现需要确保在调用此方法后源将跳出while循环
		 */
		@Override
		public void cancel() {
			isRunning = false;
		}
	}


	/**
	 * 一个异步函数的示例:用线程池模拟多个异步操作
	 * 具体功能:处理流任务的异步函数
	 * An sample of {@link AsyncFunction} using a thread pool and executing working threads
	 * to simulate multiple async operations.
	 *
	 * <p>For the real use case in production environment, the thread pool may stay in the
	 * async client.
	 */
	private static class SampleAsyncFunction extends RichAsyncFunction<Integer, String> {
		private static final long serialVersionUID = 2098635244857937717L;
		
		//定义一个线程池服务
		private transient ExecutorService executorService;

		/**
		 * 这个是模拟耗时的异步操作用的:就是假装这个异步操作很耗时,耗时时长为sleepFactor
		 */
		private final long sleepFactor;

		/**
		 * 这个是模拟异步操作出现了异常:就是假装我的流任务的异步操作出现异常啦~ 会报错:Exception : wahahaha...
		 */
		private final float failRatio;
		
		private final long shutdownWaitTS;
		
		//异步函数构造方法
		SampleAsyncFunction(long sleepFactor, float failRatio, long shutdownWaitTS) {
			 = sleepFactor;
			 = failRatio;
			 = shutdownWaitTS;
		}
		
		/**
		 * 函数的初始化方法:就是构造此异步函数后,会执行该初始化方法
		 */
		@Override
		public void open(Configuration parameters) throws Exception {
			(parameters);
			//初始化线程池大小
			executorService = (30);
		}
		
		/**
		 * 最后一次作业执行后,调动该方法关闭一些资源操作
		 */
		@Override
		public void close() throws Exception {
			();
			//优雅的关闭线程池服务
			(shutdownWaitTS, , executorService);
		}
		
		/**
		 * 真正执行异步IO的方法
		 * 这里用线程池模拟 source支持异步发送数据流
		 */
		@Override
		public void asyncInvoke(final Integer input, final ResultFuture<String> resultFuture) {
			(() -> {
				//模拟元素的操作时长:就是这个元素与外部系统交互的时长,然后sleep这么长的时间
				long sleep = (long) (().nextFloat() * sleepFactor);
				try {
					(sleep);
					//模拟触发异常:就是与外部系统交互时,假装出错发出了一个异常,此处可以查看日志,观察flink如何checkpoint恢复
					if (().nextFloat() < failRatio) {
						(new Exception("wahahahaha..."));
					} else {
						//模拟交互成功后,将结果返回给Flink
						(
							("key-" + (input % 10)));
					}
				} catch (InterruptedException e) {
					(new ArrayList<>(0));
				}
			});
		}
	}

	/**
	 * 打印flink的一些参数信息
	 */
	private static void printUsage() {
		("To customize example, use: AsyncIOExample [--fsStatePath <path to fs state>] " +
				"[--checkpointMode <exactly_once or at_least_once>] " +
				"[--maxCount <max number of input from source, -1 for infinite input>] " +
				"[--sleepFactor <interval to sleep for each stream element>] [--failRatio <possibility to throw exception>] " +
				"[--waitMode <ordered or unordered>] [--waitOperatorParallelism <parallelism for async wait operator>] " +
				"[--eventType <EventTime or IngestionTime>] [--shutdownWaitTS <milli sec to wait for thread pool>]" +
				"[--timeout <Timeout for the asynchronous operations>]");
	}

}

执行main方法后,完整的日志如下(ps其实看日志挺有意思的,可以看到flink很多过程~):

15:34:25,957 INFO                     - Job configuration
FS state path=null
Checkpoint mode=exactly_once
Max count of input from source=1000
Sleep factor=100
Fail ratio=0.001
Waiting mode=ordered
Parallelism for async wait operator=1
Event type=EventTime
Shutdown wait timestamp=20000
15:34:26,130 INFO    - Running job on local embedded Flink mini cluster
15:34:26,276 INFO                - Starting Flink Mini Cluster
15:34:26,279 INFO                - Starting Metrics Registry
15:34:26,327 INFO             - No metrics reporter configured, no metrics will be exposed/reported.
15:34:26,327 INFO                - Starting RPC Service(s)
15:34:26,584 INFO  .slf4j.Slf4jLogger                                  - Slf4jLogger started
15:34:26,603 INFO                - Trying to start actor system at :0
15:34:26,636 INFO  .slf4j.Slf4jLogger                                  - Slf4jLogger started
15:34:26,675 INFO                                            - Starting remoting
15:34:26,929 INFO                                            - Remoting started; listening on addresses :[://flink-metrics@172.16.126.14:61481]
15:34:26,934 INFO                - Actor system started at ://flink-metrics@172.16.126.14:61481
15:34:26,936 INFO                - Starting high-availability services
15:34:26,948 INFO                        - Created BLOB server storage directory C:\Users\BONC\AppData\Local\Temp\blobStore-094c3bce-9270-4e01-8c4b-38768a5893c1
15:34:26,954 INFO                        - Started BLOB server at 0.0.0.0:61482 - max concurrent requests: 50 - max backlog: 1000
15:34:26,958 INFO                - Created BLOB cache storage directory C:\Users\BONC\AppData\Local\Temp\blobStore-17590f89-39e2-4415-abfb-537ea3ed075e
15:34:26,961 INFO                - Created BLOB cache storage directory C:\Users\BONC\AppData\Local\Temp\blobStore-089378be-f96d-468a-a962-9546304b6eb2
15:34:26,961 INFO                - Starting 1 TaskManger(s)
15:34:26,963 INFO         - Starting TaskManager with ResourceID: 15962233-f7e2-44c1-a442-cacdc19f537b
15:34:27,009 INFO       - Temporary file directory 'C:\Users\BONC\AppData\Local\Temp': total 100 GB, usable 5 GB (5.00% usable)
15:34:27,160 INFO    - Allocated 401 MB for network buffer pool (number of memory segments: 12856, bytes per segment: 32768).
15:34:27,164 INFO          - Starting the network environment and its components.
15:34:27,166 WARN        - No hostname could be resolved for the IP address 127.0.0.1, using IP address as host name. Local input split assignment (such as for HDFS files) may be impacted.
15:34:27,166 INFO       - Limiting managed memory to 0.7 of the currently free heap space (2527 MB), memory will be allocated lazily.
15:34:27,170 INFO            - I/O manager uses directory C:\Users\BONC\AppData\Local\Temp\flink-io-55753bcf-1076-4999-8ed1-57558521bf42 for spill files.
15:34:27,227 INFO    - Messages have a max timeout of 10000 ms
15:34:27,237 INFO                - Starting RPC endpoint for  at akka://flink/user/taskmanager_0 .
15:34:27,250 INFO          - Start job leader service.
15:34:27,251 INFO                    - User file cache uses directory C:\Users\BONC\AppData\Local\Temp\flink-dist-cache-740d690b-ccc2-4c10-b38a-9e6256fc6f08
15:34:27,289 INFO      - Starting rest endpoint.
15:34:27,476 WARN             - Log file environment variable '' is not set.
15:34:27,477 WARN             - JobManager log files are unavailable in the web dashboard. Log file location not found in environment variable '' or configuration key 'Key: '' , default: null (fallback keys: [{key=, isDeprecated=true}])'.
15:34:27,485 INFO      - Failed to load web based job submission extension. Probable reason: flink-runtime-web is not in the classpath.
15:34:27,803 INFO      - Rest endpoint listening at localhost:61501
15:34:27,805 INFO    - Proposing leadership to contender @31be6b49 @ http://localhost:61501
15:34:27,817 INFO                - Starting RPC endpoint for  at akka://flink/user/resourcemanager .
15:34:27,832 INFO      - http://localhost:61501 was granted leadership with leaderSessionID=cd92eef3-c2f3-4fc5-9234-309493602b84
15:34:27,832 INFO                - Starting RPC endpoint for  at akka://flink/user/dispatcher .
15:34:27,833 INFO    - Received confirmation of leadership for leader http://localhost:61501 , session=cd92eef3-c2f3-4fc5-9234-309493602b84
15:34:27,843 INFO    - Proposing leadership to contender @214a971 @ akka://flink/user/dispatcher
15:34:27,844 INFO    - Proposing leadership to contender @6544adb @ akka://flink/user/resourcemanager
15:34:27,845 INFO                - Flink Mini Cluster started successfully
15:34:27,847 INFO        - Dispatcher akka://flink/user/dispatcher was granted leadership with fencing token b51436a0-a06d-456f-bd26-693e6ac9b5f9
15:34:27,851 INFO        - Recovering all persisted jobs.
15:34:27,852 INFO    - Received confirmation of leadership for leader akka://flink/user/dispatcher , session=b51436a0-a06d-456f-bd26-693e6ac9b5f9
15:34:27,870 INFO    - ResourceManager akka://flink/user/resourcemanager was granted leadership with fencing token 866212de9f97abcf1f26c94a26294b48
15:34:27,870 INFO    - Starting the SlotManager.
15:34:27,872 INFO    - Received confirmation of leadership for leader akka://flink/user/resourcemanager , session=1f26c94a-2629-4b48-8662-12de9f97abcf
15:34:27,874 INFO              - Connecting to ResourceManager akka://flink/user/resourcemanager(866212de9f97abcf1f26c94a26294b48).
15:34:27,879 INFO              - Resolved ResourceManager address, beginning registration
15:34:27,879 INFO              - Registration at ResourceManager attempt 1 (timeout=100ms)
15:34:27,883 INFO        - Received JobGraph submission 53d61cb3e5941169150beec14320d110 (Async IO Example).
15:34:27,883 INFO        - Submitting job 53d61cb3e5941169150beec14320d110 (Async IO Example).
15:34:27,885 INFO    - Registering TaskManager with ResourceID 15962233-f7e2-44c1-a442-cacdc19f537b (akka://flink/user/taskmanager_0) at ResourceManager
15:34:27,886 INFO              - Successful registration at resource manager akka://flink/user/resourcemanager under registration id 3913ce156df2c5ced9df21c9edfa7931.
15:34:27,923 INFO                - Starting RPC endpoint for  at akka://flink/user/jobmanager_1 .
15:34:27,931 INFO                    - Initializing job Async IO Example (53d61cb3e5941169150beec14320d110).
15:34:27,936 INFO                    - Using restart strategy FixedDelayRestartStrategy(maxNumberRestartAttempts=2147483647, delayBetweenRestartAttempts=0) for Async IO Example (53d61cb3e5941169150beec14320d110).
15:34:27,957 INFO          - Job recovers via failover strategy: full graph restart
15:34:27,984 INFO                    - Running initialization on master for job Async IO Example (53d61cb3e5941169150beec14320d110).
15:34:27,984 INFO                    - Successfully ran initialization on master in 0 ms.
15:34:28,001 INFO                    - No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880)
15:34:28,010 INFO    - Proposing leadership to contender @288896b8 @ akka://flink/user/jobmanager_1
15:34:28,012 INFO             - JobManager runner for job Async IO Example (53d61cb3e5941169150beec14320d110) was granted leadership with session id 2a4b2033-ba80-4056-9b04-b4bcd97848f2 at akka://flink/user/jobmanager_1.
15:34:28,014 INFO                    - Starting execution of job Async IO Example (53d61cb3e5941169150beec14320d110) under job master id 9b04b4bcd97848f22a4b2033ba804056.
15:34:28,015 INFO          - Job Async IO Example (53d61cb3e5941169150beec14320d110) switched from state CREATED to RUNNING.
15:34:28,025 INFO          - Source: Custom Source -> async wait operator (1/1) (40afe509423f45913394ef061f465b1b) switched from CREATED to SCHEDULED.
15:34:28,036 INFO        - Cannot serve slot request, no ResourceManager connected. Adding as pending request [SlotRequestId{163e56cc239963ee2ea561e91c9e273b}]
15:34:28,043 INFO    - Received confirmation of leadership for leader akka://flink/user/jobmanager_1 , session=2a4b2033-ba80-4056-9b04-b4bcd97848f2
15:34:28,044 INFO                    - Connecting to ResourceManager akka://flink/user/resourcemanager(866212de9f97abcf1f26c94a26294b48)
15:34:28,045 INFO                    - Resolved ResourceManager address, beginning registration
15:34:28,045 INFO                    - Registration at ResourceManager attempt 1 (timeout=100ms)
15:34:28,046 INFO    - Registering job manager 9b04b4bcd97848f22a4b2033ba804056@akka://flink/user/jobmanager_1 for job 53d61cb3e5941169150beec14320d110.
15:34:28,050 INFO    - Registered job manager 9b04b4bcd97848f22a4b2033ba804056@akka://flink/user/jobmanager_1 for job 53d61cb3e5941169150beec14320d110.
15:34:28,052 INFO                    - JobManager successfully registered at ResourceManager, leader id: 866212de9f97abcf1f26c94a26294b48.
15:34:28,053 INFO        - Requesting new slot [SlotRequestId{163e56cc239963ee2ea561e91c9e273b}] and profile ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0, nativeMemoryInMB=0, networkMemoryInMB=0} from resource manager.
15:34:28,054 INFO    - Request slot with profile ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0, nativeMemoryInMB=0, networkMemoryInMB=0} for job 53d61cb3e5941169150beec14320d110 with allocation id 4e3bf224392a78dad3e11bc84402898f.
15:34:28,056 INFO              - Receive slot request 4e3bf224392a78dad3e11bc84402898f for job 53d61cb3e5941169150beec14320d110 from resource manager with leader id 866212de9f97abcf1f26c94a26294b48.
15:34:28,056 INFO              - Allocated slot for 4e3bf224392a78dad3e11bc84402898f.
15:34:28,056 INFO          - Add job 53d61cb3e5941169150beec14320d110 for job leader monitoring.
15:34:28,057 INFO       - Checkpoint triggering task Source: Custom Source -> async wait operator (1/1) of job 53d61cb3e5941169150beec14320d110 is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.
15:34:28,058 INFO          - Try to register at job manager akka://flink/user/jobmanager_1 with leader id 2a4b2033-ba80-4056-9b04-b4bcd97848f2.
15:34:28,058 INFO          - Resolved JobManager address, beginning registration
15:34:28,058 INFO          - Registration at JobManager attempt 1 (timeout=100ms)
15:34:28,060 INFO          - Successful registration at job manager akka://flink/user/jobmanager_1 for job 53d61cb3e5941169150beec14320d110.
15:34:28,061 INFO              - Establish JobManager connection for job 53d61cb3e5941169150beec14320d110.
15:34:28,064 INFO              - Offer reserved slots to the leader of job 53d61cb3e5941169150beec14320d110.
15:34:28,068 INFO          - Source: Custom Source -> async wait operator (1/1) (40afe509423f45913394ef061f465b1b) switched from SCHEDULED to DEPLOYING.
15:34:28,068 INFO          - Deploying Source: Custom Source -> async wait operator (1/1) (attempt #0) to 15962233-f7e2-44c1-a442-cacdc19f537b @ 127.0.0.1 (dataPort=-1)
15:34:28,085 INFO              - Received task Source: Custom Source -> async wait operator (1/1).
15:34:28,085 INFO                       - Source: Custom Source -> async wait operator (1/1) (40afe509423f45913394ef061f465b1b) switched from CREATED to DEPLOYING.
15:34:28,085 INFO                       - Creating FileSystem stream leak safety net for task Source: Custom Source -> async wait operator (1/1) (40afe509423f45913394ef061f465b1b) [DEPLOYING]
15:34:28,085 INFO        - Activate slot 4e3bf224392a78dad3e11bc84402898f.
15:34:28,088 INFO                       - Loading JAR files for task Source: Custom Source -> async wait operator (1/1) (40afe509423f45913394ef061f465b1b) [DEPLOYING].
15:34:28,089 INFO                       - Registering task at network: Source: Custom Source -> async wait operator (1/1) (40afe509423f45913394ef061f465b1b) [DEPLOYING].
15:34:28,096 INFO                       - Source: Custom Source -> async wait operator (1/1) (40afe509423f45913394ef061f465b1b) switched from DEPLOYING to RUNNING.
15:34:28,096 INFO          - Source: Custom Source -> async wait operator (1/1) (40afe509423f45913394ef061f465b1b) switched from DEPLOYING to RUNNING.
15:34:28,098 INFO             - No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880)
15:34:29,061 INFO       - Triggering checkpoint 1 @ 1563003269057 for job 53d61cb3e5941169150beec14320d110.
15:34:29,086 INFO       - Completed checkpoint 1 for job 53d61cb3e5941169150beec14320d110 (626 bytes in 28 ms).
15:34:29,967 INFO                       - Attempting to fail task externally Source: Custom Source -> async wait operator (1/1) (40afe509423f45913394ef061f465b1b).
15:34:29,968 INFO                       - Source: Custom Source -> async wait operator (1/1) (40afe509423f45913394ef061f465b1b) switched from RUNNING to FAILED.
: An async function call terminated with an exception. Failing the AsyncWaitOperator.
	at (:137)
	at (:85)
	at (:748)
Caused by: : : wahahahaha...
	at (:357)
	at (:1895)
	at (:68)
	at (:129)
	... 2 more
Caused by: : wahahahaha...
	at $$0(:325)
	at $(:511)
	at (:266)
	at (:1149)
	at $(:624)
	... 1 more
15:34:29,971 INFO                       - Triggering cancellation of task code Source: Custom Source -> async wait operator (1/1) (40afe509423f45913394ef061f465b1b).
15:34:30,014 INFO                       - Freeing task resources for Source: Custom Source -> async wait operator (1/1) (40afe509423f45913394ef061f465b1b).
15:34:30,031 INFO                       - Ensuring all FileSystem streams are closed for task Source: Custom Source -> async wait operator (1/1) (40afe509423f45913394ef061f465b1b) [FAILED]
15:34:30,037 INFO              - Un-registering task and sending final execution state FAILED to JobManager for task Source: Custom Source -> async wait operator 40afe509423f45913394ef061f465b1b.
15:34:30,048 INFO          - Source: Custom Source -> async wait operator (1/1) (40afe509423f45913394ef061f465b1b) switched from RUNNING to FAILED.
: An async function call terminated with an exception. Failing the AsyncWaitOperator.
	at (:137)
	at (:85)
	at (:748)
Caused by: : : wahahahaha...
	at (:357)
	at (:1895)
	at (:68)
	at (:129)
	... 2 more
Caused by: : wahahahaha...
	at $$0(:325)
	at $(:511)
	at (:266)
	at (:1149)
	at $(:624)
	... 1 more
15:34:30,048 INFO          - Job Async IO Example (53d61cb3e5941169150beec14320d110) switched from state RUNNING to FAILING.
: An async function call terminated with an exception. Failing the AsyncWaitOperator.
	at (:137)
	at (:85)
	at (:748)
Caused by: : : wahahahaha...
	at (:357)
	at (:1895)
	at (:68)
	at (:129)
	... 2 more
Caused by: : wahahahaha...
	at $$0(:325)
	at $(:511)
	at (:266)
	at (:1149)
	at $(:624)
	... 1 more
15:34:30,051 INFO              - Discarding the results produced by task execution 40afe509423f45913394ef061f465b1b.
15:34:30,054 INFO          - Try to restart or fail the job Async IO Example (53d61cb3e5941169150beec14320d110) if no longer possible.
15:34:30,054 INFO          - Job Async IO Example (53d61cb3e5941169150beec14320d110) switched from state FAILING to RESTARTING.
15:34:30,055 INFO          - Restarting the job Async IO Example (53d61cb3e5941169150beec14320d110).
15:34:30,061 INFO          - Job Async IO Example (53d61cb3e5941169150beec14320d110) switched from state RESTARTING to CREATED.
15:34:30,061 INFO       - Restoring job 53d61cb3e5941169150beec14320d110 from latest valid checkpoint: Checkpoint 1 @ 1563003269057 for 53d61cb3e5941169150beec14320d110.
15:34:30,068 INFO       - No master state to restore
15:34:30,069 INFO          - Job Async IO Example (53d61cb3e5941169150beec14320d110) switched from state CREATED to RUNNING.
15:34:30,069 INFO          - Source: Custom Source -> async wait operator (1/1) (e16dfc0cf2a7703cace518ff6cf85703) switched from CREATED to SCHEDULED.
15:34:30,071 INFO          - Source: Custom Source -> async wait operator (1/1) (e16dfc0cf2a7703cace518ff6cf85703) switched from SCHEDULED to DEPLOYING.
15:34:30,071 INFO          - Deploying Source: Custom Source -> async wait operator (1/1) (attempt #1) to 15962233-f7e2-44c1-a442-cacdc19f537b @ 127.0.0.1 (dataPort=-1)
15:34:30,074 INFO              - Received task Source: Custom Source -> async wait operator (1/1).
15:34:30,074 INFO                       - Source: Custom Source -> async wait operator (1/1) (e16dfc0cf2a7703cace518ff6cf85703) switched from CREATED to DEPLOYING.
15:34:30,075 INFO                       - Creating FileSystem stream leak safety net for task Source: Custom Source -> async wait operator (1/1) (e16dfc0cf2a7703cace518ff6cf85703) [DEPLOYING]
15:34:30,075 INFO                       - Loading JAR files for task Source: Custom Source -> async wait operator (1/1) (e16dfc0cf2a7703cace518ff6cf85703) [DEPLOYING].
15:34:30,076 INFO                       - Registering task at network: Source: Custom Source -> async wait operator (1/1) (e16dfc0cf2a7703cace518ff6cf85703) [DEPLOYING].
15:34:30,079 INFO                       - Source: Custom Source -> async wait operator (1/1) (e16dfc0cf2a7703cace518ff6cf85703) switched from DEPLOYING to RUNNING.
15:34:30,079 INFO             - No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880)
15:34:30,079 INFO          - Source: Custom Source -> async wait operator (1/1) (e16dfc0cf2a7703cace518ff6cf85703) switched from DEPLOYING to RUNNING.
15:34:31,003 INFO       - Triggering checkpoint 2 @ 1563003271003 for job 53d61cb3e5941169150beec14320d110.
15:34:31,009 INFO       - Completed checkpoint 2 for job 53d61cb3e5941169150beec14320d110 (636 bytes in 4 ms).
15:34:32,003 INFO       - Triggering checkpoint 3 @ 1563003272003 for job 53d61cb3e5941169150beec14320d110.
15:34:32,004 INFO       - Completed checkpoint 3 for job 53d61cb3e5941169150beec14320d110 (616 bytes in 1 ms).
15:34:33,003 INFO       - Triggering checkpoint 4 @ 1563003273003 for job 53d61cb3e5941169150beec14320d110.
15:34:33,004 INFO       - Completed checkpoint 4 for job 53d61cb3e5941169150beec14320d110 (626 bytes in 1 ms).
15:34:34,003 INFO       - Triggering checkpoint 5 @ 1563003274003 for job 53d61cb3e5941169150beec14320d110.
15:34:34,005 INFO       - Completed checkpoint 5 for job 53d61cb3e5941169150beec14320d110 (646 bytes in 2 ms).
15:34:35,003 INFO       - Triggering checkpoint 6 @ 1563003275003 for job 53d61cb3e5941169150beec14320d110.
15:34:35,004 INFO       - Completed checkpoint 6 for job 53d61cb3e5941169150beec14320d110 (626 bytes in 1 ms).
15:34:36,003 INFO       - Triggering checkpoint 7 @ 1563003276003 for job 53d61cb3e5941169150beec14320d110.
15:34:36,005 INFO       - Completed checkpoint 7 for job 53d61cb3e5941169150beec14320d110 (636 bytes in 2 ms).
15:34:37,003 INFO       - Triggering checkpoint 8 @ 1563003277003 for job 53d61cb3e5941169150beec14320d110.
15:34:37,006 INFO       - Completed checkpoint 8 for job 53d61cb3e5941169150beec14320d110 (636 bytes in 2 ms).
15:34:38,003 INFO       - Triggering checkpoint 9 @ 1563003278003 for job 53d61cb3e5941169150beec14320d110.
15:34:38,005 INFO       - Completed checkpoint 9 for job 53d61cb3e5941169150beec14320d110 (631 bytes in 2 ms).
15:34:39,003 INFO       - Triggering checkpoint 10 @ 1563003279003 for job 53d61cb3e5941169150beec14320d110.
15:34:39,005 INFO       - Completed checkpoint 10 for job 53d61cb3e5941169150beec14320d110 (641 bytes in 2 ms).
15:34:39,346 INFO                       - Source: Custom Source -> async wait operator (1/1) (e16dfc0cf2a7703cace518ff6cf85703) switched from RUNNING to FINISHED.
15:34:39,346 INFO                       - Freeing task resources for Source: Custom Source -> async wait operator (1/1) (e16dfc0cf2a7703cace518ff6cf85703).
15:34:39,347 INFO                       - Ensuring all FileSystem streams are closed for task Source: Custom Source -> async wait operator (1/1) (e16dfc0cf2a7703cace518ff6cf85703) [FINISHED]
15:34:39,347 INFO              - Un-registering task and sending final execution state FINISHED to JobManager for task Source: Custom Source -> async wait operator e16dfc0cf2a7703cace518ff6cf85703.
15:34:39,349 INFO          - Source: Custom Source -> async wait operator (1/1) (e16dfc0cf2a7703cace518ff6cf85703) switched from RUNNING to FINISHED.
15:34:39,349 INFO          - Job Async IO Example (53d61cb3e5941169150beec14320d110) switched from state RUNNING to FINISHED.
15:34:39,349 INFO       - Stopping checkpoint coordinator for job 53d61cb3e5941169150beec14320d110.
15:34:39,349 INFO    - Shutting down
15:34:39,357 INFO        - Job 53d61cb3e5941169150beec14320d110 reached globally terminal state FINISHED.
15:34:39,357 INFO                - Shutting down Flink Mini Cluster
15:34:39,358 INFO      - Shutting down rest endpoint.
15:34:39,358 INFO              - Stopping TaskExecutor akka://flink/user/taskmanager_0.
15:34:39,359 INFO          - Stop job leader service.
15:34:39,359 INFO                    - Stopping the JobMaster for job Async IO Example(53d61cb3e5941169150beec14320d110).
15:34:39,359 INFO    - Shutting down TaskExecutorLocalStateStoresManager.
15:34:39,360 INFO        - Suspending SlotPool.
15:34:39,361 INFO                    - Close ResourceManager connection 5df31933d01f3737bf7ac1e33e11defc: JobManager is shutting down..
15:34:39,361 INFO        - Stopping SlotPool.
15:34:39,363 INFO    - Disconnect job manager 9b04b4bcd97848f22a4b2033ba804056@akka://flink/user/jobmanager_1 for job 53d61cb3e5941169150beec14320d110 from the resource manager.
15:34:39,370 INFO            - I/O manager removed spill file directory C:\Users\BONC\AppData\Local\Temp\flink-io-55753bcf-1076-4999-8ed1-57558521bf42
15:34:39,370 INFO          - Shutting down the network environment and its components.
15:34:39,378 INFO          - Stop job leader service.
15:34:39,379 INFO                    - removed file cache directory C:\Users\BONC\AppData\Local\Temp\flink-dist-cache-740d690b-ccc2-4c10-b38a-9e6256fc6f08
15:34:39,379 INFO              - Stopped TaskExecutor akka://flink/user/taskmanager_0.
15:34:39,384 INFO      - Removing cache directory C:\Users\BONC\AppData\Local\Temp\flink-web-ui
15:34:39,385 INFO      - Shut down complete.
15:34:39,387 INFO    - Shut down cluster because application is in CANCELED, diagnostics DispatcherResourceManagerComponent has been closed..
15:34:39,387 INFO        - Stopping dispatcher akka://flink/user/dispatcher.
15:34:39,387 INFO        - Stopping all currently running jobs of dispatcher akka://flink/user/dispatcher.
15:34:39,387 INFO    - Closing the SlotManager.
15:34:39,387 INFO    - Suspending the SlotManager.
15:34:39,387 INFO    - Shutting down stack trace sample coordinator.
15:34:39,387 INFO        - Stopped dispatcher akka://flink/user/dispatcher.
15:34:39,395 INFO  $RemotingTerminator         - Shutting down remote daemon.
15:34:39,396 INFO  $RemotingTerminator         - Remote daemon shut down; proceeding with flushing remote transports.
15:34:39,494 INFO  $RemotingTerminator         - Remoting shut down.
15:34:39,526 INFO                - Stopping Akka RPC service.
15:34:39,543 INFO                - Shutting down BLOB cache
15:34:39,544 INFO                - Shutting down BLOB cache
15:34:39,547 INFO                        - Stopped BLOB server at 0.0.0.0:61482
15:34:39,547 INFO                - Stopped Akka RPC service.