基于kafka-net实现的可以长链接的消息生产者

时间:2022-11-26 15:18:32

  今天有点时间,我就来说两句。最近接触的Kafka相关的东西要多一些,其实以前也接触过,但是在项目使用中的经验不是很多。最近公司的项目里面使用了Kafka消息中间件,由于以前的人员编写的客户端的类不是很好,没有设计的概念,就是一个简单类的功能罗列,没有考虑到后期的扩展和维护(以后可能会兼容其他形式的消息队列,要做到无缝衔接),所以这个重构的任务就落到我的身上。

  先说说我的感受,然后再贴出代码的实现吧。我第一次是基于Confluent.Kafka编写的Kafka消息生产者,后来经过测试,同步操作的时间比较长,要完成20万数据发送消息并更新到数据库的时间大概是16-18分钟,这个结果有点让人不能接受。为了提高性能,也做了很多测试,都没有办法解决这个问题。后来抱着试试看的想法,我又基于kafka-net重新实现了Kafka消息的生产者。经过测试,完成同样的任务,时间大概需要3分钟左右。两种实现方法完成同样的任务,都是以同步的方式生产消息,并将消息成功发送到Broker后,再将数据插入到数据库做记录。大家不要纠结为什么这样使用消息队列,这是上头的做法,我还不能做大的改动,我也无奈。

  目前看,基于kafka-net实现的消息生产者在生产消息并发送成功所需要的时间要比基于Confluent.Kafka实现的消息生产者的所需要的时间要少,尤其是发送的数据越多,这个时间的差距越大。具体的原因还不清楚,如果有高手可以不吝赐教。好了,我该上代码了。

  开始代码之前,要说明一点:Confluent.Kafka的Broker是不需要带Http://这个前缀的,但是 kafka-net 的Broker是有http://这个前缀的,大家要注意这个,刚开始的时候我也被坑了一下子。

using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks; namespace Enterprise.Framework.MessageQueue
{
/// <summary>
/// 消息生产者的接口定义,所有消息生产者的实现必须继承该接口
/// </summary>
public interface IMessageProducer
{
/// <summary>
/// 将指定的消息内容发送到消息服务器并存放在指定的主题名称里
/// </summary>
/// <param name="topic">发送消息的主题名称,这个主题就是对消息的分类,不同的主题存放不同的消息,该参数不能为空,空值会抛出异常</param>
/// <param name="message">需要发送的消息内容,该参数不能为空,空值会抛出异常</param>
void Produce(string topic, string message);
}
}
 using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks; namespace Enterprise.Framework.MessageQueue
{
/// <summary>
/// Kafka消息生产者的接口定义,所有Kafka消息生产者的实现必须继承该接口
/// </summary>
public interface IKafkaMessageProducer : IMessageProducer
{
/// <summary>
/// 将指定的消息内容发送到消息服务器并存放在指定的主题名称里
/// </summary>
/// <param name="topic">发送消息的主题名称,这个主题就是对消息的分类,不同的主题存放不同的消息,该参数不能为空,空值会抛出异常</param>
/// <param name="message">需要发送的消息内容,该参数不能为空,空值会抛出异常</param>
/// <param name="producedAction">当消息生产完成并成功发送到服务器后,可以对成功生产并发送的消息执行代理所封装方法的操作,默认值为空</param>
void Produce(string topic, string message, Action<MessageResult> producedAction = null);
}
}
 using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.InteropServices;
using System.Text; namespace Enterprise.Framework.AbstractInterface
{
/// <summary>
/// 该抽象类定义了所有需要释放资源类型的抽象类
/// </summary>
public abstract class DisposableBase : IDisposable
{
private bool disposed = false; /// <summary>
/// 实现IDisposable中的Dispose方法
/// </summary>
public void Dispose()
{
//必须为true
Dispose(true);
//通知垃圾回收机制不再调用终结器(析构器)
GC.SuppressFinalize(this);
} /// <summary>
/// 不是必要的,提供一个Close方法仅仅是为了更符合其他语言(如C++)的规范
/// </summary>
public void Close()
{
Dispose();
} /// <summary>
/// 必须,以备程序员忘记了显式调用Dispose方法
/// </summary>
~DisposableBase()
{
//必须为false
Dispose(false);
} /// <summary>
/// 非密封类修饰用protected virtual
/// 密封类修饰用private
/// </summary>
/// <param name="disposing">是否要清理托管资源,true表示需要清理托管资源,false表示不需要清理托管资源</param>
protected virtual void Dispose(bool disposing)
{
if (disposed)
{
return;
}
if (disposing)
{
// 清理托管资源
DisposeManagedResources();
}
// 清理非托管资源
DisposeUnmanagedResource();
//让类型知道自己已经被释放
disposed = true;
} /// <summary>
/// 释放托管资源
/// </summary>
protected abstract void DisposeManagedResources(); /// <summary>
/// 释放非托管资源
/// </summary>
protected abstract void DisposeUnmanagedResource();
}
}
 using KafkaNet;
