ServiceBroker创建流程

时间:2021-03-07 04:15:48

首先为这个数据库开启Service Broker

ALTER DATABASE [T_EIP_UnityStore] SET ENABLE_BROKER

创建MessageType

CREATE MESSAGE TYPE SendACLDetails
                VALIDATION =
WELL_FORMED_XML;

创建Contract

CREATE
CONTRACT [ACLMainContract]
(
               SendACLDetails
SENT BY INITIATOR
);

创建发送到队列和从度列进行读取的存储过程

插入队列数据的存储过程

CREATE PROCEDURE [dbo].[KY_SendACLToQue]

@AclRights varchar(max)

AS

BEGIN

DECLARE @Message XML

Set  @Message=cast(@AclRights as XML);

DECLARE @Handle UNIQUEIDENTIFIER ;

BEGIN TRANSACTION

BEGIN DIALOG CONVERSATION @Handle FROM SERVICE svrACLUpd TO

SERVICE 'svrACLUpd' ON CONTRACT ACLMainContract WITH ENCRYPTION = OFF ;

SEND ON CONVERSATION @Handle MESSAGE TYPE SendACLDetails (@Message) ;

--PRINT 
CAST(@Message AS NVARCHAR(MAX))

COMMIT

END

GO

从队列中读取数据并处理的存储工程

CREATE PROCEDURE [dbo].[KY_ReceiveACLFromQue]

AS

BEGIN

DECLARE @Handle UNIQUEIDENTIFIER ;

DECLARE @MessageType SYSNAME ;

DECLARE @Message XML

DECLARE @FileId int

DECLARE @OperatorId int

DECLARE @AppId INT

DECLARE @InfoId VARCHAR(50)

DECLARE @isACLSplit VARCHAR(10)

DECLARE @PID TINYINT

DECLARE @RetryCount INT

exec   sp_addlinkedserver     'uuc_server','','SQLOLEDB','***\app0c'

exec   sp_addlinkedsrvlogin   'uuc_server','false',null,'sa','***'

exec   sp_serveroption 'uuc_server','rpc out','true' --这个允许调用链接服务器上的存储过程

BEGIN TRY

BEGIN TRANSACTION tr;

RECEIVE TOP ( 1 )

@Handle = conversation_handle,

@MessageType = message_type_name,

@Message = message_body FROM dbo.queSendACL;

IF ( @Handle IS NOT NULL

AND @Message IS NOT NULL

)

BEGIN

SET @OperatorId=CAST(CAST(@Message.query('/CusFileACLXML/OperatorId/text()') AS NVARCHAR(MAX)) AS INT)

SET @FileId=CAST(CAST(@Message.query('/CusFileACLXML/FileId/text()')
AS NVARCHAR(MAX)) AS INT)

SET @AppId=CAST(CAST(@Message.query('/CusFileACLXML/AppId/text()')
AS NVARCHAR(MAX)) AS INT)

SET @InfoId=CAST(CAST(@Message.query('/CusFileACLXML/InfoId/text()')
AS NVARCHAR(MAX)) AS varchar(50))

SET @isACLSplit=CAST(CAST(@Message.query('/CusFileACLXML/isACLSplit/text()') AS NVARCHAR(MAX)) AS varchar(10))

SET @PID=CAST(CAST(@Message.query('/CusFileACLXML/PID/text()')
AS NVARCHAR(MAX)) AS TINYINT)

DECLARE @i INT

DECLARE @Count INT

DECLARE @FileACLXML XML

SET @i=1

SET @Count=0

SET @FileACLXML=@Message.query('/CusFileACLXML/FileAcls/FileACL')

SET @count=@FileACLXML.value('count(/FileACL)','int')

WHILE @i<=@count

BEGIN

DECLARE @childXML XML

DECLARE @EncryptFileId VARCHAR(32)

DECLARE @OrgId INT

DECLARE @OrgName VARCHAR(200)

DECLARE @OrgTypeId INT

DECLARE @FileRight VARCHAR(16)

SELECT @childXML= @FileACLXML.query('/FileACL[position()=sql:variable("@i")]')

