作者 王枫 发布于2014年5月15日
综述
本文介绍建立一个在Azure上使用Azure服务总线, 高吞吐量短信平台的必要步骤。在这篇文章中提出的解决方案是在响应由客户的具体要求,建立一个基于Windows Azure技术的复杂远程信息处理应用。
在Windows Azure中的通讯服务
Windows Azure平台通过不同的技术支持信息通信:
- 存储队列
- 服务总线
以下各段将给予每个类型的信息传递技术的概览。
存储队列
您可以使用Windows Azure中的队列,支持组件之间的异步通信。应用程序中的组件可以发布信息到一个队列。其他组件可以接收信息,并提供处理。队列中提供组件之间耐用的持久性存储,以及负载调平,负载均衡,和缩放的好处。
服务总线
Windows Azure服务总线为广泛的交流,大型活动分布,命名和服务发布提供了一个托管,安全和广泛可用的基础设施。服务总线为Windows Communication Foundation(WCF)和其他服务端点提供连接选项 - 包括REST端点 - 否则将很难或根本不可能达到。
针对Windows Server的服务总线是一组可安装的组件,为Windows Sever上提供Windows Azure服务总线的信息传递功能。适用于Windows Server的服务总线,使您能够在自我管理的环境,并在开发计算机上构建,测试和松耦合运行,信息驱动的应用程序。
通讯设计模式
使用存储队列构建松散耦合的信息传递方案
Windows Azure的队列存储是一个用于存储大量的信息,可以在世界任何地方通过身份验证的调用使用HTTP或HTTPS访问服务。一个单一的队列的信息可高达64KB的大小,队列可以包含数百万条信息,高达100TB的总极限容量的存储帐户。常见队列存储用途包括:
- 建异步处理积压的工作
- Windows Azure Web角色中传递信息到Windows Azure Worker角色
队列服务包含以下组件:
URL格式:队列可以通过以下URL格式访问:
http://<storage account>.queue.core.windows.net/<queue>
下面的URL地址针对图中的队列:
http://myaccount.queue.core.windows.net/imagesToDownload
- 存储帐户:所有Windows Azure存储是通过一个存储帐户来访问的。存储帐户是访问队列的*别的命名空间。帐户内表格和队列内容存储的总大小不能超过100TB。
- 队列: 队列中包含一系列信息。所有信息必须在队列中。
- Message: 一个信息,无论任何格式的最大上限是64KB
一个涉及多个组件之间的存储队列典型的架构设计模式,可以类似于下列:
在这个模式中,多个前端Web Role客户端发送信息到一个或多个存储队列,然后多个Worker Role实例从一个或多个队列读取信息及进行处理。
上述架构可以通过以下方式缩放:
- 存储队列:当信息并发数量增加,可以增加更多的存储队列
- Worker Role实例:当信息的数量增加,Worker Role实例的数量可以增加,从而使更多的计算资源可用于处理从队列中的信息。
不过,也有数个点,我们需要考虑到,以确保在现实中的架构可以灵活扩展:
- 存储帐户:一个单一的存储帐户的可扩展性目标是每秒5000条信息
- 存储队列:一个单一队列的可扩展性目标是能够处理最多每秒500条信息
由于上述两个存储帐户和存储队列的限制,如果我们希望能够同时处理大量信息,我们需要分割我们的存储基础设施成多个队列和/或存储帐户。
例如,一个客户端以每秒1000条信息的速度传送到Azure存储队列中,我们希望我们的解决方案以无积压及最快的速度来处理信息,那么我们就需要使用4个队列及分割我们我们的信息传递到4个队列中,看下图。
每个队列每秒可以处理500个信息,包括入口和出口,如果我们想每个队列尽快处理信息,那么每个队列入口和出口每秒应该处理不超过250个信息。既然我们的入口和出口每秒要处理1000条信息,因此,我们需要共4个队列。Web Role客户端将需要分割信息及均匀地分配到可用的队列以分担工作负载。
在另一情况下,客户端以每秒10,000条的速度发送信息到存储队列,那么我们就需要创建多个存储帐户分担工作量。为了处理每秒10,000个信息,我们需要2个存储帐户,每个存储账户下, 需要20个队列。
利用服务总线主题发布订阅信息
服务总线主题和订阅支持信息发布/订阅传递通信模式。当使用主题和订阅,分布式应用程序个组件不直接彼此沟通,而是通过一个主题,充当中介的信息交换。
相比在服务总线队列,每个信息是由单一消费者处理的,主题及订阅使用发布/订阅模式提供了一个对多个形式的通信。一个主题注册多个订阅是可能的。当一个信息被发送到一个话题后,它会被每个订阅独立处理。您可以选择为基于每一个订阅基础上的主题, 注册过滤规则,它可以让你过滤/限制主题订阅中接收的信息。
服务总线主题和订阅,让你在横跨非常大量的用户和应用程序下, 可以扩展到处理一个非常大的信息数目。
自动转发链接服务总线实体来缩放单个主题
自动转发功能能让您使一个订阅或队列链到相同的服务命名下另一个队列或主题。当自动转发启用,服务总线自动移除被放置在第一个队列或订阅(源)的信息,并把它们放入第二队列或主题(目标)。
您可以使用自动转发向外扩充一个单独主题。服务总线限制一个主题订阅的数量。可以通过创建第二层主题容纳更多的订阅。需要注意的是,即使你并不受服务总线订阅数目的限制,增加了第二层的主题,可以提高你的主题的整体吞吐量。
当把别主题串联起来而得到一个拥有许多订阅的复合主题时,建议你有一个中等数量的第一级主题订阅及许多二级的主题订阅。例如,第一级以20个主题订阅,彼等各自链接到第二层200个主题订阅,允许比第一层200个主题订阅每个链接有20个第二层主题订阅更高的吞吐量。
参考案例
下面的示例使用中国的Azure公共云计算平台,及解释利用服务总线在中国Azure平台专门所需的编码。使用的Azure SDK 2.0及此示例介绍了Windows Azure服务总线在Azure SDK 2.0提供的,新的事件驱动编程模式。
Scenario场景
上面的例子是基于建立连接汽车远程信息处理方案,其中远程信息处理单位不断发送车数据到云端。各种车辆数据还有多个接收器。为了支持非常高吞吐量的汽车数据到云中,例子采用了2层自动转发的向外扩展做法。在第一层,每款车数据主题有少量订阅, 例如10 个,每个订阅自动转发到第二层主题。每个第二层主题然后有多个订阅 -接收器 Worker Roles. 每个接收器Worker Role包含多个实例。
取决于实际吞吐量要求,第一层和第二层的主题可以独立扩展, 增加的主题及订阅的数量。以及每个接收器Worker Role可以不断增加对Worker Role实例的向外扩展。
连接到服务总线
中国Azure比全球Azure平台有不同端点URL,全球Azure所提供的一些文档和示例代码并不适合使用在Azure中国环境。下面是需要创建连接到Azure中国服务总线的示例代码:
NamespaceManager namespaceManager;
MessagingFactory messagingFactory;
string namespaceAddress = "<your-namespace>";
string issuerName = "<your-issuer-name>";
string issuerKey = "<your-issuer-key>";
string endpointuriAddress = string.Format("sb://{0}.servicebus.chinacloudapi.cn",
namespaceAddress);
string stsendpointAddress = string.Format("https://{0}-sb.accesscontrol.chinacloudapi.
cn/", NamespaceAddress); TokenProvider tp = TokenProvider.CreateSharedSecretTokenProvider(issuerName,
issuerKey, new Uri(stsendpointAddress));
namespaceManager = new NamespaceManager(new Uri(endpointuriAddress), tp );
messagingFactory = MessagingFactory.Create(endpointuriAddress, tp);
创建主题及自动转发订阅
public bool CreateTopic(string topicNamePrefix, int topicSuffixIndex, int NumOfTopics)
{
bool success;
try
{
var topic = this.namespaceManager.CreateTopic(string.Format("{0}_{1}",
topicNamePrefix, topicSuffixIndex.ToString())); for (int i = 0; i < NumOfTopics; i++)
{
CreateSubscription(topicSuffixIndex, i, topic.Path);
}
success = true;
}
catch (Exception)
{
success = false;
}
return success;
} public bool CreateSubscription( int topicSuffixIndex, int subscriptionIndex,
string srcTopic)
{
bool success = false;
string destTopic = srcTopic + subscriptionIndex.ToString();
TopicDescription dTopic = null;
try
{
dTopic = this.namespaceManager.GetTopic(destTopic);
}
catch (Microsoft.ServiceBus.Messaging.MessagingEntityNotFoundException)
{
dTopic = null;
} if (dTopic == null)
{
dTopic = this.namespaceManager.CreateTopic(destTopic); var subscription = this.namespaceManager.CreateSubscription(dTopic.Path, "
Subscription_0" ); } SubscriptionDescription srcSubscription = new SubscriptionDescription(srcTopic,
string.Format("Subscription{0}_{1}", topicSuffixIndex.ToString(), subscriptionIndex.ToString()));
srcSubscription.ForwardTo = dTopic.Path;
var ForwardSubscription = this.namespaceManager.CreateSubscription(srcSubscription,
new SqlFilter(string.Format("PartitionID='{0}'", subscriptionIndex.ToString()))); return success;
}
执行下面的代码片段创建的主题和订阅。下面的代码执行的结果将创建5个第一层主题及10个第二个层主题。每个第一层主题将有10个自动转发订阅到10个第二层主题。
for (int i = 0; i < 5 i++)
{
CreateTopic(“SBMessageTopic”, i, 10);
}
当从Azure管理门户网站查看时,可以查看以下主题:
第一层有10个主题订阅:
第二层主题包含2个订阅为2个接收器的Work Role。
发送信息个到服务总线主题
下面的示例代码并行发送一批信息到其中一个第一层主题:
public bool SendMessage(string msgid, string message, string label, int nIterations,
int NumOfPartitions )
{
bool success = false;
string topicName = “sbmessagetopic_0”; TopicClient topicClient = this.messagingFactory.CreateTopicClient(topicName); List<CustomMessage> MessagesList = new List<CustomMessage>();
for (int i = 0; i < nIterations; i++)
{
CustomMessage customMessage = new CustomMessage() { Body = message, Date =
DateTime.Now, Label = label , MSGID=msgid};
MessagesList.Add(customMessage);
} Parallel.ForEach<CustomMessage>(MessagesList, new Action<CustomMessage>(
(CarDataMessage) =>
{
BrokeredMessage bm = null;
try
{
bm = new BrokeredMessage(CarDataMessage); bm.Properties["PartitionID"] = RandomDigit(1, NumOfPartitions); topicClient.Send(bm); success = true;
}
catch (Exception)
{
// TODO: do something
}
finally
{
if (bm != null)
{
bm.Dispose();
}
}
}
)); return success;
} public static string RandomDigit(int size, int length)
{
Random random = new Random();
const string digitString = "0123456789";
var chars = Enumerable.Range(0, size)
.Select(x => digitString[random.Next(0, length)]);
return new string(chars.ToArray());
} public class CustomMessage
{
private string msgid;
private DateTime date;
private string body;
private string label; public DateTime Date
{
get { return this.date; }
set { this.date = value; }
} public string Body
{
get { return this.body; }
set { this.body = value; }
} public string Label
{
get { return this.label; }
set { this.label = value; }
} public string MSGID
{
get { return this.msgid; }
set { this.msgid = value; }
}
}
下面的代码片段执行发送信息指令,它发送1000个信息到第一层主题并从第一层主题订阅随机转发每个信息到第二层主题其中一个":
SendMessage(“Your-Msg-ID”, “Message Text” , “Message Label”, 1000, 10);
同时发送大量的信息时,建议利用大型Worker Role并且有多个实例,以提高的并行度。
接收信息
下面的示例代码在Work Role启动由Azure SDK 2.0提供的事件驱动信息泵:
public void StartMessagePump(string topicName, string subscriptionName)
{
SubscriptionClient subscriptionClient = this.messagingFactory.
CreateSubscriptionClient(topicName, subscriptionName, ReceiveMode.PeekLock);
var eventDrivenMessagingOptions = new OnMessageOptions();
eventDrivenMessagingOptions.AutoComplete = true;
eventDrivenMessagingOptions.ExceptionReceived += OnExceptionReceived;
eventDrivenMessagingOptions.MaxConcurrentCalls = 256; subscriptionClient.OnMessage(OnMessageArrived, eventDrivenMessagingOptions);
}
当信息到达时, OnMessage()被调用:
/// <summary>
/// This event will be called each time a message arrives.
/// </summary>
/// <param name="message"></param>
private static void OnMessageArrived(BrokeredMessage message)
{
var custmsg = message.GetBody<CustomMessage>(); System.Diagnostics.PerformanceCounter perfCounterReceived =
new System.Diagnostics.PerformanceCounter("ServiceBusDemo", "Message Receive/
Sec", RoleEnvironment.CurrentRoleInstance.Id, false);
if (perfCounterReceived != null)
{
perfCounterReceived.ReadOnly = false;
perfCounterReceived.IncrementBy(1);
} Trace.WriteLine(string.Format(" > {0} - Received message for Vehicle: {1} (Thread: {2})",
DateTime.Now, custmsg.MSGID, Thread.CurrentThread.ManagedThreadId)); if (custmsg.MSGID == string.Empty)
throw new InvalidOperationException("Invalid Message Id: " + custmsg.MSGID);
}
和下面的功能在发生错误时被调用时:
/// <summary>
/// Event handler for each time an error occurs.
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
static void OnExceptionReceived(object sender, ExceptionReceivedEventArgs e)
{
if (e != null && e.Exception != null)
{
Trace.WriteLine(string.Format(" > Exception received: {0}", e.Exception.Message));
}
}
下面的代码片段是Worker Role执行run()方法进行订阅到第二层主题。这个例子进行订阅10个第二层主题:
for (int i = 0; i < 10; i++)
{
string topic = string.Format("sbmessagetopic_0{0}", i.ToString());
StartMessagePump(topic, "Subscription_1");
}
while (true)
{
Thread.Sleep(10000);
Trace.TraceInformation("SBMessageConsumer1 Working", "Information");
}
测试服务总线吞吐量
上面的测试是通过使用一个2芯信息产生者Worker Role实例和一个2芯信息的接收者实例,在1个第一层主题和10个第二层主题。吞吐量保持在每秒1000 - 1200信息之间。
该解决方案可以通过加入多个第一层主题,以及更多的第二层主题进一步向外扩展。当适当数量的信息接收器的情况下,服务总线解决方案可以轻松地向外扩展支持每秒数千或数万的信息吞吐量。
小结
本文描述了在Azure上处理应用程序和设备间进行信息传递的不同信息传递组件。此外,这篇文章亦介绍了不同的方式在Azure上来扩展信息传递的基础设施,以向外扩展的方法来处理大量信息传递并发。
感谢马国耀对本文的审校。
本文转载自:http://www.infoq.com/cn/articles/experience-of-scalable-messaging-in-azure-environment