多线程

发布时间 2023-09-27 13:50:54作者: donfag

一、什么是线程
线程(Thread)也叫轻量级进程,是操作系统能够进行运算调度的最小单位,它被包涵在进程之中,是进程中的实际运作单位。线程自己不拥有系统资源,只拥有一点儿在运行中必不可少的资源,但它可与同属一个进程的其它线程共享进程所拥有的全部资源。一个线程可以创建和撤消另一个线程,同一进程中的多个线程之间可以并发执行。
线程有就绪、阻塞、运行三种基本状态。
多进程主要用于计算密集型、多线程主要用于I/O密集型(输入输出),下在举例子说明:

1、计算密集型任务-多进程示例代码:

 1 from multiprocessing import Process
 2 import os,time
 3 
 4 #计算密集型任务:
 5 def work():
 6     res = 0
 7     for i in range(100000000):
 8         res *= i
 9 if __name__ == "__main__":
10     lst = []
11     print("本机为",os.cpu_count(),"核心CPU")
12     start = time.time()
13     for i in range(4):
14         p = Process(target=work)
15         lst.append(p)
16         p.start()
17     for p in lst:
18         p.join()
19     stop = time.time()
20     print("计算密集型任务,多进程耗时 %s" % (stop-start))

返回结果:
本机为 4 核心CPU
计算密集型任务,多进程耗时 14.821362972259521

2、计算密集型任务-多线程示例代码:

 1 from threading import Thread
 2 import os,time
 3 
 4 #计算密集型任务
 5 def work():
 6     res = 0
 7     for i in range(100000000):
 8         res *= i
 9 
10 if __name__ == "__main__":
11     lst = []
12     print("本机为",os.cpu_count(),"核心CPU")
13     start = time.time()
14     for i in range(4):
15         p = Thread(target=work) #多线程
16         lst.append(p)
17         p.start()
18     for p in lst:
19         p.join()
20     stop = time.time()
21     print("计算密集型任务,多线程耗时 %s" % (stop - start))

返回结果:
本机为 4 核心CPU
计算密集型任务,多线程耗时 29.50564193725586

3、I/O密集型任务-多进程示例代码:

 1 from multiprocessing import Process
 2 import os,time
 3 
 4 #I/O密集型任务
 5 def work():
 6     time.sleep(2)
 7     print("===>",file=open("tmp.txt","w"))
 8 
 9 if __name__ == "__main__":
10     lst = []
11     print("本机为",os.cpu_count(),"核心CPU")
12     start = time.time()
13     for i in range(400):
14         p = Process(target=work) #多进程
15         lst.append(p)
16         p.start()
17     for p in lst:
18         p.join()
19     stop = time.time()
20     print("I/O密集型任务,多进程耗时 %s" % (stop - start))

返回结果:
本机为 4 核心CPU
I/O密集型任务,多进程耗时 25.822606086730957

4、I/O密集型任务-多线程示例代码:

 1 from threading import Thread
 2 import os,time
 3 
 4 #I/O密集型任务
 5 def work():
 6     time.sleep(2)
 7     print("===>",file=open("tmp.txt","w"))
 8 
 9 if __name__ == "__main__":
10     lst = []
11     print("本机为",os.cpu_count(),"核心CPU")
12     start = time.time()
13     for i in range(400):
14         p = Thread(target=work) #多线程
15         lst.append(p)
16         p.start()
17     for p in lst:
18         p.join()
19     stop = time.time()
20     print("I/O密集型任务,多线程耗时 %s" % (stop - start))

返回结果;
本机为 4 核心CPU
I/O密集型任务,多线程耗时 2.1557388305664062

二、使用threading进行多线程操作有以下两种方法:
1、创建threading.Thread类的实例,调用其start()方法:
示例代码:

 1 import threading
 2 import time
 3 
 4 def task_thread(counter):
 5     print(f'线程名称:{threading.current_thread().name} 参数:{counter} 开始时间:{time.strftime("%Y-%m-%d %H:%M:%S")}')
 6     num = counter
 7     while num:
 8         time.sleep(3)
 9         num -= 1
