https://www.elastic.co/cn/blog/building-real-time-dashboard-applications-with-apache-flink-elasticsearch-and-kibana
F* Hueske
Gaining actionable insights from continuously produced data in real-time is a common requirement for many businesses today. A wide-spread use case for real-time data processing is dashboarding. A typical architecture to support such a use case is based on a data stream processor, a data store with low latency read/write access, and a visualization framework.
In this blog post, we demonstrate how to build a real-time dashboard solution for stream data analytics using Apache Flink, Elasticsearch, and Kibana. The following figure depicts our system architecture.
In our architecture, Apache Flink executes stream analysis jobs that ingest a data stream, apply transformations to analyze, transform, and model the data in motion, and write their results to an Elasticsearch index. Kibana connects to the index and queries it for data to visualize. All components of our architecture are open source systems under the Apache License 2.0. We show how to implement a Flink DataStream program that analyzes a stream of taxi ride events and writes its results to Elasticsearch and give instructions on how to connect and configure Kibana to visualize the analyzed data in real-time.
Why use Apache Flink for stream processing?
Before we dive into the details of implementing our demo application, we discuss some of the features that make Apache Flink an outstanding stream processor. Apache Flink 0.10, which was recently released, comes with a competitive set of stream processing features, some of which are unique in the open source domain. The most important ones are:
- Support for event time and out of order streams: In reality, streams of events rarely arrive in the order that they are produced, especially streams from distributed systems and devices. Until now, it was up to the application programmer to correct this “time drift”, or simply ignore it and accept inaccurate results, as streaming systems (at least in the open source world) had no support for event time (i.e., processing events by the time they happened in the real world). Flink 0.10 is the first open source engine that supports out of order streams and which is able to consistently process events according to their timestamps.
- Expressive and easy-to-use APIs in Scala and Java: Flink's DataStream API ports many operators which are well known from batch processing APIs such as map, reduce, and join to the streaming world. In addition, it provides stream-specific operations such as window, split, and connect. First-class support for user-defined functions eases the implementation of custom application behavior. The DataStream API is available in Scala and Java.
- Support for sessions and unaligned windows: Most streaming systems have some concept of windowing, i.e., a grouping of events based on some function of time. Unfortunately, in many systems these windows are hard-coded and connected with the system’s internal checkpointing mechanism. Flink is the first open source streaming engine that completely decouples windowing from fault tolerance, allowing for richer forms of windows, such as sessions.
- Consistency, fault tolerance, and high availability: Flink guarantees consistent state updates in the presence of failures (often called “exactly-once processing”), and consistent data movement between selected sources and sinks (e.g., consistent data movement between Kafka and HDFS). Flink also supports worker and master failover, eliminating any single point of failure.
- Low latency and high throughput: We have clocked Flink at 1.5 million events per second per core, and have also observed latencies in the 25 millisecond range for jobs that include network data shuffling. Using a tuning knob, Flink users can navigate the latency-throughput trade off, making the system suitable for both high-throughput data ingestion and transformations, as well as ultra low latency (millisecond range) applications.
- Connectors and integration points: Flink integrates with a wide variety of open source systems for data input and output (e.g., HDFS, Kafka, Elasticsearch, HBase, and others), deployment (e.g., YARN), as well as acting as an execution engine for other frameworks (e.g., Cascading, Google Cloud Dataflow). The Flink project itself comes bundled with a Hadoop MapReduce compatibility layer, a Storm compatibility layer, as well as libraries for machine learning and graph processing.
- Developer productivity and operational simplicity: Flink runs in a variety of environments. Local execution within an IDE significantly eases development and debugging of Flink applications. In distributed setups, Flink runs at massive scale-out. The YARN mode allows users to bring up Flink clusters in a matter of seconds. Flink serves monitoring metrics of jobs and the system as a whole via a well-defined REST interface. A build-in web dashboard displays these metrics and makes monitoring of Flink very convenient.
The combination of these features makes Apache Flink a unique choice for many stream processing applications.
Building a demo application with Flink, Elasticsearch, and Kibana
Our demo ingests a stream of taxi ride events and identifies places that are popular within a certain period of time, i.e., we compute every 5 minutes the number of passengers that arrived at each location within the last 15 minutes by taxi. This kind of computation is known as a sliding window operation. We share a Scala implementation of this application (among others) on Github. You can easily run the application from your IDE by cloning the repository and importing the code. The repository's README file provides more detailed instructions.
Analyze the taxi ride event stream with Apache Flink
For the demo application, we generate a stream of taxi ride events from a public dataset of the New York City Taxi and LimousineCommission (TLC). The data set consists of records about taxi trips in New York City from 2009 to 2015. We took some of this data and converted it into a data set of taxi ride events by splitting each trip record into a ride start and a ride end event. The events have the following schema:
rideId: Long time: DateTime // start or end time isStart: Boolean // true = ride start, false = ride end location: GeoPoint // lon/lat of pick-up or drop-off location passengerCnt: short travelDist: float // -1 on start events
We implemented a custom SourceFunction
to serve a DataStream[TaxiRide]
from the ride event data set. In order to generate the stream as realistically as possible, events are emitted by their timestamps. Two events that occurred ten minutes after each other in reality are ingested by Flink with a ten minute lag. A speed-up factor can be specified to “fast-forward” the stream, i.e., with a speed-up factor of 2.0, these events are served five minutes apart. Moreover, the source function adds a configurable random delay to each event to simulate the real-world jitter. Given this stream of taxi ride events, our task is to compute every five minutes the number of passengers that arrived within the last 15 minutes at locations in New York City by taxi.
As a first step we obtain a StreamExecutionEnvironment
and set the TimeCharacteristic
to EventTime
. Event time mode guarantees consistent results even in case of historic data or data which is delivered out-of-order.
val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
Next, we define the data source that generates a DataStream[TaxiRide]
with at most 60 seconds serving delay (events are out of order by max. 1 minute) and a speed-up factor of 600 (10 minutes are served in 1 second).
// Define the data source val rides: DataStream[TaxiRide] = env.addSource(new TaxiRideSource( “./data/nycTaxiData.gz”, 60, 600.0f))
Since we are only interested in locations that people travel to (and not where they come from) and because the original data is a little bit messy (locations are not always correctly specified), we apply a few filters to first cleanse the data.
val cleansedRides = rides // filter for ride end events .filter( !_.isStart ) // filter for events in NYC .filter( r => NycGeoUtils.isInNYC(r.location) )
The location of a taxi ride event is defined as a pair of continuous longitude/latitude values. We need to map them into a finite set of regions in order to be able to aggregate events by location. We do this by defining a grid of approx. 100x100 meter cells on the area of New York City. We use a utility function to map event locations to cell ids and extract the passenger count as follows:
// map location coordinates to cell Id, timestamp, and passenger count val cellIds: DataStream[(Int, Long, Short)] = cleansedRides .map { r => ( NycGeoUtils.mapToGridCell(r.location), r.time.getMillis, r.passengerCnt ) }
After these preparation steps, we have the data that we would like to aggregate. Since we want to compute the passenger count for each location (cell id), we start by keying (partitioning by key) the stream by cell id (_._1
). Subsequently, we define a sliding time window and run a <code>WindowFunction</code>; by calling apply()
:
val passengerCnts: DataStream[(Int, Long, Int)] = cellIds // key stream by cell Id .keyBy(_._1) // define sliding window on keyed stream .timeWindow(Time.minutes(15), Time.minutes(5)) // count events in window .apply { ( cell: Int, window: TimeWindow, events: Iterable[(Int, Short)], out: Collector[(Int, Long, Int)]) => out.collect( ( cell, window.getEnd, events.map( _._2 ).sum ) ) }
The timeWindow()
operation groups stream events into finite sets of records on which a window or aggregation function can be applied. For our application, we call apply() to process the windows using a WindowFunction
. The WindowFunction
receives four parameters, a Tuple that contains the key of the window, a Window object that contains details such as the start and end time of the window, an Iterable
over all elements in the window, and a Collector
to collect the records emitted by the WindowFunction
. We want to count the number of passengers that arrive within the window’s time bounds. Therefore, we have to emit a single record that contains the grid cell id, the end time of the window, and the sum of the passenger counts which is computed by extracting the individual passenger counts from the iterable (events.map( _._2)
) and summing them (.sum
).
Finally, we translate the cell id back into a GeoPoint
(referring to the center of the cell) and print the result stream to the standard output. The final env.execute()
call takes care of submitting the program for execution.
val cntByLocation: DataStream[(Int, Long, GeoPoint, Int)] = passengerCnts // map cell Id back to GeoPoint .map( r => (r._1, r._2, NycGeoUtils.getGridCellCenter(r._1), r._3 ) ) cntByLocation // print to console .print() env.execute(“Total passenger count per location”)
If you followed the instructions to import the demo code into your IDE, you can run theSlidingArrivalCount.scala program by executing its main()
methods. You will see Flink’s log messages and the computed results being printed to the standard output.
You might wonder why the the program produces results much faster than once every five minutes per location. This is due to the event time processing mode. Since all time-based operations (such as windows) are based on the timestamps of the events, the program becomes independent of the speed at which the data is served. This also means that you can process historic data which is read at full speed from some data store and data which is continuously produced with exactly the same program.
Our streaming program will run for a few minutes until the packaged data set is completely processed but you can terminate it at any time. As a next step, we show how to write the result stream into an Elasticsearch index.
Prepare the Elasticsearch
The Flink Elasticsearch connector depends on Elasticsearch 1.7.3. Follow these steps to setup Elasticsearch and to create an index.
- Download Elasticsearch 1.7.3 as .tar (or .zip) archive here.
- Extract the archive file:
tar xvfz elasticsearch-1.7.3.tar.gz
- Enter the extracted directory and start Elasticsearch
cd elasticsearch-1.7.3 ./bin/elasticsearch
- Create an index called “nyc-idx”:
curl -XPUT "http://localhost:9200/nyc-idx"
- Create an index mapping called “popular-locations”:
curl -XPUT "http://localhost:9200/nyc-idx/_mapping/popular-locations" -d' { "popular-locations" : { "properties" : { "cnt": {"type": "integer"}, "location": {"type": "geo_point"}, "time": {"type": "date"} } } }'
The SlidingArrivalCount.scala program is prepared to write data to the Elasticsearch index you just created but requires a few parameters to be set at the beginning of the main() function. Please set the parameters as follows:
val writeToElasticsearch = true val elasticsearchHost = // look up the IP address in the Elasticsearch logs val elasticsearchPort = 9300
Now, everything is set up to fill our index with data. When you run the program by executing the main() method again, the program will write the resulting stream to the standard output as before but also insert the records into the nyc-idx
Elasticsearch index.
If you later want to clear the nyc-idx index, you can simply drop the mapping by running
curl -XDELETE 'http://localhost:9200/nyc-idx/popular-locations'
and create the mapping again with the previous command.
Visualizing the results with Kibana
In order to visualize the data that is inserted into Elasticsearch, we install Kibana 4.1.3 which is compatible with Elasticsearch 1.7.3. The setup is basically the same as for Elasticsearch.
1. Download Kibana 4.1.3 for your environment here.
2. Extract the archive file.
3. Enter the extracted folder and start Kibana by running the start script: ./bin/kibana
4. Open http://localhost:5601 in your browser to access Kibana.
Next we need to configure an index pattern. Enter the index name “nyc-idx” and click on “Create”. Do not uncheck the “Index contains time-based events” option. Now, Kibana knows about our index and we can start to visualize our data.
First click on the “Discover” button at the top of the page. You will find that Kibana tells you “No results found”.
This is because Kibana restricts time-based events by default to the last 15 minutes. Since our taxi ride data stream starts on January, 1st 2013, we need to adapt the time range that is considered by Kibana. This is done by clicking on the label “Last 15 Minutes” in the top right corner and entering an absolute time range starting at 2013-01-01 and ending at 2013-01-06.
We have told Kibana where our data is and the valid time range and can continue to visualize the data. For example we can visualize the arrival counts on a map. Click on the “Visualize” button at the top of the page, select “Tile map”, and click on “From a new search”.
See the following screenshot for the tile map configuration (left-hand side).
Another interesting visualization is to plot the number of arriving passengers over time. Click on “Visualize” at the top, select “Vertical bar chart”, and select “From a new search”. Again, have a look at the following screenshot for an example for how to configure the chart.
Kibana offers many more chart types and visualization options which are out of the scope of this post. You can easily play around with this setup, explore Kibana’s features, and implement your own Flink DataStream programs to analyze taxi rides in New York City.
We’re done and hope you had some fun
In this blog post we demonstrated how to build a real-time dashboard application with Apache Flink, Elasticsearch, and Kibana. By supporting event-time processing, Apache Flink is able to produce meaningful and consistent results even for historic data or in environments where events arrive out-of-order. The expressive DataStream API with flexible window semantics results in significantly less custom application logic compared to other open source stream processing solutions. Finally, connecting Flink with Elasticsearch and visualizing the real-time data with Kibana is just a matter of a few minutes. We hope you enjoyed running our demo application and had fun playing around with the code.
F* Hueske is a PMC member of Apache Flink. He is contributing to Flink since its earliest days when it started as research project as part of his PhD studies at TU Berlin. F* did internships with IBM Research, SAP Research, and Microsoft Research and is a co-founder of data Artisans, a Berlin-based start-up devoted to foster Apache Flink. He is interested in distributed data processing and query optimization.