优化Python中的I/O工作负载:逐步指南以及常用分析工具和优化策略

发布时间 2023-12-28 10:57:30作者: Daijn

原文地址:https://www.toymoban.com/diary/python/625.html

在Python中优化I/O工作负载通常涉及了解瓶颈所在,然后应用策略来减少或管理这些瓶颈。分析是这个过程中至关重要的一步,因为它有助于确定代码中资源使用最多的部分。

优化Python中的I/O工作负载

以下是通过分析来优化Python中的I/O工作负载的逐步指南:

确定I/O工作负载

首先,理解你的I/O工作负载的类型是很重要的。它们是否涉及磁盘I/O,例如文件读写操作,网络I/O,包括通过网络传输数据,或者数据库I/O,包括数据库交互?每个类别都适用不同的优化技术。本文将介绍与网络和文件读写操作相关的I/O瓶颈。

使用分析工具

有几种可用于分析Python代码的工具:

cProfile

cProfile是Python中最常用的分析器。由于它是一个带有可管理开销的C扩展,因此通常建议大多数用户使用它,适用于运行时间较长的程序的分析。它被广泛使用,原因有以下几点:

  • 内置且标准:cProfile是Python标准库的一部分,这意味着它在任何标准Python安装中都可以直接使用,无需额外的包。

  • 开销较低:作为一个C扩展,与一些纯Python分析器相比,cProfile的开销相对较低。这个特性使得它适用于需要长时间运行的应用程序的分析,其中分析器对性能的影响是一个关注点。

  • 通用分析:cProfile适用于大多数分析需求,平衡了详细程度和可用性。它可以给出函数执行时间的逐个函数的详细分解,主要用于识别性能瓶颈。

  • 广泛接受和社区支持:由于它是标准库的一部分并且易于使用,cProfile拥有广泛的用户群体和社区支持。

尽管cProfile是最常用的分析器,但重要的是要注意,对于给定的任务,最佳的分析器可能取决于项目的具体需求。例如,line_profiler适用于逐行分析,memory_profiler适用于需要关注内存使用的情况。选择分析器通常取决于您想要优化应用程序的特定方面。

Line_profiler

Line_profiler是Python中的一个工具,提供对代码逐行进行分析,以便查看每行的性能。当您试图优化代码并需要了解瓶颈所在时,这种精细度的分析非常有益。

  • Memory_profiler:如果您怀疑内存使用与I/O效率有关,这个分析器将非常有帮助。

分析分析数据

运行完分析器后,分析数据以找出大部分时间花在哪里。通常,分析输出会指示长时间运行的I/O操作、可批处理的重复I/O操作和可以消除的不必要的I/O操作。

应用优化策略

根据分析结果,您可以应用不同的策略:

  • 缓存:将数据存储在内存中,以避免重复的I/O操作。

  • 批处理:将多个I/O操作合并为一个,以减少开销。

  • 异步I/O:使用asyncio或其他异步编程技术进行I/O操作,而无需阻塞主线程。

  • 缓冲:对于磁盘I/O,使用缓冲区来减少I/O调用的次数。

  • 数据压缩:减小读取或写入的数据大小可以提高I/O性能,特别适用于网络和磁盘I/O。

  • 并行处理:使用多线程或多进程并行执行I/O操作,特别适用于网络I/O。

测试和迭代

应用优化后,再次对代码进行分析以查看影响。

继续按以下过程进行迭代:

  • 优化 - 分析 - 修改

其他注意事项

确保硬件不是限制因素。对于数据库I/O,研究如何优化数据库查询和索引。对于文件I/O,请考虑文件系统和运行该文件系统的硬件。

文档和社区资源

阅读您所使用的分析工具的文档,以获得更详细的指导。与Python社区或论坛互动,获取专业建议和最佳实践。

记住,优化通常涉及权衡,重点放在能够产生最大改进的代码部分上是至关重要的。

气象站数据分析和分析优化

我将以分析气象站数据的示例为例。气象站记录每小时的温度,并具有以下列。

"STATION","DATE","SOURCE","LATITUDE","LONGITUDE","ELEVATION","NAME","REPORT_TYPE","CALL_SIGN","QUALITY_CONTROL","WND","CIG","VIS","TMP","DEW","SLP","AA1","AA2","AA3","AJ1","KA1","KA2","OC1","OD1","OD2","REM"

