subprocess模块

发布时间 2023-06-20 16:37:30作者: lxd670

subprocess说明

subprocess.call()为python3.5以前版本使用(call已经被run取代了)
subprocess.run()此方法为python3.5版本后的推荐方法,可以获取执行结果、返回内容等一些常用的信息

1.run用法

基本用法

默认输出的内容(stdout和stdin)都在终端

如果args是个字符串的话,会被当做是可执行文件的路径,会报FileNotFoundError错误

import shlex
import subprocess
cmd = "ls /Users/lxd670/go_project/t2"

cmd_list = shlex.split(cmd)
print(f"cmd_list: {cmd_list}")

res = subprocess.run(args=cmd_list)
print(f"res: {res}")
cmd_list: ['ls', '/Users/lxd670/go_project/t2']
go.mod  http    http.go main.go src
res: CompletedProcess(args=['ls', '/Users/lxd670/go_project/t2'], returncode=0)

shell参数

shell控制args是否是字符串还是列表,False(默认)时,Popen使用os.execvp()来执行子程序。

当True时,"ls /Users/lxd670"相当于["/bin/bash", "-c" , "ls", "/Users/lxd670"]

尽量使用shell=False来执行命令,避免安全隐患

import subprocess
cmd = "ls /Users/lxd670/go_project/t2"

res = subprocess.run(args=cmd, shell=True)
print(f"res: {res}")
go.mod  http    http.go main.go src
res: CompletedProcess(args='ls /Users/lxd670/go_project/t2', returncode=0)

capture_output参数

capture_output设为truestdoutstderr将会被捕获。

在使用时,内置的 Popen 对象将自动用stdout=PIPEstderr=PIPE创建。

stdoutstderr参数不应当与capture_output同时提供。如果你希望捕获并将两个流合并在一起,使用stdout=PIPEstderr=STDOUT来代替capture_output

默认输出的是byte类型

import shlex
import subprocess
cmd = "ls /Users/lxd670/go_project/t2"

res = subprocess.run(args=shlex.split(cmd), capture_output=True)
print(res)
print(res.stdout)
CompletedProcess(args=['ls', '/Users/lxd670/go_project/t2'], returncode=0, stdout=b'go.mod\nhttp\nhttp.go\nmain.go\nsrc\n', stderr=b'')
b'go.mod\nhttp\nhttp.go\nmain.go\nsrc\n'

字符编码问题

字符编码问题可以使用encoding='utf=8'errors='ignore|replace|xmlcharrefreplace'text=True来解决,它们会影响stdinstdoutstderr的编码,通常使用encodingtext来设置,

import shlex
import subprocess
cmd = "ls /Users/lxd670/go_project/t2"

# res = subprocess.run(args=shlex.split(cmd), capture_output=True, encoding='utf-8')效果是一样的
res = subprocess.run(args=shlex.split(cmd), capture_output=True, text=True)
print(res)
print(f"args: {res.args}")
print(f"code: {res.returncode}")
print(f"out: {res.stdout}")
print(f"err: {res.stderr}")
CompletedProcess(args=['ls', '/Users/lxd670/go_project/t2'], returncode=0, stdout='go.mod\nhttp\nhttp.go\nmain.go\nsrc\n', stderr='')
args: ['ls', '/Users/lxd670/go_project/t2']
code: 0
out: go.mod
http
http.go
main.go
src

err: 

errors作用

