• <blockquote id="eygoe"></blockquote>
  • <menu id="eygoe"></menu>
    <tt id="eygoe"></tt>
  • import socket

    import threading

    import time

    import struct

    """時間:2021-01-24 版本V1.0"""


    def send_out(udp_socket, recv_ip, sn, functionCode):

        """回復數據"""

        dLen = bytes([0,1]) #數據長度

        state = bytes([0]) # 狀態

        oOptions = bytes([0,0])  # 操作選項

        currentTime = bytes(list(map(int, time.strftime("%y,%m,%d,%H,%M,%S", time.localtime()).split(","))))  # 實時時間

        HDReporT = bytes([0, 0])  # 歷史數據上報時刻


        RTReporInterval = 1  # 實時數據上報間隔(分鐘,1-1440)

        RecorInterval = 1  # 數據記錄間隔(分鐘,1-1440)

        HDRepor = 0  # 歷史數據上報間隔(小時,0代表隨實時數據上報)

        HTAlarm = 35  # 高溫告警值(℃)

        LTAlarm = 0  # 低溫告警值(℃)

        TBuffer = 0.1  # 溫度緩沖值(℃)

        HHAlarm = 75  # 高濕告警值(rh%)

        LHAlarm = 35  # 低濕告警值(rh%)

        HBuffer = 0.1  # 濕度緩沖值(rh%)


        # 根據SN號判斷設備類型回復配置數據,SN號C0開頭為,80開頭為單溫度

        if sn[0:1] == b'\xC0':

            cdata = (oOptions + currentTime + (RTReporInterval).to_bytes(2, byteorder = 'big') + (RecorInterval).to_bytes(2, byteorder = 'big') + HDReporT +

                     (HDRepor).to_bytes(1, byteorder = 'big') + struct.pack('>h',int(HTAlarm*10)) + struct.pack('>h',int(LTAlarm*10)) + (int(TBuffer*10)).to_bytes(2, byteorder = 'big') +

                     (int(HHAlarm*10)).to_bytes(2, byteorder = 'big') + (int(LHAlarm*10)).to_bytes(2, byteorder = 'big') + (int(HBuffer*10)).to_bytes(2, byteorder = 'big'))

        elif sn[0:1] == b'\x80':

            cdata = (oOptions + currentTime + (RTReporInterval).to_bytes(2, byteorder = 'big') + (RecorInterval).to_bytes(2,byteorder = 'big') + HDReporT +

                    (HDRepor).to_bytes(1, byteorder='big') + struct.pack('>h',int(HTAlarm*10)) + struct.pack('>h',int(LTAlarm*10)) + (int(TBuffer * 10)).to_bytes(2,byteorder='big'))

        # 判斷功能碼,如果是實時數據帶配置參數回復

        if functionCode == b'\x01' or functionCode == b'\x0B':

            cLen = bytes([0, len(cdata)])  # 配置長度

            CRCM = bytearray.fromhex((calc_crc((b'\x7e' + sn + functionCode + dLen + state + cLen + cdata).hex()))[2:6]) #計算CRC校驗碼

            sendBytes = b'\x7e'+ sn + functionCode + dLen + state + cLen + cdata + CRCM + b'\x0D'

        else:

            cLen = bytes([0, 0]) # 配置長度

            CRCM = bytearray.fromhex((calc_crc((b'\x7e' + sn + functionCode + dLen + state + cLen).hex()))[2:6]) #計算CRC校驗碼

            sendBytes = b'\x7e' + sn + functionCode + dLen + state + cLen + CRCM + b'\x0D'

        dest_ip = recv_ip[0]

        dest_port = recv_ip[1]

        udp_socket.sendto(sendBytes, (dest_ip, dest_port))

        print("回復數據:",sendBytes.hex())

     

    def recv_data(udp_socket):

        """接收數據"""

        while True:

            recv_data = udp_socket.recvfrom(1024)

            recv_ip = recv_data[1] #設備IP 端口

            data = recv_data[0] #設備上報的數據

            if data[0:1] == b'\x7e' and data[len(data)-1:len(data)] == b'\x0D': # 判斷是否包頭為7E,包尾為0D

                print("接收數據:", data.hex())

                sn = data[1:7] #設備SN號

                functionCode = data[7:8] #功能碼

                # 功能碼01和0B為實時數據

                if functionCode == b'\x01' or functionCode == b'\x0B':

                    temperature = struct.unpack('>h',data[10:12])[0] / 10 #溫度值

                    # 根據SN號判斷設備類型,SN號C0開頭為,80開頭為單溫度

                    if sn[0:1] == b'\xC0':

                        humidity = int.from_bytes(data[12:14], byteorder='big') / 10  # 濕度值

                        print("接收解析:","SN號:"+sn.hex(), "溫度:"+str(temperature), "濕度:"+str(humidity))

                    elif sn[0:1] == b'\x80':

                        print("接收解析:","SN號:"+sn.hex(), "溫度:"+str(temperature))

                # 功能碼02和0C為歷史數據

                elif functionCode == b'\x02' or functionCode == b'\x0C':

                    dLen = int.from_bytes(data[8:10], byteorder='big') # 數據項長度

                    # 溫濕度記錄解析

                    if sn[0:1] == b'\xC0':

                        for i in range(10,dLen+1,10):

                            HTemperature = struct.unpack('>h', data[i:i+2])[0] / 10  # 記錄溫度值

                            HHumidity = struct.unpack('>h', data[i+2:i+4])[0] / 10  # 記錄濕度值

                            #HTime = data[i+4:i+10].hex()  # 記錄時間

                            # 記錄時間

                            HTime = ("20"+str(ord(data[i+4:i+5])).zfill(2) + str(ord(data[i+5:i+6])).zfill(2) + str(ord(data[i+6:i+7])).zfill(2) + " " +

                                      str(ord(data[i+7:i+8])).zfill(2)+ ":" + str(ord(data[i+8:i+9])).zfill(2) + ":" + str(ord(data[i+9:i+10])).zfill(2))

                            print("歷史記錄"+str(int(i/10)),HTemperature,HHumidity,HTime)

                    # 單溫度記錄解析

                    elif sn[0:1] == b'\x80':

                        for i in range(10,dLen+1,8):

                            HTemperature = struct.unpack('>h', data[i:i+2])[0] / 10  # 記錄溫度值

                            #HHumidity = struct.unpack('>h', data[i+2:i+4])[0] / 10  # 記錄濕度值

                            # 記錄時間

                            HTime = ("20"+str(ord(data[i+2:i+3])).zfill(2) + str(ord(data[i+3:i+4])).zfill(2) + str(ord(data[i+4:i+5])).zfill(2) + " " +

                                     str(ord(data[i+5:i+6])).zfill(2)+ ":" + str(ord(data[i+6:i+7])).zfill(2) + ":" + str(ord(data[i+7:i+8])).zfill(2))

                            print("歷史記錄"+str(int(i/10)),HTemperature,HTime)

                # 功能碼03和0D為告警數據

                elif functionCode == b'\x03' or functionCode == b'\x0D':

                    print("告警記錄")

                send_out(udp_socket, recv_ip, sn, functionCode) #回復設備


    def calc_crc(string):

        """CRCM計算"""

        data = bytearray.fromhex(string)

        crc = 0xFFFF

        for pos in data:

            crc ^= pos

            for i in range(8):

                if ((crc & 1) != 0):

                    crc >>= 1

                    crc ^= 0xA001

                else:

                    crc >>= 1

        return hex(((crc & 0xff) << 8) + (crc >> 8) + 4660)


    def main():

        # 1. 創建套接字

        udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

        # 2. 綁定本地信息

        udp_socket.bind(("", 6666))

        # 3. 創建一個子線程用來接收數據

        t = threading.Thread(target=recv_data, args=(udp_socket,))

        t.start()


    if __name__ == "__main__":

        main()


    2021年03月01日

    溫濕度變送器 MODBUS RTU java demo
    4代TH4XW/TH4XG 調用易維聯云平臺api的C# demo

    上一篇

    下一篇

    4代TH4XW/TH4XG儀表二次開發UDP通訊python demo

    th40w-e, th40g-e溫濕度記錄儀二次開發,跟儀表直接對接的例子

    添加時間:

    米奇在线777在线精品视频,米奇影院888奇米色99在线,米奇777免费视频手机版