并发使用(多线程 多进程 进程池 线程池 协程和异步编程) 队列

发布时间 2023-12-13 15:17:21作者: 善战者求之于势

一、多线程、多进程、进程池、线程池、协程与异步区别

多线程、多进程、进程池和线程池都是并发编程中常用的工具,它们在实现并发和并行执行任务时有一些不同的特点。以下是它们的主要区别:

多线程(Multithreading)

  • 特点: 多线程是在同一进程中创建多个线程,这些线程共享同一进程的内存空间。每个线程是独立执行的,但它们共享相同的资源,如变量和文件句柄。
  • 适用场景: 适用于 I/O 密集型任务,因为线程可以在等待 I/O 操作时释放 GIL(全局解释器锁)。
  • 缺点: 受 GIL 限制,不适用于 CPU 密集型任务。线程之间需要进行同步和协调,可能涉及到锁和其他同步机制。

多进程(Multiprocessing)

  • 特点: 多进程是在同一计算机上创建多个独立的进程,每个进程都有自己的内存空间。进程之间通过进程间通信(IPC)来进行数据交换。
  • 适用场景: 适用于 CPU 密集型任务,因为每个进程有自己的 GIL,不受全局解释器锁的限制。
  • 缺点: 进程之间的通信复杂,开销较大。不如多线程节省资源,因为每个进程都需要独立的内存空间。

进程池(Multiprocessing Pool)

  • 特点: 进程池是一种使用固定数量的进程来执行任务的机制。每个进程独立执行任务,任务分配给不同的进程并行执行。
  • 适用场景: 适用于需要并行执行多个相似任务的情况,例如批量处理数据。
  • 优点: 可以有效利用多核处理器,不受 GIL 限制。相对于手动创建和管理进程,进程池提供了更高层次的抽象,更容易使用。

线程池(Multithreading Pool)

  • 特点: 线程池是一种使用固定数量的线程来执行任务的机制。每个线程独立执行任务,任务分配给不同的线程并行执行。

  • 适用场景: 适用于需要并行执行多个相似任务的情况,例如批量处理数据。但由于 GIL 的存在,不适用于 CPU 密集型任务。

  • 优点: 相对于手动创建和管理线程,线程池提供了更高层次的抽象,更容易使用。可以在一定程度上缓解 GIL 的影响。

虽然线程池和多线程都涉及到使用多个线程来实现并发,但它们是不同的概念,有一些关键的区别。

多线程: 多线程是一种编程模型,指的是在同一进程中创建多个线程,这些线程共享同一进程的内存空间。多线程的目标是通过并发执行来提高程序的性能,但在某些情况下,由于全局解释器锁(GIL)的存在,Python 中的多线程并不能充分利用多核处理器。

线程池: 线程池是一种并发编程的机制,它是对多线程的一种组织和管理方式。线程池在应用程序启动时创建一组预先初始化的线程,并将它们放入一个池中,这些线程可以等待执行任务。与手动创建和管理线程相比,线程池提供了更高层次的抽象,使得可以重复使用这些线程来执行多个任务,而不需要频繁地创建和销毁线程,从而降低了线程创建和销毁的开销。

虽然线程池涉及到使用多线程,但线程池更侧重于组织和管理线程的方式,以提高并发执行的效率。多线程是一种更一般的概念,指的是在同一进程中创建多个线程。

协程(Coroutine)

  • 定义: 协程是一种轻量级的并发编程结构,用于在单一线程中实现并发。协程允许在程序执行的特定点挂起和恢复执行,而无需阻塞整个线程。在 Python 中,协程是通过使用 asyncawait 关键字定义的。

  • 特点: 协程允许在执行过程中主动挂起自己,并在稍后的时间点恢复执行。这使得可以在一个线程中交替执行多个协程,提高程序的效率和响应性。

  • 示例:

    pythonCopy codeasync def my_coroutine():
        print("Start Coroutine")
        await asyncio.sleep(2)  # 挂起协程
        print("Coroutine resumed")
    

异步编程(Asynchronous Programming)

  • 定义: 异步编程是一种编程模型,用于处理可能引起等待的操作而不阻塞整个程序的执行。异步编程通常与协程一起使用,通过非阻塞的方式处理 I/O 操作,例如读写文件、网络通信等。

  • 特点: 在异步编程中,任务在等待潜在的等待操作时不会阻塞,而是切换到执行其他任务。这样可以提高程序的效率,充分利用等待时的空闲时间。

  • 示例:

    pythonCopy codeimport asyncio
    
    async def main():
        await asyncio.gather(my_coroutine(), my_other_coroutine())
    

总的来说,协程是一种并发编程的结构,而异步编程是一种处理可能引起等待的操作的编程模型。在异步编程中,协程通常被用作处理异步任务的工具,通过使用异步事件循环(如 asyncio)来协调和执行这些协程。通过使用协程和异步编程,可以实现高效的并发处理,特别适用于处理大量的 I/O 操作。

总体而言,选择多线程、多进程、进程池或线程池取决于具体的应用场景和任务类型。多线程适用于 I/O 密集型任务,多进程适用于 CPU 密集型任务,而进程池和线程池则提供了更高层次的抽象,使得并发编程更加方便。