errors 是一个可选的字符串,它指明编码格式和编码格式错误的处理方式。 传入 'strict' 将在出现编码格式错误时引发 ValueError (默认值 None 具有相同的效果),传入 'ignore' 将忽略错误。 (请注意忽略编码格式错误会导致数据丢失。) 'replace' 会在出现错误数据时插入一个替换标记 (例如 '?')。 'backslashreplace' 将把错误数据替换为一个反斜杠转义序列。在写入时,还可以使用 xmlcharrefreplace (替换为适当的 XML 字符引用) 或 namereplace (替换为 \\N{...} 转义序列)任何其他通过 codecs.register_error() 注册的错误处理方式名称也可以被接受。
比如,若要将某个String对象s从gbk内码转换为UTF-8,可以如下操作
s.decode('gbk').encode('utf-8')
可是,在实际开发中,我发现,这种办法经常会出现异常:
UnicodeDecodeError: ‘gbk' codec can't decode bytes in position 30664-30665: illegal multibyte sequence
这 是因为遇到了非法字符——尤其是在某些用C/C++编写的程序中,全角空格往往有多种不同的实现方式,比如\xa3\xa0,或者\xa4\x57,这些 字符,看起来都是全角空格,但它们并不是“合法”的全角空格(真正的全角空格是\xa1\xa1),因此在转码的过程中出现了异常。
这样的问题很让人头疼,因为只要字符串中出现了一个非法字符,整个字符串——有时候,就是整篇文章——就都无法转码。
解决办法:
s.decode('gbk', 'ignore').encode('utf-8')
因为decode的函数原型是decode([encoding], [errors='strict']),可以用第二个参数控制错误处理的策略,默认的参数就是strict,代表遇到非法字符时抛出异常;
如果设置为ignore,则会忽略非法字符;
如果设置为replace,则会用?取代非法字符;
如果设置为xmlcharrefreplace,则使用XML的字符引用。

check

check用于判断returncode是否为0,不是的话会报subprocess.CalledProcessError

import shlex
import subprocess
cmd = "ls /Users/lxd670/xxx/t2"

subprocess.run(args=shlex.split(cmd), check=True, capture_output=True)
# 报错
subprocess.CalledProcessError: Command '['ls', '/Users/lxd670/xxx/t2']' returned non-zero exit status 1.

cwd

函数在执行子进程前改变当前工作目录为cwd指定的目录

import shlex
import subprocess
cmd = "ls"

res = subprocess.run(args=shlex.split(cmd), cwd="/Users/lxd670/go_project/t2", capture_output=True, text=True)
print(f"res: {res}")
res: CompletedProcess(args=['ls'], returncode=0, stdout='go.mod\nhttp\nhttp.go\nmain.go\nsrc\n', stderr='')

input

调用其它py文件时,需要通过sys.stdin.read()取值,本质是echo "xxx" | xxx,通过管道传输数据

import shlex
import subprocess
cmd = "grep go"

res = subprocess.run(args=shlex.split(cmd), input="cccgo\n1231\n434", capture_output=True, text=True)
print(f"res: {res}")
# 相当于echo "cccgo\n1231\n434" | grep go
res: CompletedProcess(args=['grep', 'go'], returncode=0, stdout='cccgo\n', stderr='')

CompletedProcess对象

subprocess.run返回CompletedProcess对象,如: CompletedProcess(args=['ls'], returncode=0, stdout='go.mod\nhttp\nhttp.go\nmain.go\nsrc\n', stderr='')

CompletedProcess最多有4个参数
args: 执行参cmd命令
returncode: 直接结果code
stdout: 正常输出内容(stdout)
stderr:	错误输出内容(stderr)

2.getoutput用法

底层调用了getstatusoutput(cmd)[1],获取了返回值

import shlex
import subprocess
cmd = "ls /Users/lxd670/go_project/t2"

stdout = subprocess.getoutput(cmd=cmd)
print(f"getoutput: {stdout}")
getoutput: go.mod
http
http.go
main.go
src

3.getstatusoutput用法

返回执行状态码和执行结果(stdout和stderr)

import shlex
import subprocess
cmd = "ls /Users/lxd670/go_project/t2"

res_code, stdout = subprocess.getstatusoutput(cmd=cmd)
print(f"res_code: {res_code}")
print(f"getoutput: {stdout}")
res_code: 0
getoutput: go.mod
http
http.go
main.go
src

3.Popen

说明

subprocess.run 是在Python3.5中添加的,目的简化subprocess.Popen您何时只想执行命令并等待其结束的时间,但同时您也不想执行其他任何操作。对于其他情况,您仍然需要使用subprocess.Popen

区别run

  • subprocess.run执行命令并 等待,实际所做的是为Popen和调用communicate也无需等待过程完成
  • subprocess.Popen执行命令不会等待结果(异步)

参数说明