10     print(f'线程名称:{threading.current_thread().name} 参数:{counter} 结束时间:{time.strftime("%Y-%m-%d %H:%M:%S")}')
11 
12 if __name__ == "__main__":
13     print(f'主线程开始时间:{time.strftime("%Y-%m-%d %H:%M:%S")}')
14     #初始化三个线程,传递不同的参数
15     t1 = threading.Thread(target=task_thread,args=(3,))
16     t2 = threading.Thread(target=task_thread, args=(2,))
17     t3 = threading.Thread(target=task_thread, args=(1,))
18     #开启三个进程
19     t1.start()
20     t2.start()
21     t3.start()
22     #等待运行结束
23     t1.join()
24     t2.join()
25     t3.join()
26     print(f'主线程结束时间:{time.strftime("%Y-%m-%d %H:%M:%S")}')

返回结果:
主线程开始时间:2023-09-24 16:14:35
线程名称:Thread-1 参数:3 开始时间:2023-09-24 16:14:35
线程名称:Thread-2 参数:2 开始时间:2023-09-24 16:14:35
线程名称:Thread-3 参数:1 开始时间:2023-09-24 16:14:35
线程名称:Thread-3 参数:1 结束时间:2023-09-24 16:14:38
线程名称:Thread-2 参数:2 结束时间:2023-09-24 16:14:41
线程名称:Thread-1 参数:3 结束时间:2023-09-24 16:14:44
主线程结束时间:2023-09-24 16:14:44

2、继承Thread类,在子类中重写run()和init()方法:
示例代码:

 1 import threading
 2 import time
 3 
 4 
 5 class MyThead(threading.Thread):
 6     def __init__(self,counter):
 7         super().__init__()
 8         self.counter = counter
 9     def run(self):
10         print(f'线程名称:{threading.current_thread().name} 参数:{self.counter} 开始时间:{time.strftime("%Y-%m-%d %H:%M:%S")}')
11         counter = self.counter
12         while counter:
13             time.sleep(3)
14             counter -= 1
15         print(f'线程名称:{threading.current_thread().name} 参数:{self.counter} 结束时间:{time.strftime("%Y-%m-%d %H:%M:%S")}')
16 
17 if __name__ == "__main__":
18     print(f'主线程开始时间:{time.strftime("%Y-%m-%d %H:%M:%S")}')
19     # 初始化三个线程,传递不同的参数
20     t1 = MyThead(3)
21     t2 = MyThead(2)
22     t3 = MyThead(1)
23     # 开启三个进程
24     t1.start()
25     t2.start()
26     t3.start()
27     # 等待运行结束
28     t1.join()
29     t2.join()
30     t3.join()
31     print(f'主线程结束时间:{time.strftime("%Y-%m-%d %H:%M:%S")}')

返回结果:
主线程开始时间:2023-09-24 16:34:27
线程名称:Thread-1 参数:3 开始时间:2023-09-24 16:34:27
线程名称:Thread-2 参数:2 开始时间:2023-09-24 16:34:27
线程名称:Thread-3 参数:1 开始时间:2023-09-24 16:34:27
线程名称:Thread-3 参数:1 结束时间:2023-09-24 16:34:30
线程名称:Thread-2 参数:2 结束时间:2023-09-24 16:34:33
线程名称:Thread-1 参数:3 结束时间:2023-09-24 16:34:36
主线程结束时间:2023-09-24 16:34:36

3、如果继承Thread类,想调用外部传入函数,请看下面示例:

 1 import threading
 2 import time
 3 
 4 def task_thread(counter):
 5     print(f'线程名称:{threading.current_thread().name} 参数:{counter} 开始时间:{time.strftime("%Y-%m-%d %H:%M:%S")}')
 6     num = counter
 7     while num:
 8         time.sleep(3)
 9         num -= 1
10     print(f'线程名称:{threading.current_thread().name} 参数:{counter} 结束时间:{time.strftime("%Y-%m-%d %H:%M:%S")}')
11 
12 class MyThead(threading.Thread):
13     def __init__(self,target,args):
14         super().__init__()
15         self.target = target
16         self.args = args
17     def run(self):
18         self.target(*self.args)
19 
20 if __name__ == "__main__":
21     print(f'主线程开始时间:{time.strftime("%Y-%m-%d %H:%M:%S")}')
22     # 初始化三个线程,传递不同的参数
23     t1 = MyThead(target=task_thread,args=(3,))
24     t2 = MyThead(target=task_thread,args=(2,))
25     t3 = MyThead(target=task_thread,args=(1,))
26     # 开启三个进程
27     t1.start()
28     t2.start()
29     t3.start()
30     # 等待运行结束
31     t1.join()
32     t2.join()
33     t3.join()
34     print(f'主线程结束时间:{time.strftime("%Y-%m-%d %H:%M:%S")}')

