基于client-go实现pod 交互式terminal

发布时间 2023-04-23 14:08:27作者: 众里寻它千百度

基于client-go实现pod 交互式terminal

后端实现逻辑(golang)

package main

import (
	"errors"
	"fmt"
	"github.com/gin-gonic/gin"
	"github.com/gorilla/websocket"
	corev1 "k8s.io/api/core/v1"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/kubernetes/scheme"
	"k8s.io/client-go/rest"
	"k8s.io/client-go/tools/clientcmd"
	"k8s.io/client-go/tools/remotecommand"
	"log"
	"net/http"
	"strconv"
	"strings"
	"unicode/utf8"
)

//初始化k8s客户端
func initialClientSet(path string) (*kubernetes.Clientset, *rest.Config, error) {
	config, err := clientcmd.BuildConfigFromFlags("", path)
	if err != nil {
		log.Fatal(err)
	}

	ClientSet, err := kubernetes.NewForConfig(config)
	if err != nil {
		log.Fatal(err)
		return nil, nil, err
	}
	return ClientSet, config, err
}

func initialWS(c *gin.Context) (*websocket.Conn, error) {
	var upgrader = websocket.Upgrader{
		ReadBufferSize:  1024,
		WriteBufferSize: 1024,
		CheckOrigin: func(r *http.Request) bool {
			return true
		},
	}
	//将http协议提升为ws
	ws, err := upgrader.Upgrade(c.Writer, c.Request, nil)
	if err != nil {
		fmt.Println(err)
		return nil, err
	}
	return ws, err
}

//为remotecommand.StreamOptions提供方法
type streamHandler struct {
	ws          *websocket.Conn                 //ws
	inputMsg    chan []byte                     //客户端输入数据
	resizeEvent chan remotecommand.TerminalSize //窗口调整事件
}

//获取调整窗口事件
func (handler *streamHandler) Next() *remotecommand.TerminalSize {
	resize := <-handler.resizeEvent
	return &resize
}

//从ws获取客户端输入的数据
func (handler *streamHandler) Read(p []byte) (size int, err error) {
	data, ok := <-handler.inputMsg
	if !ok {
		return 0, errors.New("I/O data reading failed")
	}
	copy(p, data)
	return len(data), nil
}

//将标准输出、错误写入ws(客户端)
func (handler *streamHandler) Write(p []byte) (int, error) {
	// 处理非utf8字符
	if !utf8.Valid(p) {
		bufStr := string(p)
		buf := make([]rune, 0, len(bufStr))
		for _, r := range bufStr {
			if r == utf8.RuneError {
				buf = append(buf, []rune("@")...)
			} else {
				buf = append(buf, r)
			}
		}
		p = []byte(string(buf))
	}
	err := handler.ws.WriteMessage(websocket.TextMessage, p)
	return len(p), err
}

//将字符串转换为int类型
func ToInt(str string) int {
	data, err := strconv.Atoi(str)
	if err != nil {
		fmt.Println(err)
	}
	return data
}

//处理ws输入数据
func executeTask(handler *streamHandler) {
	for {
		_, msg, err := handler.ws.ReadMessage()
		if err != nil {
			return
		}
		//心跳检测
		if string(msg) == "ping" {
			continue
		}
		//调整窗口宽高
		if strings.Contains(string(msg), "resize") {
			resizeSlice := strings.Split(string(msg), ":")
			rows, _ := strconv.Atoi(resizeSlice[1])
			cols, _ := strconv.Atoi(resizeSlice[2])
			handler.resizeEvent <- remotecommand.TerminalSize{
				Width:  uint16(cols),
				Height: uint16(rows),
			}
			continue
		}
		handler.inputMsg <- msg
	}
}

