.net 连接kafka

时间:2024-10-06 23:03:44

新建两个控制台项目,一个生产者,一个消费者,使用Nuget安装Confluent.Kafka

.net 连接kafka

生产者

static void Main(string[] args)
        {
            var config = new Dictionary<string, object>
            {
                { "bootstrap.servers", "10.202.203.29:9092" }
            };
            bool isDOne = false;

            using (var producer = new Producer<Null, string>(config, null, new StringSerializer(Encoding.UTF8)))
            {
                Console.WriteLine("enter your message:");
                while (!isDOne)
                {
                    var message = Console.ReadLine();
                    if (message == "exit")
                    {
                        isDOne = true;
                    }
                    var dr = producer.ProduceAsync("my-topic", null, message).Result;
                    Console.WriteLine($"Delivered '{dr.Value}' to: {dr.TopicPartitionOffset}");
                }
            }
        }

消费者

static void Main(string[] args)
        {
            Console.WriteLine("waiting for message...");
            var conf = new Dictionary<string, object>
            {
              { "group.id", "test-consumer-group" },
              { "bootstrap.servers", "10.202.203.29:9092" },
              {  },
              { "auto.offset.reset", "earliest" }
            };

            using (var consumer = new Consumer<Null, string>(conf, null, new StringDeserializer(Encoding.UTF8)))
            {
                consumer.OnMessage += (_, msg)
                  => Console.WriteLine($"Read '{msg.Value}' from: {msg.TopicPartitionOffset}");

                consumer.OnError += (_, error)
                  => Console.WriteLine($"Error: {error}");

                consumer.OnConsumeError += (_, msg)
                  => Console.WriteLine($"Consume error ({msg.TopicPartitionOffset}): {msg.Error}");

                consumer.Subscribe("my-topic");

                while (true)
                {
                    consumer.Poll(TimeSpan.FromMilliseconds());
                }
            }
        }

此时如果启动消费者的话会报找不到主机的错误

如果从外部连接需要修改下kafka的配置

vi server.properties

.net 连接kafka

启动生产者和消费者

.net 连接kafka

如果消费者没有启动,生产者推送几条消息,在下次消费者启动后会接收到之前的消息

.net 连接kafka

.net 连接kafka

更多高级用法查看:https://github.com/confluentinc/confluent-kafka-dotnet