参数 含义
args 字符串或者列表
bufsize 0 无缓冲 1 行缓冲 其他正值 缓冲区大小 负值代表采用默认系统缓冲(一般是全缓冲)
executable 指定可执行程序。一般情况下我们通过args参数来设置所要运行的程序。 如果将参数shell设为 True,executable将指定程序使用的shell
stdin stdout stderr 分别表示程序的标准输入、输出、错误句柄 他们可以设置成文件对象,文件描述符或者PIPE和None None 没有任何重定向,继承父进程 PIPE 创建管道
preexec_fn 用于指定一个可执行对象(callable object),它将在子进程运行之前被调用(unix)
close_fds 在windows平台下,如果close_fds被设置为True,则新创建的子进程将不会继承父进程的输入、输出和错误。 我们不能将close_fds设置为True的同时重定向子进程的标准输入、输出与错误。
shell 如果参数shell设为true,程序将通过shell来执行,且args输入应当是字符串形式,同shell窗口执行的命令 如果不设置,默认为false,则输入的args应当是字符串列表
cwd 设置子进程的当前目录
env 字典类型,设置子进程的环境变量。如果env = None,子进程的环境变量将从父进程中继承。
universal_newlines windows下文本换行符用’\r\n’,而Linux下用 ‘\n’。若设置为True,Python统一把这些换行符当作’\n’来处理。
startupinfo createionflags 只在windows下用效,被传递给底层的CreateProcess()函数,设置子进程的一些属性 如:主窗口的外观,进程的优先级等等

属性说明

方法 含义
Popen.poll() 用于检查子进程是否已经结束,返回returncode。
Popen.wait() 等待子进程结束,返回returncode。 如果子进程输出了大量数据到stdout或者stderr的管道,并达到了系统pipe的缓存大小的话,子进程会等待父进程读取管道,而父进程此时正wait着的话,将会产生传说中的死锁,后果非常严重。 因此通常建议使用communicate读取缓存中的数据从而让进程正常释放缓存之后结束。
Popen.communicate(input=None) 与子进程进行交互。向stdin发送数据,或从stdout和stderr中读取数据。直到收到EOF,等待子进程结束。 可选参数input指定发送到子进程的参数。 返回一个元组:(stdoutdata, stderrdata)。 如果希望通过进程的stdin向其发送数据,在创建Popen对象的时候,参数stdin必须被设置为PIPE。 如果希望从stdout和stderr获取数据,必须将stdout和stderr设置为PIPE
Popen.send_signal(signal) 向子进程发送信号。
Popen.terminate() 停止(stop)子进程。在windows平台下,该方法将调用Windows API TerminateProcess()来结束子进程。
Popen.kill() 杀死子进程。
Popen.stdin 如果参数stdin被设置为PIPE,Popen.stdin将返回一个文件对象,否则返回None。
Popen.stdout 如果参数stdout被设置为PIPE,Popen.stdout将返回一个文件对象,否则返回 None。
Popen.stderr 如果参数stdout被设置为PIPE,Popen.stderr将返回一个文件对象,否则返回 None。
Popen.pid 获取子进程的进程ID。
Popen.returncode 获取进程的返回值。 如果进程还没有结束,返回None如果进程结束则返回值为0。 如果返回值是其他数值,则执行错误,可打印报错信息查看(stderr)。

wait和communicate区别

即当stdout/stdin设置为PIPE时,使用wait()可能会导致死锁。因而建议使用communicate而对于communicate

使用wait

import shlex
import subprocess

def test(size):
    print('start')
    cmd = 'dd if=/dev/zero bs=1 count={} 2>/dev/null'.format(size)
    p = subprocess.Popen(args=cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True)
    p.wait()
    print('end')

# 64KB
print('size==64K')
test(64 * 1024)

# 64KB + 1B
print('size>64K')
test(64 * 1024 + 1)
size==64K
start
end
size>64K
start
# 一直卡着了

使用communicate

import shlex
import subprocess

def test(size):
    print('start')
    cmd = 'dd if=/dev/zero bs=1 count={} 2>/dev/null'.format(size)
    p = subprocess.Popen(args=cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True)
    p.communicate()
    print('end')

# 64KB
print('size==64K')
test(64 * 1024)

# 64KB + 1B
print('size>64K')
test(64 * 1024 + 1)
size==64K
start
end
size>64K
start
end

案列

1.异步

b.py

import time
time.sleep(2)
print(f"BBBBB")

