环境准备:
1)需要在maven工程中引入依赖:
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId> org.apache.cassandra</groupId>
<artifactId>cassandra-all</artifactId>
<version>0.8.1</version> <exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions> </dependency>
2)本机是否能telnet 192.178.0.111 9092(kafaka所部署的vmw虚拟机)通? 如果telnet端口不通,则需要关闭192.178.0.111的防火墙:
systemctl stop firewalld.service #停止firewall
systemctl disable firewalld.service #禁止firewall开机启动
一、生产者
首先看以下两种实现示例:
package com.dx; import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.*; import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.Date; public class ProducerTest {
public static void main(String[] args) {
producer_test1(args); producer_test2();
} private static void producer_test2() {
Properties props = new Properties();
props.put("bootstrap.servers", "192.178.0.111:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<String, String>(props);
for(int i = 0; i < 10; i++)
producer.send(new ProducerRecord<String, String>("kafakatopic", Integer.toString(i), Integer.toString(i))); producer.close();
} private static void producer_test1(String[] args) {
String arg0 = args != null && args.length > 0 ? args[0] : "10";
long events = Long.parseLong(arg0);
Random rnd = new Random(); // /opt/kafka_2.12-1.1.0/bin/kafka-console-producer.sh --broker-list 192.178.0.111:9092 --sync --topic kafkatopic
Properties props = new Properties();
props.put("bootstrap.servers", "192.178.0.111:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 配置partitionner选择策略,可选配置
props.put("partitioner.class", "com.dx.SimplePartitioner2"); Producer<String, String> producer = new KafkaProducer<String, String>(props);
for (long nEvents = 0; nEvents < events; nEvents++) {
long runtime = new Date().getTime();
String ip = "192.178.0." + rnd.nextInt(255);
String msg = runtime + ",www.example.com," + ip;
ProducerRecord<String, String> data = new ProducerRecord<String, String>("kafakatopic", ip, msg);
Future<RecordMetadata> send = producer.send(data,
new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e != null) {
e.printStackTrace();
} else {
System.out.println("The offset of the record we just sent is: " + metadata.offset());
}
}
});
}
producer.close();
}
}
SimplePartitioner2.java
package com.dx; import java.util.List;
import java.util.Map; import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo; public class SimplePartitioner2 implements Partitioner {
public void configure(Map<String, ?> map) {
} public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
int partition = 0;
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
String stringKey = (String) key;
int offset = stringKey.lastIndexOf('.');
if (offset > 0) {
partition = Integer.parseInt(stringKey.substring(offset + 1)) % numPartitions;
} return partition;
} public void close() {
}
}
参数设置备注:
1)bootstrap.servers --设置生产者需要连接的kafka地址
2)acks --回令类型
3)retries --重试次数
4)batch.size --批量提交大小
5)linger.ms --提交延迟等待时间(等待时间内可以追加提交)
6)buffer.memory --缓存大小
7)key.serializer|value.serializer --序列化方法
需要注意的有两点:
1、acks回令。如果必须等待回令,那么设置acks为all;否则,设置为-1;等待回令会有性能损耗。
2、生产者在发送消息的过程中,会自己默认批量提交。所以,如果单条指令的发送请求,记得发送完后flush才能生效。
3、SimplePartitioner2.java为kafaka分区,可选项。
二、消费者
以下实现示例:
package com.dx; import kafka.consumer.Consumer;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import kafka.javaapi.consumer.ConsumerConnector;
import org.apache.kafka.common.serialization.StringDeserializer; import java.util.Properties;
import java.util.Arrays;
import java.util.Map;
import java.util.HashMap;
import java.util.List; /**
* zk启动:sh /opt/zookeeper-3.4.11/bin/zkServer.sh start &
* kafka启动:sh /opt/kafka_2.12-1.1.0/bin/kafka-server-start.sh /opt/kafka_2.12-1.1.0/config/server.properties &
*/
public class ConsumerTest {
public static void main(String[] args) {
Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.178.0.111:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG ,"test") ;
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList("kafakatopic")); while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
}
三、测试
先启动productor运行,之后在启动consumer运行。在consumer打印结果如下:
offset = , key = 192.178.0.20, value = ,www.example.com,192.178.0.20
offset = , key = 192.178.0.143, value = ,www.example.com,192.178.0.143
offset = , key = 192.178.0.113, value = ,www.example.com,192.178.0.113
offset = , key = 192.178.0.110, value = ,www.example.com,192.178.0.110
offset = , key = 192.178.0.232, value = ,www.example.com,192.178.0.232
offset = , key = 192.178.0.96, value = ,www.example.com,192.178.0.96
offset = , key = 192.178.0.76, value = ,www.example.com,192.178.0.76
offset = , key = 192.178.0.78, value = ,www.example.com,192.178.0.78
offset = , key = 192.178.0.80, value = ,www.example.com,192.178.0.80
offset = , key = 192.178.0.177, value = ,www.example.com,192.178.0.177
offset = , key = , value =
offset = , key = , value =
offset = , key = , value =
offset = , key = , value =
offset = , key = , value =
offset = , key = , value =
offset = , key = , value =
offset = , key = , value =
offset = , key = , value =
offset = , key = , value =
kafka_2.12-1.1.0 生产与消费java实现示例的更多相关文章
-
Java实现远程服务生产与消费(RPC)的4种方法-RMI,WebService,HttpClient,RestTemplate
目录 一. 通过rmi实现远程服务的生产与消费 远程服务提供者实现. 创建rmi-provider项目(Maven) 远程服务消费者实现 创建rmi-consumer项目 二. 通过WebServic ...
-
kafka的Java客户端示例代码(kafka_2.12-0.10.2.1)
使用0.9开始增加的KafkaProducer和KafkaConsumer. Pom.xml <project xmlns="http://maven.apache.org/POM/4 ...
-
kafka_2.11-0.8.2.1+java 生产消费程序demo示例
Kafka学习8_kafka java 生产消费程序demo示例 kafka是吞吐量巨大的一个消息系统,它是用scala写的,和普通的消息的生产消费还有所不同,写了个demo程序供大家参考.kaf ...
-
Kafka 使用Java实现数据的生产和消费demo
前言 在上一篇中讲述如何搭建kafka集群,本篇则讲述如何简单的使用 kafka .不过在使用kafka的时候,还是应该简单的了解下kafka. Kafka的介绍 Kafka是一种高吞吐量的分布式发布 ...
-
Python 基于Python结合pykafka实现kafka生产及消费速率&;主题分区偏移实时监控
基于Python结合pykafka实现kafka生产及消费速率&主题分区偏移实时监控 By: 授客 QQ:1033553122 1.测试环境 python 3.4 zookeeper- ...
-
c语言使用librdkafka库实现kafka的生产和消费实例(转)
关于librdkafka库的介绍,可以参考kafka的c/c++高性能客户端librdkafka简介,本文使用librdkafka库来进行kafka的简单的生产.消费 一.producer librd ...
-
kafka之三:kafka java 生产消费程序demo示例
kafka是吞吐量巨大的一个消息系统,它是用scala写的,和普通的消息的生产消费还有所不同,写了个demo程序供大家参考.kafka的安装请参考官方文档. 首先我们需要新建一个maven项目,然后在 ...
-
C# 多线程详解 Part.04(Lock、Monitor、生产与消费)
系列1 曾经说过:每个线程都有自己的资源,但代码区是共享的,即每个线程都可以执行相同的函数. 这可能带来的问题就是多个线程同时执行一个函数,并修改同一变量值,这将导致数据的 ...
-
《多线程操作之生产者消费者》(单生产单消费&;多生产多消费)
说明1:假设有一个放商品的盘子(此盘子只能放下一个商品).生产者每次生产一个商品之后,放到这个盘子里,然后唤醒消费者来消费这个面包.消费者消费完这个商品之后,就唤醒生产者生产下一个商品.前提是,只有盘 ...
随机推荐
-
ajaxSubmit
$('button').on('click', function() { $('form').on('submit', function() { var title = $('in ...
-
HQ-SSAO (High-Quality SSAO)
踩了前前后后无数坑,实现方式都试过了10几种,终于得到这个方案.虽说比不上2015最新的far-field AO,但至少在near/middle-field上,算是state of arts的实现了. ...
-
Material Design入门(三)
本文主要包括 CollapsingToolbarLayout实现滚动动画效果 ViewPager+tabLayout实现左右类Tab效果 控件介绍 这次需要用到得新控件比较多,主要有以下几个: Coo ...
-
Demo学习: Dialogs Anonymous Callback
Dialogs\Dialogs Anonymous Callback 窗体回调函数使用. 1. 标准回调函数 ShowMessage(const Msg: string; CallBack: TUni ...
-
基于visual Studio2013解决C语言竞赛题之0404循环求和
题目 解决代码及点评 这道题考验for循环和一个简单的算法 因为每次累加的值有规律,后面一次累加是前面一次累加的两倍 所以可以用简单的循环,计算累加项和累加结果 /************ ...
-
ASP.NET MVC基于标注特性的Model验证:将ValidationAttribute应用到参数上
原文:ASP.NET MVC基于标注特性的Model验证:将ValidationAttribute应用到参数上 ASP.NET MVC默认采用基于标准特性的Model验证机制,但是只有应用在Model ...
-
Vue表单控件绑定
前面的话 本文将详细介绍Vue表单控件绑定 基础用法 可以用 v-model 指令在表单控件元素上创建双向数据绑定.它会根据控件类型自动选取正确的方法来更新元素.v-model本质上不过是语法糖,它负 ...
-
关于linux下ntp时间同步服务的安装与配置
1.安装ntp服务,要使用时间同步.那么服务端与客户端都需要使用如下命令安装NTP软件包 [root@ ~]# yum install ntp -y 2.如果只是作为客户端的话,配置则可以非常简单,编 ...
-
利用QT、QWebview、ffmpeg实现的屏幕录制方案
.katex { display: block; text-align: center; white-space: nowrap; } .katex-display > .katex > ...
-
freeRTOS中文实用教程3--中断管理之计数信号量
1.前言 在中断不频繁的系统中,使用二值信号量没有问题,但是中断频繁发生时,则会有中断丢失的问题. 因为中断发生时延迟任务执行,延迟任务执行的过程中,如果又来了两次中断,则只会处理第一次,第二次将会丢 ...