写了一个小的项目,自定义java对象,发送到kafka
kafka安装搭建这里就不在描述了,解压简单配置即可
直接进入正题吧
一. 自定义java对象,并实现序列化,省略get,set方法
public class Document implements Serializable {
private String title;
private String content;
private String id;
private String date;
private long updatetime;
public byte[] toBytes(){
ByteArrayOutputStream bo = new ByteArrayOutputStream();
ObjectOutputStream oos = null;
try {
oos = new ObjectOutputStream(bo);
oos.writeObject(this);
oos.flush();
oos.close();
bo.close();
} catch (IOException e) {
e.printStackTrace();
}
return bo.toByteArray();
}
public Document toDocument(byte[] bytes){
Document document = null;
try {
ByteArrayInputStream bis = new ByteArrayInputStream (bytes);
ObjectInputStream ois = new ObjectInputStream (bis);
document = (Document) ois.readObject();
ois.close();
bis.close();
} catch (IOException ex) {
ex.printStackTrace();
} catch (ClassNotFoundException ex) {
ex.printStackTrace();
}
return document;
}
}
二. 自定义Encoder
import java.util.Map;
import org.apache.kafka.common.serialization.Serializer;
import com.thunisoft.data.domain.Document;
public class DocumentEncoder implements Serializer<Document> {
@Override
public void configure(Map<String, ?> map, boolean b) {
}
@Override
public byte[] serialize(String s, Document document) {
return document.toBytes();
}
@Override
public void close() {
}
}
三. producer实现
import java.io.IOException;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.stereotype.Service;
import com.thunisoft.data.domain.Document;
import com.thunisoft.data.fy.api.kafka.DocumentProducer;
import com.thunisoft.data.fy.api.kafka.domain.DocumentEncoder;
import com.thunisoft.data.fy.constant.Constants;
public class DocumentProducer {
private static Properties props;
private static KafkaProducer<String, Document> producer;
static {
if (props == null){
props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Constants.KAFKA_METDATA_BROKERS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, DocumentEncoder.class.getName());
//自定义分区
// props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DocumentPartitioner.class.getName());
}
producer = new KafkaProducer<String, Document>(props);
}
public void produce(Document document) throws IOException {
producer.send(new ProducerRecord<String, Document>(Constants.TOPIC, document));
}
}
四. 自定义Decoder
import com.bigdata.frame.data.Document;
import kafka.serializer.Decoder;
public class DocumentDecoder implements Decoder<Document>{
@Override
public Document fromBytes(byte[] bytes) {
Document document = new Document();
return document.toDocument(bytes);
}
}
五. 编写Consumer
import com.bigdata.frame.constant.Constants;
import com.bigdata.frame.kafka.DocumentConsumer;
import com.bigdata.frame.kafka.domain.DocumentDecoder;
import kafka.consumer.ConsumerConfig;
import kafka.javaapi.consumer.ConsumerConnector;
import java.util.Properties;
public class DocumentConsumer {
private static Properties props;
private static ConsumerConnector consumer;
static{
if(props == null){
props = new Properties();
//zookeeper 配置
props.put("zookeeper.connect", Constants.ZOOKEERER_CONNECT);
//group 代表一个消费组
props.put("group.id", Constants.KAFKA_GROUP_ID);
//指定客户端连接zookeeper的最大超时时间
props.put("zookeeper.connection.timeout.ms", Constants.ZOOLEEPER_CONNECT_SESSION_TOMEOUT);
//rebalance.max.retries * rebalance.backoff.ms > zookeeper.session.timeout.ms
// 连接zk的session超时时间
props.put("zookeeper.session.timeout.ms", Constants.ZOOKEEPER_SESSION_TIMEOUT);
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "smallest");
props.put("rebalance.max.retries", "5");
props.put("rebalance.backoff.ms", "1200");
//序列化类
props.put("serializer.class", DocumentDecoder.class.getName());
}
}
public ConsumerConnector getConsumer(){
if(consumer == null){
consumer = consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
}
return consumer;
}
}
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
public class DocumentPartitioner implements Partitioner {
@Override
public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
return 0;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}
七, 测试代码
public static void main(String[] args){
DocumentProducer producer = new DocumentProducer();
Document documentnt = new Document();
documentnt.setTitle("测试");
documentnt.setContent("这是Producer测试");
producer.produce(documentnt);
}
public static void main(String[] args) { Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, 1); StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties()); DocumentDecoder valueDecoder = new DocumentDecoder(); Map<String, List<KafkaStream<String, Document>>> consumerMap = consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder); KafkaStream<String, Document> stream = consumerMap.get(topic).get(0); ConsumerIterator<String, Document> it = stream.iterator(); while (it.hasNext()) { Document document = it.next().message(); System.out.println(document.toString()); } }