a.py

import shlex
import subprocess
cmd = "python3 b.py"

# 需要开启stdin=subprocess.PIPE
p = subprocess.Popen(args=shlex.split(cmd), text=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE)

# 1.Popen是不会等待结果(异步),返回的Popen对象
print("next")
print(p.pid)
print(p)

# 使用stdout.read或者communicate可以获取内,会等2秒获取输出内容
# print(p.stdout.read())
# print(p.communicate())
next
30193
<subprocess.Popen object at 0x100d8cc70>

# BBBBB 通过p.stdout.read()获取
# ('BBBBB\n', '') 通过p.communicate()获取

2.输入信息

需要主要textencoding,否者需要输入byte类型

使用 communicate() 而非 .stdin.write.stdout.read 或者 .stderr.read 来避免由于任意其他 OS 管道缓冲区被子进程填满阻塞而导致的死锁。

communicate
import shlex
import subprocess
cmd = "python3 b.py"

# 需要开启stdin=subprocess.PIPE
p = subprocess.Popen(args=shlex.split(cmd), text=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE)

# 使用communicate输入信息,返回的(stdout, stderr)
res = p.communicate(input="yes")
print(res)
print(p.returncode)	# 获取状态码
('请输入内容:\n你选择了: yes\n', '')
0
stdin.write
import shlex
import subprocess
cmd = "python3 b.py"

# 需要开启stdin=subprocess.PIPE
p = subprocess.Popen(args=shlex.split(cmd), text=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE)

# 使用p.stdin.write写入数据
p.stdin.write("yes\n")
p.stdin.flush() # 必须刷入一下
print(p.stdout.read())  # 获取结果(str),read会等待程序执行完成拆会进入下一行运行

# 需要使用wait等待
p.wait()
print(p.returncode)
请输入内容:
你选择了: yes

0

3.read/readline/readlines区别

p.stdout.read()  			# 获取结果(str),read会【等待】程序执行完成
p.stdout.readline() 	# 获取结果(str),只会获取提示信息,readline【不会等待】命令执行完成
p.stdout.readlines() 	# 会把屏幕输出的内容变为list,readlines会【等待】程序执行完成

4.多行输入

b.py

import time
for i in range(1, 3):
    res = input(f"请输入内容[{i}]:\n")
    print(f"你选择了[{i}]: {res}")
    time.sleep(2)
print("完成选择")

使用run输入

import shlex
import subprocess
cmd = "python3 b.py"

p = subprocess.run(args=shlex.split(cmd), text=True, input="yes\nno", stdout=subprocess.PIPE, stderr=subprocess.PIPE)
print(p)
CompletedProcess(args=['python3', 'b.py'], returncode=0, stdout='请输入内容[1]:\n你选择了[1]: yes\n请输入内容[2]:\n你选择了[2]: no\n完成选择\n', stderr='')

使用communicate输入

import shlex
import subprocess
cmd = "python3 b.py"

p = subprocess.Popen(args=shlex.split(cmd), text=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
res = p.communicate(input="yes\nno")
print(p.returncode)
print(res)
0
('请输入内容[1]:\n你选择了[1]: yes\n请输入内容[2]:\n你选择了[2]: no\n完成选择\n', '')

使用stdin.write完成输入

import shlex
import subprocess
cmd = "python3 b.py"

p = subprocess.Popen(args=shlex.split(cmd), text=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
p.stdin.write("yes\n")
p.stdin.write("no\n")
p.stdin.flush() # 刷入
print(p.stdout.read())  # 获取结果正确结果
print(p.stderr.read())	# 获取结果错误信息
p.wait()
print(p.returncode)
请输入内容[1]:
你选择了[1]: yes
请输入内容[2]:
你选择了[2]: no
完成选择


0

4.shlex的split有什么特别的

shlex.split对命令解析为list友好

import shlex

cmd = "this is 'my string' that --has=arguments -or=something"

print(f"shlex.split: {shlex.split(cmd)}")

print(f"cmd.split: {cmd.split(' ')}")
shlex.split: ['this', 'is', 'my string', 'that', '--has=arguments', '-or=something']
cmd.split: ['this', 'is', "'my", "string'", 'that', '--has=arguments', '-or=something']