Windows Azure Service Bus Topics实现系统松散耦合

时间:2022-07-04 18:22:52

前言

Windows Azure中的服务总线(Service Bus)提供了多种功能, 包括队列(Queue), 主题(Topic),中继(Relay),和通知中心(Notification Hub).

1) 有关Windows Azure Service Bus 的中继功能, 我们在 《在IIS上发布基于Windows Azure Service Bus的WCF服务》做了详细介绍。

2) 有关Windows Azure 通知中心(Notification Hub), 我们在《如何利用Windows Azure的Service Bus Notification Hub推送通知》做了详细介绍。通过Service Bus Notification Hub,我们可以非常方便地向所有注册终端批量推送通知。

那么Service Bus中的主题(Topic)又是起什么作用的呢?

正文

关于Service Bus的Topic是什么的问题,我们必须先了解Service Bus中的另外一个功能Queue. 顾名思义,Service Bus Queue就是一个队列,用来存储消息。消息的发送者和消息的消费者互相没有关联,从而实现系统的松散耦合。

当然,我们知道Windows Azure Storage里面也提供了一个叫做Queue的功能,Windows Azure Queue和Service Bus Queue之间具体的区别,您可以参考下面的白皮书:

Windows Azure Queues and Windows Azure Service Bus Queues

http://msdn.microsoft.com/en-us/library/windowsazure/hh767287.aspx

假设我们有这样一种情形:

我们有一个商品销售公司,其有很多的零售店,分别位于上海,无锡等。各零售店每售出一件商品,其都会和总部的库存系统通讯,通知该件商品又售出一件 。 对于这样的应用,我们不需要零售端和库存系统同时在线。 因此,零售端只需要发送一条消息给Service Bus Queue。 而库存系统一旦上线之后,其从Queue中接受消息,并修改库存。其架构图如下所示:

Windows  Azure Service Bus Topics实现系统松散耦合

对于这样的情形,Service Bus 确实可以满足要求了。

但是,考虑另外一种情形:除了和库存系统通讯之外,假设上海和无锡的店长需要实时的统计商品的销售状况。在采用队列的情况下, 所有的消息一旦被receive之后,就会被delete,因此无法被多次consume。也就是说这些消息被库存系统consume之后,Queue中是不存在任何message来给店长进行统计之用的。

针对这样的需求,Service Bus Topic可以完美地解决这样的需求:Service Bus Topic可以创建多个订阅(Subscription),然后将所有的消息根据订阅的规则复制并发往相应的订阅队列中(Subscription Queue)。 也就是说, 对于一个Topic,如果有多个感兴趣的订阅(Subscription),那么其会自动将这些消息复制并发往所有订阅队列中。

结合上面的场景,我们可以采取下面的解决方案:

  1. 我们创建一个Service Bus Topic,假设叫做DataCollectionQueue.
  2. 针对这个Topic,我们可以创建2个 subscription:

1)      Inventory subscription, 这个subscription里面的消息被库存系统收取。

2)      Dashboard subscription,这个subscription里面的消息被统计系统收取。

因此,采取了Service Bus Topic后,该系统的架构将变为如下:

Windows  Azure Service Bus Topics实现系统松散耦合

实例演示

下面我们就实例演示一下如何使用Service Bus Topic。

·         前提条件:

1)      Visual Studio 2012

2)      Windows Azure SDK2.0

3)      Windows Azure 开发账号

·         实施

1. 在这个实例中, 我们将分为2部分:消息发送端(您可以理解为终端POS的角色) 和消息接收端(您可以理解为库存系统等)。

