import tkinter as tk
from tkinter import ttk
import threading
import queue
import json
import time
import websocket
from SignUtil import SignUtil # 请确保SignUtil.py在同一目录
class WebSocketClient:
def init(self, params, msg_queue, stop_event):
self.params = params
self.msg_queue = msg_queue
self.stop_event = stop_event
self.ws = None
self.heartbeat_interval = 30
def on_message(self, message): self.msg_queue.put(("info", f"接收消息: {message}")) def run(self): try: # 生成签名 sign_util = SignUtil() timestamp = str(int(time.time() * 1000)) sign = sign_util.get_sign( self.params["system_id"], self.params["system_key"], timestamp ) # 构造连接URL url = f"{self.params['url']}systemId={self.params['system_id']}×tamp={timestamp}&sign={sign}" self.ws = websocket.create_connection(url) # 发送订阅请求 topics = [t.strip() for t in self.params["topics"].split(",")] subscription = json.dumps({"cmd": "subscribe", "topics": topics}) self.ws.send(subscription) self.msg_queue.put(("info", "订阅请求已发送")) # 主循环 last_heartbeat = time.time() while not self.stop_event.is_set(): try: message = self.ws.recv() self.on_message(message) # 心跳处理 if time.time() - last_heartbeat > self.heartbeat_interval: self.ws.send(json.dumps({"cmd": "keepAlive", "data": "heartbeat"})) last_heartbeat = time.time() except websocket.WebSocketConnectionClosedException: self.msg_queue.put(("error", "连接已关闭")) break except Exception as e: self.msg_queue.put(("error", f"接收错误: {str(e)}")) break except Exception as e: self.msg_queue.put(("error", f"连接失败: {str(e)}")) finally: if self.ws: self.ws.close() self.stop_event.set()
复制
class WebSocketGUI:
def init(self, root):
self.root = root
self.root.title(“WebSocket 自动化平台”)
# 控制变量 self.is_connected = False self.stop_event = threading.Event() self.msg_queue = queue.Queue() # 创建UI self.create_widgets() # 启动消息检查 self.root.after(100, self.process_messages) def create_widgets(self): # 输入框框架 input_frame = ttk.LabelFrame(self.root, text="连接参数") input_frame.pack(padx=10, pady=5, fill=tk.X) # URL ttk.Label(input_frame, text="URL:").grid(row=0, column=0, sticky=tk.W) self.url_entry = ttk.Entry(input_frame, width=50) self.url_entry.grid(row=0, column=1, padx=5, pady=2) # System ID ttk.Label(input_frame, text="System ID:").grid(row=1, column=0, sticky=tk.W) self.system_id_entry = ttk.Entry(input_frame, width=50) self.system_id_entry.grid(row=1, column=1, padx=5, pady=2) # System Key ttk.Label(input_frame, text="System Key:").grid(row=2, column=0, sticky=tk.W) self.system_key_entry = ttk.Entry(input_frame, width=50) self.system_key_entry.grid(row=2, column=1, padx=5, pady=2) # Topics ttk.Label(input_frame, text="Topics (逗号分隔):").grid(row=3, column=0, sticky=tk.W) self.topics_entry = ttk.Entry(input_frame, width=50) self.topics_entry.grid(row=3, column=1, padx=5, pady=2) # 按钮框架 btn_frame = ttk.Frame(self.root) btn_frame.pack(pady=5) self.start_btn = ttk.Button(btn_frame, text="开始", command=self.start_connection) self.start_btn.pack(side=tk.LEFT, padx=5) self.stop_btn = ttk.Button(btn_frame, text="停止", command=self.stop_connection, state=tk.DISABLED) self.stop_btn.pack(side=tk.LEFT, padx=5) # 日志输出 log_frame = ttk.LabelFrame(self.root, text="日志输出") log_frame.pack(padx=10, pady=5, fill=tk.BOTH, expand=True) self.log_text = tk.Text(log_frame, height=15, state=tk.DISABLED) self.log_text.pack(fill=tk.BOTH, expand=True, padx=5, pady=5) def start_connection(self): params = { "url": self.url_entry.get(), "system_id": self.system_id_entry.get(), "system_key": self.system_key_entry.get(), "topics": self.topics_entry.get() } if not all(params.values()): self.display_message("error", "所有字段必须填写") return self.stop_event.clear() self.client_thread = threading.Thread( target=WebSocketClient( params, self.msg_queue, self.stop_event ).run, daemon=True ) self.client_thread.start() self.start_btn.config(state=tk.DISABLED) self.stop_btn.config(state=tk.NORMAL) self.display_message("info", "正在建立连接...") def stop_connection(self): self.stop_event.set() self.start_btn.config(state=tk.NORMAL) self.stop_btn.config(state=tk.DISABLED) self.display_message("info", "正在断开连接...") def process_messages(self): while not self.msg_queue.empty(): msg_type, message = self.msg_queue.get() self.display_message(msg_type, message) self.root.after(100, self.process_messages) def display_message(self, msg_type, message): self.log_text.config(state=tk.NORMAL) tag = "error" if msg_type == "error" else "info" self.log_text.insert(tk.END, message + "\n", tag) self.log_text.see(tk.END) self.log_text.config(state=tk.DISABLED) # 配置标签样式 self.log_text.tag_config("error", foreground="red") self.log_text.tag_config("info", foreground="black")
复制
if name == “main”:
root = tk.Tk()
app = WebSocketGUI(root)
root.geometry(“800x600”)
root.mainloop()
打包命令:pyinstaller --onefile --windowed WebsocketTest.py