import socket
import threading
import time
import struct
"""時間:2021-01-24 版本V1.0"""
def send_out(udp_socket, recv_ip, sn, functionCode):
"""回復數據"""
dLen = bytes([0,1]) #數據長度
state = bytes([0]) # 狀態
oOptions = bytes([0,0]) # 操作選項
currentTime = bytes(list(map(int, time.strftime("%y,%m,%d,%H,%M,%S", time.localtime()).split(",")))) # 實時時間
HDReporT = bytes([0, 0]) # 歷史數據上報時刻
RTReporInterval = 1 # 實時數據上報間隔(分鐘,1-1440)
RecorInterval = 1 # 數據記錄間隔(分鐘,1-1440)
HDRepor = 0 # 歷史數據上報間隔(小時,0代表隨實時數據上報)
HTAlarm = 35 # 高溫告警值(℃)
LTAlarm = 0 # 低溫告警值(℃)
TBuffer = 0.1 # 溫度緩沖值(℃)
HHAlarm = 75 # 高濕告警值(rh%)
LHAlarm = 35 # 低濕告警值(rh%)
HBuffer = 0.1 # 濕度緩沖值(rh%)
# 根據SN號判斷設備類型回復配置數據,SN號C0開頭為,80開頭為單溫度
if sn[0:1] == b'\xC0':
cdata = (oOptions + currentTime + (RTReporInterval).to_bytes(2, byteorder = 'big') + (RecorInterval).to_bytes(2, byteorder = 'big') + HDReporT +
(HDRepor).to_bytes(1, byteorder = 'big') + struct.pack('>h',int(HTAlarm*10)) + struct.pack('>h',int(LTAlarm*10)) + (int(TBuffer*10)).to_bytes(2, byteorder = 'big') +
(int(HHAlarm*10)).to_bytes(2, byteorder = 'big') + (int(LHAlarm*10)).to_bytes(2, byteorder = 'big') + (int(HBuffer*10)).to_bytes(2, byteorder = 'big'))
elif sn[0:1] == b'\x80':
cdata = (oOptions + currentTime + (RTReporInterval).to_bytes(2, byteorder = 'big') + (RecorInterval).to_bytes(2,byteorder = 'big') + HDReporT +
(HDRepor).to_bytes(1, byteorder='big') + struct.pack('>h',int(HTAlarm*10)) + struct.pack('>h',int(LTAlarm*10)) + (int(TBuffer * 10)).to_bytes(2,byteorder='big'))
# 判斷功能碼,如果是實時數據帶配置參數回復
if functionCode == b'\x01' or functionCode == b'\x0B':
cLen = bytes([0, len(cdata)]) # 配置長度
CRCM = bytearray.fromhex((calc_crc((b'\x7e' + sn + functionCode + dLen + state + cLen + cdata).hex()))[2:6]) #計算CRC校驗碼
sendBytes = b'\x7e'+ sn + functionCode + dLen + state + cLen + cdata + CRCM + b'\x0D'
else:
cLen = bytes([0, 0]) # 配置長度
CRCM = bytearray.fromhex((calc_crc((b'\x7e' + sn + functionCode + dLen + state + cLen).hex()))[2:6]) #計算CRC校驗碼
sendBytes = b'\x7e' + sn + functionCode + dLen + state + cLen + CRCM + b'\x0D'
dest_ip = recv_ip[0]
dest_port = recv_ip[1]
udp_socket.sendto(sendBytes, (dest_ip, dest_port))
print("回復數據:",sendBytes.hex())
def recv_data(udp_socket):
"""接收數據"""
while True:
recv_data = udp_socket.recvfrom(1024)
recv_ip = recv_data[1] #設備IP 端口
data = recv_data[0] #設備上報的數據
if data[0:1] == b'\x7e' and data[len(data)-1:len(data)] == b'\x0D': # 判斷是否包頭為7E,包尾為0D
print("接收數據:", data.hex())
sn = data[1:7] #設備SN號
functionCode = data[7:8] #功能碼
# 功能碼01和0B為實時數據
if functionCode == b'\x01' or functionCode == b'\x0B':
temperature = struct.unpack('>h',data[10:12])[0] / 10 #溫度值
# 根據SN號判斷設備類型,SN號C0開頭為,80開頭為單溫度
if sn[0:1] == b'\xC0':
humidity = int.from_bytes(data[12:14], byteorder='big') / 10 # 濕度值
print("接收解析:","SN號:"+sn.hex(), "溫度:"+str(temperature), "濕度:"+str(humidity))
elif sn[0:1] == b'\x80':
print("接收解析:","SN號:"+sn.hex(), "溫度:"+str(temperature))
# 功能碼02和0C為歷史數據
elif functionCode == b'\x02' or functionCode == b'\x0C':
dLen = int.from_bytes(data[8:10], byteorder='big') # 數據項長度
# 溫濕度記錄解析
if sn[0:1] == b'\xC0':
for i in range(10,dLen+1,10):
HTemperature = struct.unpack('>h', data[i:i+2])[0] / 10 # 記錄溫度值
HHumidity = struct.unpack('>h', data[i+2:i+4])[0] / 10 # 記錄濕度值
#HTime = data[i+4:i+10].hex() # 記錄時間
# 記錄時間
HTime = ("20"+str(ord(data[i+4:i+5])).zfill(2) + str(ord(data[i+5:i+6])).zfill(2) + str(ord(data[i+6:i+7])).zfill(2) + " " +
str(ord(data[i+7:i+8])).zfill(2)+ ":" + str(ord(data[i+8:i+9])).zfill(2) + ":" + str(ord(data[i+9:i+10])).zfill(2))
print("歷史記錄"+str(int(i/10)),HTemperature,HHumidity,HTime)
# 單溫度記錄解析
elif sn[0:1] == b'\x80':
for i in range(10,dLen+1,8):
HTemperature = struct.unpack('>h', data[i:i+2])[0] / 10 # 記錄溫度值
#HHumidity = struct.unpack('>h', data[i+2:i+4])[0] / 10 # 記錄濕度值
# 記錄時間
HTime = ("20"+str(ord(data[i+2:i+3])).zfill(2) + str(ord(data[i+3:i+4])).zfill(2) + str(ord(data[i+4:i+5])).zfill(2) + " " +
str(ord(data[i+5:i+6])).zfill(2)+ ":" + str(ord(data[i+6:i+7])).zfill(2) + ":" + str(ord(data[i+7:i+8])).zfill(2))
print("歷史記錄"+str(int(i/10)),HTemperature,HTime)
# 功能碼03和0D為告警數據
elif functionCode == b'\x03' or functionCode == b'\x0D':
print("告警記錄")
send_out(udp_socket, recv_ip, sn, functionCode) #回復設備
def calc_crc(string):
"""CRCM計算"""
data = bytearray.fromhex(string)
crc = 0xFFFF
for pos in data:
crc ^= pos
for i in range(8):
if ((crc & 1) != 0):
crc >>= 1
crc ^= 0xA001
else:
crc >>= 1
return hex(((crc & 0xff) << 8) + (crc >> 8) + 4660)
def main():
# 1. 創建套接字
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# 2. 綁定本地信息
udp_socket.bind(("", 6666))
# 3. 創建一個子線程用來接收數據
t = threading.Thread(target=recv_data, args=(udp_socket,))
t.start()
if __name__ == "__main__":
main()
2021年03月01日
上一篇
下一篇
4代TH4XW/TH4XG儀表二次開發UDP通訊python demo
添加時間: