Masstransit作为.Net平台下的一款优秀的开源产品却没有得到应有的关注,这段时间有机会阅读了Masstransit的源码,我觉得我有必要普及一下这个框架的使用。
值得一提的是Masstransit的源码写的非常优秀,值得每个想提高自己编程能力的.Net选手阅读,整个代码看起来赏心悦目。反之,每次打开自己公司项目的时候心情都异常沉重。所以不是.Net不行,还是咱们水平不行。
学会了Masstransit你再也不用羡慕别人有Dubbo、Mule、Akka什么的了,当然在某些方面他们的使用场景还是有一些区别。另外插播一条广告:本人目前在西安求职中,如果那位同学有好的工作机会希望能够帮忙推荐。
阅读本篇文章的前提是你需要对消息队列有一些了解,特别是RabbitMq,Masstransit作为一款轻量级的ESB默认支持RabbitMq和MSMQ。本文的例子都使用RabbitMq来介绍,所以你最好能读一下我之前写的《如何优雅的使用RabbitMq》。
简单来说,Masstransit提供了使用消息队列场景的一种抽象,也就是说,如果你有使用消息队列的需求,都可以通过Masstransit来完成,当然如果仅仅是拿消息队列来发个短信、邮件之类的并不能体现出Masstransit的优越性。当整个业务系统都通过Masstransit过来构建和交互的时候,才能真正体现ESB的价值所在。
我写了5不同场景个Demo,方便大家学习和参考。我会重点讲解Real World的案例,也就是如何在真实场景使用Masstransit。如果仅仅是把一些组件融入到了项目中并且能够运行,并不能算是一个合格的架构师,一个合格的架构师一定是可以将某个组件以最佳实践的方式融入到了自己的项目中,并且能够为开发者提供清晰且合理的抽象,然后针对这一方案制定一些约定和规则,随着项目的推进,整个项目的代码都能够有章可循,始终在架构师的掌控之中。
一、发送命令模型(Send Command Pattern)
这种模型最常见的就是CQRS中C,用来向DomainHandler发送一个Command。另外系统的发送邮件服务、发送短信服务也可以通过这种模式来实现。这种模型跟邮递员向邮箱投递邮件有点相似。这一模型的特点是你需要知道对方终结点的地址,意味着你要明确要向哪个地址发送消息。从Masstransit提供的api就可以看出来:
var endPoint =await bus.GetSendEndpoint(sendToUri);
var command = new GreetingCommandA()
{
Id = Guid.NewGuid(),
DateTime = DateTime.Now
}; await endPoint.Send(command);
这个Demo主要由2个工程组成,Client发送消息到Server,Server来响应这一消息。
二、发布/订阅模型(publish/subscribe pattern)
之所以有基于消息传递的分布式应用这种架构模式,很大程度上就是依靠这种模式来完成。一个典型的例子是子系统A发布了一条消息,子系统B和子系统C都可以订阅这一消息并异步处理该消息。而这一过程对子系统A来说是不关心的。从而减少不同的子系统之间的耦合,提高系统的可扩展性。
三、消息的继承层次
用过RabbitMQ的同学应该知道,RabbitMQ提供了3中类型的Exchange,分别为direct、fanout和topic。所有这一切都是为了提供一种路由消息的机制。而这一切是通过匹配一种字符串类型的routingKey来实现的,当然有了Masstransit你就不用这么费劲了。C#作为一种强类型的语言,我们可以通过设计消息的继承层次来实现消息的路由机制。比如我们可以设计下面的消息继承体系:
public interface IMessage
{
Guid Id { get; set; }
} public class Message : IMessage
{
public Guid Id { get; set; }
public string Type { get; set; }
} public class UserUpdatedMessage : Message
{
public Guid Id { get; set; }
}
有了这样的继承体系,我们可以定义下面的Consumer类型:
public class BaseInterfaceMessageConsumer:IConsumer<IMessage>
{
public async Task Consume(ConsumeContext<IMessage> context)
{
await Console.Out.WriteLineAsync($"consumer is BaseInterfaceMessageConsumer,message type is {context.Message.GetType()}");
}
}
还可以定义下面的Consumer类型:
public class UserUpdatedMessageConsumer: IConsumer<UserUpdatedMessage>
{
public async Task Consume(ConsumeContext<UserUpdatedMessage> context)
{
await Console.Out.WriteLineAsync($"consumer is UserUpdatedMessageConsumer,message type is {context.Message.GetType()}");
}
}
这样就可以路由不同的消息到相应的Consumer中了。
四、使用Topshelf来构建windows服务
我们最终要将consumer程序集打成windows服务来安装在产品环境下,Topshelf为我们提供了一组DSL描述的api来创建window服务:
HostFactory.Run(x =>
{
x.Service<GreetingServer>(s =>
{
s.ConstructUsing(name => new GreetingServer());
s.WhenStarted(tc => tc.Start());
s.WhenStopped(tc => tc.Stop());
});
x.StartAutomatically();
x.RunAsLocalSystem();
x.SetDescription("A greeting service");
x.SetDisplayName("Greeting Service");
x.SetServiceName("GreetingService");
});
五、RPC调用(request/response pattern)
我们还可以通过Masstransit实现RPC调用:
var response = await client.Request(new SimpleRequest() {CustomerId = customerId}); Console.WriteLine("Customer Name: {0}", response.CusomerName);
这有点像是一个webservice调用,不过在ESB的设计中我们应该尽量避免这种设计,特别是在异构系统之间,应该尽量采用send command pattern和publish/subscriber pattern。
六、正式场景该如何使用Masstransit
在使用Masstranit的正式场景中,我们主要考虑以下几个方面:
1、配置方式
定义一个抽象类,用来统一配置方式:
public abstract class BusConfiguration
{
public abstract string RabbitMqAddress { get; }
public abstract string QueueName { get; }
public abstract string RabbitMqUserName { get; }
public abstract string RabbitMqPassword { get; }
public abstract Action<IRabbitMqBusFactoryConfigurator,IRabbitMqHost> Configuration { get; } public virtual IBus CreateBus()
{
var bus = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(new Uri(RabbitMqAddress), hst =>
{
hst.Username(RabbitMqUserName);
hst.Password(RabbitMqPassword);
}); Configuration?.Invoke(cfg, host);
}); return bus;
}
}
具体的项目会继承该配置类做对应的配置:如UserManagementBusConfiguration、UserManagementServiceBusConfiguration等
2、能够跟DI容器结合,本例以Castle Windsor Container为例:
在web项目中添加ServiceBusInstaller:
public class ServiceBusInstaller:IWindsorInstaller
{
public void Install(IWindsorContainer container, IConfigurationStore store)
{
container.Register(
Component.For<IBus, IBusControl>()
.Instance(UserManagementBusConfiguration.BusInstance)
.LifestyleSingleton());
}
}
然后我们就可以在controller中注入IBus了:
private readonly IUserProvider _userProvider;
private readonly IBus _bus; public ValuesController(IUserProvider userProvider,IBus bus)
{
_userProvider = userProvider;
_bus = bus;
} [HttpGet]
[Route("api/values/createuser")]
public string CreateUser()
{
//save user in local db _bus.Publish(new UserCreatedEvent() {UserName = "Tom", Email = "tom@google.com"}); return "create user named Tom";
}
同样的道理,在consumer项目中也可以做同样的配置,添加ConsumersInstaller:
public class ConsumersInstaller:IWindsorInstaller
{
public void Install(IWindsorContainer container, IConfigurationStore store)
{
container.Register(
Classes.FromThisAssembly().BasedOn(typeof (IConsumer)).WithServiceBase().WithServiceSelf().LifestyleTransient());
}
}
在Consumer中注入一个组件试试:
public class UserCreatedEventConsumer : IConsumer<UserCreatedEvent>
{
private readonly GreetingWriter _greetingWriter; public UserCreatedEventConsumer(GreetingWriter greetingWriter)
{
_greetingWriter = greetingWriter;
} public async Task Consume(ConsumeContext<UserCreatedEvent> context)
{
_greetingWriter.SayHello(); await Console.Out.WriteLineAsync($"user name is {context.Message.UserName}");
await Console.Out.WriteLineAsync($"user email is {context.Message.Email}");
}
}
把web项目和consumer服务都跑起来看看:
3、重试配置
cfg.UseRetry(Retry.Interval(3, TimeSpan.FromMinutes(1)));
消息消费失败后重试3次,每次间隔1分钟
4、限速器
cfg.UseRateLimit(1000, TimeSpan.FromSeconds(1));
每分钟消息消费数限定在1000之内
5、熔断器
cfg.UseCircuitBreaker(cb =>
{
cb.TrackingPeriod = TimeSpan.FromMinutes(1);
cb.TripThreshold = 15;
cb.ActiveThreshold = 10;
});
参照Martin Folwer对熔断器模式的描述:CircuitBreaker
6、异常处理
public class UserUpdatedEventComsumer
:IConsumer<UserUpdatedEvent>
,IConsumer<Fault<UserUpdatedEvent>>
{
public Task Consume(ConsumeContext<UserUpdatedEvent> context)
{
throw new System.NotImplementedException();
} public async Task Consume(ConsumeContext<Fault<UserUpdatedEvent>> context)
{
await Console.Out.WriteLineAsync($"catch exception: {context.Message.Message}");
}
}
只要继承于对应的Fault<TMessage>即可为对应的消息编写异常处理。
7、单元测试(待续)
8、消息定时发送(待续)
9、自定义中间件(待续)
10、自定义观察者(待续)
11、长生命周期的消费者:Turnout(待续)
12、长生命周期的状态机:saga(待续)
13、Routing slip pattern的实现:Courier(待续)
整个Demo代码提供下载:http://git.oschina.net/richieyangs/RabbitMQ.Practice