using KafkaNet.Model;
using KafkaNet.Protocol;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using ThreeSoft.Framework.AbstractInterface; namespace Enterprise.Framework.MessageQueue
{
/// <summary>
/// Kafka消息生产者具体的实现类,可以针对长链接进行消息发送处理,不用频繁进行消息组件的创建和销毁的工作
/// </summary>
public sealed class KafkaMessageKeepAliveProducer : DisposableBase, IDisposable, IKafkaMessageProducer
{
#region 私有字段 private KafkaNet.Producer _producer;
private BrokerRouter _brokerRouter;
private string _broker; #endregion #region 构造函数 /// <summary>
/// 通过构造函数初始化消息队列的服务器
/// </summary>
/// <param name="broker">消息队列服务器地址,该值不能为空</param>
public KafkaMessageKeepAliveProducer(string broker)
{
if (string.IsNullOrEmpty(broker) || string.IsNullOrWhiteSpace(broker))
{
throw new ArgumentNullException("消息队列服务器的地址不可以为空!");
} #region kafka-net实现 Uri[] brokerUriList = null; if (broker.IndexOf(',') >= )
{
string[] brokers = broker.Split(',');
brokerUriList = new Uri[brokers.Length]; for (int i = ; i < brokers.Length; i++)
{
brokerUriList[i] = new Uri(brokers[i]);
}
}
else
{
brokerUriList = new Uri[] { new Uri(broker) };
} var kafkaOptions = new KafkaOptions(brokerUriList);
_brokerRouter = new BrokerRouter(kafkaOptions);
_producer = new KafkaNet.Producer(_brokerRouter); #endregion _broker = broker;
} #endregion #region 实例属性 /// <summary>
/// 获取消息服务器的地址
/// </summary>
public string Broker
{
get { return _broker; }
} #endregion #region 发送消息的方法 /// <summary>
/// 将指定的消息内容发送到消息服务器并存放在指定的主题名称里
/// </summary>
/// <param name="topic">发送消息的主题名称,这个主题就是对消息的分类,不同的主题存放不同的消息,该参数不能为空,空值会抛出异常</param>
/// <param name="message">需要发送的消息内容,该参数不能为空,空值会抛出异常</param>
/// <param name="producedAction">当消息生产完成并成功发送到服务器后,可以对成功生产并发送的消息执行代理所封装方法的操作</param>
public void Produce(string topic, string message, Action<MessageResult> producedAction = null)
{
#region 同步实现 var currentDatetime = DateTime.Now;
var key = currentDatetime.Second.ToString();
var events = new[] { new KafkaNet.Protocol.Message(message, key) };
List<ProduceResponse> result = _producer.SendMessageAsync(topic, events).Result; if (producedAction != null && result != null && result.Count > )
{
MessageResult messageResult = new MessageResult { Broker = Broker, GroupID = null, Message = message, Offset = result[].Offset, Partition = result[].PartitionId, Topic = result[].Topic };
producedAction(messageResult);
} #endregion
} /// <summary>
/// 将指定的消息内容发送到消息服务器并存放在指定的主题名称里
/// </summary>
/// <param name="topic">发送消息的主题名称,这个主题就是对消息的分类,不同的主题存放不同的消息,该参数不能为空,空值会抛出异常</param>
/// <param name="message">需要发送的消息内容,该参数不能为空,空值会抛出异常</param>
public void Produce(string topic, string message)
{
Produce(topic, message, null);
} #endregion #region 实现消息队列资源的释放 /// <summary>
/// 析构函数释放资源
/// </summary>
~KafkaMessageKeepAliveProducer()
{
Dispose(false);
} /// <summary>
/// 释放托管资源
/// </summary>
protected override void DisposeManagedResources()
{
if (_producer != null)
{
_producer.Dispose();
}
if (_brokerRouter != null)
{
_brokerRouter.Dispose();
}
} /// <summary>
/// 释放非托管资源
/// </summary>
protected override void DisposeUnmanagedResource(){} #endregion
}
}

好了,今天就写到这里了,每天进步一点点,努力坚持。不忘初心,继续努力吧,欢迎大家前来讨论。