Python 优雅的使用 paramiko 进行交互式输入输出

发布时间 2023-08-14 19:14:21作者: 可乐芬达

目的:需要ssh链接到Linux主机,执行telnet 命令,抓回显匹配制定内容。

ssh.exec_command(cmd,bufsize,timeout) #exec_command参数使用只需要执行一次的命令,因为执行完该命令以后,shell会自动回到ssh初始连接的shell状态下

ssh.invoke_shell() #在SSH server端创建一个交互式的shell,且可以按自己的需求配置伪终端,可以在invoke_shell()函数中添加参数配置

也就是command就是发送完指令,连接就会断开
invoke_shell则是长连接,保持状态的那种

import logging
import paramiko
import re
import select
import socket


PROMPT_PATTERN = r'\S+#'

logging.basicConfig(level=logging.DEBUG)
log = logging

paramiko.util.log_to_file('demo.log')

class Channel(object):
    def __init__(self, host, commands, creds=None,
                 prompt_pattern=PROMPT_PATTERN, init_commands=None):
        if creds is None:
            raise RuntimeError('You must supply username and password!!')
        self.host = host
        self.commands = commands

        username, password = creds
        self.creds = creds
        self.username = username
        self.password = password

        self.prompt = re.compile(prompt_pattern)
        if init_commands is None:
            init_commands = []
        self.init_commands = init_commands

        self.results = []

        ssh = paramiko.SSHClient()
        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())

        self.ssh = ssh
        self.initialized = False
        self.record = False

    def run(self):
        """
        This is what we call do actually connect and start the event loop
        """
        # Establish the SSH connection
        self.ssh.connect(self.host, username=self.username,
                         password=self.password)

        # Create an SSH shell descriptor. This is how we're going to interact
        # with the remote device.
        shell = self.ssh.invoke_shell()
        shell.settimeout(0.0)
        self.shell = shell

        # Turn the list of commands into an iterator, so we know when we've run
        # out of commands
        self.cmditer = iter(self.commands)

        # Establish the data buffer we'll use to store output between commands
        self.data = ''

        # And start the event loop to store the results of the commands and
        # return them when we're done.
        results = self.interact()
        return results

    def interact(self):
        """Interact with the device using the SSH shell."""
        shell = self.shell

        # Start an infinite while loop, and use the select.select async I/O
        # handler to detect when data has been received by the SSH shell.
        while True:
            # The ``r`` variable names the object that has received data. See:
            # http://docs.python.org/2/library/select.html#select.select
            r, w, e = select.select([shell], [], [])
            # If 'shell' has received data, try to retreive it:
            if shell in r:
                #log.debug("HEY LET'S DO SOMETHING WITH SHELL")
                try:
                    # Fetch 1K off the socket.
                    bytes = shell.recv(1024).decode('utf-8', 'ignore').strip()

                    # If it's no data, we're done. 
                    if len(bytes) == 0:
                        break
                    
                    # Try to process the data we received.
                    self.data_received(bytes)

                # If we timeout or get an error, log it and carry on.
                except (socket.timeout, socket.error) as err:
                    log.error(str(err))

            # If the socket has not received any data after we sent somethign,
            # disconnect.
            else:
                break

        # The explicit call to disconnect
        shell.close()

        # And finally return the output of the results.
        return self.results

    def data_received(self, bytes):
        """
        This is what we do when data is received from the socket.
        :param bytes:
            Bytes that are received.
        """
        # This is our buffer. Until we get a result we want, we keep appending
        # bytes to the data buffer.

        log.debug('Got bytes: %r' % bytes)
        self.data += bytes
        log.debug(' Buffered: %r' % self.data)

        # Check if the prompt pattern matches. Return None if it doesn't so the
        # event loop continues to try to receive data. 
        # 
        # Basicaly this means:
        # - Loop until the prompt matches
        # - Trim off the prompt
        # - Store the buffer as the result of the last command sent
        # - Zero out the buffer
        # - Rinse/repeat til all commands are sent and results stored
        m = self.prompt.search(self.data)
        if not m:
            return None
        # log.debug('STATE: prompt %r' % m.group())

        # The prompt matched! Strip the prompt from the match result so we get
        # the data received withtout the prompt. This is our result.
        # 
        result = self.data[:m.start()]
        # result = result[result.find('\n')+1:]
        result = result.splitlines()

        # Only keep results once we've started sending commands
        if self.initialized:
            self.results.append(result)

        # And send the next command in the stack.
        self.send_next()

    def send_next(self):
        """
        Send the next command in the command stack.
        """
        # We're sending a new command, so we zero out the data buffer.
        self.data = ''

        # Check if we can safely initialize. This is a chance to do setup, such
        # as turning off console paging, or changing up CLI settings. 
        if not self.initialized:
            if self.init_commands:
                next_init = self.init_commands.pop(0)
                self.shell.send(next_init)
                return None
            else:
                log.debug('Successfully initialized for command execution')
                self.initialized = True

        # Try to fetch the next command in the stack. If we're out of commands,
        # close the channel and disconnect.
        try:
            next_command = next(self.cmditer) # Get the next command
        except StopIteration:
            self.close() # Or disconnect
            return None

        # Try to send the next command
        if next_command is None:
            self.results.append(None) # Store a null command w/ null result
            self.send_next() # Fetch the next command
        else:
            log.debug('sending %r' % next_command)
            self.shell.send(next_command + '\n')  # Send this command

    def close(self):
        """Close the SSH connection."""
        self.ssh.close()

if __name__ == '__main__':
    commands = ['df -h\n', 'top -b -n 1\n']
    host = '192.168.211.100'
    username = 'root'
    password = 'root'
    creds = (username, password)
    prompt_pattern = r'#|\S+ $|\S+ #'
    init_commands = ['dtach -a /tmp/vmm\n', 'su\n']

    c = Channel(host, commands, creds, prompt_pattern, init_commands)
    results = c.run()
    print(results)