新建两个控制台项目,一个生产者,一个消费者,使用Nuget安装Confluent.Kafka
生产者
static void Main(string[] args) { var config = new Dictionary<string, object> { { "bootstrap.servers", "10.202.203.29:9092" } }; bool isDOne = false; using (var producer = new Producer<Null, string>(config, null, new StringSerializer(Encoding.UTF8))) { Console.WriteLine("enter your message:"); while (!isDOne) { var message = Console.ReadLine(); if (message == "exit") { isDOne = true; } var dr = producer.ProduceAsync("my-topic", null, message).Result; Console.WriteLine($"Delivered '{dr.Value}' to: {dr.TopicPartitionOffset}"); } } }
消费者
static void Main(string[] args) { Console.WriteLine("waiting for message..."); var conf = new Dictionary<string, object> { { "group.id", "test-consumer-group" }, { "bootstrap.servers", "10.202.203.29:9092" }, { }, { "auto.offset.reset", "earliest" } }; using (var consumer = new Consumer<Null, string>(conf, null, new StringDeserializer(Encoding.UTF8))) { consumer.OnMessage += (_, msg) => Console.WriteLine($"Read '{msg.Value}' from: {msg.TopicPartitionOffset}"); consumer.OnError += (_, error) => Console.WriteLine($"Error: {error}"); consumer.OnConsumeError += (_, msg) => Console.WriteLine($"Consume error ({msg.TopicPartitionOffset}): {msg.Error}"); consumer.Subscribe("my-topic"); while (true) { consumer.Poll(TimeSpan.FromMilliseconds()); } } }
此时如果启动消费者的话会报找不到主机的错误
如果从外部连接需要修改下kafka的配置
vi server.properties
启动生产者和消费者
如果消费者没有启动,生产者推送几条消息,在下次消费者启动后会接收到之前的消息
更多高级用法查看:https://github.com/confluentinc/confluent-kafka-dotnet