func podTerminal(c *gin.Context) {
	podName := c.Query("podName")
	namespace := c.Query("namespace")
	containerName := c.Query("containerName")
	cols := c.Query("cols")
	rows := c.Query("rows")

	ClientSet, config, err := initialClientSet("./kube/config")
	if err != nil {
		return
	}

	//初始化请求体
	req := ClientSet.CoreV1().RESTClient().Post().
		Resource("pods").
		Name(podName). //podName
		Namespace(namespace). //namespace
		SubResource("exec").
		VersionedParams(&corev1.PodExecOptions{
			Container: containerName, //containerName
			Command:   []string{"bash"},
			Stdin:     true,
			Stdout:    true,
			Stderr:    true,
			TTY:       true, // 启用终端
		}, scheme.ParameterCodec)

	// http转SPDY,添加X-Stream-Protocol-Version等相关header并发送请求
	exec, err := remotecommand.NewSPDYExecutor(config, "POST", req.URL())
	if err != nil {
		log.Println(err)
		return
	}

	ws, err := initialWS(c)
	defer func() {
		ws.Close()
		if err := recover(); err != nil {
			log.Println(err)
		}
	}()

	handler := &streamHandler{
		ws:          ws,
		inputMsg:    make(chan []byte, 1024),
		resizeEvent: make(chan remotecommand.TerminalSize, 1),
	}
	//将初次获取的窗口cols、rows写入channel
	handler.resizeEvent <- remotecommand.TerminalSize{Width: uint16(ToInt(cols)), Height: uint16(ToInt(rows))}

	//获取ws输入数据
	go executeTask(handler)

	if err := exec.Stream(remotecommand.StreamOptions{
		Stdin:             handler,
		Stdout:            handler,
		Stderr:            handler,
		Tty:               true,
		TerminalSizeQueue: handler,
	}); err != nil {
		ws.Close()
		return
	}
}

前端实现逻辑(vue3)

<template>
    <div id="terminal"></div>
</template>

<script>
  import { Terminal } from 'xterm'
  import { FitAddon } from 'xterm-addon-fit'
  import { AttachAddon } from 'xterm-addon-attach'
  import {onBeforeUnmount, onMounted } from "vue"
  import 'xterm/css/xterm.css'
   export default {
     name: 'web-terminal',
     setup(){
       //初始化ws连接
       let ws = new WebSocket("ws://localhost:9090?podName=xxxx?namespace=xxxx?containerName=xxx")
       ws.onopen = ()=>{
         console.log(Date(), 'onopen')
         heartCheck.start()
       }
       ws.onclose = ()=>{
         console.log(Date(), 'onclose')
         heartCheck.stop()
       }
       ws.onerror = ()=> {
         console.log(Date(), 'onerror')
       }

       //心跳检查
       const heartCheck = {
         timeout: 5000, // 5s发一次心跳
         //关闭心跳检查
         stop: function() {
           clearInterval(this.timer)
         },
         //开启心跳检查
         start: function() {
           this.timer = setInterval(function() {
             if (ws !== null && ws.readyState === 1) {
               ws.send('ping')
             }
           }, this.timeout)
         }
       }
       //页面挂载后初始化terminal功能
       onMounted(()=>{
         let webTerminal = document.getElementById('terminal')
         let terminal = new Terminal(
             {
               fontSize: 16
             }
         )
         let fitAddon = new FitAddon()
         terminal.loadAddon(fitAddon)
         terminal.open(webTerminal)
         try {
           fitAddon.fit()
         } catch (e) {
           console.error(e)
         }

         //加载attach插件,通过ws实现web终端与远程终端进行实时交互
         terminal.loadAddon(new AttachAddon(ws))

         //增加滚轮事件监听,用于调整web终端字体大小
         webTerminal.addEventListener("wheel", (e) => {
           if (e.ctrlKey) {
             e.preventDefault()
             if (e.deltaY < 0) {
               terminal.options.fontSize = ++self.fontSize
             } else {
               terminal.options.fontSize = --self.fontSize
             }
             try { fitAddon.fit() } catch (e) {/**/}
             if (ws !== null && ws.readyState === 1) {
               ws.send(`resize:${terminal.rows}:${terminal.cols}`)
             }
           }

           //为window添加窗口大小调整事件,用于实时调整终端窗口
           window.addEventListener('resize', () => {
             webTerminal.style.height = document.documentElement.clientHeight + 'px'
             try { fitAddon.fit() } catch (e) {/**/}
             if (ws !== null && ws.readyState === 1) {
               ws.send(`resize:${terminal.rows}:${terminal.cols}`)
             }
           })
         })
         
         onBeforeUnmount(()=>{
           if (ws !== null) {
             ws.close()
           }
           if (terminal !== null) {
             terminal.dispose()
           }
         })
       })

     }
   }
</script>

<style>
   #terminal{
     position: absolute;
     top: 0; right: 0; bottom: 0; left: 0;
   }
</style>