我的第一个flink_java程序

时间:2022-09-21 13:41:53

之前看了视频学习第一个flink  word count使用,但是对于socket发送数据作为数据源我这里有点忘记了,加上最近有个项目要发布,一直在忙,所以迟迟无法完成;

1、首先我们要有数据源,因为不论是流计算处理还是批次处理,都需要数据源,然后经过transformation转换成我们想要的数据输出到某个地方,这里我们就输出到控制台即可;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket; public class SocketTalkServer { public static void main(String[] args) {
try {
ServerSocket server = null;
// 创建一个端口为9000监听客户端请求的serversocket
try {
server = new ServerSocket(9000);
System.out.println("服务端启动成功:服务端端口号为9000");
} catch (IOException e) {
// 如果连接不上,打印出错信息
System.out.println("can not listen to:"+e);
}
Socket serverSocket = null;
try {
// 使用accept()阻塞等待客户请求,有客户请求则产生一个Socket对象,并继续执行
serverSocket = server.accept();
// 有客户端连接
System.out.println("有个客户端连接:"+serverSocket.getInetAddress()+":"+serverSocket.getPort());
} catch (IOException e) {
// 客户端请求异常
System.out.println(e);
}
String line;
// 通过Socket对象得到输出流,构造printwriter对象
PrintWriter serverPrintWriter = new PrintWriter(serverSocket.getOutputStream());
// 通过控制台构造bufferedreader对象
BufferedReader serverInput = new BufferedReader(new InputStreamReader(System.in));
// 服务端控制台上输入的数据源字符串
String serverLine = serverInput.readLine();
// 如果输入bye,停止循环
while (!serverLine.equals("bye")){
// 向客户端输出字符串
serverPrintWriter.println(serverLine);
// 刷新输出流
serverPrintWriter.flush();
// 在系统控制台上打印输入的内容;
System.out.println("Server:"+serverLine);
// 继续输入然后重新读取字符串
serverLine = serverInput.readLine();
}
serverPrintWriter.close();
serverSocket.close();
server.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}

2、编写flink计算程序,也是我的第一个程序,这里有几个步骤,我觉着视频中的老师写的非常好,就抄过来了,十分易于理解:

我的第一个flink_java程序

package com.flink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector; public class SocketWindowWordCountJava {
public static void main(String[] args) throws Exception {
// 获取所需要的端口号
int port = 9000;
// try{
// ParameterTool parameterTool = ParameterTool.fromArgs(args);
// port = parameterTool.getInt("port");}
// catch (Exception e){
// System.err.println("no port specified. use default 9000");
// port = 9000;
// }
// 获取flink的运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
String hostname = "127.0.0.1";
String delimiter = "\n";
// 链接socket获取输入的数据
DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter);
DataStream<WordIsCount> windowCounts = text.flatMap(new FlatMapFunction<String, WordIsCount>() {
@Override
public void flatMap(String value, Collector<WordIsCount> out) throws Exception {
String[] words = value.split("\\s");
for (String word : words) {
out.collect(new WordIsCount(word, 1L));
}
}
}).keyBy("word").timeWindow(Time.seconds(2), Time.seconds(1))// 指定时间窗口大小为2秒,指定时间间隔为1秒
.sum("count");// 在这里使用sum或者reduce都可以
// 将数据打印到控制台,并设置并行度
windowCounts.print().setParallelism(1); // 这一行代码一定要实现,否则不执行
env.execute("socket window count"); } public static class WordIsCount{
public String word;
public long count; public WordIsCount(String word, long count) {
this.word = word;
this.count = count;
} public WordIsCount() {
} @Override
public String toString() {
return "WordIsCount{" +
"word='" + word + '\'' +
", count=" + count +
'}';
}
}
}

我的第一个flink_java程序的更多相关文章

  1. DirectX游戏编程(一):创建一个Direct3D程序

    一.环境 Visual Studio 2012,DirectX SDK (June 2010) 二.准备 1.环境变量(如没有配置请添加) 变量名:DXSDK_DIR 变量值:D:\Software\ ...

  2. 第一个python程序

    一个python程序的两种执行方式: 1.第一种方式是通过python解释器: cmd->python->进入python解释器->编写python代码->回车. 2.第二种方 ...

  3. 编写第一个MapReduce程序—— 统计气温

    摘要:hadoop安装完成后,像学习其他语言一样,要开始写一个“hello world!” ,看了一些学习资料,模仿写了个程序.对于一个C#程序员来说,写个java程序,并调用hadoop的包,并跑在 ...

  4. 1&period;3 第一个C&num;程序

    几乎没一门编程语言的第一个程序都叫“你好,世界”,所以先在visual studio 中创建一个Helloworld程序. 各部分的详细内容: Main方法是程序运行的起点,最重要的代码就写在Main ...

  5. 一个&period;net程序员的安卓之旅-Eclipse设置代码智能提示功能

    一个.net程序员的安卓之旅-代码智能提示功能 过完年回来就决心开始学安卓开发,就网上买了个内存条加在笔记本上(因为笔记本原来2G内存太卡了,装了vs2010.SQL Server 2008.orac ...

  6. MFC-01-Chapter01&colon;Hello&comma;MFC---1&period;3 第一个MFC程序(02)

    1.3.1 应用程序对象 MFC应用程序的核心就是基于CWinApp类的应用程序对象,CWinApp提供了消息循环来检索消息并将消息调度给应用程序的窗口.当包含头文件<afxwin.h>, ...

  7. Go&excl; new Hello World&comma; 我的第一个Go程序

    以下语句摘自百度百科: Go语言是谷歌2009发布的第二款开源编程语言. Go语言专门针对多处理器系统应用程序的编程进行了优化,使用Go编译的程序可以媲美C或C++代码的速度,而且更加安全.支持并行进 ...

  8. 搭建java开发环境、使用eclipse编写第一个java程序

    搭建java开发环境.使用eclipse编写第一个java程序 一.Java 开发环境的搭建 1.首先安装java SDK(简称JDK). 点击可执行文件 jdk-6u24-windows-i586. ...

  9. 第一个ruby程序

    老实说不是很喜欢去讨论ruby和python的对比,似乎总是把两个语言放在对立的位置上,我觉得没有必要,同样是动态语言,同样是解释型脚本语言,很多特性都是互相影响的,语言本身也在不断进化,我们更应该关 ...

随机推荐

  1. Software Testing Lab1

    Junit和Hamcrest的安装需要先把两个jar文件下载到本地,具体取得方式多种多样,我是直接从同学那要的.新建一个java项目,在新建时直接在库中导入这两个包即可. 这次编程内容是一个测试三角形 ...

  2. SQL Serve里你总要去改变的3个配置选项

    你用安装向导安装了全新的SQL Server,最后你点击了完成按钮.哇噢~~~现在我们可以把我们的服务器进入生产了!抱歉,那并不是真的,因为你的全新SQL Server默认配置是错误的. 是的,你没看 ...

  3. JavaScript基础——使用JavaScript对象

    JavaScript有许多内置对象,如Number(数字).Array(数组).String(字符串).Date(日期)和Math(数学).这些内置对象都有成员属性和方法.除了JavaScript对象 ...

  4. SQLdiag-配置文件-ProfilerCollector

    上一篇,我们讲述了配置文件中与性能计数器相关的PerfmonCollector元素:这一篇我们将讲述与跟踪数据相关的ProfilerCollector元素.在上一篇中使用SD_Detailed.XML ...

  5. HDU2838Cow Sorting(树状数组)

    题目意思是说给你一列数,每次可以将相邻的两个数交换,这一步的代价是这两个数的和,求将所有数排好序的最少代价. 题解: 我们可以这么思考,由于每次都是交换相邻的两个数,所以将一个数放到它自己的位置去后, ...

  6. IOS 开发中判断字符串是否为空字符的方法

    NSUInteger是无符号的整型, NSInteger是有符号的整型,在表视图应用中常见 NSUInteger row= [indexPath row];因为这是显示tableViewCell有多少 ...

  7. openfire&plus;spark&plus;smack实现即时通讯

    近公司项目需要用到即时通讯功能,经过调研发现openfire+spark+smack可以实现.在网上找了很久,资料都十分有限,即使有些朋友实现了也说的不清不楚.于是决定自己研究,耗时一周的时间实现了文 ...

  8. 字符串相似度算法(编辑距离算法 Levenshtein Distance)

    在搞验证码识别的时候需要比较字符代码的相似度用到“编辑距离算法”,关于原理和C#实现做个记录.据百度百科介绍:编辑距离,又称Levenshtein距离(也叫做Edit Distance),是指两个字串 ...

  9. js中window&period;print&lpar;&rpar;去除页眉页脚

    //jsp打印时去除页眉页页脚 打印前加入下面代码即可 var HKEY_Root,HKEY_Path,HKEY_Key; HKEY_Root="HKEY_CURRENT_USER&quot ...

  10. SharePoint 2010 -- &period;Net托管客户端模型简单示例

    .Net托管客户端模型,是SharePoint2010推出的三种客户端模型".NET托管"."ECMAScript"."Sliverlight&quo ...