I'm trying to create a queue and a callback that triggers when a message is queued, but I can't get the callback to trigger. What am I doing wrong?
我正在尝试创建一个队列和一个在消息排队时触发的回调,但是我无法触发回调。我究竟做错了什么?
I have a trigger that enqueues a message, and I can see it on the queue message table, and I can dequeue it by hand and process it, I just can't get the callback to fire on enqueue.
我有一个将消息排入队列的触发器,我可以在队列消息表上看到它,我可以手动将它出列并处理它,我只是无法在enqueue上触发回调。
BEGIN
DBMS_AQADM.CREATE_QUEUE_TABLE (
queue_table => 'queue_message_table',
queue_payload_type => 'queue_message_type',
multiple_consumers => TRUE);
DBMS_AQADM.CREATE_QUEUE (
queue_name => 'message_queue',
queue_table => 'queue_message_table');
DBMS_AQADM.START_QUEUE (queue_name => 'message_queue');
END;
CREATE OR REPLACE PROCEDURE queue_callback(
context RAW, reginfo SYS.AQ$_REG_INFO, descr SYS.AQ$_DESCRIPTOR, payload RAW, payloadl NUMBER) AS
queue_options DBMS_AQ.DEQUEUE_OPTIONS_T;
message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
my_message queue_message_type;
ret varchar2(200);
message_id RAW(16);
BEGIN
DBMS_OUTPUT.PUT_LINE('Callback');
queue_options.msgid := descr.msg_id;
queue_options.consumer_name := descr.consumer_name;
DBMS_AQ.DEQUEUE(
queue_name => descr.queue_name,
dequeue_options => queue_options,
message_properties => message_properties,
payload => my_message,
msgid => message_id );
ret := handle_message(my_message);
commit;
END;
BEGIN
DBMS_AQADM.ADD_SUBSCRIBER (queue_name => 'message_queue',
subscriber => SYS.AQ$_AGENT('queue_subscriber', 'message_queue',NULL));
DBMS_AQ.REGISTER (
SYS.AQ$_REG_INFO_LIST(
SYS.AQ$_REG_INFO(
'MESSAGE_QUEUE:QUEUE_SUBSCRIBER',
DBMS_AQ.NAMESPACE_AQ,
'plsql://QUEUE_CALLBACK',
HEXTORAW('FF')
)
), 1
);
END;
2 个解决方案
#1
At first glance, it appears you're neither starting the queue (dbms_aqadm.start_queue
), neither are you enqueueing anything to it (dbms_aq.enqueue
).
乍一看,您似乎既没有启动队列(dbms_aqadm.start_queue),也没有将任何内容排入队列(dbms_aq.enqueue)。
I'd recommend following this demo.
我建议按照这个演示。
#2
You need to be careful with the database version. some bugs has been reported about issues with Oracle Aq. In particular I've followed this link to built my own sample, executing the demo in a Oracle 11gR2 enterprise database. I was abled to enqueue, dequeue, purge the queue but the listener created with Dbms_Aq.Register didn't work. I ran the same example downloading a Oracle 11g R2 xe database and it worked.
您需要小心数据库版本。有关Oracle Aq问题的一些错误已被报道。特别是我按照这个链接构建了自己的示例,在Oracle 11gR2企业数据库中执行演示。我能够排队,出队,清除队列,但用Dbms_Aq.Register创建的监听器不起作用。我运行了下载Oracle 11g R2 xe数据库的相同示例,但它确实有效。
The same example was runned in a Oracle 10gR2 instance and it works perfectly.
在Oracle 10gR2实例中运行了相同的示例,它运行良好。
There are some things that you need to be careful on using aq:
使用aq时需要注意一些事项:
- use the appropriate parameters adding the subscriber
- use the appropriate namespace registering the listener with Dbms_Aq.Register
- use the multiple consumers flag declaring the queue table
- use the appropriate permissions to packages and to handle the queues
- use the qualified name of the queues in some cases if it didn't works.
使用添加订户的适当参数
使用适当的命名空间向Dbms_Aq.Register注册侦听器
使用多个consumer标志声明队列表
对包使用适当的权限并处理队列
如果它不起作用,在某些情况下使用队列的限定名称。
' First create the schema
'首先创建架构
connect / as sysdba
-- @?/rdbms/admin/dbmsaqad.sql --(install if you don't have aq installed yet)
-- create the user and permissions
create user aqadmin identified by aqadmin default tablespace users temporary tablespace temp;
GRANT create session TO aqadmin;
grant connect, resource to aqadmin;
GRANT aq_administrator_role TO aqadmin IDENTIFIED BY aqadmin;
GRANT execute ON dbms_aq TO aqadmin;
GRANT execute ON dbms_aqadm TO aqadmin;
Create the ddl objects
创建ddl对象
CREATE TABLE demo_queue_message_table
( message VARCHAR2(4000) );
Create the aq-objects
创建aq对象
create or replace type demo_queue_payload_type as object(message varchar2(4000)) ;
/
begin
DBMS_AQADM.CREATE_QUEUE_TABLE (queue_table => 'demo_queue_table', queue_payload_type => 'demo_queue_payload_type',multiple_consumers => TRUE);
DBMS_AQADM.CREATE_QUEUE (queue_name => 'demo_queue', queue_table => 'demo_queue_table');
DBMS_AQADM.START_QUEUE('demo_queue');
end;
/
CREATE or replace PROCEDURE demo_queue_callback_procedure(
context RAW,
reginfo SYS.AQ$_REG_INFO,
descr SYS.AQ$_DESCRIPTOR,
payload RAW,
payloadl NUMBER
) AS
r_dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T;
r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
v_message_handle RAW(16);
o_payload demo_queue_payload_type;
BEGIN
r_dequeue_options.msgid := descr.msg_id;
r_dequeue_options.consumer_name := descr.consumer_name;
DBMS_AQ.DEQUEUE(
queue_name => descr.queue_name,
dequeue_options => r_dequeue_options,
message_properties => r_message_properties,
payload => o_payload,
msgid => v_message_handle
);
INSERT INTO demo_queue_message_table ( message )
VALUES ( 'Message [' || o_payload.message || '] ' ||
'dequeued at [' || TO_CHAR( SYSTIMESTAMP,
'DD-MON-YYYY HH24:MI:SS.FF3' ) || ']' );
COMMIT;
END;
/
BEGIN
DBMS_AQADM.ADD_SUBSCRIBER (
queue_name => 'demo_queue',
subscriber => SYS.AQ$_AGENT(
'demo_queue_subscriber',
NULL,
NULL )
);
DBMS_AQ.REGISTER (
SYS.AQ$_REG_INFO_LIST(
SYS.AQ$_REG_INFO(
'DEMO_QUEUE:DEMO_QUEUE_SUBSCRIBER',
DBMS_AQ.NAMESPACE_AQ,
'plsql://DEMO_QUEUE_CALLBACK_PROCEDURE',
HEXTORAW('FF')
)
),
1
);
END;
/
And finally test the queue
最后测试队列
DECLARE
r_enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T;
r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
v_message_handle RAW(16);
o_payload demo_queue_payload_type;
BEGIN
o_payload := demo_queue_payload_type(
TO_CHAR(SYSTIMESTAMP, 'DD-MON-YYYY HH24:MI:SS.FF3' )
);
DBMS_AQ.ENQUEUE(
queue_name => 'demo_queue',
enqueue_options => r_enqueue_options,
message_properties => r_message_properties,
payload => o_payload,
msgid => v_message_handle
);
COMMIT;
END;
/
#1
At first glance, it appears you're neither starting the queue (dbms_aqadm.start_queue
), neither are you enqueueing anything to it (dbms_aq.enqueue
).
乍一看,您似乎既没有启动队列(dbms_aqadm.start_queue),也没有将任何内容排入队列(dbms_aq.enqueue)。
I'd recommend following this demo.
我建议按照这个演示。
#2
You need to be careful with the database version. some bugs has been reported about issues with Oracle Aq. In particular I've followed this link to built my own sample, executing the demo in a Oracle 11gR2 enterprise database. I was abled to enqueue, dequeue, purge the queue but the listener created with Dbms_Aq.Register didn't work. I ran the same example downloading a Oracle 11g R2 xe database and it worked.
您需要小心数据库版本。有关Oracle Aq问题的一些错误已被报道。特别是我按照这个链接构建了自己的示例,在Oracle 11gR2企业数据库中执行演示。我能够排队,出队,清除队列,但用Dbms_Aq.Register创建的监听器不起作用。我运行了下载Oracle 11g R2 xe数据库的相同示例,但它确实有效。
The same example was runned in a Oracle 10gR2 instance and it works perfectly.
在Oracle 10gR2实例中运行了相同的示例,它运行良好。
There are some things that you need to be careful on using aq:
使用aq时需要注意一些事项:
- use the appropriate parameters adding the subscriber
- use the appropriate namespace registering the listener with Dbms_Aq.Register
- use the multiple consumers flag declaring the queue table
- use the appropriate permissions to packages and to handle the queues
- use the qualified name of the queues in some cases if it didn't works.
使用添加订户的适当参数
使用适当的命名空间向Dbms_Aq.Register注册侦听器
使用多个consumer标志声明队列表
对包使用适当的权限并处理队列
如果它不起作用,在某些情况下使用队列的限定名称。
' First create the schema
'首先创建架构
connect / as sysdba
-- @?/rdbms/admin/dbmsaqad.sql --(install if you don't have aq installed yet)
-- create the user and permissions
create user aqadmin identified by aqadmin default tablespace users temporary tablespace temp;
GRANT create session TO aqadmin;
grant connect, resource to aqadmin;
GRANT aq_administrator_role TO aqadmin IDENTIFIED BY aqadmin;
GRANT execute ON dbms_aq TO aqadmin;
GRANT execute ON dbms_aqadm TO aqadmin;
Create the ddl objects
创建ddl对象
CREATE TABLE demo_queue_message_table
( message VARCHAR2(4000) );
Create the aq-objects
创建aq对象
create or replace type demo_queue_payload_type as object(message varchar2(4000)) ;
/
begin
DBMS_AQADM.CREATE_QUEUE_TABLE (queue_table => 'demo_queue_table', queue_payload_type => 'demo_queue_payload_type',multiple_consumers => TRUE);
DBMS_AQADM.CREATE_QUEUE (queue_name => 'demo_queue', queue_table => 'demo_queue_table');
DBMS_AQADM.START_QUEUE('demo_queue');
end;
/
CREATE or replace PROCEDURE demo_queue_callback_procedure(
context RAW,
reginfo SYS.AQ$_REG_INFO,
descr SYS.AQ$_DESCRIPTOR,
payload RAW,
payloadl NUMBER
) AS
r_dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T;
r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
v_message_handle RAW(16);
o_payload demo_queue_payload_type;
BEGIN
r_dequeue_options.msgid := descr.msg_id;
r_dequeue_options.consumer_name := descr.consumer_name;
DBMS_AQ.DEQUEUE(
queue_name => descr.queue_name,
dequeue_options => r_dequeue_options,
message_properties => r_message_properties,
payload => o_payload,
msgid => v_message_handle
);
INSERT INTO demo_queue_message_table ( message )
VALUES ( 'Message [' || o_payload.message || '] ' ||
'dequeued at [' || TO_CHAR( SYSTIMESTAMP,
'DD-MON-YYYY HH24:MI:SS.FF3' ) || ']' );
COMMIT;
END;
/
BEGIN
DBMS_AQADM.ADD_SUBSCRIBER (
queue_name => 'demo_queue',
subscriber => SYS.AQ$_AGENT(
'demo_queue_subscriber',
NULL,
NULL )
);
DBMS_AQ.REGISTER (
SYS.AQ$_REG_INFO_LIST(
SYS.AQ$_REG_INFO(
'DEMO_QUEUE:DEMO_QUEUE_SUBSCRIBER',
DBMS_AQ.NAMESPACE_AQ,
'plsql://DEMO_QUEUE_CALLBACK_PROCEDURE',
HEXTORAW('FF')
)
),
1
);
END;
/
And finally test the queue
最后测试队列
DECLARE
r_enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T;
r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
v_message_handle RAW(16);
o_payload demo_queue_payload_type;
BEGIN
o_payload := demo_queue_payload_type(
TO_CHAR(SYSTIMESTAMP, 'DD-MON-YYYY HH24:MI:SS.FF3' )
);
DBMS_AQ.ENQUEUE(
queue_name => 'demo_queue',
enqueue_options => r_enqueue_options,
message_properties => r_message_properties,
payload => o_payload,
msgid => v_message_handle
);
COMMIT;
END;
/