flink流处理demo

时间:2025-04-13 07:23:41

flink流处理demo

import ;
import ;
import .Tuple2;
import ;
import ;
import ;
import ;
import ;
import ;

public class KafkaMessageStreaming {

    static private StreamExecutionEnvironment see;

    public static void main(String[] args) throws Exception {

        see = ();
        DataStream<WikipediaEditEvent> edits = (new WikipediaEditsSource());
        KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
                .keyBy(new KeySelector<WikipediaEditEvent, String>() {
                    @Override
                    public String getKey(WikipediaEditEvent event) {
                        return ();
                    }
                });

        DataStream<Tuple2<String, Long>> result = keyedEdits
                .timeWindow((5))
                .fold(new Tuple2<>("", 0L), new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() {
                    @Override
                    public Tuple2<String, Long> fold(Tuple2<String, Long> acc, WikipediaEditEvent event) {
                        ("+++++++++++++++");
                        (());

                        acc.f0 = ();
                        acc.f1 += ();
                        return acc;
                    }
                });

        ();
        ();
    }
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="/POM/4.0.0"
		 xmlns:xsi="http:///2001/XMLSchema-instance"
		 xsi:schemaLocation="/POM/4.0.0 /xsd/maven-4.0.">
	<modelVersion>4.0.0</modelVersion>

	<groupId>wiki-edits</groupId>
	<artifactId>flink</artifactId>
	<version>1.0-SNAPSHOT</version>
	<properties>
		<>1.4.2</>
	</properties>
	<dependencies>

		<dependency>
			<groupId></groupId>
			<artifactId>flink-clients_2.11</artifactId>
			<version>${}</version>
		</dependency>
		<dependency>
			<groupId></groupId>
			<artifactId>flink-connector-wikiedits_2.11</artifactId>
			<version>${}</version>
		</dependency>

		<dependency>
			<groupId></groupId>
			<artifactId>flink-connector-kafka-0.8_2.11</artifactId>
			<version>${}</version>
		</dependency>
		<dependency>
			<groupId></groupId>
			<artifactId>flink-streaming-java_2.11</artifactId>
			<version>${}</version>
		</dependency>
		<dependency>
			<groupId></groupId>
			<artifactId>flink-java</artifactId>
			<version>${}</version>
		</dependency>

		<dependency>
			<groupId></groupId>
			<artifactId>flink-cep_2.11</artifactId>
			<version>${}</version>
		</dependency>

		<dependency>
			<groupId></groupId>
			<artifactId>fastjson</artifactId>
			<version>1.2.37</version>
		</dependency>

		<!-- /artifact/.log4j/log4j-slf4j-impl -->
		<dependency>
			<groupId>.log4j</groupId>
			<artifactId>log4j-slf4j-impl</artifactId>
			<version>2.11.1</version>
		</dependency>


	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId></groupId>
				<artifactId>maven-shade-plugin</artifactId>
				<version>3.0.0</version>
				<executions>
					<execution>
						<phase>package</phase>
						<goals>
							<goal>shade</goal>
						</goals>
						<configuration>
							<artifactSet>
								<excludes>
									<exclude>:jsr305</exclude>
									<exclude>org.slf4j:*</exclude>
									<exclude>log4j:*</exclude>
								</excludes>
							</artifactSet>
							<filters>
								<filter>
									<!-- Do not copy the signatures in the META-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
									<artifact>*:*</artifact>
									<excludes>
										<exclude>META-INF/*.SF</exclude>
										<exclude>META-INF/*.DSA</exclude>
										<exclude>META-INF/*.RSA</exclude>
									</excludes>
								</filter>
							</filters>
							<transformers>
								<transformer implementation="">
									<mainClass>KafkaMessageStreaming</mainClass>
								</transformer>
							</transformers>
						</configuration>
					</execution>
				</executions>
			</plugin>
			<plugin>
				<groupId></groupId>
				<artifactId>maven-compiler-plugin</artifactId>
				<configuration>
					<source>1.7</source>
					<target>1.7</target>
				</configuration>
			</plugin>
		</plugins>
	</build>
</project>