在PL / SQL中创建队列订户的语法是什么?

时间:2021-07-12 17:40:18

I'm trying to create a queue and a callback that triggers when a message is queued, but I can't get the callback to trigger. What am I doing wrong?

我正在尝试创建一个队列和一个在消息排队时触发的回调,但是我无法触发回调。我究竟做错了什么?

I have a trigger that enqueues a message, and I can see it on the queue message table, and I can dequeue it by hand and process it, I just can't get the callback to fire on enqueue.

我有一个将消息排入队列的触发器,我可以在队列消息表上看到它,我可以手动将它出列并处理它,我只是无法在enqueue上触发回调。

BEGIN    
DBMS_AQADM.CREATE_QUEUE_TABLE (
  queue_table        => 'queue_message_table',
  queue_payload_type => 'queue_message_type',
  multiple_consumers => TRUE);

DBMS_AQADM.CREATE_QUEUE (
  queue_name  => 'message_queue',
  queue_table => 'queue_message_table');
DBMS_AQADM.START_QUEUE (queue_name => 'message_queue');
END;

CREATE OR REPLACE PROCEDURE queue_callback(
  context RAW, reginfo  SYS.AQ$_REG_INFO, descr SYS.AQ$_DESCRIPTOR, payload  RAW, payloadl NUMBER) AS

    queue_options       DBMS_AQ.DEQUEUE_OPTIONS_T;
    message_properties  DBMS_AQ.MESSAGE_PROPERTIES_T;
    my_message          queue_message_type;
    ret                 varchar2(200);
    message_id          RAW(16);
BEGIN
    DBMS_OUTPUT.PUT_LINE('Callback');
    queue_options.msgid := descr.msg_id;
    queue_options.consumer_name := descr.consumer_name;

    DBMS_AQ.DEQUEUE(
        queue_name => descr.queue_name,
        dequeue_options => queue_options,
        message_properties => message_properties,
        payload => my_message,
        msgid => message_id );
    ret := handle_message(my_message);
    commit;
END;

BEGIN
  DBMS_AQADM.ADD_SUBSCRIBER (queue_name => 'message_queue',
    subscriber => SYS.AQ$_AGENT('queue_subscriber', 'message_queue',NULL));
  DBMS_AQ.REGISTER (
    SYS.AQ$_REG_INFO_LIST(
      SYS.AQ$_REG_INFO(
        'MESSAGE_QUEUE:QUEUE_SUBSCRIBER',
        DBMS_AQ.NAMESPACE_AQ,
        'plsql://QUEUE_CALLBACK',
        HEXTORAW('FF')
      )
    ), 1
  );
END;

2 个解决方案

#1


At first glance, it appears you're neither starting the queue (dbms_aqadm.start_queue), neither are you enqueueing anything to it (dbms_aq.enqueue).

乍一看,您似乎既没有启动队列(dbms_aqadm.start_queue),也没有将任何内容排入队列(dbms_aq.enqueue)。

I'd recommend following this demo.

我建议按照这个演示。

#2


You need to be careful with the database version. some bugs has been reported about issues with Oracle Aq. In particular I've followed this link to built my own sample, executing the demo in a Oracle 11gR2 enterprise database. I was abled to enqueue, dequeue, purge the queue but the listener created with Dbms_Aq.Register didn't work. I ran the same example downloading a Oracle 11g R2 xe database and it worked.

您需要小心数据库版本。有关Oracle Aq问题的一些错误已被报道。特别是我按照这个链接构建了自己的示例,在Oracle 11gR2企业数据库中执行演示。我能够排队,出队,清除队列,但用Dbms_Aq.Register创建的监听器不起作用。我运行了下载Oracle 11g R2 xe数据库的相同示例,但它确实有效。

The same example was runned in a Oracle 10gR2 instance and it works perfectly.

在Oracle 10gR2实例中运行了相同的示例,它运行良好。

There are some things that you need to be careful on using aq:

使用aq时需要注意一些事项:

  • use the appropriate parameters adding the subscriber
  • 使用添加订户的适当参数

  • use the appropriate namespace registering the listener with Dbms_Aq.Register
  • 使用适当的命名空间向Dbms_Aq.Register注册侦听器

  • use the multiple consumers flag declaring the queue table
  • 使用多个consumer标志声明队列表

  • use the appropriate permissions to packages and to handle the queues
  • 对包使用适当的权限并处理队列

  • use the qualified name of the queues in some cases if it didn't works.
  • 如果它不起作用,在某些情况下使用队列的限定名称。

' First create the schema

'首先创建架构