返回结果:
主线程开始时间:2023-09-24 19:35:26
线程名称:Thread-1 参数:3 开始时间:2023-09-24 19:35:26
线程名称:Thread-2 参数:2 开始时间:2023-09-24 19:35:26
线程名称:Thread-3 参数:1 开始时间:2023-09-24 19:35:26
线程名称:Thread-3 参数:1 结束时间:2023-09-24 19:35:29
线程名称:Thread-2 参数:2 结束时间:2023-09-24 19:35:32
线程名称:Thread-1 参数:3 结束时间:2023-09-24 19:35:35
主线程结束时间:2023-09-24 19:35:35

三、多线程同步之Lock(互斥锁)
如果多个线程共同对某个数据修改,刚可能出现不可预料的结果,这个时候就需要使用互斥锁来进行同步。例如,在三个线程对共同变量num进行100万次加减操作之后,其结果不为0.
示例代码-不加锁的情况:

 1 import threading
 2 import time
 3 num = 0
 4 def task_thread(n):
 5     global num
 6     for i in range(1000000):
 7         num = num + n
 8         num = num - n
 9 t1 = threading.Thread(target=task_thread,args=(6,))
10 t2 = threading.Thread(target=task_thread,args=(17,))
11 t3 = threading.Thread(target=task_thread,args=(11,))
12 t1.start();t2.start();t3.start()
13 t1.join();t2.join();t3.join()
14 print(num)

返回结果:每次都不一样

示例代码-加锁的情况:

 1 import threading
 2 import time
 3 num = 0
 4 lock = threading.Lock()
 5 def task_thread(n):
 6     global num
 7     #获取锁用于线程同步
 8     lock.acquire()
 9     for i in range(1000000):
10         num = num + n
11         num = num - n
12     #释放锁,开启下一个线程
13     lock.release()
14 t1 = threading.Thread(target=task_thread,args=(6,))
15 t2 = threading.Thread(target=task_thread,args=(17,))
16 t3 = threading.Thread(target=task_thread,args=(11,))
17 t1.start();t2.start();t3.start()
18 t1.join();t2.join();t3.join()
19 print(num)

返回结果:每次都是0

四、多线程同步之Semaphore(信号量):
信号量是同时允许一定数量的线程访问共享数据,比如银行柜台有五个窗口,允许同时有五个办理业务,后面的人只能等待,待柜台有人办理完业务后才可以进入相应的柜台办理。
示例代码:

 1 import threading
 2 import time
 3 #同时只有五个人办理业务
 4 semaphore = threading.BoundedSemaphore(5)
 5 #模拟银行业务办理
 6 def yewubanli(name):
 7     semaphore.acquire()
 8     time.sleep(3)
 9     print(f'{time.strftime("%Y-%m-%d %H:%M:%S")} {name} 正在办理业务')
10     semaphore.release()
11 
12 thread_lst = []
13 for i in range(12):
14     t = threading.Thread(target=yewubanli,args=(i,))
15     thread_lst.append(t)
16 for thread in thread_lst:
17     thread.start()
18 for thread in thread_lst:
19     thread.join()

返回结果:
2023-09-24 20:01:59 4 正在办理业务
2023-09-24 20:01:59 0 正在办理业务
2023-09-24 20:01:59 3 正在办理业务
2023-09-24 20:01:59 1 正在办理业务
2023-09-24 20:01:59 2 正在办理业务
2023-09-24 20:02:02 8 正在办理业务
2023-09-24 20:02:02 6 正在办理业务
2023-09-24 20:02:02 9 正在办理业务
2023-09-24 20:02:02 7 正在办理业务
2023-09-24 20:02:02 5 正在办理业务
2023-09-24 20:02:05 11 正在办理业务
2023-09-24 20:02:05 10 正在办理业务

