forked from Wangler2333/tcp_udp_web_tools-pyqt5
-
Notifications
You must be signed in to change notification settings - Fork 0
/
udp_logic.py
118 lines (109 loc) · 3.82 KB
/
udp_logic.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
from PyQt5 import QtWidgets
import tcp_udp_web_ui
import stopThreading
import socket
import threading
import sys
class UdpLogic(tcp_udp_web_ui.ToolsUi):
def __init__(self, num):
super(UdpLogic, self).__init__(num)
self.udp_socket = None
self.address = None
self.sever_th = None
def udp_server_start(self):
"""
开启UDP服务端方法
:return:
"""
self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
port = int(self.lineEdit_port.text())
address = ('', port)
self.udp_socket.bind(address)
except Exception as ret:
msg = '请检查端口号\n'
self.signal_write_msg.emit(msg)
else:
self.sever_th = threading.Thread(target=self.udp_server_concurrency)
self.sever_th.start()
msg = 'UDP服务端正在监听端口:{}\n'.format(port)
self.signal_write_msg.emit(msg)
def udp_server_concurrency(self):
"""
用于创建一个线程持续监听UDP通信
:return:
"""
while True:
recv_msg, recv_addr = self.udp_socket.recvfrom(1024)
msg = recv_msg.decode('utf-8')
msg = '来自IP:{}端口:{}:\n{}\n'.format(recv_addr[0], recv_addr[1], msg)
self.signal_write_msg.emit(msg)
def udp_client_start(self):
"""
确认UDP客户端的ip及地址
:return:
"""
self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
self.address = (str(self.lineEdit_ip_send.text()), int(self.lineEdit_port.text()))
except Exception as ret:
msg = '请检查目标IP,目标端口\n'
self.signal_write_msg.emit(msg)
else:
msg = 'UDP客户端已启动\n'
self.signal_write_msg.emit(msg)
def udp_send(self):
"""
功能函数,用于UDP客户端发送消息
:return: None
"""
if self.link is False:
msg = '请选择服务,并点击连接网络\n'
self.signal_write_msg.emit(msg)
else:
try:
send_msg = (str(self.textEdit_send.toPlainText())).encode('utf-8')
if self.comboBox_tcp.currentIndex() == 2:
msg = 'UDP服务端无法发送,请切换为UDP客户端\n'
self.signal_write_msg.emit(msg)
if self.comboBox_tcp.currentIndex() == 3:
self.udp_socket.sendto(send_msg, self.address)
msg = 'UDP客户端已发送\n'
self.signal_write_msg.emit(msg)
except Exception as ret:
msg = '发送失败\n'
self.signal_write_msg.emit(msg)
def udp_close(self):
"""
功能函数,关闭网络连接的方法
:return:
"""
if self.comboBox_tcp.currentIndex() == 2:
try:
self.udp_socket.close()
if self.link is True:
msg = '已断开网络\n'
self.signal_write_msg.emit(msg)
except Exception as ret:
pass
if self.comboBox_tcp.currentIndex() == 3:
try:
self.udp_socket.close()
if self.link is True:
msg = '已断开网络\n'
self.signal_write_msg.emit(msg)
except Exception as ret:
pass
try:
stopThreading.stop_thread(self.sever_th)
except Exception:
pass
try:
stopThreading.stop_thread(self.client_th)
except Exception:
pass
if __name__ == '__main__':
app = QtWidgets.QApplication(sys.argv)
ui = UdpLogic(1)
ui.show()
sys.exit(app.exec_())