使用paramiko上传文件到Linux并且解压

发布时间 2023-04-09 23:16:46作者: 帅胡

思路是,使用zipfile对文件进行压缩,之后使用paramiko提供的SFTPClient()上传文件,之后使用paramiko的SSHClient()执行Linux命令操作文件

import os
import zipfile
import paramiko

# 使用zipfile压缩文件
def zip_code(dir_name, zip_name):
    file_list = []
    if os.path.isfile(dir_name):
        file_list.append(dir_name)
    else:
        for root, dirs, files in os.walk(dir_name):
            for name in files:
                file_list.append(os.path.join(root, name))
    zf = zipfile.ZipFile(zip_name, "w", zipfile.ZIP_DEFLATED)
    for tar in file_list:
        arc_name = tar[len(dir_name):]
        zf.write(tar, arc_name)
    zf.close()

# 使用zipfile解压文件(这是直接在本地解压)
# def unzip_file(zip_file_name, unzip_dir):
#     zf = zipfile.ZipFile(zip_file_name)
#     file_list = zf.namelist()
#     for file in file_list:
#         zf.extract(file, unzip_dir)
#     zf.close()


def upload_code(host, port, username, pwd, local_file, target_path, target_file_name):
    transport = paramiko.Transport((host, port))
    transport.connect(username=username, password=pwd)

    sftp = paramiko.SFTPClient.from_transport(transport)
    sftp.put(local_file, target_path + target_file_name)
    client = paramiko.SSHClient()
    client._transport = transport
    cmd_ls = "ls -l " + target_path + target_file_name
    _, stdout, stderr = client.exec_command(cmd_ls)
    if not stdout.read().decode('utf-8'):
        print("失败: ", stderr.read().decode('utf-8'))
        return
    # 这里使用unzip解压,但是zipfile压缩的时候对中文不友好,所以unzip解压需要指定CP936编码
    cmd = "cd " + target_path + " && sudo rm -rf web_code/* && unzip -O CP936 " + target_file_name + " -d web_code/ && rm " + target_file_name
    print("cmd: ", cmd)
    _, stdout, stderr = client.exec_command(cmd)
    print("stdout:", stdout.read().decode('utf-8'))
    print("stderr:", stderr.read().decode('utf-8'))
    print("finish")


def start():
    username = "root"
    pwd = "12345"
    host = "localhost"
    port = 22
    target_file_name = "dist.zip"
    local_file = "./{}".format(target_file_name)
    target_path = "/test/"
    zip_code(r"D:\工作", "./{}".format(target_file_name))
    upload_code(host, port, username, pwd, local_file, target_path, target_file_name)
    input()


if __name__ == '__main__':
    start()