五、多线程同步之Condition
条件对象Condition能让一个线程A停下来,等待其它线程B,线程B满足了某个条件后通知(notify)线程A继续运行.线程首先获取一个条件变量锁,如果条件不足,则该线程等待(wait)并释放条件变量锁;如果条件满足,就继续执行线程,执行完成后可以通知(notify)其它状态为wait的线程执行。其它处于wait状态的线程接到通知后会重新判读条件以确定是否继续执行。
示例代码:

 1 import threading
 2 
 3 class Boy(threading.Thread):
 4     def __init__(self,cond,name):
 5         super(Boy,self).__init__()
 6         self.cond = cond
 7         self.name = name
 8     def run(self):
 9         self.cond.acquire()
10         print(self.name +":嫁给我吧!")
11         self.cond.notify() #唤醒一个挂起的进程让Haimeme表态
12         self.cond.wait() #释放内部所占用的锁,同时线程被挂起,直到接收到通知被唤醒或超时,等待Haimeme回答
13         print(self.name + ":我手捧鲜花,送上戒指!")
14         self.cond.notify()
15         self.cond.wait()
16         print(self.name + ":Li太太,你的选择太明智了!")
17         self.cond.release()
18 
19 class Girl(threading.Thread):
20     def __init__(self,cond,name):
21         super(Girl,self).__init__()
22         self.cond = cond
23         self.name = name
24     def run(self):
25         self.cond.acquire()
26         self.cond.wait()
27         print(self.name +":没有情调,不够浪漫,不答应!")
28         self.cond.notify()
29         self.cond.wait()
30         print(self.name + ":好吧,答应你了!")
31         self.cond.notify()
32         self.cond.release()
33 
34 if __name__ == '__main__':
35     cond = threading.Condition()
36     boy = Boy(cond,'LiLei')
37     gril = Girl(cond,'Haimeimei')
38     gril.start()
39     boy.start()

程序实例化了一个Conditon对象cond,一个Boy对象,一个girl对象,程序先启动了girl线程,girl虽然获取到了条件变量锁cond,但又执行了wait并释放条件变量锁,自身进入阻塞状态;boy线程启动后,就获得了变量锁cond并发出了消息,之后通过notify唤醒一个挂起的线程,并释放条件变量锁等待girl的回答...
返回结果:
LiLei:嫁给我吧!
Haimeimei:没有情调,不够浪漫,不答应!
LiLei:我手捧鲜花,送上戒指!
Haimeimei:好吧,答应你了!
LiLei:Li太太,你的选择太明智了!

六、线程同步之Event
Event用于线程这间的通信。一个线程发出一个信号,其它一个或多个线程等待,调用Event对象的wait方法,线程则会阻塞等待,直到别的线程set之后才会被唤醒。示例代码:

 1 import threading,time
 2 
 3 class Boy(threading.Thread):
 4     def __init__(self,event,name):
 5         super(Boy,self).__init__()
 6         self.event = event
 7         self.name = name
 8     def run(self):
 9         print(self.name +":嫁给我吧!")
10         self.event.set() #唤醒一个挂起的进程让Haimeme表态
11         time.sleep(0.5)
12         self.event.wait() #释放内部所占用的锁,同时线程被挂起,直到接收到通知被唤醒或超时,等待Haimeme回答
13         print(self.name + ":我手捧鲜花,送上戒指!")
14         self.event.set()
15         time.sleep(0.5)
16         self.event.wait()
17         #self.event.clear()
18         print(self.name + ":Li太太,你的选择太明智了!")
19 
20 
21 class Girl(threading.Thread):
22     def __init__(self,event,name):
23         super(Girl,self).__init__()
24         self.event= event
25         self.name = name
26     def run(self):
27         self.event.wait()
28         self.event.clear()
29         print(self.name +":没有情调,不够浪漫,不答应!")
30         self.event.set()
31         time.sleep(0.5)
32         self.event.wait()
33         print(self.name + ":好吧,答应你了!")
34         self.event.set()
35 
36 if __name__ == '__main__':
37     event = threading.Event()
38     boy = Boy(event,'LiLei')
39     gril = Girl(event,'Haimeimei')
40     gril.start()
41     boy.start()