我们的分析中,需要关注"STATION"和"TMP"这两列。

我将按照以下步骤进行操作:

  1. 创建一个Python程序,接受参数(气象站列表(用逗号分隔),年份范围(开始年份和结束年份,用连字符分隔))。

  2. 下载气象站数据作为CSV文件。

  3. 解析CSV文件,并获取所提供参数中气象站列表和年份范围内的所有温度。

  4. 找到年份范围内各个气象站的最高温度、最低温度和平均温度。

  5. 对代码进行分析优化。

  6. 分析I/O瓶颈。

  7. 实现本地缓存。

  8. 分析输出和运行时间。

通过以上步骤,可以对气象站数据进行分析,并找到给定年份范围内各个气象站的温度统计信息。同时,在分析过程中也可以进行代码优化,以减少I/O瓶颈,并实现本地缓存以提高运行效率。

没有本地缓存的代码

此程序下载指定气象站的天气数据,并计算给定年份的低温和高温天气:

示例一:

import csv
import sys
import requests
import collections
from statistics import mean


# 此功能下载站点/年的天气数据,并将输出写入csv文件
 def download_weather_station_data(station, year):
my_url = generic_url.format(station=station, year=year)
req = requests.get(my_url)
if req.status_code != 200:
return

 with open(generic_file.format(station=station, year=year), 'w') as sf:
sf.write(req.text)


# 此父功能下载给定电台列表和年份范围的天气数据
 def download_all_weather_station_data(stations_list, start_year, end_year):
for station in stations_list:
for year in range(start_year, end_year + 1):
download_weather_station_data(station, year)


# 此函数从文件中获取温度详细信息
 def get_file_temperature(file_name):
with open(file_name, 'r') as tf:
reader = csv.reader(tf)
header = next(reader)

for row in reader:
station = row[header.index("STATION")]
temp = row[header.index("TMP")]
temperature, status = temp.split(",")
if int(status) != 1:
continue
             temperature = int(temperature) / 10

yield temperature


# 此父函数获取给定站点和年份的所有温度
 def get_temperatures_all(stations_list, start_year, end_year):
temperatures = collections.defaultdict(list)
for station in stations_list:
for year in range(start_year, end_year + 1):
for temperature in get_file_temperature(generic_file.format(station=station, year=year)):
temperatures[station].append(temperature)
return temperatures


# 此函数用于获取给定年份内电站的最高/最低/平均温度
 def get_temperatures(lst_temperatures, calc_mode):
result = {}
for mode in calc_mode:
if mode == 'max':
result[mode] = {station: max(temperatures) for station, temperatures in lst_temperatures.items()}
elif mode == 'min':
result[mode] = {station: min(temperatures) for station, temperatures in lst_temperatures.items()}
else:
result[mode] = {station: mean(temperatures) for station, temperatures in lst_temperatures.items()}
return result


# 主要功能
 if __name__ := "__main__":
stations = sys.argv[1].split(",")
years = [int(year) for year in sys.argv[2].split("-")]
first_year = years[0]
last_year = years[1]

generic_url = "https://www.ncei.noaa.gov/data/global-hourly/access/{year}/{station}.csv"
     generic_file = "Weather_station_{station}_{year}.csv"

     download_all_weather_station_data(stations, first_year, last_year)
temperatures_all = get_temperatures_all(stations, first_year, last_year)
temperatures_values = get_temperatures(temperatures_all, ['max', 'min', 'avg'])

print(f"温度为 {temperatures_values}")

示例二

import csv
import requests

def download_weather_data(stations, start_year, end_year):
    url = "https://example.com/weatherdata.csv"  # Replace with the actual URL to download the weather data
    response = requests.get(url)
    
    with open("weather_data.csv", "wb") as file:
        file.write(response.content)

def parse_weather_data(stations, start_year, end_year):
    temperatures = []
    
    with open("weather_data.csv", "r") as file:
        reader = csv.DictReader(file)
        for row in reader:
            station = row["STATION"]
            year = int(row["DATE"][:4])
            temperature = float(row["TMP"])
            
            if station in stations and start_year <= year <= end_year:
                temperatures.append(temperature)
    
    return temperatures

def calculate_statistics(temperatures):
    min_temp = min(temperatures)
    max_temp = max(temperatures)
    avg_temp = sum(temperatures) / len(temperatures)
    
    return min_temp, max_temp, avg_temp

