应用实例操作步骤如下:
一:建立两个用来交换的数据库实例,并在这些实例中启用Service Broker活动,T-SQL如下:
------(一)、启用数据库的Service Broker活动
-- Enabling Databases for Service Broker Activity
USE master
GO
IF NOT EXISTS(SELECTname FROM sys.databasesWHERE name = 'SSB_Book')
CREATE DATABASESSB_Book
GO
ALTER DATABASESSB_Book SET ENABLE_BROKER
GO
ALTER DATABASESSB_Book SET TRUSTWORTHY ON
GO
USE SSB_Book
GO
CREATE TABLE Publications
(
Publication UNIQUEIDENTIFIERNOT NULL PRIMARY KEY,
Subject NVARCHAR(MAX)NOT NULL,
OriginalXml XMLNOT NULL
)
GO
CREATE TABLE Subscriptions
(
Subscriber UNIQUEIDENTIFIERNOT NULL PRIMARY KEY,
Subject NVARCHAR(MAX)NOT NULL,
OriginalXml XMLNOT NULL
)
GO
二:建立消息类型、契约(Contract)、队列、服务,并把指定的队列绑定到契约
----创建具体消息XML格式的实例
CREATE XMLSCHEMA COLLECTIONExpenseReportSchema AS
N'<?xml version="1.0" encoding="UTF-16" ?>
<xsd:schema xmlns:xsd="http://www.w3.org/2001/XMLSchema"
targetNamespace="http://Adventure-Works.com/schemas/expenseReport"
xmlns:expense="http://Adventure-Works.com/schemas/expenseReport"
elementFormDefault="qualified"
>
<xsd:complexType name="expenseReportType">
<xsd:sequence>
<xsd:element name="EmployeeName" type="xsd:string"/>
<xsd:element name="EmployeeID" type="xsd:string"/>
<xsd:element name="ItemDetail"
type="expense:ItemDetailType" maxOccurs="unbounded"/>
</xsd:sequence>
</xsd:complexType>
<xsd:complexType name="ItemDetailType">
<xsd:sequence>
<xsd:element name="Date" type="xsd:date"/>
<xsd:element name="CostCenter" type="xsd:string"/>
<xsd:element name="Total" type="xsd:decimal"/>
<xsd:element name="Currency" type="xsd:string"/>
<xsd:element name="Description" type="xsd:string"/>
</xsd:sequence>
</xsd:complexType>
<xsd:element name="ExpenseReport" type="expense:expenseReportType"/>
</xsd:schema>' ;
CREATE MESSAGE TYPE
[//Adventure-Works.com/Expenses/SubmitExpense]
VALIDATION = VALID_XML WITHSCHEMA COLLECTIONExpenseReportSchema ;
------------------------
USE SSB_Book
GO
--------------一:service Broker基础架构
----------消息类型Message Type
CREATE MESSAGETYPE [http://ssb.csharp.at/SSB_Book/c10/PublishMessage]
VALIDATION = WELL_FORMED_XML;
GO
CREATE MESSAGETYPE [http://ssb.csharp.at/SSB_Book/c10/ArticleMessage]
VALIDATION = NONE;
GO
CREATE MESSAGETYPE [http://ssb.csharp.at/SSB_Book/c10/SubscribeMessage]
VALIDATION = WELL_FORMED_XML;
GO
-------------契约Contract
CREATE CONTRACT[http://ssb.csharp.at/SSB_Book/c10/PublishContract]
(
[http://ssb.csharp.at/SSB_Book/c10/PublishMessage]SENT BY INITIATOR,
[http://ssb.csharp.at/SSB_Book/c10/ArticleMessage]SENT BY INITIATOR
)
GO
CREATE CONTRACT[http://ssb.csharp.at/SSB_Book/c10/SubscribeContract]
(
[http://ssb.csharp.at/SSB_Book/c10/SubscribeMessage]SENT BY INITIATOR,
[http://ssb.csharp.at/SSB_Book/c10/ArticleMessage]SENT BY TARGET
)
GO
---------队列QUEUE和服务SERVICE
-----发布者队列及其服务
CREATE QUEUE [PublisherQueue]
GO
CREATE SERVICE[PublisherService] ONQUEUE [PublisherQueue]
(
[http://ssb.csharp.at/SSB_Book/c10/PublishContract],
[http://ssb.csharp.at/SSB_Book/c10/SubscribeContract]
)
GO
---------订阅者队列及其服务
CREATE QUEUE SubscriberQueue1;
GO
CREATE SERVICESubscriberService1 ONQUEUE SubscriberQueue1;
GO
CREATE QUEUE SubscriberQueue2;
GO
CREATE SERVICESubscriberService2 ONQUEUE SubscriberQueue2;
GO
----------Author队列及其服务
CREATE QUEUE AuthorQueue;
GO
CREATE SERVICEAuthorService ON QUEUE AuthorQueue;
GO
三:编写存储过程
------------------三:执行存储过程如下
---------1:
CREATE PROCEDUREsp_PublishPublication
@Publication UNIQUEIDENTIFIER,
@Subject NVARCHAR(MAX),
@OriginalXml XML
AS
BEGIN
INSERT INTO Publications(Publication,Subject, OriginalXml)
VALUES
(
@Publication,
@Subject,
@OriginalXml
)
END
GO
--------------2:
CREATE PROCEDUREsp_RemovePublication
@Publication UNIQUEIDENTIFIER
AS
BEGIN
DELETE FROM Publications
WHERE Publication= @Publication
END
GO
-----------------3:
CREATE PROCEDUREsp_ProcessPublicationRequest
@Conversation UNIQUEIDENTIFIER,
@Message VARBINARY(MAX)
AS
BEGIN
DECLARE @RequestXML;
DECLARE @SubjectNVARCHAR(MAX);
SELECT @Request= CAST(@MessageAS XML);
WITH XMLNAMESPACES(DEFAULT'http://ssb.csharp.at/SSB_Book/c10/PublishSubscribe')
SELECT @Subject= @Request.value(N'(//Publish/Subject)[1]',N'NVARCHAR(MAX)');
IF (@SubjectIS NOT NULL)
BEGIN
EXEC sp_PublishPublication@Conversation,@Subject, @Message;
END
ELSE
BEGIN
END CONVERSATION@Conversation
WITH ERROR = 1
DESCRIPTION = N'The publication is missing a subject';
EXEC sp_RemovePublication@Conversation;
END
END
GO
-------------------4:
CREATE PROCEDUREsp_SendOnPublication
@Publication UNIQUEIDENTIFIER,
@Article VARBINARY(MAX)
AS
BEGIN
DECLARE @SubscriptionUNIQUEIDENTIFIER;
DECLARE @cursorSubscriptionsCURSOR;
SET @cursorSubscriptions= CURSOR LOCAL SCROLL FOR
SELECT Subscriber
FROM Subscriptionss
JOIN Publicationsp ON s.Subject = p.Subject
WHERE p.Publication= @Publication;
BEGIN TRANSACTION;
OPEN @cursorSubscriptions;
FETCH NEXT FROM @cursorSubscriptions
INTO @Subscription;
WHILE (@@fetch_status= 0)
BEGIN
IF (@ArticleIS NOT NULL)
BEGIN
-----------------使用游标发送用户从AuthorService服务中匹配的消息
SEND ON CONVERSATION @Subscription
MESSAGE TYPE [http://ssb.csharp.at/SSB_Book/c10/ArticleMessage](@Article);
END
ELSE
BEGIN
SEND ON CONVERSATION @Subscription
MESSAGE TYPE [http://ssb.csharp.at/SSB_Book/c10/ArticleMessage];
END
FETCH NEXT FROM @cursorSubscriptions
INTO @Subscription;
END
CLOSE @cursorSubscriptions;
DEALLOCATE @cursorSubscriptions;
COMMIT;
END
GO
------------------5:
CREATE PROCEDUREsp_RemoveSubscriber
@Subscriber UNIQUEIDENTIFIER
AS
BEGIN
DELETE FROM Subscriptions
WHERE Subscriber=@Subscriber
END
GO
-------------6:
CREATE PROCEDUREsp_PublishSubscriber
@Subscriber UNIQUEIDENTIFIER,
@Subject NVARCHAR(MAX),
@OriginalXml XML
AS
BEGIN
INSERT INTO Subscriptions(Subscriber,Subject, OriginalXml)
VALUES
(
@Subscriber,
@Subject,
@OriginalXml
)
END
GO
------------7:
CREATE PROCEDUREsp_ProcessSubscriptionRequest
@Conversation UNIQUEIDENTIFIER,
@Message VARBINARY(MAX)
AS
BEGIN
DECLARE @RequestXML;
DECLARE @SubjectNVARCHAR(MAX);
SELECT @Request= CAST(@MessageAS XML);
WITH XMLNAMESPACES(DEFAULT'http://ssb.csharp.at/SSB_Book/c10/PublishSubscribe')
SELECT @Subject= @Request.value(N'(//Request/Subject)[1]',N'NVARCHAR(MAX)');
IF (@SubjectIS NOT NULL)
BEGIN
EXEC sp_PublishSubscriber@Conversation,@Subject, @Message;
END
ELSE
BEGIN
END CONVERSATION@Conversation
WITH ERROR = 1
DESCRIPTION = N'The subscriber is missing a subject';
EXEC sp_RemoveSubscriber@Conversation;
END
END
GO
---------------------8:
CREATE PROCEDUREsp_PublisherService
AS
BEGIN
DECLARE @ConversationUNIQUEIDENTIFIER;
DECLARE @MessageVARBINARY(MAX);
DECLARE @MessageTypeNameSYSNAME;
BEGIN TRANSACTION;
WAITFOR
(
RECEIVE TOP(1)
@Conversation =conversation_handle,
@Message = message_body,
@MessageTypeName =message_type_name
FROM PublisherQueue
), TIMEOUT 1000;
WHILE (@ConversationIS NOT NULL)
BEGIN
IF (@MessageTypeName= 'http://ssb.csharp.at/SSB_Book/c10/PublishMessage')
BEGIN
EXEC sp_ProcessPublicationRequest@Conversation,@Message;
END
ELSE IF (@MessageTypeName= 'http://ssb.csharp.at/SSB_Book/c10/SubscribeMessage')
BEGIN
EXEC sp_ProcessSubscriptionRequest@Conversation,@Message;
END
ELSE IF (@MessageTypeName=
'http://ssb.csharp.at/SSB_Book/c10/ArticleMessage')
BEGIN
EXEC sp_SendOnPublication@Conversation,@Message;
END
ELSE IF (@MessageTypeNameIN(
N'http://schemas.microsoft.com/SQL/ServiceBroker/Error',
N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog'))
BEGIN
END CONVERSATION@Conversation;
IF (EXISTS(SELECT* FROM Publications
WHERE Publication= @Conversation))
BEGIN
EXEC sp_RemovePublication@Conversation;
END
IF (EXISTS(SELECT* FROM Subscribers))
BEGIN
EXEC sp_RemoveSubscriber@Conversation;
END
END
ELSE
BEGIN
-- Unexpected message
RAISERROR (N'Received unexpected message type: %s', 16, 1,
@MessageTypeName);
ROLLBACK;
RETURN;
END
COMMIT;
SELECT @Conversation= NULL;
BEGIN TRANSACTION;
WAITFOR
(
RECEIVE TOP(1)
@Conversation =conversation_handle,
@Message = message_body,
@MessageTypeName =message_type_name
FROM PublisherQueue
), TIMEOUT 1000;
END
COMMIT;
END
GO
----------------9:
--------------订阅方请求订阅脚本:
DECLARE @ch UNIQUEIDENTIFIER;
BEGIN DIALOG CONVERSATION @ch
FROM SERVICE [SubscriberService2]
TO SERVICE 'PublisherService'
ON CONTRACT [http://ssb.csharp.at/SSB_Book/c10/SubscribeContract]
WITH ENCRYPTION= OFF;
SEND ON CONVERSATION @ch
MESSAGE TYPE [http://ssb.csharp.at/SSB_Book/c10/SubscribeMessage]
(
N'<?xml version="1.0"?>
<Request xmlns="http://ssb.csharp.at/SSB_Book/c10/PublishSubscribe">
<Subject>Subject2</Subject>
</Request>'
);
GO
四:发布消息:
--------执行AuthorService服务就可以发送article消息给PublisherService进行并发;
---------[AuthorService] 发送PublishMessage消息:
DECLARE @ch UNIQUEIDENTIFIER;
BEGIN DIALOG CONVERSATION @ch
FROM SERVICE [AuthorService]
TO SERVICE 'PublisherService'
ON CONTRACT [http://ssb.csharp.at/SSB_Book/c10/PublishContract]
WITH ENCRYPTION= OFF;
SEND ON CONVERSATION @ch
MESSAGE TYPE [http://ssb.csharp.at/SSB_Book/c10/PublishMessage]
(
N'<?xml version="1.0"?>
<Publish xmlns="http://ssb.csharp.at/SSB_Book/c10/PublishSubscribe">
<Subject>Subject1</Subject>
</Publish>'
);
--------向'PublisherService'服务所在队列PublisherQueue中新增一条记录,执行dbo.sp_PublisherService存储过程后接收消息,
-----调用sp_ProcessPublicationRequest存储过程,并持久化到Publications中
select * from dbo.PublisherQueue;
select * from Publications;
五:订阅消息
-----------1:订阅者通过订阅PublisherService服务接收消息,
----------------SubscriberService1订购主题Subject1的消息----》PublisherService
DECLARE @ch UNIQUEIDENTIFIER;
BEGIN DIALOG CONVERSATION @ch
FROM SERVICE [SubscriberService1]
TO SERVICE 'PublisherService'
ON CONTRACT [http://ssb.csharp.at/SSB_Book/c10/SubscribeContract]
WITH ENCRYPTION= OFF;
SEND ON CONVERSATION @ch
MESSAGE TYPE [http://ssb.csharp.at/SSB_Book/c10/SubscribeMessage]
(
N'<?xml version="1.0"?>
<Request xmlns="http://ssb.csharp.at/SSB_Book/c10/PublishSubscribe">
<Subject>Subject1</Subject>
</Request>'
);
GO
----------------或者SubscriberService2订购主题Subject2的消息----》PublisherService
DECLARE @ch UNIQUEIDENTIFIER;
BEGIN DIALOG CONVERSATION @ch
FROM SERVICE [SubscriberService2]
TO SERVICE 'PublisherService'
ON CONTRACT [http://ssb.csharp.at/SSB_Book/c10/SubscribeContract]
WITH ENCRYPTION= OFF;
SEND ON CONVERSATION @ch
MESSAGE TYPE [http://ssb.csharp.at/SSB_Book/c10/SubscribeMessage]
(
N'<?xml version="1.0"?>
<Request xmlns="http://ssb.csharp.at/SSB_Book/c10/PublishSubscribe">
<Subject>Subject2</Subject>
</Request>'
);
GO
-----向'PublisherService'服务所在队列PublisherQueue中新增一条记录,执行dbo.sp_PublisherService存储过程后接收消息,
-----调用sp_ProcessSubscriptionRequest存储过程,并持久化到Subscriptions中
select * from dbo.PublisherQueue;
select * from Subscriptions;
六:
-----执行AuthorService服务就可以发送article消息给PublisherService进行并发
----发送特定主题的article消息:
DECLARE @ch UNIQUEIDENTIFIER;
BEGIN DIALOG CONVERSATION @ch
FROM SERVICE [AuthorService]
TO SERVICE 'PublisherService'
ON CONTRACT [http://ssb.csharp.at/SSB_Book/c10/PublishContract]
WITH ENCRYPTION= OFF;
SEND ON CONVERSATION @ch
MESSAGE TYPE [http://ssb.csharp.at/SSB_Book/c10/ArticleMessage]
(
N'This is an article on Subject2'
);
----向'PublisherService'服务所在队列PublisherQueue中新增一条记录,
----执行dbo.sp_PublisherService存储过程后接收消息,
-----调用sp_SendOnPublication存储过程分发该消息给所有匹配的订阅者
select * from dbo.PublisherQueue;
六:应用服务JDBC调用“发送消息存储过程”、“接收消息存储过程”,供参考:
cStmt = con.prepareCall("{call [dbo].[ sp_PublisherService]");
七:测试响应时间间隔
1:发送消息测试
通过登录数据库验证sql如下:
2:接收消息测试
3:涉及SQL验证:
select *from SubscriberQueue1;
select *from SubscriberQueue2;
select *from dbo.PublisherQueue;
select *from dbo.AuthorQueue;
----发布后,订阅后都持久到表中,查阅如下
select *from dbo.Publications;
select *from dbo.Subscriptions;
4:原理架构图