asyncio的subprocess使用

发布时间 2023-08-03 16:09:42作者: lxd670

1.asyncio的subprocess

asyncio提供了两个开箱即用的协程函数来创建子进程,这些协程函数都返回一个Process实例
  1.asyncio.create_subprocess_exec(),用于直接运行命令(如ls、pwd、who、python3、go等)。
  2.asyncio.create_subprocess_shell(),用于通过 shell 运行命令。
  
create_subprocess_exec相比create_subprocess_shell更安全,应用程序有责任确保所有空格和特殊字符都被适当地引用,以避免 shell injection 漏洞。这个 shlex.quote() 函数可用于正确转义将用于构造shell命令的字符串中的空白字符和特殊shell字符。

2.使用create_subprocess_exec

2-1.FileNotFoundError报错

create_subprocess_exec会把第一个参数当做命令(如bash、python、go这些),后面更命令的参数

2-1-1.报错原因

# 1.源码参数
async def create_subprocess_exec(
  program, 
  *args, 
  stdin=None, 
  stdout=None,
  stderr=None, loop=None,
  limit=streams._DEFAULT_LIMIT, **kwds
)

# 2.调用时入参
asyncio.create_subprocess_exec('ls -l', 
                               stdout=asyncio.subprocess.PIPE, 
                               stdin=asyncio.subprocess.PIPE,
                               stderr=asyncio.subprocess.PIPE)

# 3.相当于program 指向了 'ls -l'命令
# 4.系统中只有ls、pwd等这些命令,所以'ls -l'这个命令会报错说找不到这个命令文件(FileNotFoundError)
# 5.正确的解法应该是asyncio.create_subprocess_exec('ls', '-l', xxx),把命令和参数分开。

2-1-2.案列

import shlex
import asyncio

async def run_asyncio_subprocess_exec(cmd):
    proc = await asyncio.create_subprocess_exec(cmd, stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE)
    stdout, stderr = await proc.communicate()
    # 获取返回值
    print(proc.returncode)
    # 输出内容
    print(f'stdout: {repr(stdout.decode())}')
    # 输出错误信息
    print(f'stderr: {repr(stderr.decode())}')

if __name__ == "__main__":
    cmd = "ls -l"
    asyncio.run(run_asyncio_subprocess_exec(cmd))
FileNotFoundError: [Errno 2] No such file or directory: 'ls -l'

2-2.单个命令

import shlex
import asyncio

async def run_asyncio_subprocess_exec(cmd):
    proc = await asyncio.create_subprocess_exec(cmd, stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE)
    stdout, stderr = await proc.communicate()
    # 获取返回值
    print(proc.returncode)
    # 输出内容
    print(f'stdout: {repr(stdout.decode())}')
    # 输出错误信息
    print(f'stderr: {repr(stderr.decode())}')

if __name__ == "__main__":
    cmd = "ls"
    asyncio.run(run_asyncio_subprocess_exec(cmd))
0
stdout: '1.html\nPie.html\n'
stderr: ''

2-3.命令+参数

通过*shlex.split(cmd)解决(类似于create_subprocess_exec('ls', '-l'))

import shlex
import asyncio

async def run_asyncio_subprocess_exec(cmd):
  	# 解决方法
    proc = await asyncio.create_subprocess_exec(*shlex.split(cmd), stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE)
    stdout, stderr = await proc.communicate()
    # 获取返回值
    print(proc.returncode)
    # 输出内容
    print(f'stdout: {repr(stdout.decode())}')
    # 输出错误信息
    print(f'stderr: {repr(stderr.decode())}')

if __name__ == "__main__":
    cmd = "ls -l"
    asyncio.run(run_asyncio_subprocess_exec(cmd))
0
stdout: 'total 30272\n-rw-r--r--@  1 lxd670  staff  3704145  7 25  2022 1.html\n-rw-r--r--   1 lxd670  staff  3690539  7 26  2022 Pie.html\n'
stderr: ''

2-4.wait和communicate

2-4-1.使用wait获取内容

等待子进程终结。设置并返回 returncode属性。

当使用 stdout=PIPEstderr=PIPE 并且子进程产生了足以阻塞 OS 管道缓冲区等待接收更多的数据的输出时,此方法会发生死锁。 当使用管道时请使用 communicate() 方法来避免这种情况。

import shlex
import asyncio

async def run_asyncio_subprocess_exec(cmd):
    proc = await asyncio.create_subprocess_exec(cmd, stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE)
   
    # 获取输出内容
    stdout = await proc.stdout.read()
    stderr = await proc.stderr.read()
    
    # 等待子进程终结。设置并返回returncode属性。
    returncode = await proc.wait()
    print(returncode)
    print(stdout)
    print(stderr)

if __name__ == "__main__":
    cmd = "ls"
    asyncio.run(run_asyncio_subprocess_exec(cmd))
0
b'1.html\nPie.html\n'
b''

2-4-2.使用communicate获取内容

与进程交互:

  1. 发送数据到 stdin (如果 input 不为 None);
  2. stdoutstderr 读取数据,直至到达 EOF;
  3. 等待进程终结。