# Example usage
stations = ["station1", "station2", "station3"]
start_year = 2010
end_year = 2020

download_weather_data(stations, start_year, end_year)
temperatures = parse_weather_data(stations, start_year, end_year)
min_temp, max_temp, avg_temp = calculate_statistics(temperatures)

print(f"Min Temperature: {min_temp}")
print(f"Max Temperature: {max_temp}")
print(f"Avg Temperature: {avg_temp}")

这段代码定义了下载天气数据、解析CSV文件、计算统计数据的函数,并提供了一个使用示例。但是,它不包括任何本地缓存机制。

为了优化I/O工作负载,我们可以引入本地缓存,以避免每次程序运行时下载天气数据。这可以通过在发出下载请求之前检查CSV文件是否已经存在来完成。如果文件存在,则可以重用该文件,而不是重新下载。

执行了代码并获得了期望的输出

python load_weather_data.py "01480099999,02110099999,02243099999" 2018-2023

输出结果如下:

The temperatures are {'max': {'01480099999': 33.5, '02110099999': 29.6, '02243099999': 32.0}, 'min': {'01480099999': -20.4, '02110099999': -39.5, '02243099999': -32.1}, 'avg': {'01480099999': 7.145012712693135, '02110099999': 0.23863829994401306, '02243099999': 3.383049058515579}}

使用CProfile分析代码:

python -m cProfile -s cumulative load_weather_data.py "01480099999,02110099999,02243099999" 2018-2023 > load_weather_data_profile.txt

以上命令将使用CProfile对代码进行分析,并将分析结果保存到`load_weather_data_profile.txt`文件中。

The temperatures are {'max': {'01480099999': 33.5, '02110099999': 29.6, '02243099999': 32.0}, 'min': {'01480099999': -20.4, '02110099999': -39.5, '02243099999': -32.1}, 'avg': {'01480099999': 7.1538004828081165, '02110099999': 0.23863829994401306, '02243099999': 3.383049058515579}}
1422783 function calls (1416758 primitive calls) in 17.250 seconds
Ordered by: cumulative time
ncalls   tottime  percall  cumtime   percall filename:lineno(function)
    181/1    0.002     0.000   17.250   17.250 {built-in method builtins.exec}
        1    0.000     0.000   17.250   17.250 load_weather_data.py:1(<module>)
        1    0.003     0.003   16.241   16.241 load_weather_data.py:23(download_all_weather_station_data)
       18    0.003     0.000   16.221    0.901 load_weather_data.py:12(download_weather_station_data)

函数调用download_all_weather_station_data占用了最多的运行时间,有优化I/O的空间。

由于数据是静态的,一旦生成了CSV文件,就没有必要再次生成。

下面的程序经过优化,如果已经生成了文件,则不会再次生成。以下是优化后的代码示例:

"""此程序下载指定电台的天气数据并计算给定年份的低温和高温天气"""

 import os
 import csv
 import sys
 import fnmatch
 import requests
 import collections
 from statistics import mean
 
 
 # #此功能下载站点/年的天气数据,并将输出写入csv文件
 def download_weather_station_data(station, year):
     my_url = generic_url.format(station=station, year=year)
     req = requests.get(my_url)
     if req.status_code != 200:
         return
 
     with open(generic_file.format(station=station, year=year), 'w') as sf:
         sf.write(req.text)
 
 
 # 此父功能下载给定电台列表和年份范围的天气数据
 def download_all_weather_station_data(stations_list, start_year, end_year):
     for station in stations_list:
         for year in range(start_year, end_year + 1):
             if not os.path.exists(generic_file.format(station=station, year=year)):
                 download_weather_station_data(station, year)
 
 
 # 此函数从文件中获取温度详细信息
 def get_file_temperature(file_name):
     with open(file_name, 'r') as tf:
         reader = csv.reader(tf)
         header = next(reader)
 
         for row in reader:
             station = row[header.index("STATION")]
             temp = row[header.index("TMP")]
             temperature, status = temp.split(",")
             if int(status) != 1:
                 continue
             temperature = int(temperature) / 10
 
             yield temperature
 
 
 #此父函数获取给定站点和年份的所有温度
 def get_temperatures_all(stations_list, start_year, end_year):
     temperatures = collections.defaultdict(list)
     for station in stations_list:
         for year in range(start_year, end_year + 1):
             if os.path.exists(generic_file.format(station=station, year=year)):
                 for temperature in get_file_temperature(generic_file.format(station=station, year=year)):
                     temperatures[station].append(temperature)
     return temperatures
 
 
 # 此函数用于获取给定年份内电站的最高/最低/平均温度
 def get_temperatures(lst_temperatures, calc_mode):
     result = {}
     for mode in calc_mode:
         if mode == 'max':
             result[mode] = {station: max(temperatures) for station, temperatures in lst_temperatures.items()}
         elif mode == 'min':
             result[mode] = {station: min(temperatures) for station, temperatures in lst_temperatures.items()}
         else:
             result[mode] = {station: mean(temperatures) for station, temperatures in lst_temperatures.items()}
     return result
 
 
 # 主要功能
 if __name__ := "__main__":
     stations = sys.argv[1].split(",")
     years = [int(year) for year in sys.argv[2].split("-")]
     first_year = years[0]
     last_year = years[1]
 
     generic_url = "https://www.ncei.noaa.gov/data/global-hourly/access/{year}/{station}.csv"
     generic_file = "Weather_station_{station}_{year}.csv"
     current_directory = os.getcwd()
 
     download_all_weather_station_data(stations, first_year, last_year)
 
     count = len(fnmatch.filter(os.listdir(current_directory), '*.csv'))
 
     if count > 0:
         temperatures_all = get_temperatures_all(stations, first_year, last_year)
         temperatures_values = get_temperatures(temperatures_all, ['max', 'min', 'avg'])
         print(f"温度为 {temperatures_values}")
     else:
         print(f"There are no file(s) available for the given stations {sys.argv[1]} and years {sys.argv[2]}")