协程是异步编程的一种实现方式。异步编程是一种广义的概念,而协程是异步编程的一种具体技术。

二、并发简单使用案例

2.1 多线程案例

使用多线程打印数字:

pythonCopy codeimport threading
import time

def print_numbers():
    for i in range(5):
        time.sleep(1)
        print(f"Number: {i}")

# 创建两个线程对象,传入相同的执行函数
thread1 = threading.Thread(target=print_numbers)
thread2 = threading.Thread(target=print_numbers)

# 启动线程
thread1.start()
thread2.start()

# 等待两个线程执行完成
thread1.join()
thread2.join()

print("Main thread exiting.")

在这个例子中,我们创建了两个线程,每个线程执行 print_numbers 函数,该函数会在每秒打印一个数字。两个线程并发执行,可以看到它们交替地输出数字。

需要注意的是,多线程的使用需要谨慎处理共享资源的同步问题,以避免竞争条件等并发问题。在实际应用中,可能需要使用锁或其他同步机制来确保线程之间的安全访问共享资源。

2.2 多进程案例

多进程是一种并发编程的方式,它允许程序同时执行多个独立的进程,每个进程都有自己独立的内存空间。在 Python 中,可以使用 multiprocessing 模块来创建和管理多进程。以下是多进程的一般使用方式和一个简单的案例分析。

import multiprocessing

def square(numbers, result):
    for num in numbers:
        result.put(num * num)

# 创建一个进程安全的队列用于存放结果
result_queue = multiprocessing.Queue()

# 创建两个进程对象,传入相同的执行函数和共享的结果队列
process1 = multiprocessing.Process(target=square, args=([1, 2, 3], result_queue))
process2 = multiprocessing.Process(target=square, args=([4, 5, 6], result_queue))

# 启动进程
process1.start()
process2.start()

# 等待两个进程执行完成
process1.join()
process2.join()

# 从队列中获取结果
results = []
while not result_queue.empty():
    results.append(result_queue.get())

print("Results:", results)

2.3 进程池案例

pythonCopy codeimport multiprocessing
import time

def task(number):
    print(f"Processing {number}")
    time.sleep(2)
    print(f"Task {number} completed")

# 创建进程池,设置最大进程数为2
pool = multiprocessing.Pool(processes=2)
# 定义要执行的任务列表
tasks = [1, 2, 3, 4, 5]
# 使用进程池并发执行任务
pool.map(task, tasks)
# 关闭进程池
pool.close()
# 等待所有进程执行完成
pool.join()
print("All tasks completed")

在这个例子中,我们直接创建了一个进程池对象 pool,而没有使用 with 语句。在任务执行完毕后,我们调用了 pool.close() 来关闭进程池,然后使用 pool.join() 等待所有进程执行完成。

2.4 线程池案例

线程池是一种管理和复用线程的机制,它可以提高多线程应用程序的性能和效率。线程池中包含一组预先创建的线程,这些线程在任务到达时执行任务,而不是为每个任务都创建一个新的线程。

import concurrent.futures
import time

def task(number):
    print(f"Processing {number}")
    time.sleep(2)
    print(f"Task {number} completed")

# 创建线程池,设置最大线程数为2
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
    # 定义要执行的任务列表
    tasks = [1, 2, 3, 4, 5]

    # 使用线程池并发执行任务
    executor.map(task, tasks)

print("All tasks completed")

在这个例子中,我们使用 ThreadPoolExecutor 创建了一个最大线程数为 2 的线程池。然后,我们定义了一个简单的任务函数 task,该函数会模拟任务的执行,包括打印任务编号和等待 2 秒。最后,我们使用 executor.map 并发地执行任务列表中的每个任务。

线程池的优势之一是它可以在不断地重用线程的情况下执行多个任务,而不需要为每个任务都创建和销毁线程。这提高了性能,并减少了线程创建和销毁的开销。需要注意的是,在 Python 中由于全局解释器锁(GIL)的存在,线程池在处理 CPU 密集型任务时可能并不会带来性能的提升,而在 I/O 密集型任务中则可能更为有效。

2.5 协程(异步)案例

协程是一种轻量级的并发编程方式,它允许程序在执行过程中暂停和恢复,而不是一直占用执行线程。协程可以用于提高程序的并发性能和资源利用率。以下是一些关键点、案例以及协程中的一些重要参数的解释:

协程的关键点:

暂停和恢复: 协程允许程序在执行到一定点时暂停,并在稍后的时候恢复执行,而不是一直占用执行线程。这使得程序可以更高效地利用计算资源。

协作式多任务: 协程是一种协作式多任务的形式,不同于传统的抢占式多任务。在协程中,任务协作地自愿让出执行权,而不是被系统强制切换。

提高并发性能: 协程可以在一个线程中实现并发,避免了线程切换的开销,提高了并发性能。

import asyncio

async def task1():
    print("Task 1 started")
    await asyncio.sleep(2)
    print("Task 1 completed after 2 seconds")

