【python基础】7.生产者消费者

发布时间 2024-01-04 11:19:48作者: asdio

生产者消费者模型

线程

  • 线程的创建

    • 继承Thread类,重写run方法
    • 创建Thread类的实例,传入一个可调用的类实例
  • 线程的启动

    • 调用start方法
    • 调用run方法
  • 线程的结束

    • 线程执行完毕
    • 线程抛出未捕获的异常
    • 线程调用stop方法
import threading, time, random

class MyThread(threading.Thread):
    def __init__(self, name):
        threading.Thread.__init__(self)
        self.name=name

    def run(self):
        print('线程{0}开始执行'.format(self.name))
        time.sleep(random.randint(1,5))
        print('线程{0}执行完毕'.format(self.name))


print('线程的创建和启动:')
t1=MyThread('t1')
t2=MyThread('t2')
t1.start()
t2.start()
t1.join()
t2.join()
print('主线程执行完毕')

线程同步

线程同步的目的是为了防止多个线程访问同一资源时,对资源造成破坏。

  • 互斥锁

    • 互斥锁是一种独占锁,同一时刻只有一个线程可以访问共享资源

    • 互斥锁的使用

      • 创建锁对象
      • 调用acquire方法获取锁
      • 调用release方法释放锁
import threading, time, random

class MyThread(threading.Thread):
    def __init__(self, name, lock):
        threading.Thread.__init__(self)
        self.name=name
        self.lock=lock

    def run(self):
        self.lock.acquire()
        print('线程{0}开始执行'.format(self.name))
        time.sleep(random.randint(1,5))
        print('线程{0}执行完毕'.format(self.name))
        self.lock.release()

  • 信号量
    • 信号量是一种计数器,用来控制多个线程对共享资源的访问

    • 信号量的使用

      • 创建信号量对象
      • 调用acquire方法获取信号量
      • 调用release方法释放信号量
import threading, time, random

class MyThread(threading.Thread):
    def __init__(self, name, sem):
        threading.Thread.__init__(self)
        self.name=name
        self.sem=sem

    def run(self):
        self.sem.acquire()
        print('线程{0}开始执行'.format(self.name))
        time.sleep(random.randint(1,5))
        print('线程{0}执行完毕'.format(self.name))
        self.sem.release()

  • 事件
    • 事件是一种线程间通信方式,用来实现线程同步

    • 事件的使用

      • 创建事件对象
      • 调用wait方法等待事件
      • 调用set方法设置事件
      • 调用clear方法清除事件

import threading, time, random

class MyThread(threading.Thread):
    def __init__(self, name, event):
        threading.Thread.__init__(self)
        self.name=name
        self.event=event

    def run(self):
        print('线程{0}开始等待事件'.format(self.name))
        self.event.wait()
        print('线程{0}收到事件'.format(self.name))
        print('线程{0}开始执行'.format(self.name))
        time.sleep(random.randint(1,5))
        print('线程{0}执行完毕'.format(self.name))

Print('线程同步:')
event=threading.Event()
t1=MyThread('t1', event)
t2=MyThread('t2', event)
t1.start()
t2.start()

生产者消费者模型

import threading, time, random
class Container():                      #基于同步和通信
    lock=threading.Lock()
    def __init__(self,amount):                   #构造函数
        self.amount=amount

    def put(self, value):                 #生产函数
        self.lock.acquire()
        self.amount+=value
        t = threading.current_thread()
        print('{0}生产{1},当前剩余{2}'.format(t.name, value, self.amount))
        self.lock.release()

    def get(self, value):                     #消费函数
        self.lock.acquire()
        if value<= self.amount:
            self.amount-=value
            t = threading.current_thread()
            print('{0}消费{1},当前剩余{2}'.format(t.name, value, self.amount))
        else:
            print('当前余量不足,无法消费')
        self.lock.release()

class Producer(threading.Thread):
    def __init__(self, container):
        threading.Thread.__init__(self)
        self.container=container
        self.interval=2

    def run(self):
        while True:
            time.sleep(random.choice(range(self.interval)))  # 随机睡眠interval秒
            self.container.put(random.randint(0,3))



class Consumer(threading.Thread):
    def __init__(self, container):
        threading.Thread.__init__(self)
        self.container=container
        self.interval = 2

    def run(self):
        while True:
            time.sleep(random.choice(range(self.interval)))  # 随机睡眠interval秒
            self.container.get(random.randint(0,10))


def test1():
    print('基本同步和通信的生产者消费者模型:')
    container = Container(0)        #创建容器
    Producer(container).start()      #创建消费者线程并启动
    Consumer(container).start()     #创建消费者线程并启动
if __name__=='__main__':
    test1()