示例二

import os.path

def download_weather_station_data(station, year):
    # Check if the CSV file already exists
    csv_filename = f"{station}_{year}.csv"
    if os.path.isfile(csv_filename):
        print(f"CSV file for {station} and {year} already exists. Skipping download.")
        return
    
    # Download the weather station data as a CSV
    # ... (code to download the data)
    print(f"Downloaded CSV for {station} and {year}")

def download_all_weather_station_data(stations, start_year, end_year):
    years = list(range(start_year, end_year+1))

    for station in stations:
        for year in years:
            download_weather_station_data(station, year)

# Usage example
stations = ["01480099999", "02110099999", "02243099999"]
start_year = 2018
end_year = 2023

download_all_weather_station_data(stations, start_year, end_year)

在上述代码中,我们在download_weather_station_data函数中添加了检查逻辑,以验证是否已经生成了CSV文件。如果文件已存在,则会打印一条消息并跳过下载过程。

这样的优化确保只有在需要时才进行文件下载,避免了重复的I/O操作。

执行了代码并获得了所需的输出

python load_weather_data_cache.py "01480099999,02110099999,02243099999" 2018-2023

输出结果如下:

The temperatures are {'max': {'01480099999': 33.5, '02110099999': 29.6, '02243099999': 32.0}, 'min': {'01480099999': -20.4, '02110099999': -39.5, '02243099999': -32.1}, 'avg': {'01480099999': 7.145012712693135, '02110099999': 0.2386..., '02243099999': 3.383049058515579}}

 

使用CProfile分析代码:

python -m cProfile -s cumulative load_weather_data_cache.py "01480099999,02110099999,02243099999" 2018-2023 > load_weather_data_cache_profile.txt

以上命令将使用CProfile对代码进行分析,并将分析结果保存到`load_weather_data_cache_profile.txt`文件中。

在分析结果中,可以注意到函数调用`download_all_weather_station_data`不再是最耗时的部分。整体运行时间减少了约16倍,这是一个显著的性能提升。

结论

正如本示例所展示的,缓存有能力将代码加速数倍。然而,管理缓存可能会带来一些挑战,并且常常会导致错误。在给定的示例中,文件随时间保持不变,但值得注意的是,在许多情况下,缓存数据可能会发生变化。在这种情况下,负责缓存管理的代码必须能够识别和处理这些变化。

缓存是一个强大的工具,可以提高代码的性能,但在使用时需要权衡其优劣,并考虑到潜在的缓存失效问题。适当地管理缓存并确保其有效性是确保代码正确运行和高效执行的关键。

关键词:Python I/O优化,分析工具,优化策略,瓶颈分析,cProfile,Line_profiler,Memory_profiler,缓存,批处理,异步I/O,数据压缩,并行处理,气象站数据分析