一、什么是线程
线程(Thread)也叫轻量级进程,是操作系统能够进行运算调度的最小单位,它被包涵在进程之中,是进程中的实际运作单位。线程自己不拥有系统资源,只拥有一点儿在运行中必不可少的资源,但它可与同属一个进程的其它线程共享进程所拥有的全部资源。一个线程可以创建和撤消另一个线程,同一进程中的多个线程之间可以并发执行。
线程有就绪、阻塞、运行三种基本状态。
多进程主要用于计算密集型、多线程主要用于I/O密集型(输入输出),下在举例子说明:
1、计算密集型任务-多进程示例代码:
1 from multiprocessing import Process 2 import os,time 3 4 #计算密集型任务: 5 def work(): 6 res = 0 7 for i in range(100000000): 8 res *= i 9 if __name__ == "__main__": 10 lst = [] 11 print("本机为",os.cpu_count(),"核心CPU") 12 start = time.time() 13 for i in range(4): 14 p = Process(target=work) 15 lst.append(p) 16 p.start() 17 for p in lst: 18 p.join() 19 stop = time.time() 20 print("计算密集型任务,多进程耗时 %s" % (stop-start))
返回结果:
本机为 4 核心CPU
计算密集型任务,多进程耗时 14.821362972259521
2、计算密集型任务-多线程示例代码:
1 from threading import Thread 2 import os,time 3 4 #计算密集型任务 5 def work(): 6 res = 0 7 for i in range(100000000): 8 res *= i 9 10 if __name__ == "__main__": 11 lst = [] 12 print("本机为",os.cpu_count(),"核心CPU") 13 start = time.time() 14 for i in range(4): 15 p = Thread(target=work) #多线程 16 lst.append(p) 17 p.start() 18 for p in lst: 19 p.join() 20 stop = time.time() 21 print("计算密集型任务,多线程耗时 %s" % (stop - start))
返回结果:
本机为 4 核心CPU
计算密集型任务,多线程耗时 29.50564193725586
3、I/O密集型任务-多进程示例代码:
1 from multiprocessing import Process 2 import os,time 3 4 #I/O密集型任务 5 def work(): 6 time.sleep(2) 7 print("===>",file=open("tmp.txt","w")) 8 9 if __name__ == "__main__": 10 lst = [] 11 print("本机为",os.cpu_count(),"核心CPU") 12 start = time.time() 13 for i in range(400): 14 p = Process(target=work) #多进程 15 lst.append(p) 16 p.start() 17 for p in lst: 18 p.join() 19 stop = time.time() 20 print("I/O密集型任务,多进程耗时 %s" % (stop - start))
返回结果:
本机为 4 核心CPU
I/O密集型任务,多进程耗时 25.822606086730957
4、I/O密集型任务-多线程示例代码:
1 from threading import Thread 2 import os,time 3 4 #I/O密集型任务 5 def work(): 6 time.sleep(2) 7 print("===>",file=open("tmp.txt","w")) 8 9 if __name__ == "__main__": 10 lst = [] 11 print("本机为",os.cpu_count(),"核心CPU") 12 start = time.time() 13 for i in range(400): 14 p = Thread(target=work) #多线程 15 lst.append(p) 16 p.start() 17 for p in lst: 18 p.join() 19 stop = time.time() 20 print("I/O密集型任务,多线程耗时 %s" % (stop - start))
返回结果;
本机为 4 核心CPU
I/O密集型任务,多线程耗时 2.1557388305664062
二、使用threading进行多线程操作有以下两种方法:
1、创建threading.Thread类的实例,调用其start()方法:
示例代码:
1 import threading 2 import time 3 4 def task_thread(counter): 5 print(f'线程名称:{threading.current_thread().name} 参数:{counter} 开始时间:{time.strftime("%Y-%m-%d %H:%M:%S")}') 6 num = counter 7 while num: 8 time.sleep(3) 9 num -= 1 10 print(f'线程名称:{threading.current_thread().name} 参数:{counter} 结束时间:{time.strftime("%Y-%m-%d %H:%M:%S")}') 11 12 if __name__ == "__main__": 13 print(f'主线程开始时间:{time.strftime("%Y-%m-%d %H:%M:%S")}') 14 #初始化三个线程,传递不同的参数 15 t1 = threading.Thread(target=task_thread,args=(3,)) 16 t2 = threading.Thread(target=task_thread, args=(2,)) 17 t3 = threading.Thread(target=task_thread, args=(1,)) 18 #开启三个进程 19 t1.start() 20 t2.start() 21 t3.start() 22 #等待运行结束 23 t1.join() 24 t2.join() 25 t3.join() 26 print(f'主线程结束时间:{time.strftime("%Y-%m-%d %H:%M:%S")}')
返回结果:
主线程开始时间:2023-09-24 16:14:35
线程名称:Thread-1 参数:3 开始时间:2023-09-24 16:14:35
线程名称:Thread-2 参数:2 开始时间:2023-09-24 16:14:35
线程名称:Thread-3 参数:1 开始时间:2023-09-24 16:14:35
线程名称:Thread-3 参数:1 结束时间:2023-09-24 16:14:38
线程名称:Thread-2 参数:2 结束时间:2023-09-24 16:14:41
线程名称:Thread-1 参数:3 结束时间:2023-09-24 16:14:44
主线程结束时间:2023-09-24 16:14:44
2、继承Thread类,在子类中重写run()和init()方法:
示例代码:
1 import threading 2 import time 3 4 5 class MyThead(threading.Thread): 6 def __init__(self,counter): 7 super().__init__() 8 self.counter = counter 9 def run(self): 10 print(f'线程名称:{threading.current_thread().name} 参数:{self.counter} 开始时间:{time.strftime("%Y-%m-%d %H:%M:%S")}') 11 counter = self.counter 12 while counter: 13 time.sleep(3) 14 counter -= 1 15 print(f'线程名称:{threading.current_thread().name} 参数:{self.counter} 结束时间:{time.strftime("%Y-%m-%d %H:%M:%S")}') 16 17 if __name__ == "__main__": 18 print(f'主线程开始时间:{time.strftime("%Y-%m-%d %H:%M:%S")}') 19 # 初始化三个线程,传递不同的参数 20 t1 = MyThead(3) 21 t2 = MyThead(2) 22 t3 = MyThead(1) 23 # 开启三个进程 24 t1.start() 25 t2.start() 26 t3.start() 27 # 等待运行结束 28 t1.join() 29 t2.join() 30 t3.join() 31 print(f'主线程结束时间:{time.strftime("%Y-%m-%d %H:%M:%S")}')
返回结果:
主线程开始时间:2023-09-24 16:34:27
线程名称:Thread-1 参数:3 开始时间:2023-09-24 16:34:27
线程名称:Thread-2 参数:2 开始时间:2023-09-24 16:34:27
线程名称:Thread-3 参数:1 开始时间:2023-09-24 16:34:27
线程名称:Thread-3 参数:1 结束时间:2023-09-24 16:34:30
线程名称:Thread-2 参数:2 结束时间:2023-09-24 16:34:33
线程名称:Thread-1 参数:3 结束时间:2023-09-24 16:34:36
主线程结束时间:2023-09-24 16:34:36
3、如果继承Thread类,想调用外部传入函数,请看下面示例:
1 import threading 2 import time 3 4 def task_thread(counter): 5 print(f'线程名称:{threading.current_thread().name} 参数:{counter} 开始时间:{time.strftime("%Y-%m-%d %H:%M:%S")}') 6 num = counter 7 while num: 8 time.sleep(3) 9 num -= 1 10 print(f'线程名称:{threading.current_thread().name} 参数:{counter} 结束时间:{time.strftime("%Y-%m-%d %H:%M:%S")}') 11 12 class MyThead(threading.Thread): 13 def __init__(self,target,args): 14 super().__init__() 15 self.target = target 16 self.args = args 17 def run(self): 18 self.target(*self.args) 19 20 if __name__ == "__main__": 21 print(f'主线程开始时间:{time.strftime("%Y-%m-%d %H:%M:%S")}') 22 # 初始化三个线程,传递不同的参数 23 t1 = MyThead(target=task_thread,args=(3,)) 24 t2 = MyThead(target=task_thread,args=(2,)) 25 t3 = MyThead(target=task_thread,args=(1,)) 26 # 开启三个进程 27 t1.start() 28 t2.start() 29 t3.start() 30 # 等待运行结束 31 t1.join() 32 t2.join() 33 t3.join() 34 print(f'主线程结束时间:{time.strftime("%Y-%m-%d %H:%M:%S")}')
返回结果:
主线程开始时间:2023-09-24 19:35:26
线程名称:Thread-1 参数:3 开始时间:2023-09-24 19:35:26
线程名称:Thread-2 参数:2 开始时间:2023-09-24 19:35:26
线程名称:Thread-3 参数:1 开始时间:2023-09-24 19:35:26
线程名称:Thread-3 参数:1 结束时间:2023-09-24 19:35:29
线程名称:Thread-2 参数:2 结束时间:2023-09-24 19:35:32
线程名称:Thread-1 参数:3 结束时间:2023-09-24 19:35:35
主线程结束时间:2023-09-24 19:35:35
三、多线程同步之Lock(互斥锁)
如果多个线程共同对某个数据修改,刚可能出现不可预料的结果,这个时候就需要使用互斥锁来进行同步。例如,在三个线程对共同变量num进行100万次加减操作之后,其结果不为0.
示例代码-不加锁的情况:
1 import threading 2 import time 3 num = 0 4 def task_thread(n): 5 global num 6 for i in range(1000000): 7 num = num + n 8 num = num - n 9 t1 = threading.Thread(target=task_thread,args=(6,)) 10 t2 = threading.Thread(target=task_thread,args=(17,)) 11 t3 = threading.Thread(target=task_thread,args=(11,)) 12 t1.start();t2.start();t3.start() 13 t1.join();t2.join();t3.join() 14 print(num)
返回结果:每次都不一样
示例代码-加锁的情况:
1 import threading 2 import time 3 num = 0 4 lock = threading.Lock() 5 def task_thread(n): 6 global num 7 #获取锁用于线程同步 8 lock.acquire() 9 for i in range(1000000): 10 num = num + n 11 num = num - n 12 #释放锁,开启下一个线程 13 lock.release() 14 t1 = threading.Thread(target=task_thread,args=(6,)) 15 t2 = threading.Thread(target=task_thread,args=(17,)) 16 t3 = threading.Thread(target=task_thread,args=(11,)) 17 t1.start();t2.start();t3.start() 18 t1.join();t2.join();t3.join() 19 print(num)
返回结果:每次都是0
四、多线程同步之Semaphore(信号量):
信号量是同时允许一定数量的线程访问共享数据,比如银行柜台有五个窗口,允许同时有五个办理业务,后面的人只能等待,待柜台有人办理完业务后才可以进入相应的柜台办理。
示例代码:
1 import threading 2 import time 3 #同时只有五个人办理业务 4 semaphore = threading.BoundedSemaphore(5) 5 #模拟银行业务办理 6 def yewubanli(name): 7 semaphore.acquire() 8 time.sleep(3) 9 print(f'{time.strftime("%Y-%m-%d %H:%M:%S")} {name} 正在办理业务') 10 semaphore.release() 11 12 thread_lst = [] 13 for i in range(12): 14 t = threading.Thread(target=yewubanli,args=(i,)) 15 thread_lst.append(t) 16 for thread in thread_lst: 17 thread.start() 18 for thread in thread_lst: 19 thread.join()
返回结果:
2023-09-24 20:01:59 4 正在办理业务
2023-09-24 20:01:59 0 正在办理业务
2023-09-24 20:01:59 3 正在办理业务
2023-09-24 20:01:59 1 正在办理业务
2023-09-24 20:01:59 2 正在办理业务
2023-09-24 20:02:02 8 正在办理业务
2023-09-24 20:02:02 6 正在办理业务
2023-09-24 20:02:02 9 正在办理业务
2023-09-24 20:02:02 7 正在办理业务
2023-09-24 20:02:02 5 正在办理业务
2023-09-24 20:02:05 11 正在办理业务
2023-09-24 20:02:05 10 正在办理业务
五、多线程同步之Condition
条件对象Condition能让一个线程A停下来,等待其它线程B,线程B满足了某个条件后通知(notify)线程A继续运行.线程首先获取一个条件变量锁,如果条件不足,则该线程等待(wait)并释放条件变量锁;如果条件满足,就继续执行线程,执行完成后可以通知(notify)其它状态为wait的线程执行。其它处于wait状态的线程接到通知后会重新判读条件以确定是否继续执行。
示例代码:
1 import threading 2 3 class Boy(threading.Thread): 4 def __init__(self,cond,name): 5 super(Boy,self).__init__() 6 self.cond = cond 7 self.name = name 8 def run(self): 9 self.cond.acquire() 10 print(self.name +":嫁给我吧!") 11 self.cond.notify() #唤醒一个挂起的进程让Haimeme表态 12 self.cond.wait() #释放内部所占用的锁,同时线程被挂起,直到接收到通知被唤醒或超时,等待Haimeme回答 13 print(self.name + ":我手捧鲜花,送上戒指!") 14 self.cond.notify() 15 self.cond.wait() 16 print(self.name + ":Li太太,你的选择太明智了!") 17 self.cond.release() 18 19 class Girl(threading.Thread): 20 def __init__(self,cond,name): 21 super(Girl,self).__init__() 22 self.cond = cond 23 self.name = name 24 def run(self): 25 self.cond.acquire() 26 self.cond.wait() 27 print(self.name +":没有情调,不够浪漫,不答应!") 28 self.cond.notify() 29 self.cond.wait() 30 print(self.name + ":好吧,答应你了!") 31 self.cond.notify() 32 self.cond.release() 33 34 if __name__ == '__main__': 35 cond = threading.Condition() 36 boy = Boy(cond,'LiLei') 37 gril = Girl(cond,'Haimeimei') 38 gril.start() 39 boy.start()
程序实例化了一个Conditon对象cond,一个Boy对象,一个girl对象,程序先启动了girl线程,girl虽然获取到了条件变量锁cond,但又执行了wait并释放条件变量锁,自身进入阻塞状态;boy线程启动后,就获得了变量锁cond并发出了消息,之后通过notify唤醒一个挂起的线程,并释放条件变量锁等待girl的回答...
返回结果:
LiLei:嫁给我吧!
Haimeimei:没有情调,不够浪漫,不答应!
LiLei:我手捧鲜花,送上戒指!
Haimeimei:好吧,答应你了!
LiLei:Li太太,你的选择太明智了!
六、线程同步之Event
Event用于线程这间的通信。一个线程发出一个信号,其它一个或多个线程等待,调用Event对象的wait方法,线程则会阻塞等待,直到别的线程set之后才会被唤醒。示例代码:
1 import threading,time 2 3 class Boy(threading.Thread): 4 def __init__(self,event,name): 5 super(Boy,self).__init__() 6 self.event = event 7 self.name = name 8 def run(self): 9 print(self.name +":嫁给我吧!") 10 self.event.set() #唤醒一个挂起的进程让Haimeme表态 11 time.sleep(0.5) 12 self.event.wait() #释放内部所占用的锁,同时线程被挂起,直到接收到通知被唤醒或超时,等待Haimeme回答 13 print(self.name + ":我手捧鲜花,送上戒指!") 14 self.event.set() 15 time.sleep(0.5) 16 self.event.wait() 17 #self.event.clear() 18 print(self.name + ":Li太太,你的选择太明智了!") 19 20 21 class Girl(threading.Thread): 22 def __init__(self,event,name): 23 super(Girl,self).__init__() 24 self.event= event 25 self.name = name 26 def run(self): 27 self.event.wait() 28 self.event.clear() 29 print(self.name +":没有情调,不够浪漫,不答应!") 30 self.event.set() 31 time.sleep(0.5) 32 self.event.wait() 33 print(self.name + ":好吧,答应你了!") 34 self.event.set() 35 36 if __name__ == '__main__': 37 event = threading.Event() 38 boy = Boy(event,'LiLei') 39 gril = Girl(event,'Haimeimei') 40 gril.start() 41 boy.start()
Event内部默认内置了一个标志,初始值为False。上述代码中对象girl通过wait()方法进入等待状态,直到对象boy调用该Event的set()方法将内置标志设为True时,对象girl再继续运行。对象boy最后调用Event的clear()方法再将内置标志设为False,恢复初始状态。
返回结果:
LiLei:嫁给我吧!
Haimeimei:没有情调,不够浪漫,不答应!
LiLei:我手捧鲜花,送上戒指!
Haimeimei:好吧,答应你了!
LiLei:Li太太,你的选择太明智了!
七、线程优先级队列(queue)
queue模块中提供了同步的、线程安全的队列类,包括先进先出队列(Queue)、后进先出队列(LifoQueue)、和优先级队列(PriorityQueue)。
示例代码:
1 import threading,time 2 import queue 3 4 #先进先出 5 q = queue.Queue(maxsize=5) 6 #q = queue.LifoQueue(maxsize=3) 7 #q = queue.PriorityQueue(maxsize=3) 8 9 def ProducerA(): 10 count = 1 11 while True: 12 q.put(f"冷饮 {count}") 13 print(f'{time.strftime("%Y-%m-%d %H:%M:%S")} A 放入:[冷饮 {count}]') 14 count += 1 15 time.sleep(1) 16 def ConsumerB(): 17 while True: 18 print(f'{time.strftime("%Y-%m-%d %H:%M:%S")} B 取出:[{q.get()}]') 19 time.sleep(5) 20 21 if __name__ == '__main__': 22 p = threading.Thread(target=ProducerA) 23 c = threading.Thread(target=ConsumerB) 24 c.start() 25 p.start()
返回结果:
2023-09-27 11:34:00 A 放入:[冷饮 1]
2023-09-27 11:34:00 B 取出:[冷饮 1]
2023-09-27 11:34:01 A 放入:[冷饮 2]
2023-09-27 11:34:02 A 放入:[冷饮 3]
2023-09-27 11:34:03 A 放入:[冷饮 4]
2023-09-27 11:34:04 A 放入:[冷饮 5]
2023-09-27 11:34:05 B 取出:[冷饮 2]
2023-09-27 11:34:05 A 放入:[冷饮 6]
2023-09-27 11:34:06 A 放入:[冷饮 7]
2023-09-27 11:34:10 B 取出:[冷饮 3]
2023-09-27 11:34:10 A 放入:[冷饮 8]
2023-09-27 11:34:15 B 取出:[冷饮 4]
2023-09-27 11:34:15 A 放入:[冷饮 9]
2023-09-27 11:34:20 B 取出:[冷饮 5]
八、多线程之线程池pool
在面向对象编程中,创建和销毁对象是很费时间的,因为创建一个对象要获取内存资源或其它更多资源。虚拟机也将试图跟踪每一个对象,以便能够在对象销毁后进行垃圾回收。同样的道理,多任务情况下每次都会产生一个新进程,执行任务后资源再被回收就显得非常低效,因此线程池就是解决这个问题的办法。类似的还有连接池、进程池等。
将任务添加到线程池中,线程池会自动指定一个空闲的线程去执行任务,当超过线程池的最大线程时,任务需要等待有新的空闲线程后才会被执行。
from multiprocessing import Pool表示的是进程池;from multiprocessing.dummy import Pool表示的线程池。
示例代码:
1 from multiprocessing.dummy import Pool as ThreadPool 2 import time 3 4 def fun(n): 5 time.sleep(2) 6 start = time.time() 7 for i in range(5): 8 fun(i) 9 print("单线程顺序执行耗时:",time.time() - start) 10 11 start2 = time.time() 12 #开5个worker,没有参数时默认是cpu的核心数 13 pool = ThreadPool(processes=5) 14 res = pool.map(fun,range(5)) 15 pool.close() 16 pool.join() 17 print("线程池(5)并发执行耗时:",time.time() - start2)
返回结果:
单线程顺序执行耗时: 10.041308164596558
线程池(5)并发执行耗时: 2.1530473232269287
以上内容来自郑征的《Python自动化运维快速入门》,清华大学出版社。