IO密集型任务与CPU密集型任务

发布时间 2023-07-29 09:17:11作者: Allen_Hao

IO密集型

1. IO密集型任务是指在执行过程中主要涉及到输入输出(IO)操作的任务。这些任务通常需要与外部资源进行交互,如读写文件、网络请求、数据库查询等,而实际的计算量相对较小。

2. 在IO密集型任务中,CPU的使用率相对较低,大部分时间都花费在等待IO操作完成上。因此,多线程在这种情况下能够充分利用等待IO操作的时间,执行其他任务,从而提高程序的并发性和响应能力。

 

3. 当一个线程在执行IO操作时,它会将CPU资源释放给其他可执行的线程。这是因为IO操作通常涉及等待外部资源响应或完成,例如从磁盘读取数据、网络请求等,这些操作需要较长的时间来完成。

在Python中,当一个线程执行IO操作时,它会暂时释放GIL(全局解释器锁),让其他等待执行的线程获取GIL并继续执行。这样,其他线程就有机会利用CPU资源进行计算密集型任务或执行其他IO操作,以提高程序的并发性和响应能力。

在IO密集型任务中,多线程可以通过充分利用等待IO操作的时间来执行其他任务,有效地利用了系统资源。这种方式称为"异步"或"非阻塞"IO,即一个线程在等待IO操作完成时不会阻塞整个程序的执行,而是允许其他线程继续执行。

需要注意的是,当IO操作完成后,线程将重新获得CPU资源,并继续执行后续的操作。这包括处理IO返回的结果、更新程序状态等。因此,在IO密集型任务中,多线程的切换和等待IO操作的时间可以被充分利用,以提高程序的整体效率和响应速度。

常见的IO密集型任务的示例:

1. 文件读写:读取大量的数据或写入大量的数据到文件中  

1 with open('input.txt', 'r') as file:
2     data = file.read()
3 
4 with open('output.txt', 'w') as file:
5     file.write('Hello, World!')

2. 网络请求:通过HTTP协议与远程服务器进行通信,获取数据或发送数据。

1 import requests
2 
3 response = requests.get('https://www.example.com')
4 print(response.text)

3. 数据库操作:与数据库进行连接、查询、插入、更新等操作

import sqlite3

conn = sqlite3.connect('example.db')
cursor = conn.cursor()

cursor.execute("SELECT * FROM users")
rows = cursor.fetchall()

for row in rows:
    print(row)

conn.close()

4. 图像处理:读取图像文件、处理图像数据、保存图像等操作

from PIL import Image

image = Image.open('image.jpg')

# 图像处理操作...

image.save('processed_image.jpg')

在这些任务中,多线程的优势在于,当一个线程进行IO操作时,其他线程可以继续执行其他的IO操作或计算任务,从而充分利用CPU和系统资源,提高程序的性能和效率。

最佳实践:

1. 使用异步IO:使用异步编程模型(如异步IO、协程等)可以在等待IO操作的同时执行其他任务,提高程序的并发性和响应能力。Python提供了asyncio库来支持异步IO操作。

 1 '''
 2 1. 使用异步IO:使用异步编程模型(如异步IO、协程等)可以在等待IO操作的同时执行其他任务,提高程序的并发性和响应能力。
 3    Python提供了asyncio库来支持异步IO操作。
 4 '''
 5 
 6 import asyncio
 7 
 8 
 9 async def fetch_data(url):
10     # 模拟网络请求延迟
11     await asyncio.sleep(1)
12     return f"Data from {url}"
13 
14 
15 async def main():
16     urls = ['http://example.com', 'http://example.org']
17     tasks = []
18 
19     for url in urls:
20         task = asyncio.create_task(fetch_data(url))
21         tasks.append(task)
22 
23     results = await asyncio.gather(*tasks)
24 
25     for result in results:
26         print(result)
27 
28 
29 if __name__ == '__main__':
30     asyncio.run(main())