connect / as sysdba
-- @?/rdbms/admin/dbmsaqad.sql --(install if you don't have aq installed yet)

-- create the user and permissions
create user aqadmin identified by aqadmin default tablespace users temporary tablespace temp;
GRANT create session TO aqadmin;
grant connect, resource to aqadmin;
GRANT aq_administrator_role TO aqadmin IDENTIFIED BY aqadmin;
GRANT execute ON dbms_aq TO aqadmin;
GRANT execute ON dbms_aqadm TO aqadmin;

Create the ddl objects

创建ddl对象

CREATE TABLE demo_queue_message_table
( message VARCHAR2(4000) );

Create the aq-objects

创建aq对象

create or replace type demo_queue_payload_type as object(message varchar2(4000)) ;
/

begin
  DBMS_AQADM.CREATE_QUEUE_TABLE (queue_table => 'demo_queue_table', queue_payload_type => 'demo_queue_payload_type',multiple_consumers => TRUE);
  DBMS_AQADM.CREATE_QUEUE (queue_name => 'demo_queue', queue_table => 'demo_queue_table');
  DBMS_AQADM.START_QUEUE('demo_queue');
end;
/

CREATE or replace PROCEDURE demo_queue_callback_procedure(
                 context  RAW,
                 reginfo  SYS.AQ$_REG_INFO,
                 descr    SYS.AQ$_DESCRIPTOR,
                 payload  RAW,
                 payloadl NUMBER
                 ) AS

   r_dequeue_options    DBMS_AQ.DEQUEUE_OPTIONS_T;
   r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
   v_message_handle     RAW(16);
   o_payload            demo_queue_payload_type;

BEGIN

   r_dequeue_options.msgid := descr.msg_id;
   r_dequeue_options.consumer_name := descr.consumer_name;

   DBMS_AQ.DEQUEUE(
      queue_name         => descr.queue_name,
      dequeue_options    => r_dequeue_options,
      message_properties => r_message_properties,
      payload            => o_payload,
      msgid              => v_message_handle
      );

   INSERT INTO demo_queue_message_table ( message )
   VALUES ( 'Message [' || o_payload.message || '] ' ||
            'dequeued at [' || TO_CHAR( SYSTIMESTAMP,
                                        'DD-MON-YYYY HH24:MI:SS.FF3' ) || ']' );
   COMMIT;

END;
/


BEGIN

   DBMS_AQADM.ADD_SUBSCRIBER (
      queue_name => 'demo_queue',
      subscriber => SYS.AQ$_AGENT(
                       'demo_queue_subscriber',
                       NULL,
                       NULL )
      );

    DBMS_AQ.REGISTER (
       SYS.AQ$_REG_INFO_LIST(
          SYS.AQ$_REG_INFO(
             'DEMO_QUEUE:DEMO_QUEUE_SUBSCRIBER',
             DBMS_AQ.NAMESPACE_AQ,
             'plsql://DEMO_QUEUE_CALLBACK_PROCEDURE',
             HEXTORAW('FF')
             )
          ),
       1
       );
END;
/

And finally test the queue

最后测试队列

DECLARE

   r_enqueue_options    DBMS_AQ.ENQUEUE_OPTIONS_T;
   r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
   v_message_handle     RAW(16);
   o_payload            demo_queue_payload_type;

BEGIN

   o_payload := demo_queue_payload_type(
                   TO_CHAR(SYSTIMESTAMP, 'DD-MON-YYYY HH24:MI:SS.FF3' )
                   );

   DBMS_AQ.ENQUEUE(
      queue_name         => 'demo_queue',
      enqueue_options    => r_enqueue_options,
      message_properties => r_message_properties,
      payload            => o_payload,
      msgid              => v_message_handle
      );

  COMMIT;

END;
/

#1


At first glance, it appears you're neither starting the queue (dbms_aqadm.start_queue), neither are you enqueueing anything to it (dbms_aq.enqueue).

乍一看,您似乎既没有启动队列(dbms_aqadm.start_queue),也没有将任何内容排入队列(dbms_aq.enqueue)。

I'd recommend following this demo.

我建议按照这个演示。

#2


You need to be careful with the database version. some bugs has been reported about issues with Oracle Aq. In particular I've followed this link to built my own sample, executing the demo in a Oracle 11gR2 enterprise database. I was abled to enqueue, dequeue, purge the queue but the listener created with Dbms_Aq.Register didn't work. I ran the same example downloading a Oracle 11g R2 xe database and it worked.

您需要小心数据库版本。有关Oracle Aq问题的一些错误已被报道。特别是我按照这个链接构建了自己的示例,在Oracle 11gR2企业数据库中执行演示。我能够排队,出队,清除队列,但用Dbms_Aq.Register创建的监听器不起作用。我运行了下载Oracle 11g R2 xe数据库的相同示例,但它确实有效。

The same example was runned in a Oracle 10gR2 instance and it works perfectly.

在Oracle 10gR2实例中运行了相同的示例,它运行良好。

There are some things that you need to be careful on using aq:

使用aq时需要注意一些事项:

  • use the appropriate parameters adding the subscriber
  • 使用添加订户的适当参数

  • use the appropriate namespace registering the listener with Dbms_Aq.Register
  • 使用适当的命名空间向Dbms_Aq.Register注册侦听器

  • use the multiple consumers flag declaring the queue table
  • 使用多个consumer标志声明队列表

  • use the appropriate permissions to packages and to handle the queues
  • 对包使用适当的权限并处理队列

  • use the qualified name of the queues in some cases if it didn't works.
  • 如果它不起作用,在某些情况下使用队列的限定名称。

' First create the schema

'首先创建架构

connect / as sysdba
-- @?/rdbms/admin/dbmsaqad.sql --(install if you don't have aq installed yet)

-- create the user and permissions
create user aqadmin identified by aqadmin default tablespace users temporary tablespace temp;
GRANT create session TO aqadmin;
grant connect, resource to aqadmin;
GRANT aq_administrator_role TO aqadmin IDENTIFIED BY aqadmin;
GRANT execute ON dbms_aq TO aqadmin;
GRANT execute ON dbms_aqadm TO aqadmin;

Create the ddl objects

创建ddl对象

CREATE TABLE demo_queue_message_table
( message VARCHAR2(4000) );

Create the aq-objects

创建aq对象

create or replace type demo_queue_payload_type as object(message varchar2(4000)) ;
/

begin
  DBMS_AQADM.CREATE_QUEUE_TABLE (queue_table => 'demo_queue_table', queue_payload_type => 'demo_queue_payload_type',multiple_consumers => TRUE);
  DBMS_AQADM.CREATE_QUEUE (queue_name => 'demo_queue', queue_table => 'demo_queue_table');
  DBMS_AQADM.START_QUEUE('demo_queue');
end;
/

CREATE or replace PROCEDURE demo_queue_callback_procedure(
                 context  RAW,
                 reginfo  SYS.AQ$_REG_INFO,
                 descr    SYS.AQ$_DESCRIPTOR,
                 payload  RAW,
                 payloadl NUMBER
                 ) AS

   r_dequeue_options    DBMS_AQ.DEQUEUE_OPTIONS_T;
   r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
   v_message_handle     RAW(16);
   o_payload            demo_queue_payload_type;

BEGIN

   r_dequeue_options.msgid := descr.msg_id;
   r_dequeue_options.consumer_name := descr.consumer_name;

   DBMS_AQ.DEQUEUE(
      queue_name         => descr.queue_name,
      dequeue_options    => r_dequeue_options,
      message_properties => r_message_properties,
      payload            => o_payload,
      msgid              => v_message_handle
      );

   INSERT INTO demo_queue_message_table ( message )
   VALUES ( 'Message [' || o_payload.message || '] ' ||
            'dequeued at [' || TO_CHAR( SYSTIMESTAMP,
                                        'DD-MON-YYYY HH24:MI:SS.FF3' ) || ']' );
   COMMIT;

END;
/


BEGIN

   DBMS_AQADM.ADD_SUBSCRIBER (
      queue_name => 'demo_queue',
      subscriber => SYS.AQ$_AGENT(
                       'demo_queue_subscriber',
                       NULL,
                       NULL )
      );

    DBMS_AQ.REGISTER (
       SYS.AQ$_REG_INFO_LIST(
          SYS.AQ$_REG_INFO(
             'DEMO_QUEUE:DEMO_QUEUE_SUBSCRIBER',
             DBMS_AQ.NAMESPACE_AQ,
             'plsql://DEMO_QUEUE_CALLBACK_PROCEDURE',
             HEXTORAW('FF')
             )
          ),
       1
       );
END;
/

And finally test the queue

最后测试队列

DECLARE

   r_enqueue_options    DBMS_AQ.ENQUEUE_OPTIONS_T;
   r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
   v_message_handle     RAW(16);
   o_payload            demo_queue_payload_type;

BEGIN

   o_payload := demo_queue_payload_type(
                   TO_CHAR(SYSTIMESTAMP, 'DD-MON-YYYY HH24:MI:SS.FF3' )
                   );

   DBMS_AQ.ENQUEUE(
      queue_name         => 'demo_queue',
      enqueue_options    => r_enqueue_options,
      message_properties => r_message_properties,
      payload            => o_payload,
      msgid              => v_message_handle
      );

  COMMIT;

END;
/