使用MySQL表作为Python中的线程队列

时间:2022-04-06 20:43:33

I have a DB with a queue table, new entries are inserted continuously in the queue.

我有一个带有队列表的DB,新的条目不断地插入到队列中。

I want a Python script to execute the queue as fast as possible, and I think I need some threaded code to do so, running like a daemon.

我想要一个Python脚本以尽可能快的速度执行队列,并且我认为我需要一些线程代码来执行队列,就像一个守护进程一样运行。

But I can't figure out how to use the DB as the queue.

但是我不知道如何使用DB作为队列。

I am looking at this example:

我正在看这个例子:

import MySQLdb
from Queue import Queue
from threading import Thread

def do_stuff(q):
    while True:
        print q.get()
        q.task_done()

q = Queue(maxsize=0)
num_threads = 10

for i in range(num_threads):
    worker = Thread(target=do_stuff, args=(q,))
    worker.setDaemon(True)
    worker.start()

// TODO:  Use the DB
db = MySQLdb.connect(...)
cursor = db.cursor()
q = cursor.execute("SELECT * FROM queue")

for x in range(100):
    q.put(x)
q.join()

2 个解决方案

#1


3  

2 quick points :

2快速点:

  1. Assuming you are using cPython, The GIL will effectively render threading useless, allowing only 1 thread through the interpreter at one time. Couple of workarounds are :

    假设您使用的是cPython, GIL将有效地使线程无效,一次只允许1个线程通过解释器。有两种解决方法:

    • The Gevent library [source]

      Gevent库[源]

      gevent is a coroutine-based Python networking library that uses greenlet to provide a high-level synchronous API on top of the libev event loop.

      gevent是一个基于关联的Python网络库,它使用greenlet在libev事件循环之上提供高级同步API。

    • The multiprocessing module, you can spawn multiple processes - this is true concurrency in python.

      多处理模块,您可以产生多个进程——这是python中真正的并发。

    • The concurrent.futures module - new in python 3, port available for python 2. [source]

      并发。期货模块——python 3中的新模块,适用于python 2的端口。(来源)

      This is a new high-level library that operates only at a “job” level, which means that you no longer have to fuss with
      synchronization, or managing threads or processes. you just specify a thread or process pool with a certain number of “workers,” submit
      jobs, and collate the results. It’s new in Python 3.2, but a port for Python 2.6+ is available at http://code.google.com/p/pythonfutures.

      这是一个新的高级库,只在“作业”级别上操作,这意味着您不必再为同步、或管理线程或进程而烦恼。您只需指定具有一定数量的“worker”的线程或进程池,提交作业,并对结果进行排序。它在Python 3.2中是新的,但是可以在http://code.google.com/p/pythonfutures获得Python 2.6+的端口。

You can use the SSDictCursor() of MySQLdb and do a fetchone().This is a streaming cursor and you can run this in an infinite while() loop to resemble a queue:

您可以使用MySQLdb的SSDictCursor()并执行fetchone()。这是一个流光标,您可以在无限while()循环中运行它,以类似于队列:

cur = MySQLdb.cursors.SSDictCursor()

cur.execute(query)

while True:

row = cursor.fetchone()

if not row : break # (or sleep()!)

else: # other
  1. Having said all that, I would suggest you look at implementing tools like celery or mongodb to emulate queues and workers. Relational databases are just not cut out for that kind of a job and suffer unnecessary fragmentation. Here's a great source if you want to know more about fragmentation in mysql.
  2. 话虽如此,我还是建议您考虑使用芹菜或mongodb之类的实现工具来模拟队列和员工。关系数据库不适合这种工作,并且会遭受不必要的碎片化。如果你想了解更多关于mysql的分段,这里有一个很好的资源。

#2


1  

I am not sure if its the best solution but I think of a structure of a main-thread which reads the db and fill the Queue. Make sure to avoid doublets. Maybe by using primary key of increasing numbers would be easy to check.

我不确定这是否是最好的解决方案,但我想到了一个主线程结构,它读取db并填充队列。一定要避免紧身裤。也许用主键增加数字会很容易检查。

The Worker-Structure is nice, but like mentioned in comments: the GIL will avoid any boost. But you could use multiprocessing if your "do_stuff" is independent from the script himself (f.e. the tasks are pictures and the "do_stuff" is "rotate ervery picture 90°"). Afaik it doesn't suffer from GIL

