Bläddra i källkod

add recv broadcast test

xuqiang 3 månader sedan
förälder
incheckning
c3d0a9b04e
1 ändrade filer med 47 tillägg och 0 borttagningar
  1. 47 0
      tests/broadcast.py

+ 47 - 0
tests/broadcast.py

@@ -0,0 +1,47 @@
+import socket
+import struct
+
+def receive_broadcast(port=9999, timeout=10):
+    """
+    接收UDP广播数据
+    
+    Args:
+        port: 监听的端口号
+        timeout: 超时时间(秒)
+    """
+    # 创建UDP socket
+    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+    
+    # 允许接收广播
+    sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
+    
+    # 绑定到所有网络接口
+    sock.bind(('', port))
+    sock.settimeout(timeout)
+    
+    print(f"正在监听广播端口 {port}...")
+    
+    try:
+        while True:
+            try:
+                # 接收数据(最大缓冲区大小)
+                data, addr = sock.recvfrom(1024)
+                print(f"收到来自 {addr} 的广播消息:")
+                print(f"  数据: {data.decode('utf-8', errors='ignore')}")
+                # print(f"  原始数据: {data.hex()}")
+                
+            except socket.timeout:
+                print("等待广播超时")
+                break
+            except KeyboardInterrupt:
+                print("\n用户中断")
+                break
+                
+    finally:
+        sock.close()
+        print("Socket已关闭")
+
+# 运行示例
+if __name__ == "__main__":
+    receive_broadcast(port=37000, timeout=30)