首先为这个数据库开启Service Broker
ALTER DATABASE [T_EIP_UnityStore] SET ENABLE_BROKER
创建MessageType
CREATE MESSAGE TYPE SendACLDetails
VALIDATION =
WELL_FORMED_XML;
创建Contract
CREATE
CONTRACT [ACLMainContract]
(
SendACLDetails
SENT BY INITIATOR
);
创建发送到队列和从度列进行读取的存储过程
插入队列数据的存储过程
CREATE PROCEDURE [dbo].[KY_SendACLToQue]
@AclRights varchar(max)
AS
BEGIN
DECLARE @Message XML
Set @Message=cast(@AclRights as XML);
DECLARE @Handle UNIQUEIDENTIFIER ;
BEGIN TRANSACTION
BEGIN DIALOG CONVERSATION @Handle FROM SERVICE svrACLUpd TO
SERVICE 'svrACLUpd' ON CONTRACT ACLMainContract WITH ENCRYPTION = OFF ;
SEND ON CONVERSATION @Handle MESSAGE TYPE SendACLDetails (@Message) ;
--PRINT
CAST(@Message AS NVARCHAR(MAX))
COMMIT
END
GO
从队列中读取数据并处理的存储工程
CREATE PROCEDURE [dbo].[KY_ReceiveACLFromQue]
AS
BEGIN
DECLARE @Handle UNIQUEIDENTIFIER ;
DECLARE @MessageType SYSNAME ;
DECLARE @Message XML
DECLARE @FileId int
DECLARE @OperatorId int
DECLARE @AppId INT
DECLARE @InfoId VARCHAR(50)
DECLARE @isACLSplit VARCHAR(10)
DECLARE @PID TINYINT
DECLARE @RetryCount INT
exec sp_addlinkedserver 'uuc_server','','SQLOLEDB','***\app0c'
exec sp_addlinkedsrvlogin 'uuc_server','false',null,'sa','***'
exec sp_serveroption 'uuc_server','rpc out','true' --这个允许调用链接服务器上的存储过程
BEGIN TRY
BEGIN TRANSACTION tr;
RECEIVE TOP ( 1 )
@Handle = conversation_handle,
@MessageType = message_type_name,
@Message = message_body FROM dbo.queSendACL;
IF ( @Handle IS NOT NULL
AND @Message IS NOT NULL
)
BEGIN
SET @OperatorId=CAST(CAST(@Message.query('/CusFileACLXML/OperatorId/text()') AS NVARCHAR(MAX)) AS INT)
SET @FileId=CAST(CAST(@Message.query('/CusFileACLXML/FileId/text()')
AS NVARCHAR(MAX)) AS INT)
SET @AppId=CAST(CAST(@Message.query('/CusFileACLXML/AppId/text()')
AS NVARCHAR(MAX)) AS INT)
SET @InfoId=CAST(CAST(@Message.query('/CusFileACLXML/InfoId/text()')
AS NVARCHAR(MAX)) AS varchar(50))
SET @isACLSplit=CAST(CAST(@Message.query('/CusFileACLXML/isACLSplit/text()') AS NVARCHAR(MAX)) AS varchar(10))
SET @PID=CAST(CAST(@Message.query('/CusFileACLXML/PID/text()')
AS NVARCHAR(MAX)) AS TINYINT)
DECLARE @i INT
DECLARE @Count INT
DECLARE @FileACLXML XML
SET @i=1
SET @Count=0
SET @FileACLXML=@Message.query('/CusFileACLXML/FileAcls/FileACL')
SET @count=@FileACLXML.value('count(/FileACL)','int')
WHILE @i<=@count
BEGIN
DECLARE @childXML XML
DECLARE @EncryptFileId VARCHAR(32)
DECLARE @OrgId INT
DECLARE @OrgName VARCHAR(200)
DECLARE @OrgTypeId INT
DECLARE @FileRight VARCHAR(16)
SELECT @childXML= @FileACLXML.query('/FileACL[position()=sql:variable("@i")]')
SET @EncryptFileId=@childXML.value('(/FileACL/EncryptFileId)[1]','varchar(32)')
SET @OrgId=@childXML.value('(/FileACL/OrgId)[1]','int')
SET @OrgName=@childXML.value('(/FileACL/OrgName)[1]','VARCHAR(200)')
SET @OrgTypeId=@childXML.value('(/FileACL/OrgTypeId)[1]','int')
SET @FileRight=@childXML.value('(/FileACL/FileRight)[1]','VARCHAR(16)')
--是否需要扩散到个人
IF @isACLSplit='true'
BEGIN
--个人
IF @OrgTypeId=1
BEGIN
EXEC [dbo].[KY_SaveFileAcl] @OperatorId,'',@AppId,@InfoId,@FileId,@EncryptFileId,@OrgId,@OrgName,@OrgTypeId,@FileRight,@PID
END
ELSE
BEGIN
CREATE table #tmpTable(OrganizeId INT,OrganizeName nvarchar(200),Flag int)
INSERT INTO #tmpTable(OrganizeId,OrganizeName)
EXEC KY_GetAllEmpListByorg @OrgId,@OrgTypeId
DECLARE
@tmpCount INT
DECLARE
@j INT
DECLARE
@OrganizeId INT
DECLARE
@OrganizeName VARCHAR(200)
SET @j=1;
SELECT @tmpCount = COUNT(OrganizeId) FROM #tmpTable
WHILE @j<=@tmpCount
BEGIN
SET @OrganizeId = (SELECT top(1) OrganizeId FROM #tmpTable where Flag IS NULL)
SET @OrganizeName=(SELECT OrganizeName FROM #tmpTable WHERE OrganizeId=@OrganizeId)
UPDATE #tmpTable SET Flag = 1 WHERE OrganizeId = @OrganizeId
EXEC [dbo].[KY_SaveFileAcl] @OperatorId,'',@AppId,@InfoId,@FileId,@EncryptFileId,@OrganizeId,@OrganizeName,1,@FileRight,@PID
SET @j=@j+1
END
DROP TABLE #tmpTable
END
END
ELSE
BEGIN
EXEC [dbo].[KY_SaveFileAcl] @OperatorId,'',@AppId,@InfoId,@FileId,@EncryptFileId,@OrgId,@OrgName,@OrgTypeId,@FileRight,@PID
END
SET @i=@i+1
END
EXEC [T_EIP_USPIndex].dbo.KY_DeleteACLQueue @OperatorId,@FileId
END
COMMIT transaction tr
END TRY
BEGIN CATCH
rollback transaction tr;
SET @RetryCount=CAST(CAST(@Message.query('/CusFileACLXML/RetryCount/text()') AS NVARCHAR(MAX)) AS INT)
DECLARE @messageStr varchar(max)
IF(@RetryCount < 3 )
BEGIN
PRINT '重试处理'
SET @RetryCount = @RetryCount + 1
EXEC [T_EIP_USPIndex].dbo.KY_UpdateACLQueuE 3,@RetryCount,@OperatorId,@FileId
SET @Message.modify('insert
<RetryCount>{sql:variable("@RetryCount")}</RetryCount>
into (/CusFileACLXML)[1]')
SET @messageStr = CAST(@Message AS VARCHAR(MAX))
exec dbo.KY_SendACLToQue @messageStr
END
ELSE
BEGIN
SET @messageStr = CAST(@Message AS VARCHAR(MAX))
--记录队列错误日志
PRINT '记录队列错误日志'
EXEC KY_InsertACLUpdateLog 1,@OperatorId,'',@InfoId,@AppId,@FileId,@EncryptFileId,NULL,
NULL,NULL,NULL,NULL,NULL,@messageStr,0,'ACL队列处理错误'
END
END CATCH
exec sp_dropserver 'uuc_server','droplogins'
END
GO
创建队列
CREATE QUEUE queSendACL WITH
STATUS= ON
创建服务
CREATE SERVICE
svrACLUpd ON
QUEUE queSendACL ([ACLMainContract])