flink流处理demo
import ;
import ;
import .Tuple2;
import ;
import ;
import ;
import ;
import ;
import ;
public class KafkaMessageStreaming {
static private StreamExecutionEnvironment see;
public static void main(String[] args) throws Exception {
see = ();
DataStream<WikipediaEditEvent> edits = (new WikipediaEditsSource());
KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
.keyBy(new KeySelector<WikipediaEditEvent, String>() {
@Override
public String getKey(WikipediaEditEvent event) {
return ();
}
});
DataStream<Tuple2<String, Long>> result = keyedEdits
.timeWindow((5))
.fold(new Tuple2<>("", 0L), new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> fold(Tuple2<String, Long> acc, WikipediaEditEvent event) {
("+++++++++++++++");
(());
acc.f0 = ();
acc.f1 += ();
return acc;
}
});
();
();
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="/POM/4.0.0"
xmlns:xsi="http:///2001/XMLSchema-instance"
xsi:schemaLocation="/POM/4.0.0 /xsd/maven-4.0.">
<modelVersion>4.0.0</modelVersion>
<groupId>wiki-edits</groupId>
<artifactId>flink</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<>1.4.2</>
</properties>
<dependencies>
<dependency>
<groupId></groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${}</version>
</dependency>
<dependency>
<groupId></groupId>
<artifactId>flink-connector-wikiedits_2.11</artifactId>
<version>${}</version>
</dependency>
<dependency>
<groupId></groupId>
<artifactId>flink-connector-kafka-0.8_2.11</artifactId>
<version>${}</version>
</dependency>
<dependency>
<groupId></groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${}</version>
</dependency>
<dependency>
<groupId></groupId>
<artifactId>flink-java</artifactId>
<version>${}</version>
</dependency>
<dependency>
<groupId></groupId>
<artifactId>flink-cep_2.11</artifactId>
<version>${}</version>
</dependency>
<dependency>
<groupId></groupId>
<artifactId>fastjson</artifactId>
<version>1.2.37</version>
</dependency>
<!-- /artifact/.log4j/log4j-slf4j-impl -->
<dependency>
<groupId>.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.11.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId></groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="">
<mainClass>KafkaMessageStreaming</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId></groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
</plugins>
</build>
</project>