2. 在消息发送部分,我们将会创建1个名为DataCollectionTopic的Service Bus Topic, 然后添加2个对该Topic感兴趣的订阅,分别为Shanghai subscription和Wuxi subscription。 然后,我们发送100条消息。在发送消息的时候,我们设置了一个Filter,使得具有不同属性的消息分别到了不同Subscription Queue里面。 相关示例代码如下:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text; using Microsoft.ServiceBus;
using Microsoft.ServiceBus.Messaging; namespace ServiceBusTopicDemo_Sender
{
class Program
{
static void Main(string[] args)
{
var connectionString=System.Configuration.ConfigurationManager.AppSettings["Microsoft.ServiceBus.ConnectionString"];
var myNamespaceManager = NamespaceManager.CreateFromConnectionString(connectionString); //Create Topic
if (!myNamespaceManager.TopicExists("DataCollectionTopic"))
{
myNamespaceManager.CreateTopic("DataCollectionTopic");
Console.WriteLine("Service Bus Topic is created");
} //Create Subscription if (!myNamespaceManager.SubscriptionExists("DataCollectionTopic", "Shanghai"))
{
myNamespaceManager.CreateSubscription("DataCollectionTopic","Shanghai",new SqlFilter("site = 'Shanghai'"));
Console.WriteLine("Subscription named Shanghai is created");
} if (!myNamespaceManager.SubscriptionExists("DataCollectionTopic", "Wuxi"))
{
myNamespaceManager.CreateSubscription("DataCollectionTopic", "Wuxi", new SqlFilter("site = 'Wuxi'"));
Console.WriteLine("Subscription named Wuxi is created");
} //Begin to send messages
TopicClient client = TopicClient.CreateFromConnectionString(connectionString, "DataCollectionTopic"); for (int i = ; i < ; i++)
{
BrokeredMessage message = new BrokeredMessage("Test message " + i.ToString());
if (i % == )
message.Properties["site"] = "Shanghai";
else
message.Properties["site"] = "Wuxi";
client.Send(message);
Console.WriteLine(i + + " messages are sent");
} }
}
}

运行后,我们可以看到,100条消息被分别发往Shanghai subscription和Wuxi subscription的队列中, 如下所示:

Windows  Azure Service Bus Topics实现系统松散耦合

3. 在消息接收端,我们需要从你所感兴趣的Subscription Queue中接收相应的消息。下面的代码演示了如何从Shanghai Subscription 的Queue中接收相应消息并作后期处理,比如更新库存系统等。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text; using Microsoft.ServiceBus;
using Microsoft.ServiceBus.Messaging; namespace ServiceBusTopicDemo_Consumer
{
class Program
{
static void Main(string[] args)
{
var connectionString = System.Configuration.ConfigurationManager.AppSettings["Microsoft.ServiceBus.ConnectionString"];
SubscriptionClient subClient = SubscriptionClient.CreateFromConnectionString(connectionString, "DataCollectionTopic","Shanghai");
subClient.Receive(); while (true)
{
BrokeredMessage message = subClient.Receive(); if (message != null)
{
try
{
// process the message like update the inventory system.
Console.WriteLine("Body: " + message.GetBody<string>());
Console.WriteLine("MessageID: " + message.MessageId);
Console.WriteLine("Site: " + message.Properties["site"]);
message.Complete();
}
catch (Exception)
{
message.Abandon();
} } else
break;
}
} }
}

运行时的结果如下:

Windows  Azure Service Bus Topics实现系统松散耦合

4. 在Shanghai subscription队列里面的消息被接收之后, 我们看到该队列也重新变为了空, 如下所示:

Windows  Azure Service Bus Topics实现系统松散耦合

参考文档

How to Use Service Bus Topics/Subscriptions

http://www.windowsazure.com/en-us/develop/net/how-to-guides/service-bus-topics/

An Introduction to Service Bus Topics

http://blogs.msdn.com/b/appfabric/archive/2011/05/25/an-introduction-to-service-bus-topics.aspx

Windows Azure Queues and Windows Azure Service Bus Queues - Compared and Contrasted

http://msdn.microsoft.com/en-us/library/windowsazure/hh767287.aspx

Service Bus Topics

http://msdn.microsoft.com/en-us/library/windowsazure/hh532029.aspx

希望以上内容对您有所帮助

Winston He