高效 Python (Effective Python)

发布时间 2023-08-20 19:05:01作者: 软匠

有效的 Python

Effective Python

Python 思维方式

第1条 知道你使用的 Python 版本

python --version
# Python 3.8.10

Python 3 通常是 python3

python3 --version
#  Python 3.8.10

在运行中,可以通过 sys.version 来获取 python 的版本

>>> import sys
>>> print(sys.version_info)
sys.version_info(major=3, minor=8, micro=10, releaselevel='final', serial=0)
>>> print(sys.version)
3.8.10 (tags/v3.8.10:3d8993a, May  3 2021, 11:48:03) [MSC v.1928 64 bit (AMD64)]

第2条 遵循 PEP 8 编程风格

第3条 知道 bytes 和 str 的区别

bytes 实例包含的是原始数据,即8位无符号值
str 包含的是 Unicode 码点
如果需要从文件中读写二进制数据,那么需要用二进制的模式打开文件 "wb" 或 "rb"
如果需要从文件中读写文本数据,需要注意操作系统默认的编码影响,可以指定 encoding 编码来避免影响。

第4条 用 f-string 取代 C 风格的格式化或者 str.format 方法

三种方式对别如下

key = 'my_var'
f_string = f'{key:<10} = {value:.2f}'
str_args = '{:<10} = {:.2f}'.format(key, value)
str_kw = '{key:<10} = {value:.2f}'.format(key=key, value=value)
c_tuple = '%-10s = %.2f' % (key, value)
c_dict = '%(key)-10s = %(value).2f' % {'key': key, 'value': value}
assert c_tuple == c_dict == f_string
assert str_args == str_kw == f_string

第5条 用辅助函数取代复杂表达式

  • Python 的语法灵活导致可以写复杂且难以阅读的单行表达式
  • 当重复用到同一逻辑时,可以把逻辑移到辅助函数中
  • if else 表达式比 or/and 布尔运算更易读
from urllib.parse import parse_qs


def test_complex_expressions():
    my_values = parse_qs('red=5&blue=0&green=', keep_blank_values=True)
    red = int(my_values.get('red', [''])[0] or 0)
    blue = int(my_values.get('blue', [''])[0] or 0)
    green = int(my_values.get('green', [''])[0] or 0)
    assert red == 5 and blue == 0 and green == 0


def test_if_else():
    my_values = parse_qs('red=5&blue=0&green=', keep_blank_values=True)
    red_str = my_values.get('red', [''])
    red = int(red_str[0]) if red_str[0] else 0
    blue_str = my_values.get('blue', [''])
    blue = int(blue_str[0]) if blue_str[0] else 0
    green_str = my_values.get('green', [''])
    green = int(green_str[0]) if green_str[0] else 0
    assert red == 5 and blue == 0 and green == 0


def get_first_int(values, key, default=0):
    found = values.get(key, [''])
    if found[0]:
        return int(found[0])
    return default


def test_helper_function():
    my_values = parse_qs('red=5&blue=0&green=', keep_blank_values=True)
    red = get_first_int(my_values, "red", 0)
    blue = get_first_int(my_values, "blue", 0)
    green = get_first_int(my_values, "green", 0)
    assert red == 5 and blue == 0 and green == 0

第6条 把数据结构直接拆分到多个变量里

snacks = [('bacon', 350), ('donut', 240), ('muffin', 190)]
for i in range(len(snacks)):
    item = snacks[i]
    name = item[0]
    calories = item[1]
    print(f'#{i+1}: {name} has {calories} calories')

# 使用 enumerate 加上解包赋值方式简化代码
for rank, (name, calories) in enumerate(snacks, 1):
    print(f'#{rank}: {name} has {calories} calories')

第7条 用 enumerate 取代 range

flavor_list = ['vanilla', 'chocolate', 'pecan', 'strawberry']
for i in range(len(flavor_list)):
    flavor = flavor_list[i]
    print(f'{i + 1}: {flavor}')

# 使用 enumerate 比 range 代码更简洁
for i, flavor in enumerate(flavor_list):
 print(f'{i + 1}: {flavor}')

第8条 用 zip 同时遍历多个迭代器

zip 可以同时遍历多个迭代器,zip 只迭代到最短的迭代器结束,如果需要迭代到最长的迭代器结束,可以使用 itertools.zip_longest

names = ['Cecilia', 'Lise', 'Marie']
counts = [len(n) for n in names]

longest_name = None
max_count = 0

for i, name in enumerate(names):
    count = counts[i]
    if count > max_count:
        longest_name = name
        max_count = count

# 使用 zip 同时遍历多个迭代器
for name, count in zip(names, counts):
    if count > max_count:
        longest_name = name
        max_count = count

# 遍历最长的迭代器
import itertools
for name, count in itertools.zip_longest(names, counts):
    print(f'{name}: {count}')

第9条 避免在 for 或 while 循环后使用 else

else 块只在循环没有被 break 时才会执行,容易引起混乱,尽量避免使用

第10条 用赋值表达式减少重复代码

count = fresh_fruit.get('lemon', 0)
if count:
    make_lemonade(count)
else:
    out_of_stock()

# 使用赋值表达式 := 简化为
if count := fresh_fruit.get('lemon', 0):
    make_lemonade(count)
else:
    out_of_stock()
count = fresh_fruit.get('banana', 0)
if count >= 2:
    pieces = slice_bananas(count)
    to_enjoy = make_smoothies(pieces)
else:
    count = fresh_fruit.get('apple', 0)
    if count >= 4:
        to_enjoy = make_cider(count)
    else:
        count = fresh_fruit.get('lemon', 0)
        if count:
            to_enjoy = make_lemonade(count)
        else:
            to_enjoy‘= 'Nothing'

