在mysql中实现消息队列表的最佳方式是什么

时间:2021-04-24 07:37:55

It's probably the tenth time I'm implementing something like this, and I've never been 100% happy about solutions I came up with.

这可能是我第10次实现这样的东西了,我从来没有对我提出的解决方案100%满意过。

The reason using mysql table instead of a "proper" messaging system is attractive is primarily because most application already use some relational database for other stuff (which tends to be mysql for most of the stuff I've been doing), while very few applications use a messaging system. Also - relational databases have very strong ACID properties, while messaging systems often don't.

使用mysql表而不是“合适的”消息传递系统的原因主要是,大多数应用程序已经将一些关系数据库用于其他用途(在我所做的大多数工作中,通常是mysql),而很少有应用程序使用消息传递系统。此外,关系数据库具有非常强的ACID属性,而消息传递系统通常没有。

The first idea is to use:

第一个想法是:

create table jobs(
  id auto_increment not null primary key,
  message text not null,
  process_id varbinary(255) null default null,
  key jobs_key(process_id) 
);

And then enqueue looks like this:

然后队列看起来是这样的:

insert into jobs(message) values('blah blah');

And dequeue looks like this:

dequeue是这样的:

begin;
select * from jobs where process_id is null order by id asc limit 1;
update jobs set process_id = ? where id = ?; -- whatever i just got
commit;
-- return (id, message) to application, cleanup after done

Table and enqueue look nice, but dequeue kinda bothers me. How likely is it to rollback? Or to get blocked? What keys I should use to make it O(1)-ish?

表格和入队看起来不错,但是入队有点困扰我。它回滚的可能性有多大?还是被*?我应该用什么键使它接近O(1) ?

Or is there any better solution that what I'm doing?

或者有更好的解决方法吗?

6 个解决方案

#1


6  

I've built a few message queuing systems and I'm not certain what type of message you're referring to, but in the case of the dequeuing (is that a word?) I've done the same thing you've done. Your method looks simple, clean and solid. Not that my work is the best, but it's proven very effective for large-monitoring for many sites. (error logging, mass email marketing campaigns, social networking notices)

我已经构建了一些消息队列系统,我不确定您指的是哪种类型的消息,但是对于退出队列(这是一个词吗?)我做了和你一样的事。你的方法看起来简单、干净、可靠。并不是说我的工作是最好的,但它被证明对许多站点的大监视非常有效。(错误日志、大规模电子邮件营销活动、社交网络通告)

My vote: no worries!

我的投票:不用担心!

#2


24  

Your dequeue could be more concise. Rather than relying on the transaction rollback, you could do it in one atomic statement without an explicit transaction:

您的dequeue可能更简洁。不依赖事务回滚,您可以在一个原子语句中完成,而不需要显式事务:

UPDATE jobs SET process_id = ? WHERE process_id IS NULL ORDER BY ID ASC LIMIT 1;

Then you can pull jobs with (brackets [] mean optional, depending on your particulars):

然后你可以根据你的具体情况(括号[]表示可选):

SELECT * FROM jobs WHERE process_id = ? [ORDER BY ID LIMIT 1];

#3


6  

Brian Aker talked about a queue engine a while ago. There's been talk about a SELECT table FROM DELETE syntax, too.

Brian Aker刚才谈到了队列引擎。还讨论了从DELETE语法中选择表。

If you're not worried about throughput, you can always use SELECT GET_LOCK() as a mutex. For example:

如果您不担心吞吐量问题,您可以始终使用SELECT GET_LOCK()作为互斥对象。例如:

SELECT GET_LOCK('READQUEUE');
SELECT * FROM jobs;
DELETE FROM JOBS WHERE ID = ?;
SELECT RELEASE_LOCK('READQUEUE');

And if you want to get really fancy, wrap it in a stored procedure.

如果你想要更花哨,可以用存储过程来包装它。

#4


1  

I would suggest using Quartz.NET

我建议使用Quartz.NET

It has providers for SQL Server, Oracle, MySql, SQLite and Firebird.

它有SQL Server、Oracle、MySql、SQLite和Firebird的供应商。

#5


1  

This thread has design information that should be mappable.

这个线程具有应该可映射的设计信息。

To quote:

引用:

Here's what I've used successfully in the past:

以下是我过去成功使用过的东西:

MsgQueue table schema

MsgQueue表模式

MsgId identity -- NOT NULL
MsgTypeCode varchar(20) -- NOT NULL
SourceCode varchar(20) -- process inserting the message -- NULLable
State char(1) -- 'N'ew if queued, 'A'(ctive) if processing, 'C'ompleted, default 'N' -- NOT NULL
CreateTime datetime -- default GETDATE() -- NOT NULL
Msg varchar(255) -- NULLable

MsgId标识——不为空的MsgTypeCode varchar(20)——不为空的SourceCode varchar(20)——处理插入消息——可空状态char(1)——‘N’ew if queue

Your message types are what you'd expect - messages that conform to a contract between the process(es) inserting and the process(es) reading, structured with XML or your other choice of representation (JSON would be handy in some cases, for instance).

您的消息类型正是您所期望的——符合插入过程和读取过程之间的契约的消息,这些消息采用XML结构,或者您可以选择其他的表示形式(例如,JSON在某些情况下非常有用)。

Then 0-to-n processes can be inserting, and 0-to-n processes can be reading and processing the messages, Each reading process typically handles a single message type. Multiple instances of a process type can be running for load-balancing.

然后0到n个进程可以插入,0到n进程可以读取和处理消息,每个读取过程通常处理单个消息类型。进程类型的多个实例可以运行以实现负载平衡。

The reader pulls one message and changes the state to "A"ctive while it works on it. When it's done it changes the state to "C"omplete. It can delete the message or not depending on whether you want to keep the audit trail. Messages of State = 'N' are pulled in MsgType/Timestamp order, so there's an index on MsgType + State + CreateTime.

读者提取一条信息,并将状态更改为“A”。当它完成时,它将状态变为“C”omplete。它可以删除消息或不删除消息,这取决于您是否希望保留审计跟踪。状态= 'N'的消息被按MsgType/Timestamp顺序提取,因此MsgType + State + CreateTime上有一个索引。

Variations:
State for "E"rror.
Column for Reader process code.
Timestamps for state transitions.

变化:国家rror“E”。读者进程代码列。状态转换的时间戳。

This has provided a nice, scalable, visible, simple mechanism for doing a number of things like you are describing. If you have a basic understanding of databases, it's pretty foolproof and extensible. There's never been an issue with locks roll-backs etc. because of the atomic state transition transactions.

这提供了一种很好的、可伸缩的、可见的、简单的机制,用于完成您正在描述的许多事情。如果您对数据库有基本的了解,那么它是非常简单和可扩展的。由于原子状态的转换事务,所以不存在锁回滚等问题。

#6


1  

Here is a solution I used, working without the process_id of the current thread, or locking the table.

这里是我使用的一个解决方案,不使用当前线程的process_id,也不锁定表。

SELECT * from jobs ORDER BY ID ASC LIMIT 0,1;

Get the result in a $row array, and execute:

在$row数组中获取结果,并执行:

DELETE from jobs WHERE ID=$row['ID'];

Then get the affected rows(mysql_affected_rows). If there are affected rows, process the job in the $row array. If there are 0 affected rows, it means some other process is already processing the selected job. Repeat the above steps until there are no rows.

然后获取受影响的行(mysql_affected_rows)。如果存在受影响的行,请处理$row数组中的作业。如果有0个受影响的行,这意味着其他进程已经在处理所选的作业。重复以上步骤,直到没有行。

I've tested this with a 'jobs' table having 100k rows, and spawning 20 concurrent processes that do the above. No race conditions happened. You can modify the above queries to update a row with a processing flag, and delete the row after you actually processed it:

我已经用一个包含100k行的“jobs”表进行了测试,生成了20个执行上述操作的并发进程。没有混乱的情况发生。您可以修改上面的查询,以更新带有处理标志的行,并在实际处理后删除行:

while(time()-$startTime<$timeout)
{
SELECT * from jobs WHERE processing is NULL ORDER BY ID ASC LIMIT 0,1;
if (count($row)==0) break;
UPDATE jobs set processing=1 WHERE ID=$row['ID'];
if (mysql_affected_rows==0) continue;
//process your job here
DELETE from jobs WHERE ID=$row['ID'];
}

Needless to say, you should use a proper message queue (ActiveMQ, RabbitMQ, etc) for this kind of work. We had to resort to this solution though, as our host regularly breaks things when updating software, so the less stuff to break the better.

毫无疑问,您应该为此类工作使用适当的消息队列(ActiveMQ、RabbitMQ等)。我们不得不采用这种解决方案,因为我们的主机在更新软件时经常出错,所以越少出错越好。

#1


6  

I've built a few message queuing systems and I'm not certain what type of message you're referring to, but in the case of the dequeuing (is that a word?) I've done the same thing you've done. Your method looks simple, clean and solid. Not that my work is the best, but it's proven very effective for large-monitoring for many sites. (error logging, mass email marketing campaigns, social networking notices)

我已经构建了一些消息队列系统,我不确定您指的是哪种类型的消息,但是对于退出队列(这是一个词吗?)我做了和你一样的事。你的方法看起来简单、干净、可靠。并不是说我的工作是最好的,但它被证明对许多站点的大监视非常有效。(错误日志、大规模电子邮件营销活动、社交网络通告)

My vote: no worries!

我的投票:不用担心!

#2


24  

Your dequeue could be more concise. Rather than relying on the transaction rollback, you could do it in one atomic statement without an explicit transaction:

您的dequeue可能更简洁。不依赖事务回滚,您可以在一个原子语句中完成,而不需要显式事务:

UPDATE jobs SET process_id = ? WHERE process_id IS NULL ORDER BY ID ASC LIMIT 1;

Then you can pull jobs with (brackets [] mean optional, depending on your particulars):

然后你可以根据你的具体情况(括号[]表示可选):

SELECT * FROM jobs WHERE process_id = ? [ORDER BY ID LIMIT 1];

#3


6  

Brian Aker talked about a queue engine a while ago. There's been talk about a SELECT table FROM DELETE syntax, too.

Brian Aker刚才谈到了队列引擎。还讨论了从DELETE语法中选择表。

If you're not worried about throughput, you can always use SELECT GET_LOCK() as a mutex. For example:

如果您不担心吞吐量问题,您可以始终使用SELECT GET_LOCK()作为互斥对象。例如:

SELECT GET_LOCK('READQUEUE');
SELECT * FROM jobs;
DELETE FROM JOBS WHERE ID = ?;
SELECT RELEASE_LOCK('READQUEUE');

And if you want to get really fancy, wrap it in a stored procedure.

如果你想要更花哨,可以用存储过程来包装它。

#4


1  

I would suggest using Quartz.NET

我建议使用Quartz.NET

It has providers for SQL Server, Oracle, MySql, SQLite and Firebird.

它有SQL Server、Oracle、MySql、SQLite和Firebird的供应商。

#5


1  

This thread has design information that should be mappable.

这个线程具有应该可映射的设计信息。

To quote:

引用:

Here's what I've used successfully in the past:

以下是我过去成功使用过的东西:

MsgQueue table schema

MsgQueue表模式

MsgId identity -- NOT NULL
MsgTypeCode varchar(20) -- NOT NULL
SourceCode varchar(20) -- process inserting the message -- NULLable
State char(1) -- 'N'ew if queued, 'A'(ctive) if processing, 'C'ompleted, default 'N' -- NOT NULL
CreateTime datetime -- default GETDATE() -- NOT NULL
Msg varchar(255) -- NULLable

MsgId标识——不为空的MsgTypeCode varchar(20)——不为空的SourceCode varchar(20)——处理插入消息——可空状态char(1)——‘N’ew if queue

Your message types are what you'd expect - messages that conform to a contract between the process(es) inserting and the process(es) reading, structured with XML or your other choice of representation (JSON would be handy in some cases, for instance).

您的消息类型正是您所期望的——符合插入过程和读取过程之间的契约的消息,这些消息采用XML结构,或者您可以选择其他的表示形式(例如,JSON在某些情况下非常有用)。

Then 0-to-n processes can be inserting, and 0-to-n processes can be reading and processing the messages, Each reading process typically handles a single message type. Multiple instances of a process type can be running for load-balancing.

然后0到n个进程可以插入,0到n进程可以读取和处理消息,每个读取过程通常处理单个消息类型。进程类型的多个实例可以运行以实现负载平衡。

The reader pulls one message and changes the state to "A"ctive while it works on it. When it's done it changes the state to "C"omplete. It can delete the message or not depending on whether you want to keep the audit trail. Messages of State = 'N' are pulled in MsgType/Timestamp order, so there's an index on MsgType + State + CreateTime.

读者提取一条信息,并将状态更改为“A”。当它完成时,它将状态变为“C”omplete。它可以删除消息或不删除消息,这取决于您是否希望保留审计跟踪。状态= 'N'的消息被按MsgType/Timestamp顺序提取,因此MsgType + State + CreateTime上有一个索引。

Variations:
State for "E"rror.
Column for Reader process code.
Timestamps for state transitions.

变化:国家rror“E”。读者进程代码列。状态转换的时间戳。

This has provided a nice, scalable, visible, simple mechanism for doing a number of things like you are describing. If you have a basic understanding of databases, it's pretty foolproof and extensible. There's never been an issue with locks roll-backs etc. because of the atomic state transition transactions.

这提供了一种很好的、可伸缩的、可见的、简单的机制,用于完成您正在描述的许多事情。如果您对数据库有基本的了解,那么它是非常简单和可扩展的。由于原子状态的转换事务,所以不存在锁回滚等问题。

#6


1  

Here is a solution I used, working without the process_id of the current thread, or locking the table.

这里是我使用的一个解决方案,不使用当前线程的process_id,也不锁定表。

SELECT * from jobs ORDER BY ID ASC LIMIT 0,1;

Get the result in a $row array, and execute:

在$row数组中获取结果,并执行:

DELETE from jobs WHERE ID=$row['ID'];

Then get the affected rows(mysql_affected_rows). If there are affected rows, process the job in the $row array. If there are 0 affected rows, it means some other process is already processing the selected job. Repeat the above steps until there are no rows.

然后获取受影响的行(mysql_affected_rows)。如果存在受影响的行,请处理$row数组中的作业。如果有0个受影响的行,这意味着其他进程已经在处理所选的作业。重复以上步骤,直到没有行。

I've tested this with a 'jobs' table having 100k rows, and spawning 20 concurrent processes that do the above. No race conditions happened. You can modify the above queries to update a row with a processing flag, and delete the row after you actually processed it:

我已经用一个包含100k行的“jobs”表进行了测试,生成了20个执行上述操作的并发进程。没有混乱的情况发生。您可以修改上面的查询,以更新带有处理标志的行,并在实际处理后删除行:

while(time()-$startTime<$timeout)
{
SELECT * from jobs WHERE processing is NULL ORDER BY ID ASC LIMIT 0,1;
if (count($row)==0) break;
UPDATE jobs set processing=1 WHERE ID=$row['ID'];
if (mysql_affected_rows==0) continue;
//process your job here
DELETE from jobs WHERE ID=$row['ID'];
}

Needless to say, you should use a proper message queue (ActiveMQ, RabbitMQ, etc) for this kind of work. We had to resort to this solution though, as our host regularly breaks things when updating software, so the less stuff to break the better.

毫无疑问,您应该为此类工作使用适当的消息队列(ActiveMQ、RabbitMQ等)。我们不得不采用这种解决方案,因为我们的主机在更新软件时经常出错,所以越少出错越好。