输出:

Data from http://example.com
Data from http://example.org

 

这段代码演示了如何使用asyncio库来进行异步IO操作。以下是对代码的详细解释:

  1. 导入asyncio模块。

  2. 定义一个名为fetch_data的异步函数,该函数接受一个URL作为参数,在模拟网络请求延迟后返回一个带有URL信息的字符串。

  3. 定义一个名为main的异步函数,用于执行主要的逻辑。

  4. main函数中:

    a. 创建一个包含两个URL的列表urls,代表要获取数据的网址。

    b. 创建一个空列表tasks,用于存储创建的任务。

    c. 使用for循环遍历urls列表中的每个URL:

    • 调用asyncio.create_task()方法将fetch_data函数和当前URL作为参数,返回一个任务对象。

    • 将任务对象添加到tasks列表中。

    d. 使用asyncio.gather()方法等待所有任务完成,并将结果存储在results列表中。

    e. 使用for循环遍历results列表中的每个结果,并打印出来。

  5. if __name__ == '__main__':条件下执行以下操作:

    a. 使用asyncio.run()方法运行main函数,它会自动创建一个事件循环并执行异步任务。

当程序运行时,main函数会创建多个任务对象,每个任务对象都会调用fetch_data函数来获取相应的数据。由于使用了await asyncio.sleep(1),每个任务会模拟1秒的网络请求延迟。使用asyncio.gather()等待所有任务完成后,将结果存储在results列表中,并依次打印出来。

需要注意的是,在使用asyncio库时,需要定义异步函数(使用async关键字),并使用await关键字来等待异步操作的完成。通过使用异步IO操作,可以在等待IO操作的同时执行其他任务,提高程序的并发性和响应能力。最后,使用asyncio.run()方法来运行主函数,它会自动创建事件循环并执行异步任务。

 

 

 

使用异步IO操作可以提高程序的并发性和响应能力,因为在等待IO操作完成的同时,可以执行其他任务,而不是被阻塞等待IO操作的结果。这种并发执行任务的方式可以使得程序更加高效地利用系统资源,从而提升整体的性能。

 1 '''
 2 使用异步IO操作可以提高程序的并发性和响应能力,因为在等待IO操作完成的同时,可以执行其他任务(释放cpu资源,让cpu执行其他线程任务),而不是被阻塞等待IO操作的结果。
 3 这种并发执行任务的方式可以使得程序更加高效地利用系统资源,从而提升整体的性能。
 4 '''
 5 import asyncio
 6 import aiohttp
 7 
 8 
 9 async def fetch_data(url):
10     async with aiohttp.ClientSession() as session:
11         async with session.get(url) as response:
12             data = await response.text()
13             return f"Data from {url}: {data} \n \n 下一个数据:\n"
14 
15 
16 async def main():
17     urls = ['https://www.baidu.com/', 'http://example.org']
18     tasks = []
19 
20     for url in urls:
21         task = asyncio.create_task(fetch_data(url))
22         tasks.append(task)
23 
24     results = await asyncio.gather(*tasks)
25 
26     for result in results:
27         print(result)
28 
29 
30 if __name__ == '__main__':
31     asyncio.run(main())

输出:

Data from https://www.baidu.com/: <html>
<head>
	<script>
		location.replace(location.href.replace("https://","http://"));
	</script>
</head>
<body>
	<noscript><meta http-equiv="refresh" content="0;url=http://www.baidu.com/"></noscript>
</body>
</html> 
 下一个数据:

Data from http://example.org: <!doctype html>
<html>
<head>
    <title>Example Domain</title>

    <meta charset="utf-8" />
    <meta http-equiv="Content-type" content="text/html; charset=utf-8" />
    <meta name="viewport" content="width=device-width, initial-scale=1" />
    <style type="text/css">
    body {
        background-color: #f0f0f2;
        margin: 0;
        padding: 0;
        font-family: -apple-system, system-ui, BlinkMacSystemFont, "Segoe UI", "Open Sans", "Helvetica Neue", Helvetica, Arial, sans-serif;
        
    }
    div {
        width: 600px;
        margin: 5em auto;
        padding: 2em;
        background-color: #fdfdff;
        border-radius: 0.5em;
        box-shadow: 2px 3px 7px 2px rgba(0,0,0,0.02);
    }
    a:link, a:visited {
        color: #38488f;
        text-decoration: none;
    }
    @media (max-width: 700px) {
        div {
            margin: 0 auto;
            width: auto;
        }
    }
    </style>    
</head>

<body>
<div>
    <h1>Example Domain</h1>
    <p>This domain is for use in illustrative examples in documents. You may use this
    domain in literature without prior coordination or asking for permission.</p>
    <p><a href="https://www.iana.org/domains/example">More information...</a></p>
</div>
</body>
</html>
 
 下一个数据:

 

在这个示例中,fetch_data函数使用aiohttp库进行异步的HTTP请求,并返回获取到的数据。在main函数中,我们创建了多个任务对象来执行这些网络请求,并使用asyncio.gather等待所有任务完成。最后,我们打印出每个请求的结果。

通过使用异步IO操作,这段代码可以同时执行多个网络请求,而不需要等待每个请求的响应。这样可以大大缩短程序的运行时间,并充分利用系统资源。另外,由于异步IO操作是非阻塞的,所以可以提高程序的响应能力,使其能够更快地响应用户的请求。

需要注意的是,在使用异步IO操作时,必须使用async关键字定义异步函数,并使用await关键字等待异步操作的完成。最后,使用asyncio.run()方法来运行主函数,它会自动创建事件循环并执行异步任务。

 

2. 使用线程池或进程池:对于一些IO操作,可以使用线程池或进程池来创建多个线程或进程,以便并发地执行IO任务。这样可以充分利用系统资源,并减少IO操作的等待时间。

 1 '''
 2 2. 使用线程池或进程池:对于一些IO操作,可以使用线程池或进程池来创建多个线程或进程,以便并发地执行IO任务。
 3    这样可以充分利用系统资源,并减少IO操作的等待时间。
 4 '''
 5 
 6 import concurrent.futures
 7 import time
 8 
 9 
10 def fetch_data(url):
11     # 模拟网络请求延迟
12     time.sleep(1)
13     return f"Data from {url}"
14 
15 
16 def main():
17     urls = ['http://example.com', 'http://example.org']
18     results = []
19 
20     with concurrent.futures.ThreadPoolExecutor() as executor:
21         futures = [executor.submit(fetch_data, url) for url in urls]
22 
23         for future in concurrent.futures.as_completed(futures):
24             result = future.result()
25             results.append(result)
26 
27     for result in results:
28         print(result)
29 
30 
31 if __name__ == '__main__':
32     main()

输出:

Data from http://example.com
Data from http://example.org

 

3. 使用缓存和批处理:对于一些重复的IO操作,可以使用缓存来存储已经获取的数据,以减少IO操作次数。此外,可以将多个小的IO请求合并成一个大的IO请求,以减少IO操作的开销。

 1 '''
 2 3. 使用缓存和批处理:对于一些重复的IO操作,可以使用缓存来存储已经获取的数据,以减少IO操作次数。
 3    此外,可以将多个小的IO请求合并成一个大的IO请求,以减少IO操作的开销。
 4 '''
 5 
 6 import requests
 7 from functools import lru_cache
 8 
 9 
10 @lru_cache(maxsize=128)
11 def get_data(url):
12     response = requests.get(url)
13     return response.text
14 
15 
16 def main():
17     urls = ['https://www.baidu.com/', 'https://www.baidu.com/']
18     results = []
19 
20     for url in urls:
21         data = get_data(url)
22         results.append(data)
23 
24     for result in results:
25         print(result)
26 
27 
28 if __name__ == '__main__':
29     main()

 

 