可选的 input 参数为将被发送到子进程的数据 (bytes 对象)。

返回一个元组 (stdout_data, stderr_data)

import shlex
import asyncio

async def run_asyncio_subprocess_exec(cmd):
    proc = await asyncio.create_subprocess_exec(cmd, stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE)
   	# 使用communicate就可以获取输出内容
    stdout, stderr = await proc.communicate()
    # 获取返回值
    print(proc.returncode)
    # 输出内容
    print(f'stdout: {repr(stdout.decode())}')
    # 输出错误信息
    print(f'stderr: {repr(stderr.decode())}')

if __name__ == "__main__":
    cmd = "ls"
    asyncio.run(run_asyncio_subprocess_exec(cmd))
0
stdout: '1.html\nPie.html\n'
stderr: ''

命令执行失败

import shlex
import asyncio

async def run_asyncio_subprocess_exec(cmd):
    proc = await asyncio.create_subprocess_exec(*shlex.split(cmd), stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE)
    stdout, stderr = await proc.communicate()
    # 获取返回值
    print(proc.returncode)
    # 输出内容
    print(f'stdout: {repr(stdout.decode())}')
    # 输出错误信息
    print(f'stderr: {repr(stderr.decode())}')

if __name__ == "__main__":
    cmd = "ls -l /asdad"
    asyncio.run(run_asyncio_subprocess_exec(cmd))
1
stdout: ''
stderr: 'ls: /asdad: No such file or directory\n'

3.使用create_subprocess_shell

3-1.初步使用

使用communicate来获取信息

import shlex
import asyncio

async def run_asyncio_subprocess_shell(cmd):
    proc = await asyncio.create_subprocess_shell(cmd, stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE)
    # 使用communicate接收返回值
    stdout, stderr = await proc.communicate()
    # 获取返回值状态
    print(proc.returncode)
    # 输出内容
    print(f'stdout: {repr(stdout.decode())}')
    # 输出错误信息
    print(f'stderr: {repr(stderr.decode())}')

if __name__ == "__main__":
    cmd = "cd $HOME && pwd"
    asyncio.run(run_asyncio_subprocess_shell(cmd))
0
stdout: '/Users/lxd670\n'
stderr: ''

3-2.命令执行失败


import shlex
import asyncio

async def run_asyncio_subprocess_shell(cmd):
    proc = await asyncio.create_subprocess_shell(cmd, stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE)
    # 使用communicate接收返回值
    stdout, stderr = await proc.communicate(input="yes\nno".encode('utf-8'))
    
    # 获取返回值
    print(proc.returncode)
    # 输出内容
    print(f'stdout: {repr(stdout.decode())}')
    # 输出错误信息
    print(f'stderr: {repr(stderr.decode())}')

if __name__ == "__main__":
    cmd = "cd /Users/lxd670/ccc && lsx"
    asyncio.run(run_asyncio_subprocess_shell(cmd))
127
stdout: ''
stderr: '/bin/sh: lsx: command not found\n'

4.执行操作异步

在async中使用subprocess.Popen执行会是同步效果

4-1.被操作的文件test_cmd_1.py

import time
for i in range(2):
    res = input(f"请输入内容[{i}]:\n")
    print(f"你选择了[{i}]: {res}")
    time.sleep(1)
print("完成选择")

4-2.使用create_subprocess_shell

import time
import shlex
import asyncio
from functools import wraps

def totle_time(func):
    @wraps(func)
    async def wrapper(*args, **kwargs):
        start = time.time()
        res = await func(*args, **kwargs)
        print(f'RUN TIME: {time.time() - start}')
        return res
    return wrapper

async def run_asyncio_subprocess_shell(cmd):
    proc = await asyncio.create_subprocess_shell(cmd, stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE)
    stdout, stderr = await proc.communicate(input="yes\nno".encode('utf-8'))
    print(proc.returncode)
    print(f'stdout: {repr(stdout.decode())}')
    print(f'stderr: {repr(stderr.decode())}')

@totle_time
async def run():
    cmd = "python3 test_cmd_1.py"
    task = []
    # 循环10个
    for i in range(10):
        t = asyncio.create_task(run_asyncio_subprocess_shell(cmd=cmd))
        task.append(t)
    await asyncio.gather(*task)

if __name__ == "__main__":
    asyncio.run(run())
0
stdout: '请输入内容[0]:\n你选择了[0]: yes\n请输入内容[1]:\n你选择了[1]: no\n完成选择\n'
stderr: ''
...
...
RUN TIME: 2.062711000442505

4-3.使用create_subprocess_exec

import time
import shlex
import asyncio
from functools import wraps

def totle_time(func):
    @wraps(func)
    async def wrapper(*args, **kwargs):
        start = time.time()
        res = await func(*args, **kwargs)
        print(f'RUN TIME: {time.time() - start}')
        return res
    return wrapper

async def run_asyncio_subprocess_exec(cmd):
    proc = await asyncio.create_subprocess_exec(*shlex.split(cmd), stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE)
    stdout, stderr = await proc.communicate(input="yes\nno".encode('utf-8'))
    print(proc.returncode)
    print(f'stdout: {repr(stdout.decode())}')
    print(f'stderr: {repr(stderr.decode())}')

