streamlit 网络批量ping和snmp检查

发布时间 2023-12-17 22:37:35作者: 小菜鸟起飞ing
import time
import netaddr
from netaddr import *
import pandas as pd
import streamlit as st
from concurrent.futures import ThreadPoolExecutor
import subprocess
import shlex
from threading import Lock
import re

class compute(object):

def __init__(self):
st.title(':red[Welcome] :sunglasses:') # 创建title,可以链接
st.sidebar.header(":red[IP计算器]")
#self.cmd = st.text_input("请输入要执行的命令")
self.net_iput = st.sidebar.text_input("请输入ip地址", help="示例:1.1.1.1/24") # 输入要划分的网段
self.subnet_list = [] # subnet列表

def subnet(self):
try:
net = IPNetwork(self.net_iput) # 输入要划分的网段
choose = st.sidebar.checkbox("是否进行子网划分",key="key123")
if choose == True:
cidr = st.sidebar.number_input("请选择划分的子网", min_value=1, max_value=128)
if int(cidr) - int(net.prefixlen) <20:
self.subnet_list = [str(snet) for snet in net.subnet(cidr)] # 获取子网list
#st.write(":red[子网详情如下:]")
with st.sidebar.expander(":red[查看子网详情]"):
df_subnet = pd.DataFrame(data=self.subnet_list, columns=["子网详情"])
st.dataframe(df_subnet,width=1000)
else:
st.sidebar.warning("子网数量过多,请重新输入")
except netaddr.core.AddrFormatError:
pass

def ip_info(self):
tab1,tab2,tab3 = st.sidebar.tabs([":red[可用地址]",":red[反掩码]",":red[掩码]"])
try:
ip = IPNetwork(self.net_iput)
with tab1:
st.write(str(ip[1]) + str("---") + str(ip[-2]))
with tab2:
st.write(str(ip.hostmask))
with tab3:
st.write(str(ip.netmask))
except netaddr.core.AddrFormatError:
pass


class mult_ping(object):

def __init__(self):
self.log_list = []
self.log_list_name = []
self.success_list = []
self.success_list_name = []
self.failed_list = []
self.failed_list_name = []

self.log_list_1 = []
self.log_list_name_1 = []
self.success_list_1 = []
self.success_list_name_1 = []
self.failed_list_1 = []
self.failed_list_name_1 = []

def ping(self,name,ip):
lock = Lock()
cmd_tmp = shlex.split("ping -c 2 %s" % str(ip))
cmd = subprocess.run(cmd_tmp, stdout=subprocess.PIPE)
result = (cmd.stdout.decode("gbk"))
self.log_list.append(result)
self.log_list_name.append(str(name))
if cmd.returncode == 0:
result = "正常"
self.success_list.append(result)
self.success_list_name.append(str(name))
else:
result = "不通"
#result = str(name) + " ping不通"
self.failed_list.append(result)
self.failed_list_name.append(str(name))

def snmp(self,name, ip):
lock = Lock()
cmd_snmp = shlex.split("snmpwalk -a sha -A muS7kKrZTAFbzPa4Uu56 -v 3 -l authPriv -x aes -X muS7kKrZTAFbzPa4Uu56 -u yunqiao %s ifindex" % str(ip))
cmd = subprocess.run(cmd_snmp, stdout=subprocess.PIPE)

if cmd.returncode == 0:
lock.acquire()
result = "snmpv3正常"
self.success_list_1.append(result)
self.success_list_name_1.append(str(name))
cmd_result = (cmd.stdout.decode("gbk"))
self.log_list_1.append(cmd_result)
self.log_list_name_1.append(str(name))
lock.release()
elif cmd.returncode == 1:
cmd_snmp = shlex.split("snmpwalk -v 2c -c AXQfPuJs %s ifindex" % str(ip))
tmp_cmd = subprocess.run(cmd_snmp,stdout=subprocess.PIPE)
if tmp_cmd.returncode == 0:
lock.acquire()
result = "snmpv2正常"
self.success_list_1.append(result)
self.success_list_name_1.append(str(name))
cmd_result = (tmp_cmd.stdout.decode("gbk"))
self.log_list_1.append(cmd_result)
self.log_list_name_1.append(str(name))
lock.release()
else:
lock.acquire()
result = "认证失败或超时"
self.failed_list_1.append(result)
self.failed_list_name_1.append(str(name))
cmd_result = (cmd.stdout.decode("gbk"))
self.log_list_1.append(cmd_result)
self.log_list_name_1.append(str(name))
lock.release()
elif cmd.returncode == 2:
lock.acquire()
result = "终端禁止接入"
self.failed_list_1.append(result)
self.failed_list_name_1.append(str(name))
cmd_result = (cmd.stdout.decode("gbk"))
self.log_list_1.append(cmd_result)
self.log_list_name_1.append(str(name))
lock.release()
else:
lock.acquire()
result = "其他问题"
self.failed_list_1.append(result)
self.failed_list_name_1.append(str(name))
cmd_result = (cmd.stdout.decode("gbk"))
self.log_list_1.append(cmd_result)
self.log_list_name_1.append(str(name))
lock.acquire()