SET @EncryptFileId=@childXML.value('(/FileACL/EncryptFileId)[1]','varchar(32)')

SET @OrgId=@childXML.value('(/FileACL/OrgId)[1]','int')

SET @OrgName=@childXML.value('(/FileACL/OrgName)[1]','VARCHAR(200)')

SET @OrgTypeId=@childXML.value('(/FileACL/OrgTypeId)[1]','int')

SET @FileRight=@childXML.value('(/FileACL/FileRight)[1]','VARCHAR(16)')

--是否需要扩散到个人

IF @isACLSplit='true'

BEGIN

--个人

IF @OrgTypeId=1

BEGIN

EXEC [dbo].[KY_SaveFileAcl] @OperatorId,'',@AppId,@InfoId,@FileId,@EncryptFileId,@OrgId,@OrgName,@OrgTypeId,@FileRight,@PID

END

ELSE

BEGIN

CREATE table #tmpTable(OrganizeId INT,OrganizeName nvarchar(200),Flag int)

INSERT INTO #tmpTable(OrganizeId,OrganizeName)

EXEC KY_GetAllEmpListByorg @OrgId,@OrgTypeId

DECLARE
@tmpCount INT

DECLARE
@j INT

DECLARE
@OrganizeId INT

DECLARE
@OrganizeName VARCHAR(200)

SET @j=1;

SELECT @tmpCount = COUNT(OrganizeId) FROM #tmpTable

WHILE @j<=@tmpCount

BEGIN

SET @OrganizeId = (SELECT top(1) OrganizeId FROM #tmpTable where Flag IS NULL)

SET @OrganizeName=(SELECT OrganizeName FROM #tmpTable WHERE OrganizeId=@OrganizeId)

UPDATE #tmpTable SET Flag = 1 WHERE OrganizeId = @OrganizeId

EXEC [dbo].[KY_SaveFileAcl] @OperatorId,'',@AppId,@InfoId,@FileId,@EncryptFileId,@OrganizeId,@OrganizeName,1,@FileRight,@PID

SET @j=@j+1

END

DROP TABLE #tmpTable

END

END

ELSE

BEGIN

EXEC [dbo].[KY_SaveFileAcl] @OperatorId,'',@AppId,@InfoId,@FileId,@EncryptFileId,@OrgId,@OrgName,@OrgTypeId,@FileRight,@PID

END

SET @i=@i+1

END

EXEC [T_EIP_USPIndex].dbo.KY_DeleteACLQueue @OperatorId,@FileId

END

COMMIT transaction tr

END TRY

BEGIN CATCH

rollback transaction tr;

SET @RetryCount=CAST(CAST(@Message.query('/CusFileACLXML/RetryCount/text()') AS NVARCHAR(MAX)) AS INT)

DECLARE @messageStr varchar(max)

IF(@RetryCount < 3 )

BEGIN

PRINT '重试处理'

SET @RetryCount = @RetryCount + 1

EXEC [T_EIP_USPIndex].dbo.KY_UpdateACLQueuE 3,@RetryCount,@OperatorId,@FileId

SET @Message.modify('insert
<RetryCount>{sql:variable("@RetryCount")}</RetryCount>
into (/CusFileACLXML)[1]')

SET @messageStr = CAST(@Message AS VARCHAR(MAX))

exec dbo.KY_SendACLToQue @messageStr

END

ELSE

BEGIN

SET @messageStr = CAST(@Message AS VARCHAR(MAX))

--记录队列错误日志

PRINT '记录队列错误日志'

EXEC KY_InsertACLUpdateLog 1,@OperatorId,'',@InfoId,@AppId,@FileId,@EncryptFileId,NULL,

NULL,NULL,NULL,NULL,NULL,@messageStr,0,'ACL队列处理错误'

END

END CATCH

exec  sp_dropserver 'uuc_server','droplogins'

END

GO

创建队列

CREATE QUEUE queSendACL WITH
STATUS= ON

创建服务

CREATE SERVICE
svrACLUpd ON
QUEUE queSendACL ([ACLMainContract])