【Python基础】多进程使用

发布时间 2024-01-12 15:47:13作者: 小C学安全

多进程

正常下载和使用多进程下载:

from multiprocessing import Process
import time,random


#模拟下载

def down(text):
    print(f"{text}开始下载开始!!!!!!!")
    time.sleep(5)
    print(f"{text}开始下载完成!!!!!!!")

def multiprocessing_main():
    start_time = time.time()

    #下载任务一
    t1 = Process(target=down,args=("python",))

    #下载任务二
    t2 = Process(target=down,args=("java",))

    t1.start()
    t2.start()
    t1.join()
    t2.join()
    end_time = time.time()
    print(f"任务下载完成,共耗时{end_time-start_time}秒")
def main():
    start_time = time.time()
    down("Pyhton")
    down("Java")
    end_time = time.time()
    print(f"任务下载完成,共耗时{end_time - start_time}秒")
if __name__ == "__main__":
    print("+++++++++++++++++++++正常下载+++++++++++++++++++++")
    main()
    print("+++++++++++++++++++++使用多线程下载+++++++++++++++++++++")
    multiprocessing_main()

结果显示:

+++++++++++++++++++++正常下载+++++++++++++++++++++
Pyhton开始下载开始!!!!!!!
Pyhton开始下载完成!!!!!!!
Java开始下载开始!!!!!!!
Java开始下载完成!!!!!!!
任务下载完成,共耗时10.020031690597534秒
+++++++++++++++++++++使用多线程下载+++++++++++++++++++++
java开始下载开始!!!!!!!
python开始下载开始!!!!!!!
java开始下载完成!!!!!!!
python开始下载完成!!!!!!!
任务下载完成,共耗时5.131924629211426秒

可以用os模块的getpid方法打印进程号
修改down方法

def down(text):
    print(f"{text}开始下载开始!!!!!!!,当前进程号为{os.getpid()}")
    time.sleep(5)
    print(f"{text}开始下载完成!!!!!!!,当前进程号为{os.getpid()}")

通过查看任务管理器可以看到启用了两个进程

去掉join方法,发现主进程没有等待子进程跑完就直接结束了,使用join,主进程跑到join那里,处于挂起状态,看源码,join可以通过设置timeout来决定主进程等待多长时间,若子进程在timeout用完了还没跑完,主进程会杀死子进程。

多进程数据传递

multiprocessing提供两种方式进行进程通行:
Queue:操作系统开辟的一个队列空间,进程可以向队列读写数据
Pipe:一个管道,一端用于接受数据,一段用于写入数据

创建一个队列之后,可以把它作为参数传递给进程执行的函数,实现数据通信,Queue可以使用如下方法:

* qsize():返回队列的大致大小
* empty():判断队列是否为空
* full():判断队列是否满了
* put(obj[, block[, timeout]]):将obj对象写入队列
* put_nowait(obj):等价于put(obj, False)
* get([block[, timeout]]):移除队列中的一个项目并返回
* get_nowait():等价于get(False)
* close():关闭后,表示该进程不再向队列写入数据
import sys,time
from multiprocessing import Process, Queue

#获取url
def get_url(q):
    for i in range(1, 1000):
        print(f"获取URL{i}")
        q.put({"url":i}) #将obj对象写入队列
    q.put('DONE')  # 任务完成后向队列中写入特殊值

#下载数据
def down_url(q):
    print("开始下载———————")
    while True:
        data=q.get() #移除队列中的一个项目并返回
        if data == 'DONE':
            break
        else:
            print(f"下载完成{data}")

#主程序
def main():
    q=Queue()  #创建一个队列
    p1 = Process(target=get_url,args=(q,))
    p2 = Process(target=down_url,args=(q,))
    p1.start()
    p2.start()
    p1.join()  # 等待p1进程结束
    p2.join()  # 等待p2进程结束
if __name__== "__main__":
    start_time = time.time()
    main()
    end_time = time.time()
    print(f"任务下载完成,共耗时{end_time - start_time}秒")

进程池

Pool可以一次性创建多个进程:

Pool([processes     # 进程数量,默认使用os.cpu_count
     [, initializer # 初始值
     [, initargs    # 初始数量
     [, maxtasksperchild
     [, context]]]]])

进程池可以方便地创建多个进程,它包含以下常用的方法:

  • apply(self, func, args=(), kwds={}):等价于func(*args, **kwds),进程池必须正在运行
  • apply_async(self, func, args=(), kwds={}, callback=None, error_callback=None):apply函数的异步版本,多了一个回调参数callback
  • close(self):关闭进程池
  • join(self):让主进程进入阻塞模式
  • map(self, func, iterable, chunksize=None):将func应用到iterable的每个元素上,并返回一个结果列表。imap_unordered方法的结果是无序的
# -*- coding: utf-8 -*-
# @Project :Python基础
# @File    :进程池使用.py
# @IDE     :PyCharm
# @NAME    :进程池使用
# @Author  :小C学安全
# @Date    :2024/1/12 15:16
from multiprocessing import Process, Pool
import os, time


import os, time
from multiprocessing import Process, Pool

def run_a_sub_proc(name):
    print(f'子进程:{name}({os.getpid()})开始!')
    for i in range(2):
        print(f'子进程:{name}({os.getpid()})运行中...')
        time.sleep(1)
    print(f'子进程:{name}({os.getpid()})结束!')


if __name__ == '__main__':
    print(f'主进程({os.getpid()})开始...')
    p = Pool(3)  #创建3个进程
    for i in range(1, 5):
        p.apply_async(run_a_sub_proc, args=(f"进程-{i}",)) #p.apply_async 异步执行
    p.close()  #关闭进程池
    p.join() #主进程进入阻塞模式