# 使用赋值表达式 := 来减少缩进
if (count := fresh_fruit.get('banana', 0)) >= 2:
    pieces = slice_bananas(count)
    to_enjoy = make_smoothies(pieces)
elif (count := fresh_fruit.get('apple', 0)) >= 4:
    to_enjoy = make_cider(count)
elif count := fresh_fruit.get('lemon', 0):
    to_enjoy = make_lemonade(count)
else:
    to_enjoy = 'Nothing'

列表与字典

第11条 学会对序列做切片

a = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h']
print('Middle two: ', a[3:5])
print('All but ends:', a[1:7])

# 从开头开始切片不需要写 0
assert a[:5] == a[0:5]

# 切片到结尾时,不要写结束索引
assert a[5:] == a[5:len(a)]

# 当需要相对于尾巴进行切片时,使用负数索引
a[:-1]

# 当索引超出范围时,切片会忽略缺失值
first_twenty_items = a[:20]
last_twenty_items = a[-20:]

# list 的切片返回的是新的 list 对象,对切片的修改不会影响原来的 list
b = a[3:]
print('Before: ', b)
# ['d', 'e', 'f', 'g', 'h']
b[1] = 99
print('After: ', b)
# ['d', 99, 'f', 'g', 'h']
print('No change:', a)
# ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h']

# 对切片范围的赋值,会使原来的 list 收缩或变长
print('Before ', a)
# ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h']
a[2:7] = [99, 22, 14]
print('After ', a)
# ['a', 'b', 99, 22, 14, 'h']
a[2:3] = [47, 11]
print('After ', a)
# ['a', 'b', 47, 11, 22, 14, 'h']

# 复制 list
b = a[:]
assert b == a and b is not a

第12条 不要在切片里同时指定起止下标与步进

同时指定起止下标与步进会导致奇怪的问题
尽量避免负步进

x = '寿司'
y = x[::-1]
print(y)
# 输出 司寿

第13条 用带星号的 unpacking 捕获多个元素

比如只需要使用前面两个元素时,可以使用 *other 捕获剩余元素

car_ages = [0, 9, 4, 8, 7, 20, 19, 1, 6, 15]
oldest, second_oldest, *others = car_ages_descending
print(oldest, second_oldest, others)

星号表达式可以用在任意地方

oldest, *others, youngest = car_ages_descending
print(oldest, youngest, others)
*others, second_youngest, youngest = car_ages_descending
print(youngest, second_youngest, others)

星号表达式更好的表达逻辑

it = read_csv()
header, *rows = it
print('CSV Header:', header)
print('Row count: ', len(rows))

第14条 用 sort 方法的 key 参数进行复杂排序逻辑

对于对象或者复杂数据时,可以用 key 指定排序的属性

class Tool:
    def __init__(self, name, weight):
        self.name = name
        self.weight = weight
tools = [
    Tool('level', 3.5),
    Tool('hammer', 1.25),
    Tool('screwdriver', 0.5),
    Tool('chisel', 0.25),
]
# 根据名称排序
tools.sort(key=lambda x: x.name)
# 根据重量排序
tools.sort(key=lambda x: x.weight)

对于字符串基础类型,可以用 key 来做转换后再排序

places = ['home', 'work', 'New York', 'Paris']
places.sort(key=lambda x: x.lower())

多条件排序

tools.sort(key=lambda x: (x.weight, x.name))

第15条 不要过分依赖给字典添加条目时所用的顺序

在 python 3.7 之前的版本,遍历 dict 会随机返回 key
python 3.7 和以上的版本,遍历 dict 会按插入顺序返回 key

baby_names = {
 'cat': 'kitten',
 'dog': 'puppy',
}
print(baby_names)
# python 3.5
# 输出 {'dog': 'puppy', 'cat': 'kitten'}

第16条 用 get 取代 KeyError 来处理键不在字典中的情况

# 使用 not in 来处理 key 不存在的情况
if key not in counters:
    counters[key] = 0
counters[key] += 1
# 使用 in 来处理 key 不存在的情况
if key in counters:
    counters[key] += 1
else:
    counters[key] = 1
# 使用捕获 KeyError 来处理 key 不存在的情况
try:
    counters[key] += 1
except KeyError:
    counters[key] = 1
# 使用 setdefault 来处理 key 不存在的情况
count = counters.setdefault(key, 0)
counters[key] = count + 1

# 使用 get 来处理 key 不存在的情况
count = counters.get(key, 0)
counters[key] = count+1

第17条 用 defaultdict 取代 setdefault 处理内部状态中缺失的元素

使用 defaultdict 时只需要指定缺失时的默认值

# 使用 setdefault
class Visits:
    def __init__(self):
        self.data = {}
    
    def add(self, country, city):
        city_set = self.data.setdefault(country, set())
        city_set.add(city)

# 使用 defaultdict
class Visits:
    def __init__(self):
        self.data = defaultdict(set)
    
    def add(self, country, city):
        self.data[country].add(city)

第18条 学会用 missing 构造依赖键的默认值

当继承 dict 时,使用 missing 来自定义 key 缺失的逻辑处理

class Pictures(dict):
    def __missing__(self, key):
        value = open_picture(key)
        self[key] = value
        return value

函数

第19条 不要把函数返回值拆分到三个以上

不要把函数返回值拆分到三个以上,可以使用 class 或 namedtuple 来处理3个返回值以上的场景