async def task2():
    print("Task 2 started")
    await asyncio.sleep(1)
    print("Task 2 completed after 1 second")

async def main():
    print("Main coroutine started")
    
    # 同时启动 task1 和 task2
    await asyncio.gather(task1(), task2())
    
    print("Main coroutine completed")

# 在事件循环中运行主协程
asyncio.run(main())

这个例子中有三个协程:task1task2main。在 main 协程中,我们使用 asyncio.gather 来同时运行 task1task2,这表示它们可以并发执行而不互相等待。每个函数内部的await后面会挂起后台执行让出位置给其他函数的执行,对同一个函数的await还是一行行读取会存在阻塞。

输出的结果可能是这样的:

arduinoCopy codeMain coroutine started
Task 1 started
Task 2 started
Task 2 completed after 1 second
Task 1 completed after 2 seconds
Main coroutine completed

await 关键字用于等待一个异步操作的完成,但在等待的过程中,事件循环可以继续执行其他任务。协程在执行到 await 时,会暂时挂起当前任务,让出执行权,让其他任务有机会执行。

在协程中,await 后的代码是异步执行的,不会阻塞整个程序。这使得协程能够充分利用等待异步操作的时间,同时执行其他任务,提高程序的并发性能。

三、队列(queue)

3.1 基本概念

​ 队列(Queue)是一种数据结构,用于存储和管理数据。队列是按照先进先出(FIFO)的原则进行操作的,即最先放入队列的元素最先被取出。队列常常用于在多线程或多进程环境中安全地传递数据。

先进先出(FIFO): 队列是一种按照元素进入的顺序来取出的数据结构。最早进入队列的元素将最早被取出。

安全的数据传递: 队列通常用于在多线程或多进程环境中进行线程安全的数据传递。通过队列,一个线程或进程可以将数据放入队列,而另一个线程或进程可以安全地取出这些数据,而无需担心竞态条件等并发问题。

队列的操作: 常见的队列操作包括将元素放入队列(入队)和从队列中取出元素(出队)。在Python中,可以使用queue模块中的Queue类来创建队列。

3.2 简单案例

import queue

# 创建一个队列
my_queue = queue.Queue()

# 入队
my_queue.put(1)
my_queue.put(2)
my_queue.put(3)

# 出队
while not my_queue.empty():
    item = my_queue.get()
    print(f"取出元素:{item}")

在这个例子中,queue.Queue() 创建了一个队列实例,put 方法用于将元素放入队列,get 方法用于从队列中取出元素。empty 方法用于检查队列是否为空。

总的来说,队列是一种有序的数据结构,提供了线程安全的数据传递方式,对于在多线程或多进程环境中共享数据是很有用的。

3.2.1 多进程结合queue
from multiprocessing import Process, Queue
import time

def producer(queue, data):
    for item in data:
        print(f"生产者产生数据: {item}")
        queue.put(item)
        time.sleep(1)

def consumer(queue):
    while True:
        try:
            item = queue.get(timeout=2)  # 设置timeout以防止无限等待
            print(f"消费者消费数据: {item}")
            time.sleep(2)
        except queue.Empty:
            print("队列为空,消费者终止")
            break

if __name__ == "__main__":
    # 创建队列
    shared_queue = Queue()

    # 定义数据
    data_to_produce = [1, 2, 3, 4, 5]

    # 创建生产者进程和消费者进程
    producer_process = Process(target=producer, args=(shared_queue, data_to_produce))
    consumer_process = Process(target=consumer, args=(shared_queue,))

    # 启动进程
    producer_process.start()
    consumer_process.start()

    # 等待生产者进程完成,然后终止消费者进程
    producer_process.join()
    consumer_process.join()

queue.get(timeout=2) 使用了带有超时参数的 get 操作。这表示在获取队列中的数据时,如果队列为空,它会等待最多 timeout 秒,如果在这段时间内没有数据可用,会引发 queue.Empty 异常。

如果消费者进程尚未完成所有的消费任务,这样的终止操作可能导致数据处理不完整。为了避免这个问题,我们应该在主线程等待生产者进程完成之后再终止消费者进程,所以使用了producer_process.join()与consumer_process.join()进行等待

3.2.2 多线程结合queue
import threading
import queue
import time

def producer(queue, data):
    for item in data:
        print(f"生产者产生数据: {item}")
        queue.put(item)
        time.sleep(1)

def consumer(queue):
    while True:
        try:
            item = queue.get(timeout=2)
            print(f"消费者消费数据: {item}")
            time.sleep(2)
        except queue.Empty:
            print("队列为空,消费者线程终止")
            break

if __name__ == "__main__":
    # 创建队列
    shared_queue = queue.Queue()

    # 定义数据
    data_to_produce = [1, 2, 3, 4, 5]

    # 创建生产者线程和消费者线程
    producer_thread = threading.Thread(target=producer, args=(shared_queue, data_to_produce))
    consumer_thread = threading.Thread(target=consumer, args=(shared_queue,))

    # 启动线程
    producer_thread.start()
    consumer_thread.start()

    # 等待生产者线程完成,然后终止消费者线程
    producer_thread.join()
    consumer_thread.join()