工人结构很好,但是正如评论中提到的:GIL将避免任何提升。但是你可以使用多处理如果你“do_stuff”是独立于自己的脚本(远东的任务是图片和“do_stuff”是“ervery图片旋转90°”)。但它不受吉尔的影响

https://docs.python.org/2/library/subprocess.html get you some informations about that.

https://docs.python.org/2/library/subprocess.html为您提供相关信息。

PS: English isn't my native language.

PS:英语不是我的母语。

#1


3  

2 quick points :

2快速点:

  1. Assuming you are using cPython, The GIL will effectively render threading useless, allowing only 1 thread through the interpreter at one time. Couple of workarounds are :

    假设您使用的是cPython, GIL将有效地使线程无效,一次只允许1个线程通过解释器。有两种解决方法:

    • The Gevent library [source]

      Gevent库[源]

      gevent is a coroutine-based Python networking library that uses greenlet to provide a high-level synchronous API on top of the libev event loop.

      gevent是一个基于关联的Python网络库,它使用greenlet在libev事件循环之上提供高级同步API。

    • The multiprocessing module, you can spawn multiple processes - this is true concurrency in python.

      多处理模块,您可以产生多个进程——这是python中真正的并发。

    • The concurrent.futures module - new in python 3, port available for python 2. [source]

      并发。期货模块——python 3中的新模块,适用于python 2的端口。(来源)

      This is a new high-level library that operates only at a “job” level, which means that you no longer have to fuss with
      synchronization, or managing threads or processes. you just specify a thread or process pool with a certain number of “workers,” submit
      jobs, and collate the results. It’s new in Python 3.2, but a port for Python 2.6+ is available at http://code.google.com/p/pythonfutures.

      这是一个新的高级库,只在“作业”级别上操作,这意味着您不必再为同步、或管理线程或进程而烦恼。您只需指定具有一定数量的“worker”的线程或进程池,提交作业,并对结果进行排序。它在Python 3.2中是新的,但是可以在http://code.google.com/p/pythonfutures获得Python 2.6+的端口。

You can use the SSDictCursor() of MySQLdb and do a fetchone().This is a streaming cursor and you can run this in an infinite while() loop to resemble a queue:

您可以使用MySQLdb的SSDictCursor()并执行fetchone()。这是一个流光标,您可以在无限while()循环中运行它,以类似于队列:

cur = MySQLdb.cursors.SSDictCursor()

cur.execute(query)

while True:

row = cursor.fetchone()

if not row : break # (or sleep()!)

else: # other
  1. Having said all that, I would suggest you look at implementing tools like celery or mongodb to emulate queues and workers. Relational databases are just not cut out for that kind of a job and suffer unnecessary fragmentation. Here's a great source if you want to know more about fragmentation in mysql.
  2. 话虽如此,我还是建议您考虑使用芹菜或mongodb之类的实现工具来模拟队列和员工。关系数据库不适合这种工作,并且会遭受不必要的碎片化。如果你想了解更多关于mysql的分段,这里有一个很好的资源。

#2


1  

I am not sure if its the best solution but I think of a structure of a main-thread which reads the db and fill the Queue. Make sure to avoid doublets. Maybe by using primary key of increasing numbers would be easy to check.

我不确定这是否是最好的解决方案,但我想到了一个主线程结构,它读取db并填充队列。一定要避免紧身裤。也许用主键增加数字会很容易检查。

The Worker-Structure is nice, but like mentioned in comments: the GIL will avoid any boost. But you could use multiprocessing if your "do_stuff" is independent from the script himself (f.e. the tasks are pictures and the "do_stuff" is "rotate ervery picture 90°"). Afaik it doesn't suffer from GIL

工人结构很好,但是正如评论中提到的:GIL将避免任何提升。但是你可以使用多处理如果你“do_stuff”是独立于自己的脚本(远东的任务是图片和“do_stuff”是“ervery图片旋转90°”)。但它不受吉尔的影响

https://docs.python.org/2/library/subprocess.html get you some informations about that.

https://docs.python.org/2/library/subprocess.html为您提供相关信息。

PS: English isn't my native language.

PS:英语不是我的母语。