这段代码演示了如何使用缓存和批处理来优化重复的IO操作。以下是对代码的详细解释:

  1. 导入requests模块用于发送HTTP请求,并导入functools模块中的lru_cache装饰器。

  2. 使用@lru_cache(maxsize=128)装饰器将get_data函数标记为一个带有缓存功能的函数。maxsize参数指定了缓存的大小,即最多存储128个不同的URL结果。

  3. get_data函数中,首先使用requests.get(url)发送HTTP GET请求获取数据,并将结果存储在response变量中。然后,返回响应内容的文本形式。

  4. 定义main函数作为程序的入口点。

  5. main函数中,创建一个包含两个URL的列表urls,代表要获取数据的网址。

  6. 创建一个空列表results,用于存储每个URL获取到的数据。

  7. 使用for循环遍历urls列表中的每个URL,对于每个URL,调用get_data函数来获取数据,并将结果添加到results列表中。

  8. 使用for循环遍历results列表中的每个结果,并打印出来。

通过使用缓存和批处理的方式,可以减少重复的IO操作次数。当多次请求同一个URL时,get_data函数会首先检查缓存中是否已经存在该URL的结果。如果存在,则直接返回缓存中的结果,避免发送重复的IO请求。如果缓存中不存在,则发送IO请求获取数据,并将结果存储在缓存中,以便后续使用。

另外,通过将多个小的IO请求合并成一个大的IO请求,可以减少IO操作的开销。在这段代码中,每次调用get_data函数时,实际上是对一个URL进行了一次IO请求。但由于使用了缓存,相同的URL只会进行一次IO请求,后续的调用都会直接从缓存中获取结果。

需要注意的是,在使用缓存时,要确保被缓存的函数是纯函数,即对于相同的输入,始终返回相同的输出。这样才能保证缓存的正确性和一致性。

最后,在if __name__ == '__main__':条件下执行main函数来运行程序。

 

 

CPU密集型

    CPU密集型任务是指在执行过程中主要涉及到大量的计算和处理,而涉及到的IO操作相对较少的任务。这些任务通常需要大量的CPU资源和运算能力。

    在CPU密集型任务中,程序的大部分时间都用于执行复杂的计算、算法或逻辑操作,而IO操作相对较少。因此,多线程在这种情况下可能无法提高程序的性能,因为在单个CPU核心上同时执行多个线程时,由于存在GIL(全局解释器锁)的限制,每个线程只能依次获取CPU时间片执行,无法实现真正的并行执行。

 在python种一般使用多进程处理CPU密集型的任务。CPU密集型任务的最佳实践:

1. 使用多进程:由于每个进程都有自己独立的解释器和GIL,可以利用多个CPU核心进行并行执行。使用multiprocessing模块创建多个进程来执行任务,并通过进程间的通信机制(如队列、管道等)来传递数据。

 1 '''
 2 1. 使用多进程处理cpu密集型任务:由于每个进程都有自己独立的解释器和GIL,可以利用多个CPU核心进行并行执行。
 3    使用multiprocessing模块创建多个进程来执行任务,并通过进程间的通信机制(如队列、管道等)来传递数据
 4 '''
 5 
 6 from multiprocessing import Process
 7 
 8 
 9 def calculate_square(number):
10     result = number * number
11     print(f"Square of {number} is {result}")
12 
13 
14 if __name__ == '__main__':
15     numbers = [1, 2, 3, 4, 5]
16     processes = []
17 
18     for number in numbers:
19         process = Process(target=calculate_square, args=(number,))
20         processes.append(process)
21         process.start()
22 
23     for process in processes:
24         process.join()

输出:

Square of 1 is 1
Square of 2 is 4
Square of 3 is 9
Square of 4 is 16
Square of 5 is 25