def get_stats(numbers):
    minimum = min(numbers)
    maximum = max(numbers)
    count = len(numbers)
    average = sum(numbers) / count
    sorted_numbers = sorted(numbers)
    middle = count // 2
    if count % 2 == 0:
        lower = sorted_numbers[middle - 1]
        upper = sorted_numbers[middle]
        median = (lower + upper) / 2
    else:
        median = sorted_numbers[middle]
    return minimum, maximum, average, median, count

minimum, maximum, average, median, count = get_stats(lengths)

第20条 用抛出异常取代返回 None

def careful_divide(a: float, b: float) -> float:
    try:
        return a / b
    except ZeroDivisionError as e:
        raise ValueError('Invalid inputs')

第21条 了解闭包与变量作用域

当引用一个变量时,Python 解释器按以下顺序查找变量

  • 当前函数
  • 任何包含范围 (比如被其他函数包含)
  • 所在的模块
  • 内置的范围,比如 len, str 等
    默认情况下闭包无法赋值包含范围的变量
    nonlocal 表示搜索父作用域,nonlocal 不会搜索模块作用域,防止污染模块作用域
def sort_priority2(numbers, group):
    found = False # Scope: 'sort_priority2'
    def helper(x):
        if x in group:
            found = True # Scope: 'helper' -- Bad!
            return (0, x)
        return (1, x)
    numbers.sort(key=helper)
    return found
found = sort_priority2(numbers, group)
print('Found:', found)
# Found: False

def sort_priority3(numbers, group):
    found = False
    def helper(x):
        nonlocal found
        if x in group:
            found = True
            return (0, x)
        return (1, x)
    numbers.sort(key=helper)
    return found

第22条 使用 *args 简化代码

*args 可变长参数,可以简化调用时的代码

def log(message, *values): # The only difference
    if not values:
        print(message)
    else:
        values_str = ', '.join(str(x) for x in values)
        print(f'{message}: {values_str}')
log('My numbers are', 1, 2)
log('Hi there')

第23条 使用 **kwargs 表示可选行为

def print_parameters(**kwargs):
    for key, value in kwargs.items():
        print(f'{key} = {value}')

print_parameters(alpha=1.5, beta=9, gamma=4)

第24条 使用 None 和 Docstrings 描述默认值会变的参数

def log(message, when=None):
    """Log a message with a timestamp.
    Args:
    message: Message to print.
    when: datetime of when the message occurred.
    Defaults to the present time.
    """
    if when is None:
        when = datetime.now()
    print(f'{when}: {message}')

第25条 设计清晰的参数列表

  • *号后面的是关键字参数,调用时必须传入关键字,表明参数用途
  • / 前面的是位置参数,调用时只能按位置传参,避免耦合
  • / 和 * 之间的参数,调用时可以按位置或者关键字传参
def safe_division_e(numerator, denominator, /, ndigits=10, *, ignore_overflow=False, ignore_zero_division=False):

第26条 用 functools.wrap 定义函数装饰器

functools.wrap 会把内部函数重要的 metadata 复制到外部函数

