如何在分布式环境中处理PostgreSQL触发器

时间:2022-11-25 12:08:07

We're in the process of implementing PostgreSQL Triggers to monitor for inserts/updates/deletes on several tables so that another that app that is listening for these events can keep our relational database in sync with our full-text search database.

我们正在实施PostgreSQL触发器以监视多个表上的插入/更新/删除,以便另一个正在侦听这些事件的应用程序可以使我们的关系数据库与我们的全文搜索数据库保持同步。

Here's what the trigger function looks like:

这是触发器功能的样子:

CREATE FUNCTION notification() RETURNS trigger AS $$
BEGIN
  PERFORM pg_notify('search', TG_TABLE_NAME || ',id,' || NEW.id);
  RETURN NULL;
END;
$$ LANGUAGE plpgsql;

And here's how we're adding the trigger to each table:

以下是我们如何为每个表添加触发器:

CREATE TRIGGER foo_trigger AFTER INSERT OR UPDATE or DELETE ON foo
FOR EACH ROW EXECUTE PROCEDURE notification();

And here is a very basic example of how we would have a node app (worker) listening for these trigger events:

这是一个非常基本的例子,说明我们如何让节点app(worker)监听这些触发事件:

var pg  = require('pg');

var connString = "postgres://user@localhost/foo_local";

pg.connect(connString, function(err, client, done) {

  client.on('notification', function(msg) {
    //get the added / updated / deleted record
    //sync it with the search database
  });

  var query = client.query('LISTEN search');
});

Here's my three part question:

这是我的三部分问题:

Part 1 Our app is load balanced across several instances. What happens when the node / worker app, which is also distributed, receives an event? Will all instances of the worker app that are listening receive the triggered event?

第1部分我们的应用程序跨多个实例负载平衡。当分布式节点/工作者应用程序收到事件时会发生什么?是否正在监听的工作者应用程序的所有实例都会收到触发事件?

If so, that's bad - we don't want all instances of the worker app to process every event because they'd all be doing the same work and that would negate the benefits of having multiple listeners to distribute the load. How do we mitigate this?

如果是这样,那很糟糕 - 我们不希望工作者应用程序的所有实例都处理每个事件,因为他们都在做相同的工作,这会否定让多个侦听器分配负载的好处。我们如何减轻这种影响?

Part 2 What happens if the worker receives a trigger event, but it is long running? Will PostgreSQL queue the events that have been triggered until the listeners receive them?

第2部分如果工作人员收到触发事件但会长时间运行会发生什么? PostgreSQL会对已触发的事件进行排队,直到侦听器收到它们为止?

Part 3 We've got about 5 tables that we want to fire triggers on INSERT / UPDATE / DELETE. We've got a lot of requests, so this would fire a lot of events in a short period of time. We need a worker to listen to these events and process the changed records so that it can send them along to the full-text search database. Is there a better way to architect this to handle the volume?

第3部分我们有大约5个表要在INSERT / UPDATE / DELETE上触发触发器。我们收到了很多请求,所以这会在很短的时间内引发很多事件。我们需要一个工作人员来监听这些事件并处理已更改的记录,以便它们可以将它们发送到全文搜索数据库。有没有更好的方法来构建这个来处理音量?

The other solution our team is considering is abandoning SQL Triggers and just using a message queuing system to shove messages in a data store (SQS or Redis) and then just have workers pick off messages from the queue. We want to avoid this route if we can as it adds more architecture to our platform; however, we're prepared to do it if it's our only option.

我们团队正在考虑的另一个解决方案是放弃SQL触发器,只使用消息排队系统来推送数据存储(SQS或Redis)中的消息,然后让工作人员从队列中挑选消息。我们希望避免这条路线,因为它可以为我们的平台增加更多架构;但是,如果这是我们唯一的选择,我们准备这么做。

Your thoughts would be much appreciated.

你的想法将不胜感激。

1 个解决方案

#1


First of all, in your trigger function, you might want to make life easier for your listeners, by providing more specific details of exactly what changed (e.g. in an UPDATE).

首先,在您的触发器功能中,您可能希望通过提供确切更改内容的更具体细节(例如,在更新中),使您的听众的生活更轻松。

You could do something like this:

你可以这样做:

CREATE OR REPLACE FUNCTION notification() RETURNS trigger AS $$
DECLARE
  id bigint;