@totle_time
async def run():
    cmd = "python3 test_cmd_1.py"
    task = []
    # 循环10个
    for i in range(10):
        t = asyncio.create_task(run_asyncio_subprocess_exec(cmd=cmd))
        task.append(t)
    await asyncio.gather(*task)

if __name__ == "__main__":
    asyncio.run(run())
0
stdout: '请输入内容[0]:\n你选择了[0]: yes\n请输入内容[1]:\n你选择了[1]: no\n完成选择\n'
stderr: ''
...
...
RUN TIME: 2.0569019317626953

4-4.使用subprocess(同步)

如果参数shell设为true,程序将通过shell来执行,且args输入应当是字符串形式,同shell窗口执行的命令 如果不设置,默认为false,则输入的args应当是字符串列表

import time
import shlex
import asyncio
import subprocess
from functools import wraps

def totle_time(func):
    @wraps(func)
    async def wrapper(*args, **kwargs):
        start = time.time()
        res = await func(*args, **kwargs)
        print(f'RUN TIME: {time.time() - start}')
        return res
    return wrapper

async def run_subprocess_shell(cmd):
    p = subprocess.Popen(cmd, text=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
    stdout, stderr = p.communicate(input="yes\nno")
    print(p.returncode)
    print(repr(stdout))
    print(repr(stderr))

@totle_time
async def run():
    cmd = "python3 test_cmd_1.py"
    task = []
    for i in range(2):
        t = asyncio.create_task(run_subprocess_shell(cmd=cmd))
        task.append(t)
    await asyncio.gather(*task)

if __name__ == "__main__":
    asyncio.run(run())
0
'请输入内容[0]:\n你选择了[0]: yes\n请输入内容[1]:\n你选择了[1]: no\n完成选择\n'
''
0
'请输入内容[0]:\n你选择了[0]: yes\n请输入内容[1]:\n你选择了[1]: no\n完成选择\n'
''
RUN TIME: 4.050457000732422

5.使用同步锁

5-1.使用create_subprocess_shell

import time
import shlex
import asyncio
from functools import wraps

def totle_time(func):
    @wraps(func)
    async def wrapper(*args, **kwargs):
        start = time.time()
        res = await func(*args, **kwargs)
        print(f'RUN TIME: {time.time() - start}')
        return res
    return wrapper

async def run_asyncio_subprocess_shell(cmd, lock):
    async with lock:
        proc = await asyncio.create_subprocess_shell(cmd, stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE)
        stdout, stderr = await proc.communicate(input="yes\nno".encode('utf-8'))
    print(proc.returncode)
    print(f'stdout: {repr(stdout.decode())}')
    print(f'stderr: {repr(stderr.decode())}')
    
@totle_time
async def run():
  	# 使用lock锁
    lock = asyncio.Lock()
    cmd = "python3 test_cmd_1.py"
    task = []
    for i in range(2):
        t = asyncio.create_task(run_asyncio_subprocess_shell(cmd=cmd, lock=lock))
        task.append(t)
    await asyncio.gather(*task)

if __name__ == "__main__":
    asyncio.run(run())
0
stdout: '请输入内容[0]:\n你选择了[0]: yes\n请输入内容[1]:\n你选择了[1]: no\n完成选择\n'
stderr: ''
0
stdout: '请输入内容[0]:\n你选择了[0]: yes\n请输入内容[1]:\n你选择了[1]: no\n完成选择\n'
stderr: ''
RUN TIME: 4.0933310985565186

5-2.使用run_asyncio_subprocess_exec

import time
import shlex
import asyncio
from functools import wraps

def totle_time(func):
    @wraps(func)
    async def wrapper(*args, **kwargs):
        start = time.time()
        res = await func(*args, **kwargs)
        print(f'RUN TIME: {time.time() - start}')
        return res
    return wrapper

async def run_asyncio_subprocess_exec(cmd, lock):
    async with lock:
        proc = await asyncio.create_subprocess_exec(*shlex.split(cmd), stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE)
        stdout, stderr = await proc.communicate(input="yes\nno".encode('utf-8'))
    print(proc.returncode)
    print(f'stdout: {repr(stdout.decode())}')
    print(f'stderr: {repr(stderr.decode())}')

@totle_time
async def run():
    # 使用Lock锁
    lock = asyncio.Lock()
    cmd = "python3 test_cmd_1.py"
    task = []
    for i in range(2):
        t = asyncio.create_task(run_asyncio_subprocess_exec(cmd=cmd, lock=lock))
        task.append(t)
    await asyncio.gather(*task)

if __name__ == "__main__":
    asyncio.run(run())
0
stdout: '请输入内容[0]:\n你选择了[0]: yes\n请输入内容[1]:\n你选择了[1]: no\n完成选择\n'
stderr: ''
0
stdout: '请输入内容[0]:\n你选择了[0]: yes\n请输入内容[1]:\n你选择了[1]: no\n完成选择\n'
stderr: ''
RUN TIME: 4.0468339920043945