多线程理解--互斥和同步

时间:2022-10-31 23:36:30

最近做了一个数据割接的项目,简单来说就是把数据从一个数据库迁移到另一个数据库,中间还有一些数据计算等。

背景:在不停机的情况下做割接,数据量千万级。
要在不停机的情况下做割接,则需要把做割接的过程中的数据变化想办法记录下来(用时间戳或在数据表上加触发器(trigger)),根据实际情况我们采用加trigger的方式。
什么是加trigger?
所谓trigger就是安装在某张表上的触发器,当这张表上有增删改时就会自动触发这个触发器,然后把增删改的数据记录到一个临时表中,方便后面对这些数据进行访问和处理。
这么做的目的是记录下需要进行割接的数据表的变化(insert/update/delete),将这些变化的记录存储到一张临时表,并且严格按照这些数据变化的顺序进行记录。

那么整个项目就可以分为两个部分:迁移大块的数据(数据块B)和迁移中间由触发器记录下来的数据(数据块D)。
将这两种数据分开处理是因为,既然是在不停机的情况下做割接,那么原始数据表里的数据就是在不断变化的,我们要把每一条数据都迁移完,且必须保证数据的完整性和一致性,对于数据块B,我们总有一秒会把它搬完(就是个用时长短的问题),而对于数据块D来说,我们想把它搬完就必须保证搬的速度比它增长的速度快,也就是要使得这部分数据量最终要达到一个收敛的状态。
要想达到搬数据块B用时短和搬数据块D最终收敛的效果,可以采用多线程处理这两个问题(单线程也可以搞定,那就割接几天而已嘛,并且线上的流量如果比你程序跑的快,那数据块D你就永远也搬不完了,这个太悲剧了= =!)。

言归正传,讲讲这个项目中用到的多线程吧。

这里涉及到的计算机操作有:
- I/O操作(数据库读写)
- CPU运算(数据计算和处理)

对于计算机来说,I/O操作是很慢的,而CPU运算是很快的,所以我们要使程序跑得快,就要最大限度地利用计算机的CPU,不要让I/O操作耽误了CPU运算。
所以我们把数据库读写和数据计算用不同的线程去处理:一个线程从从数据表读数据,一个线程处理这些数据,还有一个线程把处理完的数据写到另一个数据表。这里就需要用到经典的生产者和消费者模型,读-处理(生产者是读线程,消费者是处理线程),处理-写(生产者是处理线程,消费者是写线程)。对于读-处理的过程,读线程把数据读出来,依次扔到一个共享队列A,处理线程从这个共享队列A拿数据出来处理;对于处理-写的过程,处理线程处理完数据之后把它们扔到另一个共享队列B,写线程从这个共享队列拿数据出来写到另一个数据库。

有了共享队列,读线程&处理线程&写线程就可以专注于干自己的事,处理数据不需要等到数据被读出来,直接从队列A拿就可以了,写数据也不需要等到数据被处理完,直接从队列B拿就可以了,没有了I/O和CPU的相互等待,程序干活的速度自然就加快了。

多线程自然就需要考虑同步互斥问题,这里的同步和互斥是通过条件变量来实现的。
对共享队列A和B的访问需要进行互斥,也就是读线程在往队列A写数据的时候,处理线程就不应该在访问队列A,等读线程写完了再通知其他线程,这个时候处理线程才可能访问队列A,对队列B的访问同理;往队列中写数据(生产)和从队列中拿数据(消费)需要进行同步,对于写线程来说,只有当队列A没写满的时候才能继续往里面写,对于处理线程来说,只有当队列A中还有数据的时候才能从里面拿数据。
写线程想往队列A里面写数据(生产),首先需要能够访问队列A,因此要acquire一个条件变量,如果这个条件变量被其他正在访问队列A的线程占用没有释放,那么写线程就会阻塞在这里,直到有线程释放这个条件变量才再次尝试去获取;当acquire到这个条件变量后,还要先判断队列A是否满了,如果满了就应该wait(阻塞)并释放条件变量,等待处理线程从队列中把数据拿走(消费),如果还没满,就往里面写入数据,然后通知其他线程并释放此条件变量;以上写线程要acquire一个条件变量体现了互斥的思想,而获取到条件变量之后进行队列判满则体现了同步的思想。处理线程和写线程的同步与互斥同上。

下面附上经典的生产者和消费者模型的代码(python实现)。假设有一群生产者(Producer)和一群消费者(Consumer)通过一个市场(count)来交互产品,对于生产者来说,如果市场上剩余的产品少于1000个,就生产100个产品放到市场上;对于消费者来说,如果市场上剩余的产品多于100个,就消费3个产品。

import threading
import time   

class Producer(threading.Thread):
    def run(self):
        global count
        while True:
            if con.acquire():      # 对全局变量count的访问要互斥(互斥访问)
                if count > 1000:     # 对全局变量count的读写要同步(同步读写)
                    con.wait()
                else:
                    count = count+100
                    msg = self.name+' produce 100, count=' + str(count)
                    print msg
                    con.notify()
                con.release()
                time.sleep(1)

class Consumer(threading.Thread):
    def run(self):
        global count
        while True:
            if con.acquire():    # 对全局变量count的访问要互斥
                if count < 100:    # 对全局变量count的读写要同步
                    con.wait()
                else:
                    count = count-3
                    msg = self.name+' consume 3, count='+str(count)
                    print msg
                    con.notify()
                con.release()
                time.sleep(1)

count = 500
con = threading.Condition()

def test():
    for i in range(2):    # 2个生产者
        p = Producer()
        p.start()
    for i in range(5):    # 5个消费者
        c = Consumer()
        c.start()

if __name__ == '__main__':
    test()