BEGIN
  IF TG_OP = 'INSERT' OR TG_OP = 'UPDATE' THEN
    id = NEW.id;
  ELSE
    id = OLD.id;
  END IF;

  IF TG_OP = 'UPDATE' THEN
    PERFORM pg_notify('table_update', json_build_object('schema', TG_TABLE_SCHEMA, 'table', TG_TABLE_NAME, 'id', id, 'type', TG_OP, 'changes', hstore_to_json(hstore(NEW) - hstore(OLD)))::text);
    RETURN NEW;
  END IF;

  IF TG_OP = 'INSERT' THEN
    PERFORM pg_notify('table_update', json_build_object('schema', TG_TABLE_SCHEMA, 'table', TG_TABLE_NAME, 'id', id, 'type', TG_OP, 'row', row_to_json(NEW))::text);
    RETURN NEW;
  END IF;

  IF TG_OP = 'DELETE' THEN
    PERFORM pg_notify('table_update', json_build_object('schema', TG_TABLE_SCHEMA, 'table', TG_TABLE_NAME, 'id', id, 'type', TG_OP, 'row', row_to_json(OLD))::text);
    RETURN OLD;
  END IF;

END;
$$ LANGUAGE plpgsql;

Now for your questions, or at least: Part 1: I believe all the instances of the worker apps that are listening will receive the triggered event. This can be useful for pub/sub style real-time notification to multiple listeners. For your use case, it sounds like you would need to add some kind of queue package on top of the basic PostgreSQL LISTEN/NOTIFY, such as queue_classic (for Ruby) or perhaps pg-jobs for node.js.

现在提出您的问题,或者至少:第1部分:我相信正在收听的工作者应用程序的所有实例都将收到触发事件。这对于多个侦听器的pub / sub样式实时通知非常有用。对于您的用例,听起来您需要在基本的PostgreSQL LISTEN / NOTIFY之上添加某种队列包,例如queue_classic(对于Ruby)或者可能是node.js的pg-jobs。

Anyway, since it's several months since you asked this, I'm wondering what path you took in the end and how it worked out? Can you share your experience and insights?

无论如何,既然你问了几个月,我想知道你到底采取了什么样的路径以及它是如何制定出来的?你能分享一下你的经验和见解吗?

#1


First of all, in your trigger function, you might want to make life easier for your listeners, by providing more specific details of exactly what changed (e.g. in an UPDATE).

首先,在您的触发器功能中,您可能希望通过提供确切更改内容的更具体细节(例如,在更新中),使您的听众的生活更轻松。

You could do something like this:

你可以这样做:

CREATE OR REPLACE FUNCTION notification() RETURNS trigger AS $$
DECLARE
  id bigint;
BEGIN
  IF TG_OP = 'INSERT' OR TG_OP = 'UPDATE' THEN
    id = NEW.id;
  ELSE
    id = OLD.id;
  END IF;

  IF TG_OP = 'UPDATE' THEN
    PERFORM pg_notify('table_update', json_build_object('schema', TG_TABLE_SCHEMA, 'table', TG_TABLE_NAME, 'id', id, 'type', TG_OP, 'changes', hstore_to_json(hstore(NEW) - hstore(OLD)))::text);
    RETURN NEW;
  END IF;

  IF TG_OP = 'INSERT' THEN
    PERFORM pg_notify('table_update', json_build_object('schema', TG_TABLE_SCHEMA, 'table', TG_TABLE_NAME, 'id', id, 'type', TG_OP, 'row', row_to_json(NEW))::text);
    RETURN NEW;
  END IF;

  IF TG_OP = 'DELETE' THEN
    PERFORM pg_notify('table_update', json_build_object('schema', TG_TABLE_SCHEMA, 'table', TG_TABLE_NAME, 'id', id, 'type', TG_OP, 'row', row_to_json(OLD))::text);
    RETURN OLD;
  END IF;

END;
$$ LANGUAGE plpgsql;

Now for your questions, or at least: Part 1: I believe all the instances of the worker apps that are listening will receive the triggered event. This can be useful for pub/sub style real-time notification to multiple listeners. For your use case, it sounds like you would need to add some kind of queue package on top of the basic PostgreSQL LISTEN/NOTIFY, such as queue_classic (for Ruby) or perhaps pg-jobs for node.js.

现在提出您的问题,或者至少:第1部分:我相信正在收听的工作者应用程序的所有实例都将收到触发事件。这对于多个侦听器的pub / sub样式实时通知非常有用。对于您的用例,听起来您需要在基本的PostgreSQL LISTEN / NOTIFY之上添加某种队列包,例如queue_classic(对于Ruby)或者可能是node.js的pg-jobs。

Anyway, since it's several months since you asked this, I'm wondering what path you took in the end and how it worked out? Can you share your experience and insights?

无论如何,既然你问了几个月,我想知道你到底采取了什么样的路径以及它是如何制定出来的?你能分享一下你的经验和见解吗?