paramiko实例

发布时间 2023-11-14 17:22:41作者: 贪吃极了
import paramiko

class HandleSSH:
    def init(self, ssh_ip=None, ssh_port=None, ssh_user="", ssh_password=""):
        self.ssh_ip = ssh_ip
        self.ssh_port = ssh_port
        self.ssh_user = ssh_user
        self.ssh_password = ssh_password
        try:
            self.transport = paramiko.Transport(self.ssh_ip, self.ssh_port)
            self.transport.connect(user_name=self.ssh_user, password=self.ssh_password)
            self.sftp_session = paramiko.SFTPClient.from_transport(self.transport)
            self.session = paramiko.SSHClient()
            self.session.set_missing_host_key_policy(paramiko.AutoAddPolicy())
            self.session.transport = self.transport
        except Exception as e:
            raise e

    def close(self):
        """
        关闭会话信息
        """

    def byte_to_str(one_byte):
        if isinstance(one_byte, bytes):
            noe_byte = one_byte.decode('utf-8')
        return one_byte

    def remote_host_to_excute(self, cmd_str, passwd=None, timeout=None, judge=None):
        """
        使用HandleSSH对象()会自动调用此方法
        :param cmd_str: 待执行的命令字符串
        :param passwd: 密码
        :param judge: 判断返回值
        :return: 执行结果字符串
        """
        pass
        do_log.debug(f"在主机【{self.ssh_ip}】上,执行【{cmd_str}】命令开始\n")
        stdin, stdout, stderr = self.session.exec_command("sudo " + cmd_str, get_pty=True, bufsize=-1, timeout=timeout)
        sudo_password = passwd if passwd else self.ssh_password
        if self.ssh_username != 'root':
            try:
                stdin.write(sudo_password + "\n")
            except Exception as e:
                do_log.error(f'输入管理员密码失败:{e}')
            stdin.flush()
        result = self.byte_to_str(stdout.read())
        if result.__contains__('输入密码'):
            result = result.split('输入密码\r\n')[1]
        error = self.byte_to_str(stderr.read())
        if error:
            do_log.error(f"执行【{cmd_str}】命令,出现异常:\n{error}")
        do_log.debug(f"在主机【{self.ssh_ip}】上,执行【{cmd_str}】命令结束\n")
        if judge:
            try:
                res = re.search(r'\[(\d)\]', result).group(1)
            except Exception as e:
                res = '1'
            if res == '0':
                return result
            else:
                raise AssertionError(result)
        else:
            return result
        
    # 关闭防火墙
    def close_firewall(self):
        if self.DP.common.sys_type == 'sp':
            self.remote_host_to_excute("service firewalld stop")
            time.sleep(5)
        else:
            self.remote_host_to_excute("systemctl stop firewalld")
            time.sleep(5)

    # 开启防火墙
    def open_firewall(self):
        if self.DP.common.sys_type == 'sp':
            self.remote_host_to_excute("service firewalld restart")
            time.sleep(5)
        else:
            self.remote_host_to_excute("systemctl restart firewalld")
            time.sleep(5)
            
    async def sftp_upload(self, local_path='', remote_path=''):
        """
        sftp上传
        :param local_path: 本地待上传的文件路径或者目录路径
        :param remote_path: 远程路径
        :return:
        """
        if not local_path:
            local_path = SAVE_FILES_DIR
            # 设置本地上传文件的根路径
            setattr(self, 'local_file_root_path', SAVE_FILES_DIR)
        else:
            # 设置本地上传文件的根路径
            setattr(self, 'local_file_root_path', os.path.basename(local_path))

        # 将本地文件或者目录中的文件加载到self.__upload_file_path_list列表中
        if os.path.isfile(local_path):
            self.__add_one_file(local_path)
        elif os.path.isdir(local_path):
            self.__add_one_dir(local_path)
        else:
            do_log.error(f"本地路径【{local_path}】不存在!\n")
            sys.exit(1)

        # 开始上传文件
        tasks_list = [asyncio.create_task(self.__sftp_upload_one_file(one_file, remote_path))
                      for one_file in self.__upload_file_path_list
                      if one_file.endswith('handle_udisk.sh')]
        await asyncio.gather(*tasks_list)

    def sftp_upload_one_file(self, local_path='', remote_path=''):
        try:
            # 执行文件上传
            self.sftp_session.put(local_path, remote_path)
            # self.sftp_session.chmod(remote_path, do_yaml.upload.path_chmod)
            # 给文件设置访问权限为755
            self.sftp_session.chmod(remote_path, 0o755)
            # 将上传成功的文件,添加至self.__upload_history_file列表
            self.__upload_history_file.append(f"host: {self.ssh_ip}\nfrom: {local_path}\n"
                                              f"to: {remote_path}")
        except Exception as e:
            do_log.error(f"文件上传出现异常:\n{e}")
        else:
            do_log.info("文件上传成功!\n")

    def sftp_download_one_file(self, local_path='', remote_path=''):
        try:
            # 执行文件下载
            self.sftp_session.get(remote_path, local_path)
            # 给文件设置访问权限为755
            # 将下载成功的文件,添加至self.__download_history_file
            self.__download_history_file.append(f"host: {self.ssh_ip}\nfrom: {remote_path}\n"
                                                f"to: {local_path}")
        except Exception as e:
            do_log.error(f"文件下载出现异常:\n{e}")
        else:
            do_log.info("文件下载成功!\n")

    async def __sftp_upload_one_file(self, local_file_path, remote_path='', is_save_dirs=True):
        """
        sftp上传某一个文件
        :param local_file_path: 本地文件路径
        :param remote_path: 远程目录路径
        :return:
        """
        if not os.path.isfile(local_file_path):
            do_log.error(f"本地文件【{local_file_path}】不存在!\n")
            sys.exit(1)

        remote_path = REMOTE_FILES_DIR if not remote_path else remote_path

        # 判断是否保留本地文件所在的文件夹结构,True为需要,False为不需要
        if is_save_dirs:
            _parent_path = os.path.dirname(local_file_path)
            _tmp_path = _parent_path.replace(getattr(self, 'local_file_root_path'), '').replace('\\', '/').lstrip('/')
            remote_path = remote_path + _tmp_path if remote_path.endswith('/') else remote_path + '/' + _tmp_path

        # 如果远程路径不存在,则创建
        if not self.is_exist_path(remote_path):
            cmd_str = f'mkdir -p {remote_path}'
            self(cmd_str)

        # 获取待上传的文件名
        filename = os.path.basename(local_file_path)
        # 拼接远程文件路径
        remote_path = remote_path + filename if remote_path.endswith('/') else remote_path + '/' + filename
        do_log.info(f"开始将本地【{local_file_path}】文件上传至服务器【{remote_path}】 "
                    f"【服务器】地址:{self.ssh_ip} 端口:{self.ssh_port}")
        try:
            # 执行文件上传
            self.sftp_session.put(local_file_path, remote_path)
            # self.sftp_session.chmod(remote_path, do_yaml.upload.path_chmod)
            # 给文件设置访问权限为755
            self.sftp_session.chmod(remote_path, 0o755)
            # 将上传成功的文件,添加至self.__upload_history_file列表
            self.__upload_history_file.append(f"host: {self.ssh_ip}\nfrom: {local_file_path}\n"
                                              f"to: {remote_path}")
        except Exception as e:
            do_log.error(f"文件上传出现异常:\n{e}")
        else:
            do_log.info("文件上传成功!\n")

    def __add_one_file(self, local_file_path):
        """
        将待上传的文件添加至self.__upload_file_path_list列表中
        :param local_file_path: 文件路径字符串
        :return:
        """
        self.__upload_file_path_list.append(local_file_path)

    def __add_one_dir(self, local_dir_path):
        """
        将待上传的目录下所有文件添加至self.__upload_file_path_list列表中
        :param local_dir_path: 目录路径字符串
        :return:
        """
        for root, dirs, files in os.walk(local_dir_path):
            for filename in files:
                self.__upload_file_path_list.append(os.path.join(root, filename))

    async def _more_host_to_upload(self):
        """
        多主机执行上传
        :return:
        """
        await self.sftp_upload()

    def more_host_to_upload(self):
        """
        多主机执行上传
        :return:
        """
        start_time = time.perf_counter()

        do_log.info("开始上传文件")
        asyncio.run(self._more_host_to_upload())

        end_time = time.perf_counter()
        total_time = end_time - start_time

        do_log.info(f"上传文件结束,总耗时:{round(total_time, 4)}s")