WebSocket测试方法
在线测试工具
http://www.jsons.cn/websocket/
使用python 编程作为客户端测试
安装
Install with pip
pip install websocket-server
使用接口进行通信
我们使用python写一个简单的websocket的服务端
#! -*- coding: utf-8 -*-
"""
Info: Websocket 的使用示例
"""
import asyncio
import websockets
websocket_users = set()
# 检测客户端权限,用户名密码通过才能退出循环
async def check_user_permit(websocket):
print("new websocket_users:", websocket)
websocket_users.add(websocket)
print("websocket_users list:", websocket_users)
while True:
recv_str = await websocket.recv()
cred_dict = recv_str.split(":")
if cred_dict[0] == "admin" and cred_dict[1] == "123456":
response_str = "Congratulation, you have connect with server..."
await websocket.send(response_str)
print(cred_dict)
print("Password is ok...")
return True
else:
print(cred_dict)
response_str = "Sorry, please input the username or password..."
print("Password is wrong...")
await websocket.send(response_str)
# 接收客户端消息并处理,这里只是简单把客户端发来的返回回去
async def recv_user_msg(websocket):
while True:
recv_text = await websocket.recv()
print("recv_text:", websocket.pong, recv_text)
response_text = f"Server return: {recv_text}"
print("response_text:", response_text)
await websocket.send(response_text)
# 服务器端主逻辑
async def run(websocket, path):
while True:
try:
await check_user_permit(websocket)
await recv_user_msg(websocket)
except websockets.ConnectionClosed:
print("ConnectionClosed...", path) # 链接断开
print("websocket_users old:", websocket_users)
websocket_users.remove(websocket)
print("websocket_users new:", websocket_users)
break
except websockets.InvalidState:
print("InvalidState...") # 无效状态
break
except Exception as e:
print("Exception:", e)
if __name__ == '__main__':
print("127.0.0.1:8281 websocket...")
asyncio.get_event_loop().run_until_complete(websockets.serve(run, "127.0.0.1", 8281))
asyncio.get_event_loop().run_forever()
运行服务端后可使用在线工具调试
使用python封装一个WebsocketUtil
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2022/9/3 16:24
# @File : web_socket_util.py.py
# @Software: win10 Tensorflow1.13.1 python3.9
import logging
import json
from json import JSONDecodeError
from websocket import create_connection, WebSocketTimeoutException
logger = logging.getLogger(__name__)
class WebsocketUtil():
def conn(self, url, timeout=3):
'''
短连接方法(使用create_connection链接,此方法不建议使用,链接不稳定,容易断,并且连接很耗时):
连接web服务器
:param url: 服务的url
:param timeout: 超时时间
:return:
'''
'''一直链接,直到连接上就退出循环'''
while True:
try:
self.wss = create_connection(url, timeout=timeout)
print(self.wss)
break
except Exception as e:
print('连接异常:', e)
continue
def send(self, message):
'''
发送请求数据体
:param message: 待发送的数据信息
:return:
'''
if not isinstance(message, str):
message = json.dumps(message)
return self.wss.send(message)
def load_json(self, base_str):
'''
进行数据体处理
:param base_str: 待处理的数据
:return:
'''
if isinstance(base_str, str):
try:
res = json.loads(base_str)
return base_str
except JSONDecodeError:
return base_str
elif isinstance(base_str, list):
res = []
for i in base_str:
res.append(self.load_json(i))
return res
elif isinstance(base_str, str):
for key, value in base_str.items():
base_str[key] = self.load_json(value)
return base_str
return base_str
def recv(self, timeout=3):
'''
接收数据体信息,并调用数据体处理方法处理响应体
:param timeout: 超时时间
:return:
'''
if isinstance(timeout, dict):
timeout = timeout["timeout"]
try:
self.settimeout(timeout)
recv_json = self.wss.recv()
all_json_recv = self.load_json(recv_json)
self._set_response(all_json_recv)
return all_json_recv
except WebSocketTimeoutException:
logger.error(f'已经超过{timeout}秒没有接收数据啦')
def settimeout(self, timeout):
'''
设置超时时间
:param timeout: 超时时间
:return:
'''
self.wss.settimeout(timeout)
def recv_all(self, timeout=3):
'''
接收多个数据体信息,并调用数据体处理方法处理响应体
:param timeout: 超时时间
:return:
'''
if isinstance(timeout, dict):
timeout = timeout['timeout']
recv_list = []
while True:
try:
self.settimeout(timeout)
recv_list = self.wss.recv()
all_json_recv = self.load_json(recv_list)
recv_list.append(all_json_recv)
logger.info(f'all::::: {all_json_recv}')
except WebSocketTimeoutException:
logger.error(f'已经超过{timeout}秒没有接收数据啦')
break
self._set_response(recv_list)
return recv_list
def close(self):
'''
关闭连接
:return:
'''
return self.wss.close()
def _set_response(self, response):
self.response = response
def _get_response(self) -> list:
return self.response
编写测试用例
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2022/9/3 16:25
# @Author : shisuiyi
# @File : test_case.py.py
# @Software: win10 Tensorflow1.13.1 python3.9
from web_socket_util import WebsocketUtil
class TestWsDemo:
def setup(self):
url = 'ws://127.0.0.1:8281'
self.wss = WebsocketUtil()
self.wss.conn(url)
data = 'admin:123456'
self.wss.send(data)
res = self.wss.recv()
print(res)
def teardown(self):
self.wss.close()
def test_send_01(self):
data = 'shisuiyi'
self.wss.send(data)
res = self.wss.recv()
print(res)
assert data in res
补充WebSocket长连接
- 安装
pip install websocket-client
- 长连接的调用方法
ws = websocket.WebSocketApp("ws://echo.websocket.org/",
on_message = on_message,
on_error = on_error,
on_close = on_close)
ws.on_open = on_open
ws.run_forever()
- 长连接,参数介绍
- url: websocket的地址。
- header: 客户发送websocket握手请求的请求头,{‘head1:value1’,‘head2:value2’}。
- on_open:在建立Websocket握手时调用的可调用对象,这个方法只有一个参数,就是该类本身。
- on_message:这个对象在接收到服务器返回的消息时调用。有两个参数,一个是该类本身,一个是我们从服务器获取的字符串(utf-8格式)。
- on_error:这个对象在遇到错误时调用,有两个参数,第一个是该类本身,第二个是异常对象。
- on_close:在遇到连接关闭的情况时调用,参数只有一个,就是该类本身。
- on_cont_message:这个对象在接收到连续帧数据时被调用,有三个参数,分别是:类本身,从服务器接受的字符串(utf-8),连续标志。
- on_data:当从服务器接收到消息时被调用,有四个参数,分别是:该类本身,接收到的字符串(utf-8),数据类型,连续标志。
- keep_running:一个二进制的标志位,如果为True,这个app的主循环将持续运行,默认值为True。
- get_mask_key:用于产生一个掩码。
- subprotocols:一组可用的子协议,默认为空。
- 长连接关键方法:
ws.run_forever(ping_interval=60,ping_timeout=5)
如果不断开关闭websocket连接,会一直阻塞下去。另外这个函数带两个参数,如果传的话,启动心跳包发送。
ping_interval:自动发送“ping”命令,每个指定的时间(秒),如果设置为0,则不会自动发送。
ping_timeout:如果没有收到pong消息,则为超时(秒)。
ws.run_forever(ping_interval=60,ping_timeout=5)
#ping_interval心跳发送间隔时间#ping_timeout 设置,发送ping到收到pong的超时时间
- 长连接示例
import websocket
import json
import time
try:
import thread
except ImportError:
import _thread as thread
def on_message(ws, message):
'''服务器有数据更新时,主动推送过来的数据'''
print(message)
def on_error(ws, error):
'''程序报错时,就会触发on_error事件'''
print(error)
def on_close(ws):
'''关闭websocket连接后,打印此消息'''
print("### closed ###")
def on_open(ws):
'''连接到服务器之后就会触发on_open事件,这里用于send数据'''
def run(*args):
Sign = {
"name": 1,
"nam2": 2
}
ws.send(json.dumps(Sign)) # 发送数据(必须为str类型)
time.sleep(1)
ws.close() # 关闭连接
thread.start_new_thread(run, ())
def run_long():
websocket.enableTrace(True)
ws = websocket.WebSocketApp("ws://echo.websocket.org/",
on_message=on_message,
on_error=on_error,
on_close=on_close)
ws.on_open = on_open
ws.run_forever(ping_timeout=5)
# ws.run_forever()
if __name__ == "__main__":
run_long()