def all_function(self):
upload_file = st.file_uploader(label="请上传要进行批量ping的文件",
type="xlsx",
accept_multiple_files=False)
if upload_file == None:
st.stop()
else:
with st.expander(":red[操作栏]"):
df = pd.read_excel(upload_file,None)
sheet = df.keys() # 获取所有sheet页
tabs = st.tabs(sheet)
for tab ,sheet_name,num in zip (tabs,sheet,list(range(1,10000))):
with tab:
df_sheet = df[sheet_name]
st.write(df_sheet)
col1, col2 = st.columns(2)
with col1:
result_name = st.selectbox("请选择设备名称列",df_sheet.keys(),key=10+num) # 选择最后保存的名字
if result_name == None:
st.stop()
df_result_name = df_sheet[result_name] # 获取设备名称的Dataframe
st.write(df_result_name)
dev_name_list = df_result_name.to_list()
#st.write(self.dev_name_list)
with col2:
ip_info = st.selectbox("请选择ip地址列",df_sheet.keys(),key=2+num) # 选择ping的地址
if ip_info == None:
st.stop()
df_ip_info = df_sheet[ip_info] # 获取IP的dataframe
st.write(df_ip_info)
ip_host_list = df_ip_info.to_list()
zip_list = zip(dev_name_list,ip_host_list) # 将选择的数据进行zip打包成元组
# 功能选项
ping_tab1, snmp_tab2 = st.tabs(["ping检查", "snmp检查"])
with ping_tab1:
choose_ping = st.checkbox("是否执行ping",key=20+num) # 是否执行CMD命令
if choose_ping ==True:
pool = ThreadPoolExecutor(8)
with st.spinner("wait for it"):
for i in zip_list:
name = i[0]
ip = i[1]
#st.write(i)
pool.submit(yc.ping, name, ip)
pool.shutdown()
#st.balloons()
st.snow()
tab1, tab2, tab3 = st.tabs([":red[succeed]", ":red[failed]", ":red[log]"])
with tab1:
dic = {"name": self.success_list_name, "result": self.success_list}
df_result = pd.DataFrame(data=dic)
st.write(df_result)
with tab2:
dic = {"name": self.failed_list_name, "result": self.failed_list}
df_result = pd.DataFrame(data=dic)
st.write(df_result)
with tab3:
dic = {"name": self.log_list_name, "result": self.log_list}
df_result = pd.DataFrame(data=dic)
st.write(df_result)
else:
pass
with snmp_tab2:
choose_snmp = st.checkbox("是否执行snmp_walk",key=30+num) #是否执行snmp命令
if choose_snmp ==True:
pool = ThreadPoolExecutor(8)
with st.spinner("wait for it"):
for i in zip_list:
name = i[0]
ip = i[1]
#st.write(i)
pool.submit(yc.snmp, name, ip)
pool.shutdown()
#st.balloons()
st.snow()
tab1, tab2, tab3 = st.tabs([":red[succeed]", ":red[failed]", ":red[log]"])
with tab1:
dic = {"name": self.success_list_name_1, "result": self.success_list_1}
df_result = pd.DataFrame(data=dic)
st.write(df_result)
with tab2:
dic = {"name": self.failed_list_name_1, "result": self.failed_list_1}
df_result = pd.DataFrame(data=dic)
st.write(df_result)
with tab3:
dic = {"name": self.log_list_name_1, "result": self.log_list_1}
df_result = pd.DataFrame(data=dic)
st.write(df_result)
else:
pass


if __name__ == '__main__':
yc = compute()
yc.ip_info()
yc.subnet()

yc= mult_ping()
yc.all_function()