Event内部默认内置了一个标志,初始值为False。上述代码中对象girl通过wait()方法进入等待状态,直到对象boy调用该Event的set()方法将内置标志设为True时,对象girl再继续运行。对象boy最后调用Event的clear()方法再将内置标志设为False,恢复初始状态。
返回结果:
LiLei:嫁给我吧!
Haimeimei:没有情调,不够浪漫,不答应!
LiLei:我手捧鲜花,送上戒指!
Haimeimei:好吧,答应你了!
LiLei:Li太太,你的选择太明智了!

七、线程优先级队列(queue)
queue模块中提供了同步的、线程安全的队列类,包括先进先出队列(Queue)、后进先出队列(LifoQueue)、和优先级队列(PriorityQueue)。
示例代码:

 1 import threading,time
 2 import queue
 3 
 4 #先进先出
 5 q = queue.Queue(maxsize=5)
 6 #q = queue.LifoQueue(maxsize=3)
 7 #q = queue.PriorityQueue(maxsize=3)
 8 
 9 def ProducerA():
10     count = 1
11     while True:
12         q.put(f"冷饮 {count}")
13         print(f'{time.strftime("%Y-%m-%d %H:%M:%S")} A 放入:[冷饮 {count}]')
14         count += 1
15         time.sleep(1)
16 def ConsumerB():
17     while True:
18         print(f'{time.strftime("%Y-%m-%d %H:%M:%S")} B 取出:[{q.get()}]')
19         time.sleep(5)
20 
21 if __name__ == '__main__':
22     p = threading.Thread(target=ProducerA)
23     c = threading.Thread(target=ConsumerB)
24     c.start()
25     p.start()

返回结果:
2023-09-27 11:34:00 A 放入:[冷饮 1]
2023-09-27 11:34:00 B 取出:[冷饮 1]
2023-09-27 11:34:01 A 放入:[冷饮 2]
2023-09-27 11:34:02 A 放入:[冷饮 3]
2023-09-27 11:34:03 A 放入:[冷饮 4]
2023-09-27 11:34:04 A 放入:[冷饮 5]
2023-09-27 11:34:05 B 取出:[冷饮 2]
2023-09-27 11:34:05 A 放入:[冷饮 6]
2023-09-27 11:34:06 A 放入:[冷饮 7]
2023-09-27 11:34:10 B 取出:[冷饮 3]
2023-09-27 11:34:10 A 放入:[冷饮 8]
2023-09-27 11:34:15 B 取出:[冷饮 4]
2023-09-27 11:34:15 A 放入:[冷饮 9]
2023-09-27 11:34:20 B 取出:[冷饮 5]

八、多线程之线程池pool
在面向对象编程中,创建和销毁对象是很费时间的,因为创建一个对象要获取内存资源或其它更多资源。虚拟机也将试图跟踪每一个对象,以便能够在对象销毁后进行垃圾回收。同样的道理,多任务情况下每次都会产生一个新进程,执行任务后资源再被回收就显得非常低效,因此线程池就是解决这个问题的办法。类似的还有连接池、进程池等。
将任务添加到线程池中,线程池会自动指定一个空闲的线程去执行任务,当超过线程池的最大线程时,任务需要等待有新的空闲线程后才会被执行。
from multiprocessing import Pool表示的是进程池;from multiprocessing.dummy import Pool表示的线程池。
示例代码:

 1 from multiprocessing.dummy import Pool as ThreadPool
 2 import time
 3 
 4 def fun(n):
 5     time.sleep(2)
 6 start = time.time()
 7 for i in range(5):
 8     fun(i)
 9 print("单线程顺序执行耗时:",time.time() - start)
10 
11 start2 = time.time()
12 #开5个worker,没有参数时默认是cpu的核心数
13 pool = ThreadPool(processes=5)
14 res = pool.map(fun,range(5))
15 pool.close()
16 pool.join()
17 print("线程池(5)并发执行耗时:",time.time() - start2)

返回结果:
单线程顺序执行耗时: 10.041308164596558
线程池(5)并发执行耗时: 2.1530473232269287

以上内容来自郑征的《Python自动化运维快速入门》,清华大学出版社。