2. 使用并行计算库:利用一些专门用于执行并行计算的库,如NumPyPandasDask等,这些库使用底层的C/C++实现,能够更好地利用CPU资源

 1 '''
 2 2. 使用并行计算库:利用一些专门用于执行并行计算的库,如NumPy、Pandas和Dask等,这些库使用底层的C/C++实现,能够更好地利用CPU资源
 3 '''
 4 import numpy as np
 5 
 6 
 7 def calculate_square(number):
 8     result = number * number
 9     print(f"Square of {number} is {result}")
10 
11 
12 if __name__ == '__main__':
13     numbers = np.array([1, 2, 3, 4, 5])
14     square_results = np.square(numbers)
15 
16     for number, result in zip(numbers, square_results):
17         print(f"Square of {number} is {result}")

输出:

Square of 1 is 1
Square of 2 is 4
Square of 3 is 9
Square of 4 is 16
Square of 5 is 25

 

3. 使用并发框架:一些并发框架(如concurrent.futures)提供了高级接口,可以方便地进行并发执行。通过将任务分解为多个小任务,并使用线程池或进程池来执行这些小任务,以充分利用CPU资源。

 1 '''
 2 3. 使用并发框架:一些并发框架(如concurrent.futures)提供了高级接口,可以方便地进行并发执行。
 3 通过将任务分解为多个小任务,并使用线程池或进程池来执行这些小任务,以充分利用CPU资源。
 4 '''
 5 from concurrent.futures import ThreadPoolExecutor
 6 
 7 
 8 def calculate_square(number):
 9     result = number * number
10     print(f"Square of {number} is {result}")
11 
12 
13 if __name__ == '__main__':
14     numbers = [1, 2, 3, 4, 5]
15 
16     with ThreadPoolExecutor() as executor:
17         futures = [executor.submit(calculate_square, number) for number in numbers]
18 
19         for future in futures:
20             future.result()

输出:

Square of 1 is 1
Square of 2 is 4
Square of 3 is 9
Square of 4 is 16Square of 5 is 25

 

这段代码演示了如何使用ThreadPoolExecutor来创建线程池并执行calculate_square函数。以下是对代码的详细解释:

  1. 导入concurrent.futures模块中的ThreadPoolExecutor类。

  2. 定义一个名为calculate_square的函数,该函数接受一个数字作为参数,计算其平方,并打印结果。

  3. if __name__ == '__main__':条件下执行以下操作:

    a. 创建一个包含数字1到5的列表numbers,代表要计算平方的数字。

    b. 使用with语句创建一个ThreadPoolExecutor对象,并将其赋值给变量executor。这会自动管理线程池的生命周期。

    c. 使用列表推导式创建一个包含多个Future对象的列表futures。每个Future对象代表一个任务的未来结果。

    • 对于每个数字,使用executor.submit()方法提交calculate_square函数和当前数字作为参数,返回一个Future对象。

    d. 使用for循环遍历futures列表中的每个Future对象:

    • 调用future.result()方法获取任务的结果。这会阻塞主线程,直到该任务完成并返回结果。

当程序运行时,将会创建一个线程池,并将calculate_square函数提交给线程池来执行。每个数字都会被传递给calculate_square函数,并在不同的线程中进行计算和打印结果。由于线程池管理了多个线程,可以同时执行多个任务,并充分利用系统资源。

需要注意的是,在使用ThreadPoolExecutor时,不需要显式地创建和启动线程。线程池会自动管理线程的创建、执行和回收。使用future.result()方法可以获取每个任务的结果,并确保主线程等待所有任务完成后再继续执行

 

 

在CPU密集型任务中,多线程并不能实现真正的并行执行,但可以通过使用多进程、并行计算库或并发框架来充分利用CPU资源,提高程序的性能和效率。根据具体的任务和需求,选择适合的方法进行并行计算可以帮助最大化地利用系统资源和提升计算速度。