from functools import wraps
def trace(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        pass
    return wrapper

@trace
def fibonacci(n):

推导与生成

第27条 用推导替代 map 和 filter

列表推导比 map 和 filter 更简洁

a = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
alt = map(lambda x: x**2, filter(lambda x: x % 2 == 0, a))
even_squares = [x**2 for x in a if x % 2 == 0]
assert even_squares == list(alt)
###

第28条 避免在推导中使用多余两个的控制子表达式

在推导中使用多余两个控制子表达式时,代码较难读懂,应该避免

my_lists = [
 [[1, 2, 3], [4, 5, 6]]
]
flat = [x for sublist1 in my_lists
    for sublist2 in sublist1
    for x in sublist2]

# 此时使用 for 循环的代码,更易懂
flat = []
for sublist1 in my_lists:
    for sublist2 in sublist1:
        flat.extend(sublist2)

# 在推导使用多余两个控制子表达式时,代码很难读懂
matrix = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
filtered = [[x for x in row if x % 3 == 0] for row in matrix if sum(row) >= 10]

第29条 在推导中使用赋值表达式避免重复工作

Python 3.8 引入了赋值表达式 :=

stock = {
    'nails': 125,
    'screws': 35,
    'wingnuts': 8,
    'washers': 24,
}
order = ['screws', 'wingnuts', 'clips']
def get_batches(count, size):
    return count // size

# 常规写法
result = {}
for name in order:
    count = stock.get(name, 0)
    batches = get_batches(count, 8)
    if batches:
        result[name] = batches
    print(batches)
# 使用推导使代码更紧凑,但依然有重复
found = {name: get_batches(stock.get(name, 0), 8) for name in order if get_batches(stock.get(name, 0), 8)}
print(found)

# 使用赋值表达式避免重复代码
found = {name: batches for name in order if (batches := get_batches(stock.get(name, 0), 8))}

第30条 考虑用生成器代替返回列表

  • 生成器的代码比返回列表更清晰
  • 生成器可以为任意多的输入产生输出,因为生成器不会在内存里保存全部的输入和输出
def index_words(text):
    result = []
    if text:
        result.append(0)
    for index, letter in enumerate(text):
        if letter == ' ':
            result.append(index + 1)
    return result

def index_words_iter(text):
    if text:
        yield 0
    for index, letter in enumerate(text):
        if letter == ' ':
            yield index + 1

第31条 迭代参数时注意防御

  • 当需要多次遍历输入参数时注意防御,如果输入参数是迭代器,可能会出现奇怪行为或值丢失
  • 通过实现 iter 来自定义迭代器
  • 通过下面两种方式检测输入参数是迭代器
if iter(numbers) is numbers
# 或者
if isinstance(numbers, Iterator)

第32条 当使用大的列表推导时考虑使用生成器表达式

  • 输入很大时使用列表推导会占用大量内存,生成器表达式通过迭代器每次生成一个结果,所以可以解决内存消耗过多问题
  • 生成器表达式可以把迭代器传入 for 子表达式来实现组合使用
  • 当组合使用时生成器表达式执行很快并且内存使用效率高
# 对于小的输入,使用列表推导
value = [len(x) for x in open('my_file.txt')]

# 对于大的输入,使用生成器表达式
it = (len(x) for x in open('my_file.txt'))

第33条 用 yield from 来组合使用生成式表达式

  • yield from 可以把多个嵌套生成器组合成当个组合生成器
  • yield from 比遍历多个嵌套生成器再产生各自的输出性能更高
def move(period, speed):
    for _ in range(period):
        yield speed

def pause(delay):
    for _ in range(delay):
        yield 0

def animate_composed():
    yield from move(4, 5.0)
    yield from pause(3)
    yield from move(2, 3.0)

第34条 避免用 send 给生成器注入数据

  • send 和 yeild from 一起使用可能导致奇怪的行为,比如返回 None
  • 使用迭代器作为组合生成器的输入参数是比用 send 给组合生成器注入数据更好的方式

第35条 避免在生成器中使用 throw 导致状态转移

在生成器中抛出异常会使调用变得复杂,可以通过自定义迭代器来简化

class Reset(Exception):
    pass

def timer(period):
    current = period
    while current:
        current -= 1
    try:
        yield current
    except Reset:
        current = period

def check_for_reset():
    pass

def announce(remaining):
    print(f'{remaining} ticks remaining')

def run():
    it = timer(4)
    while True:
        try:
            if check_for_reset():
                current = it.throw(Reset())
            else:
                current = next(it)
        except StopIteration:
            break
        else:
            announce(current)

通过自定义迭代器,调用者 run 的代码简化了很多

class Timer:
    def __init__(self, period):
        self.current = period
        self.period = period
    
    def reset(self):
        self.current = self.period
    
    def __iter__(self):
        while self.current:
            self.current -= 1
            yield self.current

def run():
    timer = Timer(4)
    for current in timer:
        if check_for_reset():
            timer.reset()
        announce(current)

第36条 考虑用 itertools 拼装迭代器与生成器

itertools 功能主要分三类

  • 链接迭代器
  • 过滤迭代器输出
  • 产生多个迭代器的组合输出

链接迭代器

it = itertools.chain([1, 2, 3], [4, 5, 6])
print(list(it))

repeat 重复

it = itertools.repeat('hello', 3)
print(list(it))

cycle 循环

it = itertools.cycle([1, 2])
result = [next(it) for _ in range (10)]

tee 生成多个迭代器

it1, it2, it3 = itertools.tee(['first', 'second'], 3)

zip_longest 遍历最长迭代器

keys = ['one', 'two', 'three']
values = [1, 2]
# 最短的列表
normal = list(zip(keys, values))

# zip_longest 遍历最长列表
it = itertools.zip_longest(keys, values, fillvalue='nope')
longest = list(it)

类与接口

第37条 用类的组合替代嵌套的内置类型

当需要嵌套内置类型时,比如在 dict 里面嵌套 list,可以重构为类的组合,使代码更易懂和更易扩展

  • 避免字典的值是字典、长元组、或内置类型的复杂嵌套
  • namedtuple 用于作为轻量的、不可变的数据容器,如果需要更全面功能时,使用类
  • 当内部状态字典变得复杂时,把代码重构到多个类

第38条 让简单的接口接受函数而不是类

  • 相比定义和实例化类,在组件间的接口使用函数更加简化
  • call 方法让实例可以像函数那样调用
  • 当需要一个函数维护状态时,使用带有 call 方法的类来实现,比有状态的闭包更好
# 组件间的接口传递函数
def log_missing():
    print('Key added')
    return 0
result = defaultdict(log_missing, current)

# 有状态的闭包
def increment_with_report(current, increments):
    added_count = 0
    def missing():
        nonlocal added_count # Stateful closure
        added_count += 1
        return 0
    result = defaultdict(missing, current)
    for key, amount in increments:
        result[key] += amount
    return result, added_count

# 带有 __call__ 方法的类
class BetterCountMissing:
    def __init__(self):
        self.added = 0
    def __call__(self):
        self.added += 1
        return 0

counter = BetterCountMissing()
result = defaultdict(counter, current) # 像函数那样使用

第39条 用 @classmethod 多态构建对象

  • 在 Python 中,不仅对象有多态,类也有多态
  • Python 每个类仅有一个构造函数 init
  • 使用 @classmethod 定义更多的构造函数
  • 使用 @classmethod 提供通用方法构建和连接更多具体方法
class GenericInputData:
    @classmethod
    def generate_inputs(cls, config):
        raise NotImplementedError

class PathInputData(GenericInputData):
    @classmethod
    def generate_inputs(cls, config):
        pass

class GenericWorker:
    def __init__(self, input_data):
        self.input_data = input_data
        self.result = None

    @classmethod
    def create_workers(cls, input_class, config):
        workers = []
        for input_data in input_class.generate_inputs(config):
            workers.append(cls(input_data))
        return workers

class LineCountWorker(GenericWorker):
    pass

def mapreduce(worker_class, input_class, config):
    workers = worker_class.create_workers(input_class, config)
    return execute(workers)

result = mapreduce(LineCountWorker, PathInputData, config)

第41条 使用 super 初始化超类

Python MRO 方法解析顺序继承层次中公共的超类只会执行一次,MRO 用的是 C3 算法
调用顺序是
MyBaseClass 的 init
PlusNineCorrect 的 init
TimesSevenCorrect 的 init

class MyBaseClass:
    def __init__(self, value):
        self.value = value

class TimesSevenCorrect(MyBaseClass):
    def __init__(self, value):
        super().__init__(value)
        self.value *= 7

class PlusNineCorrect(MyBaseClass):
    def __init__(self, value):
        super().__init__(value)
        self.value += 9

class GoodWay(TimesSevenCorrect, PlusNineCorrect):
    def __init__(self, value):
        super().__init__(value)

foo = GoodWay(5)
print('Should be 7 * (5 + 9) = 98 and is', foo.value)
# 输出 Should be 7 * (5 + 9) = 98 and is 98

第42条 用 Mix-in 类表示可组合功能

  • 当需要多继承时,可以考虑使用 Min-in 类来替代实现
  • Mix-in 类不定义实例属性,也不需要调用 init 方法,定义了一组方法等子类提供
class ToDictMixin:
    def to_dict(self):
        return self._traverse_dict(self.__dict__)
    def _traverse_dict(self, instance_dict):
        output = {}
        for key, value in instance_dict.items():
            output[key] = self._traverse(key, value)
        return output
    def _traverse(self, key, value):
        if isinstance(value, ToDictMixin):
            return value.to_dict()
        elif isinstance(value, dict):
            return self._traverse_dict(value)
        elif isinstance(value, list):
            return [self._traverse(key, i) for i in value]
        elif hasattr(value, '__dict__'):
            return self._traverse_dict(value.__dict__)
        else:
            return value

class BinaryTree(ToDictMixin):
    def __init__(self, value, left=None, right=None):
        self.value = value
        self.left = left
        self.right = right

Mix-in 类也可以组合使用

import json
class JsonMixin:
    @classmethod
    def from_json(cls, data):
        kwargs = json.loads(data)
        return cls(**kwargs)

    def to_json(self):
        return json.dumps(self.to_dict())

class Switch(ToDictMixin, JsonMixin):
    def __init__(self, ports=None, speed=None):
        self.ports = ports
        self.speed = speed

第42条 优先用 public 属性而不是 private 属性

  • 从开始就规划好让子类更好的使用内部 API 和属性,而不是隐藏他们
  • 用保护属性加上文档说明来指导子类,而不是用私有属性来控制访问
  • 仅在避免子类命名冲突时考虑使用私有属性

第43条 让自定义容器类型继承 collections.abc

直接继承 list 或 dict 时,因为父类是具体类,需要记住哪些方法需要重新实现,这个比较困难,而继承于 collections.abc 时因为继承的是抽象类,需要实现哪些方法直观明了

  • 在简单应用场景时继承 python 容器类型 (list, dict)
  • 需要注意大量的方法需要实现
  • 让自定义容器类型继承 collections.abc 中的接口,可以确保类实现了全部需要的方法

第44条 用属性代替 getter 和 setteer

刚开始直接使用属性,当这个属性被 set 或 get 过程需要特殊处理时,再用 @property 和 setter 来重构

  • 定义新类时直接使用 public 属性而不是 getter 和 setter
  • 如果需要特殊逻辑处理,再使用 @property 和 setter 来实现
  • 确保 @property 执行很快,否则考虑使用普通方法来实现
class VoltageResistance(Resistor):
    def __init__(self, ohms):
        super().__init__(ohms)
        self._voltage = 0
    
    @property
    def voltage(self):
        return self._voltage

    @voltage.setter
    def voltage(self, voltage):
        self._voltage = voltage
        self.current = self._voltage / self.ohms

第45条 用 @property 实现新的属性访问

  • 用 @property 可以给现有字段增加功能
  • 当 @property 过于复杂时,再考虑重构类的字段和调用的代码
class NewBucket:
    def __init__(self, period):
        self.period_delta = timedelta(seconds=period)
        self.reset_time = datetime.now()
        self.max_quota = 0
        self.quota_consumed = 0

    @property
    def quota(self):
        return self.max_quota - self.quota_consumed

第46条 用描述符改写可复用的 @property 方法

通过自定义描述符(get, set)的类来复用 @property 的逻辑

from weakref import WeakKeyDictionary
class Grade:
    def __init__(self):
        self._values = WeakKeyDictionary()
    def __get__(self, instance, instance_type):
        pass
    def __set__(self, instance, value):
        pass

class Exam:
    math_grade = Grade()
    writing_grade = Grade()
    science_grade = Grade()

first_exam = Exam()
first_exam.math_grade = 78
first_exam.writing_grade = 82
second_exam = Exam()
second_exam.writing_grade = 75
second_exam.science_grade = 80

第47条 对于惰性属性使用 getattr, getattribute, setattr

  • 对于懒加载或保存的属性采用 setattrgetattr
  • getattr 只有属性缺失才会调用,而 getattribute 每个属性访问都会调用
  • 使用 super() 来访问属性,以避免 getattributesetattr 无限循环
    访问不存在的属性时,实际调用的是 getattr 方法,而 setattr 实际修改的是实例的 dict
class LazyRecord:
    def __init__(self):
        self.exists = 5
    
    def __getattr__(self, name):
        value = f'Value for {name}'
        setattr(self, name, value)
        return value
class SavingRecord:
    def __setattr__(self, name, value):
        # Save some data for the record
        pass

第48条 用 init__subclass 验证子类是否正确

  • Metaclass 的 new 方法在类所有语句处理完成后才会执行
  • Metaclass 可以用来在类定义但实例创建之前,对类进行检查或修改,但往往过重
  • init_subclass 确保子类在定义时正确处理了
  • 在子类 init_subclass 确保调用了 super().init_subclass
class BetterPolygon:
    sides = None # 子类必须指定
    
    def __init_subclass__(cls):
        super().__init_subclass__()
        if cls.sides < 3:
            raise ValueError('Polygons need 3+ sides')

class Hexagon(BetterPolygon):
    sides = 6

第49条 用 init_subclass 注册类

  • 类注册在构建模块化 Python 程序时是个有效的模式
  • Metaclass 被子类化时可以自动运行注册逻辑
  • 用 Metaclass 可以避免忘记注册类导致错误
  • 优于 Metaclass 层次结构使用 init_subclass 使代码更易懂
class BetterRegisteredSerializable(BetterSerializable):
    def __init_subclass__(cls):
        super().__init_subclass__()
        register_class(cls)

class Vector1D(BetterRegisteredSerializable):
    def __init__(self, magnitude):
        super().__init__(magnitude)
        self.magnitude = magnitude

第50条 用 set_name 注解类属性

class Field:
    def __init__(self):
        self.name = None
        self.internal_name = None
    
    def __set_name__(self, owner, name):
        # Called on class creation for each descriptor
        self.name = name
        self.internal_name = '_' + name
    
    def __get__(self, instance, instance_type):
        if instance is None:
            return self
        return getattr(instance, self.internal_name, '')
    
    def __set__(self, instance, value):
        setattr(instance, self.internal_name, value)

class Customer:
    first_name = Field()
    last_name = Field()
    prefix = Field()
    suffix = Field()

第51条 对于可组合的类扩展优先使用类修饰器而不是 Metaclass

  • 类修饰器接受一个类,返回一个新的类或者修改的类
  • 当需要修改类的每一个方法或属性时,类修改器比较有用
  • Metaclass 不易组合使用,而很多类修改器可以组合使用来扩展类的功能
def my_class_decorator(klass):
    klass.extra_param = 'hello'
    return klass

@my_class_decorator
class MyClass:
    pass

第52条 用 subprocess 管理子进程

import subprocess

result = subprocess.run(['echo', 'Hello from the child!'], capture_output=True, encoding='utf-8')
result.check_returncode() # No exception means clean exit
print(result.stdout)

可以用 poll 查询子进程退出状态

proc = subprocess.Popen(['sleep', '1'])
while proc.poll() is None:
    print('Working...')
print('Exit status', proc.poll())

可以用 communicate 等待子进程输出完成并结束

import time
start = time.time()
sleep_procs = []
for _ in range(10):
    proc = subprocess.Popen(['sleep', '1'])
    sleep_procs.append(proc)
for proc in sleep_procs:
    proc.communicate()
end = time.time()
delta = end - start
print(f'Finished in {delta:.3} seconds')

如果子进程可能永远不退出,可以给 communicate 增加 timeout 参数

proc = subprocess.Popen(['sleep', '10'])
try:
    proc.communicate(timeout=0.1)
except subprocess.TimeoutExpired:
    proc.terminate()
    proc.wait()
print('Exit status', proc.poll())
# 输出 Exit status -15

可以用 subprocess.PIPE 给子进程传数据和接受子进程的输出

proc = subprocess.Popen(
 ['openssl', 'enc', '-des3', '-pass', 'env:password'],
 env=env,
 stdin=subprocess.PIPE,
 stdout=subprocess.PIPE)

第53条 用线程执行阻塞式 I/O

  • 因为全局解释器锁 Python 线程无法在多核 cpu 并行执行
  • Python 线程可以同时处理阻塞 I/O 和复杂计算
import time
from threading import Thread

class FactorizeThread(Thread):
    def __init__(self, number):
        super().__init__()
        self.number = number
    
    def run(self):
        self.factors = list(factorize(self.number))

numbers = [2139079, 1214759, 1516637, 1852285]
start = time.time()
threads = []
for number in numbers:
    thread = FactorizeThread(number)
    thread.start()
    threads.append(thread)
for thread in threads:
    thread.join()
end = time.time()
delta = end - start
print(f'Took {delta:.3f} seconds')
# 输出 Took 0.446 seconds

第54条 用锁避免数据竞争

对于需要并发访问的数据用锁进行保护

from threading import Lock
class LockingCounter:
    def __init__(self):
        self.lock = Lock()
        self.count = 0
    
    def increment(self, offset):
        with self.lock:
            self.count += offset

第55条 用 Queue 协调线程间的工作

用 Queue 实现生产者消费者模式

from queue import Queue
my_queue = Queue()
def consumer():
    print('Consumer waiting')
    my_queue.get() # Runs after put() below
    print('Consumer done')

thread = Thread(target=consumer)
thread.start()
print('Producer putting')
my_queue.put(object()) # Runs before get() above
print('Producer done')
thread.join()
# 输出
Consumer waiting
Producer putting
Producer done
Consumer done

第56条 判断需要并发的场景

最常见的并发协调时生成一堆并发任务,然后等待这些任务完成

第57条 避免每次 Fan-out 时都创建线程实例

  • 线程创建和启动都有性能消耗,大量线程会消耗内存,线程间的协调会引入复杂度
  • 线程的异常处理复杂,会难以调试

第58条 正确重构代码以便用 Queue 做并发

  • 用 Queue 加上固定数量的线程提高了 Fan-out 和 Fan-in 的可扩展性
  • 用 Queue 做并发时,可能有大量的代码需要重构,特别是需要使用多阶段 Pipeline 的时候

第59条 通过 ThreadPoolExecutor 实现并发

ThreadPoolExecutor 简化了 I/O 并发

with ThreadPoolExecutor(max_workers=10) as pool:
    task = pool.submit(game_logic, ALIVE, 3)
    task.result()

第60条 用协程实现高并发 I/O

  • Python 的协程是通过 async 和 await 来实现
  • 协程简化了大量任务并发执行
  • 协程的启动消耗是个函数调用,当协程活跃时大约消耗 1K 内存
  • 使用 async 定义的函数叫做协程
async def game_logic(state, neighbors):
    # Do some input/output in here:
    data = await my_socket.read(50)

第61条 知道如何用 asyncio 改写用线程实现的 I/O

  • Python 提供了异步版本的 for 循环、with 语句、生成器、推导和库以便在协程中使用
  • 内置的 asyncio 模块方便用来把用线程实现的阻塞式 I/O 修改为异步 I/O

第62条 结合线程和协程将代码迁移到 asyncio

  • 可等待的 run_in_executor 可以使协程在 ThreadPoolExecutor 中执行同步调用
  • run_until_complete 可以执行同步调用并等待完成

第63条 避免 asyncio 的事件循环阻塞以最大化响应

第64条 用 concurrent.futures 实现并行计算

内置的 multiprocessing 模块提供了并行计算的功能

鲁棒性和性能

第65条 合理利用 try,except,else 和 finally

  • try/finally 不管是否发生异常,都会执行清除代码
  • else 使正常逻辑和异常处理分开,代码更清晰

第66条 用 contextlib 和 with 实现可复用的 try/finally

  • with 语句简化了 try/finally 的逻辑复用
  • contextlib 模块中的 contextmanager 修饰器可以让自定义函数用在 with 中
from threading import Lock
lock = Lock()

# 上锁,执行处理,解锁
lock.acquire()
try:
 pass
finally:
 lock.release()

# 改为 with
with lock:
    pass
from contextlib import contextmanager

@contextmanager
def log_level(level, name):
    logger = logging.getLogger(name)
    old_level = logger.getEffectiveLevel()
    logger.setLevel(level)
    try:
        yield logger
    finally:
        logger.setLevel(old_level)

with log_level(logging.DEBUG, 'my-log') as logger:
    logger.debug(f'This is a message for {logger.name}!')

第66条 用 datetime 替代 time 处理本地时间

  • 在多个时区转换时避免使用 time 模块
  • 使用 datetime 模块和社区的 pytz 模块来转换不同时区
  • 总是用 UTC 来表示时间,在展示前再转换为本地时间
import time

now = 1552774475
local_tuple = time.localtime(now)
time_format = '%Y-%m-%d %H:%M:%S'
time_str = time.strftime(time_format, local_tuple)
print(time_str)
# UTC 转本地时间
from datetime import datetime, timezone
now = datetime(2019, 3, 16, 22, 14, 35)
now_utc = now.replace(tzinfo=timezone.utc)
now_local = now_utc.astimezone()

# 本地时间转 UTC
time_str = '2019-03-16 15:14:35'
now = datetime.strptime(time_str, time_format)
time_tuple = now.timetuple()
utc_now = time.mktime(time_tuple)
print(utc_now)

第67条 用 copyreg 实现可靠的 pickle

  • 内置的 pickle 模块用于 python 对象的序列化和反序列化
  • 如果类版本发生变化,pickle 的序列化和反序列化可能无法工作
  • 用 copyreg 模块来实现 pickle 的版本兼容
import copyreg

def pickle_game_state(game_state):
    kwargs = game_state.__dict__
    return unpickle_game_state, (kwargs,)

def unpickle_game_state(kwargs):
    return GameState(**kwargs)

copyreg.pickle(GameState, pickle_game_state)
state = GameState()
state.points += 1000
serialized = pickle.dumps(state)
state_after = pickle.loads(serialized)
print(state_after.__dict__)

版本化类

def pickle_game_state(game_state):
    kwargs = game_state.__dict__
    kwargs['version'] = 2
    return unpickle_game_state, (kwargs,)

def unpickle_game_state(kwargs):
    version = kwargs.pop('version', 1)
    if version == 1:
        del kwargs['lives']
        return GameState(**kwargs)

第69条 当精度很重要时使用 decimal

  • Decimal 类用于高精度计算场景,比如金额
  • 使用 Decimal 字符串的构造函数,而不是 float 构造函数
from decimal import Decimal
rate = Decimal('1.45')
seconds = Decimal(3*60 + 42)
cost = rate * seconds / Decimal(60)
print(cost)
print(Decimal('1.45'))
# 1.45
print(Decimal(1.45))
# 
1.4499999999999999555910790149937383830547332763671875

第70条 先分析性能再进行优化

  • 使用 cProfile 模块而不是 profile 模块来度量性能,因为 cProfile 提供了更准确的性能信息
  • Profile 对象的 runcall 方法提供了性能分析需要的全部信息,包括调用栈
  • Stats 对象用来根据需要输出性能信息
from cProfile import Profile
from pstats import Stats

profiler = Profile()
profiler.runcall(test)

stats = Stats(profiler)
stats.strip_dirs()
stats.sort_stats('cumulative')
stats.print_stats()
# 输出 20003 function calls in 1.320 seconds
Ordered by: cumulative time
 ncalls tottime percall cumtime percall filename:lineno(function)
 1 0.000 0.000 1.320 1.320 main.py:35(<lambda>)
 1 0.003 0.003 1.320 1.320 main.py:10(insertion_sort)
 10000 1.306 0.000 1.317 0.000 main.py:20(insert_value)
 9992 0.011 0.000 0.011 0.000 {method 'insert' of 'list' objects}
 8 0.000 0.000 0.000 0.000 {method 'append' of 'list' objects}

第71条 优先用 deque 实现生产者消费者模式

  • list 的 append 和 pop(0) 可用于实现 FIFO 的队列,但是当 list 长度变长时 pop(0) 性能下降厉害
  • deque 的 append 和 popleft 可用于实现 FIFO 队列,并且性能与长度无关

第72条 用 bitsect 搜索已排序的序列

from bisect import bisect_left
index = bisect_left(data, 91234) # Exact match

第73条 用 heapq 实现优先队列

  • 优先队列可以根据优先级处理元素
  • 用 list 来实现优先队列时,当长度变长时,性能变差
  • 内置的 heapq 模块提供了高效优先队列的全部功能
  • heapq 优先队列的元素需要实现自然排序,比如需要实现 lt 方法

第74条 用 memoryview 和 bytearray 零拷贝处理 bytes

  • memoryview 提供了零拷贝接口来读取和写入对象切片,并且支持 python 缓冲协议
  • bytearray 提供了可变的类似 bytes 功能可用来零拷贝读取,比如 socket.recv_from
  • memoryview 可以包装 btyearray 用来接收数据到任意缓冲位置,且不用拷贝

测试与调试

第75条 用 repr 输出调试信息

class BetterClass:
    def __init__(self, x, y):
        self.x = x
        self.y = y

    def __repr__(self):
        return f'BetterClass({self.x!r}, {self.y!r})'
obj = BetterClass(2, 'bar')
print(obj)
# 输出 BetterClass(2, 'bar')

第76条 在 TestCase 子类里验证相关行为

  • 继承 unittest 模块中的 TestCase 类来创建测试
from unittest import TestCase, main
from utils import to_str

class UtilsTestCase(TestCase):
    def test_to_str_bytes(self):
        self.assertEqual('hello', to_str(b'hello'))
    def test_to_str_str(self):
        self.assertEqual('hello', to_str('hello'))
    def test_failing(self):
        self.assertEqual('incorrect', to_str('hello'))
if __name__ == '__main__':
    main()

第77条 用 setUp, tearDown, setUpModule, tearDownModule 来隔离测试

  • 单元测试用于模块内独立功能的测试
  • 集成测试用于模块间的交互测试
  • setUp, tearDown 用来隔离单元测试
  • setUpModule, tearDownModule 用来隔离集成测试
from pathlib import Path
from tempfile import TemporaryDirectory
from unittest import TestCase, main
class EnvironmentTest(TestCase):
    def setUp(self):
        self.test_dir = TemporaryDirectory()
        self.test_path = Path(self.test_dir.name)
    
    def tearDown(self):
        self.test_dir.cleanup()
    
    def test_modify_file(self):
        with open(self.test_path / 'data.bin', 'w') as f:
            pass
if __name__ == '__main__':
    main()
from unittest import TestCase, main
def setUpModule():
    print('* Module setup')
def tearDownModule():
    print('* Module clean-up')
class IntegrationTest(TestCase):
    def setUp(self):
        print('* Test setup')
    def tearDown(self):
        print('* Test clean-up')
    def test_end_to_end1(self):
        print('* Test 1')
    def test_end_to_end2(self):
        print('* Test 2')
if __name__ == '__main__':
    main()

第78条 用 Mock 测试复杂依赖的代码

  • unittest.mock 模块中的 Mock 类用来模拟接口行为
  • 使用 Mock 时需要同时注意逻辑代码被测试了,并且依赖的函数被调用了 Mock.assert_called_once_with
  • unittest.mock.patch 和 关键字参数用来给测试代码注入 Mock
from datetime import datetime
from unittest.mock import Mock
mock = Mock(spec=get_animals)
expected = [
 ('Spot', datetime(2019, 6, 5, 11, 15)),
 ('Fluffy', datetime(2019, 6, 5, 12, 30)),
 ('Jojo', datetime(2019, 6, 5, 12, 45)),
]
mock.return_value = expected

第79条 封装依赖以简化模拟和测试

import contextlib
import io
from unittest.mock import patch

with patch('__main__.DATABASE', spec=ZooDatabase):
    now = datetime.utcnow()
    DATABASE.get_food_period.return_value = timedelta(hours=3)
    DATABASE.get_animals.return_value = [
    ('Spot', now - timedelta(minutes=4.5)),
    ('Fluffy', now - timedelta(hours=3.25)),
    ('Jojo', now - timedelta(hours=3)),
    ]
    fake_stdout = io.StringIO()
    with contextlib.redirect_stdout(fake_stdout):
        main(['program name', 'Meerkat'])
    found = fake_stdout.getvalue()
    expected = 'Fed 2 Meerkat(s)\n'
    assert found == expected

第80条 用 pdb 做交互调试

第81条 用 tracemalloc 理解内存使用和泄露

import tracemalloc
tracemalloc.start(10) # Set stack depth
time1 = tracemalloc.take_snapshot() # Before snapshot

import waste_memory
x = waste_memory.run() # Usage to debug
time2 = tracemalloc.take_snapshot() # After snapshot
stats = time2.compare_to(time1, 'lineno') # Compare snapshots
for stat in stats[:3]:
    print(stat)

协作

第82条 知道哪里找社区构建的包

PIPY

第83条 使用虚拟环境来隔离和重建依赖

python3 -m pip freeze > requirements.txt
python3 -m pip install -r requirements.txt

第84条 为每个函数、类和模块写文档

第85条 使用包来组织模块和提供稳定的 API

第86条 考虑使用模块级别的代码来配置不同的部署环境

第87条 为模块定义根异常

第88条 知道如何避免循环依赖关系

第89条 重构时通过 warnings 提醒开发者 API 变化

第90条 使用 typing 做静态分析以消除 bug

静态分析工具

  • mypy
  • pytype
  • pyright
  • pyre