O_K210_WifiLte远程抓拍图传
O_嵌入式专题目录
O_ESP_智能锁WifiLte控制模块
问题
- Wifi、Lte未实现切换。
- Lte连接服务器流程可优化。
- 由于LCD显示摄像头内容占用资源过多,多线程阻塞,单线程抓拍无法在显示视频同时流畅抓拍,拍照时会有明显断帧卡顿,可考虑迷你摄像机的移植。
- 代码基于Dock M1W模块,具体配置可修改
BoardConfig板级初始化函数。LCD为320*240 2.4' ST7789V。
WxMiniPro
index.wxml
<!--index.wxml--> <view style="text-align: center;"> <view style="text-align: center; margin-bottom: 20px;">{{connectionState}}</view> <image style="width: 320px; height: 240px; background-color: #eeeeee; margin-bottom: 20px;" mode="aspectFit" src="{{imgSrc}}"></image> <view> <button style="margin-bottom: 10px;" bindtap="btnOpenClose" disabled="{{lockDisconnectState}}">{{currentState}}</button> <button style="margin-bottom: 10px;" bindtap="btnClean">Clean</button> <button style="margin-bottom: 10px;" bindtap="btnDisconnect">Disconnect</button> </view> </view>index.js
// index.js var mqtt = require("../../utils/mqtt.js") let url = 'wxs://bemfa.com:9504/wss' let date = new Date().getTime() let client = mqtt.connect(url, { connectTimeout: 2000, clientId: '7d54f85af42976ee3c2693e6xxxxxxxx', username: '', password: '' }) let clientConnectFlag = 0 // 获取应用实例 const app = getApp() Page({ data: { lockDisconnectState: true, currentState: 'Open', connectionState: 'DEBUG', imgUrl: '', //imgSrc: 'https://img.bemfa.com/d1ad640cdf287c2e0084edd5de8f7606-d48b0349a3a65cd333bda8bfc212c6ca-1616576435.jpg', imgSrc: 'https://res.wx.qq.com/wxdoc/dist/assets/img/0.4cb08bb4.jpg', }, observers: { 'imgUrl': function(imgUrl) { console.log(this.data.imgUrl); this.setData({ imgSrc: imgUrl, }) } }, // 事件处理函数 bindViewTap() { wx.navigateTo({ url: '../logs/logs' }) }, onLoad() { wx.showToast({ title: 'No lock online!', icon: 'none', duration: 2000 }) this.initSocket(); this.getMessage(); }, initSocket() { client.on('connect', () => { console.log('Connect mqtt success!') }) client.subscribe('K210T1', { qos: 1 }, function (err) { if (!err) { clientConnectFlag = 1 console.log("Subscribe success: K210T1") } }) client.on('disconnect', () => { console.log('Connection disconnet!') }) client.on('close', () => { console.log('Connection close!') }) client.on('offline', () => { console.log('Connection offline!') }) client.on('error', () => { console.log('Cannot connect!') this.initSocket() }) client.publish('K210T1', 'lockCheck', {qos: 1}) }, getMessage() { console.log('Waiting message ...') client.on('message', (topic, payload) => { console.log('$Get: ', payload.toString()); /*if((payload == 'K210T1 Online') || (payload == 'Air724ugT1')) { this.setData({lockDisconnectState: false}) wx.showToast({ title: 'Lock online!', icon: 'none', duration: 2000 }) }*/ if(payload == 'Door-Od') { this.setData({ lockDisconnectState: true, currentState: 'Open', imgSrc: 'https://res.wx.qq.com/wxdoc/dist/assets/img/0.4cb08bb4.jpg', }) wx.showToast({ title: 'Door opened!', icon: 'none', duration: 2000 }) } if(payload == 'Door-Cd') { wx.showToast({ title: 'Door closed!', icon: 'none', duration: 2000 }) } if(payload == 'Door-ASK') { const that = this; wx.request({ url: 'https://images.bemfa.com/cloud/v1/get/?uid=7d54f85af42976ee3c2693e6xxxxxxxx&topic=K210IMG&num=1', success: function(res){ that.setData({ imgUrl: res.data.data[0].url.toString(), lockDisconnectState: false, }); }, fail: function(err){ console.log(err); } }) wx.showToast({ title: 'Door ASK!', icon: 'none', duration: 2000 }) } if(payload == 'Door-P') { wx.showToast({ title: 'Door people!', icon: 'none', duration: 2000 }) } if(payload == 'Door-B') { wx.showToast({ title: 'Door bell!', icon: 'none', duration: 2000 }) } if(payload.slice(0, 7) == 'imgUrl:') { //console.log(payload.slice(7).toString()) this.setData({ imgSrc: payload.slice(7).toString(), }) console.log('Get image!') wx.showToast({ title: 'Get image!', icon: 'none', duration: 2000 }) } }) }, btnOpenCloseA(e) { if(this.data.currentState == 'Open') { this.setData({ currentState: 'Close' }); console.log("Open door ..."); wx.showToast({ title: 'Open door ...', icon: 'none', duration: 2000 }) client.publish('K210T1', 'Door-Oc', {qos: 1}) } else { this.setData({ currentState: 'Open' }); console.log("Close door ..."); wx.showToast({ title: 'Close door ...', icon: 'none', duration: 2000 }) client.publish('K210T1', 'Door-Cc', {qos: 1}) } }, btnOpenClose(e) { console.log("Open door ..."); this.setData({ currentState: 'Opening ...' }); wx.showToast({ title: 'Open door ...', icon: 'none', duration: 2000 }) client.publish('K210T1', 'Door-Oc', {qos: 1}) }, btnClean(e) { this.setData({ lockDisconnectState: true, currentState: 'Open', imgSrc: 'https://res.wx.qq.com/wxdoc/dist/assets/img/0.4cb08bb4.jpg', }) }, btnDisconnect(e) { client.unsubscribe('K210T1'); console.log('Dissubscribe'); client.end(); } /* //自建服务器 wx.connectSocket({ url: 'ws://192.168.1.232:2202', }) wx.onSocketOpen((result) => { console.log("WS opend!"); wx.sendSocketMessage({ data: '{"type":"bind", "data":"bind", "fromId":"wxT1", "toId":"serverT1"}' }) wx.sendSocketMessage({ data: '{"type":"say", "data":"online", "fromId":"wxT1", "toId":"k210T1"}' }) }) wx.onSocketMessage((result) => { console.log("$Geted: " + result.data) if((result.data.indexOf('k210T1&online') >= 0) || (result.data.indexOf('k210T1&connect') >= 0)) { console.log("Lock online."); this.setData({lockOnlineState: false}) } if(result.data.indexOf('k210T1&opened') >= 0) { console.log("Open success!"); wx.showToast({ title: 'Open success!', icon: 'none', duration: 2000 }) } if(result.data.indexOf('k210T1&closed') >= 0) { console.log("Close success!"); wx.showToast({ title: 'Close success!', icon: 'none', duration: 2000 }) } if(result.data.indexOf('k210T1&bell') >= 0) { console.log("BELL!"); wx.showToast({ title: 'BELL!', icon: 'none', duration: 2000 }) } if(result.data.indexOf('k210T1&people') >= 0) { console.log("PEOPEL!"); wx.showToast({ title: 'PEOPLE!', icon: 'none', duration: 2000 }) } }) }, btnGET(e) { console.log("Start GET..."); wx.request({ url: 'http://192.168.1.232:8080?requestId=wxT1', data: {}, header: {}, method: 'GET', dataType: '其他', responseType: 'text', success: function(res) { console.log(res.data) }, fail: function(err) { console.log(err) }, complete: function(res) { //console.log(res.data) }, }) }, btnOpenClose(e) { if(this.data.currentState == 'Open') { this.setData({ currentState: 'Close' }); console.log("Open door ..."); wx.showToast({ title: 'Open door ...', icon: 'none', duration: 2000 }) wx.sendSocketMessage({ data: '{"type":"say", "data":"open", "fromId":"wxT1", "toId":"k210T1"}' }) } else { this.setData({ currentState: 'Open' }); console.log("Close door ..."); wx.showToast({ title: 'Close door ...', icon: 'none', duration: 2000 }) wx.sendSocketMessage({ data: '{"type":"say", "data":"close", "fromId":"wxT1", "toId":"k210T1"}' }) } } */ })
Embeded Code
main.py(如果SD卡加载失败,自动运行芯片内部的main.py)
try: import gc, lcd gc.collect() lcd.init() lcd.rotation(1) lcd.clear() lcd.draw_string(66, 130, "***************", lcd.RED, lcd.BLACK) lcd.draw_string(66, 150, "SYS ERROR: 'M0'", lcd.WHITE, lcd.BLACK) lcd.draw_string(66, 170, "***************", lcd.RED, lcd.BLACK) gc.collect() finally: gc.collect() import machine machine.reset()eye.py
#------------------------------------------------------------------------------# import gc, sys, os, time import lcd, sensor, image import json, ubinascii, machine from machine import Timer, UART #------------------------------------------------------------------------------# from Maix import FPIOA, GPIO from fpioa_manager import fm from board import board_info #------------------------------------------------------------------------------# gc.collect() #from Maix import freq #print(freq.get()) #CPU KPU frequence print("54d06ff5ada345f385d90ca58c59c81a\r\n") print(ubinascii.hexlify(machine.unique_id())) print(os.listdir()) LCD_DEBUG = 1 #------------------------------------------------------------------------------# ''' def onLedGTimer(timerL): global ledGState #print('Timer ledG') if ledGState == 0: #print('ledG 1') ledGState = 1 ledGDock.value(1) ledG.value(1) else: #print('ledG 0') ledGState = 0 ledGDock.value(0) ledG.value(0) fm.register(12, fm.fpioa.GPIO0) ledGDock = GPIO(GPIO.GPIO0, GPIO.OUT) ledGDock.value(0) fm.register(9, fm.fpioa.GPIO1) ledG = GPIO(GPIO.GPIO1, GPIO.OUT) ledG.value(0) ledGState = 0 ledGTimer = Timer(Timer.TIMER1, Timer.CHANNEL0, mode=Timer.MODE_PERIODIC, period=1, unit=Timer.UNIT_S, callback=onLedGTimer, arg=None, start=True, priority=1, div=0) ''' #------------------------------------------------------------------------------# def LcdInit(): lcd.init(freq=15000000) lcd.rotation(1) lcd.clear() if LCD_DEBUG == 1: lcd.draw_string(66, 150, "Lcd init done.", lcd.WHITE, lcd.BLACK) if LCD_DEBUG == 1: LcdInit() lcd.draw_string(56, 170, "Init from flash.", lcd.WHITE, lcd.BLACK) #------------------------------------------------------------------------------# def BoardConfig(): import json config = { "type": "dock", "board_info": { 'BOOT_KEY': 16, 'LED_R': 14, 'LED_G': 13, 'LED_B': 12, 'WIFI_TX': 6, 'WIFI_RX': 7, 'WIFI_EN': 8, 'MIC0_WS': 30, 'MIC0_DATA': 20, 'MIC0_BCK': 32, 'I2S_WS': 33, 'I2S_DA': 34, 'I2S_BCK': 35, } } cfg = json.dumps(config) print(cfg) try: with open('/flash/config.json', 'rb') as f: tmp = json.loads(f.read()) print(tmp) if tmp["type"] != config["type"]: raise Exception('config.json no exist') except Exception as e: with open('/flash/config.json', "w") as f: f.write(cfg) import machine machine.reset() BoardConfig() #------------------------------------------------------------------------------# def CamInit(): if LCD_DEBUG == 1: lcd.clear() lcd.draw_string(66, 150, "Cam initing ...", lcd.WHITE, lcd.BLACK) # Reset and initialize the sensor. It will # run automatically, call sensor.run(0) to stop sensor.reset(freq=22000000, dual_buff=True) # Set pixel format to RGB565 (or GRAYSCALE) sensor.set_pixformat(sensor.RGB565) # Set frame size to QVGA (320x240) sensor.set_framesize(sensor.QVGA) #sensor.set_windowing((224,224)) # Sensor flip #sensor.set_hmirror(True) sensor.set_vflip(True) # Wait for settings take effect. sensor.skip_frames() #sensor.set_auto_gain(True) #sensor.set_auto_whitebal(True) #sensor.set_auto_expuosure(True) if LCD_DEBUG == 1: lcd.clear() lcd.draw_string(66, 150, "Cam init done.", lcd.WHITE, lcd.BLACK) #------------------------------------------------------------------------------# ''' from network_espat import wifi def EspatInit(): if LCD_DEBUG == 1: lcd.clear() lcd.rotation(1) lcd.draw_string(50, 150, "Network initing ...", lcd.WHITE, lcd.BLACK) SSID = "K210T" PASW = "00000000" if wifi.isconnected() == False: for i in range(5): try: wifi.reset() print(wifi.at_cmd("AT\r\n")) print(wifi.at_cmd("AT+GMR\r\n")) print("STA mode: ", wifi._at_cmd("AT+CWMODE_DEF=1\r\n")) print("Auto connect: ", wifi._at_cmd("AT+CWAUTOCONN=1\r\n")) print(wifi.at_cmd("AT+CIPSTATUS\r\n")) print("Try AT connect wifi ...", wifi._at_cmd()) if LCD_DEBUG == 1: lcd.clear() lcd.draw_string(30, 150, "Try AT connect wifi ...", lcd.WHITE, lcd.BLACK) lcd.draw_string(110, 170, "{}".format(5-i), lcd.WHITE, lcd.BLACK) wifi.connect(SSID, PASW) if wifi.isconnected(): break except Exception as e: print(e) if wifi.isconnected() == True: print("Network state: {} - {} - {}".format(wifi.isconnected(), wifi.ifconfig()[6], wifi.ifconfig()[0])) if LCD_DEBUG == 1: lcd.clear() lcd.draw_string(65, 130, "Network state:", lcd.WHITE, lcd.BLACK) lcd.draw_string(65, 150, "{}".format(wifi.ifconfig()[6]), lcd.WHITE, lcd.BLACK) lcd.draw_string(65, 170, "{}".format(wifi.ifconfig()[0]), lcd.WHITE, lcd.BLACK) ''' #------------------------------------------------------------------------------# def onCamTimer(timerC): global camCount camCount += 1 print('S: ', camCount) camCount = 0 camTimer = Timer(Timer.TIMER0, Timer.CHANNEL0, mode=Timer.MODE_PERIODIC, period=1, unit=Timer.UNIT_S, callback=onCamTimer, arg=None, start=False, priority=1, div=0) last_cap_time = 0 def CamCapture(img): global last_cap_time if time.ticks_ms() - last_cap_time > 30000 or last_cap_time == 0: #30s for test last_cap_time = time.ticks_ms() f_name = "/flash/oPic.jpg" img.save(f_name, quality=90) print("Will save to {}".format(f_name)) def CamDisplay(): global camCount LcdInit() CamInit() last_cap_time = 0 camTimer.start() while 1: if camCount >= 5: del img lcd.rotation(1) if LCD_DEBUG == 1: lcd.clear() lcd.draw_string(50, 150, "Close cam & lcd ...", lcd.WHITE, lcd.BLACK) print("Close cam & lcd ...") if LCD_DEBUG != 1: lcd.clear() lcd.deinit() camTimer.stop() camCount = 0 sensor.run(0) sensor.shutdown(False) break img = sensor.snapshot() lcd.rotation(0) lcd.display(img) CamCapture(img) #------------------------------------------------------------------------------# fm.register(12, fm.fpioa.UART2_TX, force=True) fm.register(13, fm.fpioa.UART2_RX, force=True) uart4G = UART(UART.UART2, 9600, 8, None, 1, timeout=1000, read_buf_len=4096) sendStep = 5 ipAddress = '' def Uart4GSendImg(sendImg, url): global sendStep, ipAddress imgUrlJson = "" responseJson = "" sleepTime = 0 while 1: if sendStep == 0: #Check 4G model uart4G.write('AT\r\n') while 1: readData4G = uart4G.read() if readData4G != None and 'AT\r\n\r\nOK\r\n' in readData4G: sendStep = 1 break if readData4G != None: print("@0:", readData4G) break time.sleep(sleepTime) elif sendStep == 1: #Check SIM card uart4G.write('AT+CPIN?\r\n') while 1: readData4G = uart4G.read() if readData4G != None and 'AT+CPIN?\r\n\r\n+CPIN: READY\r\n\r\nOK\r\n' in readData4G: sendStep = 2 break if readData4G != None: print("@1:", readData4G) break time.sleep(sleepTime) elif sendStep == 2: #Check regist uart4G.write('AT+CGATT?\r\n') while 1: readData4G = uart4G.read() if readData4G != None and 'AT+CGATT?\r\n\r\n+CGATT: 1\r\n\r\nOK\r\n' in readData4G: sendStep = 3 break if readData4G != None: print("@2:", readData4G) break time.sleep(sleepTime) elif sendStep == 3: #Set APN uart4G.write('AT+CSTT\r\n') while 1: readData4G = uart4G.read() if readData4G != None and 'AT+CSTT\r\n\r\nOK\r\n' in readData4G: sendStep = 4 break elif readData4G != None and 'AT+CSTT\r\n\r\n' in readData4G and '+CME ERROR' in readData4G: sendStep = 5 break if readData4G != None: print("@3:", readData4G) break time.sleep(sleepTime) elif sendStep == 4: #Active network uart4G.write('AT+CIICR\r\n') while 1: readData4G = uart4G.read() if readData4G != None and 'AT+CIICR\r\n\r\nOK\r\n' in readData4G: sendStep = 5 break elif readData4G != None and 'AT+CIICR\r\n\r\n' in readData4G and '+CME ERROR' in readData4G: sendStep = 0 break if readData4G != None: print("@4:", readData4G) break time.sleep(sleepTime) elif sendStep == 5: #Check ip uart4G.write('AT+CIFSR\r\n') while 1: readData4G = uart4G.read() if readData4G != None and 'AT+CIFSR\r\n\r\n' in readData4G and not ('+CME ERROR' in readData4G): ipAddress = readData4G[12:-2] sendStep = 6 break elif readData4G != None and 'AT+CIFSR\r\n\r\n' in readData4G and '+CME ERROR' in readData4G: uart4G.write('AT+CIPSHUT\r\n') while 1: readData4G = uart4G.read() if readData4G != None and 'AT+CIPSHUT\r\n\r\nSHUT OK\r\n' in readData4G: sendStep = 3 break if readData4G != None: print("@5:", readData4G) break if readData4G != None: print("@5:", readData4G) break time.sleep(sleepTime) elif sendStep == 6: uart4G.write('AT+SAPBR=3,1,"CONTYPE","GPRS"\r\n') while 1: readData4G = uart4G.read() if readData4G != None and 'AT+SAPBR=3,1,"CONTYPE","GPRS"\r\n\r\nOK\r\n' in readData4G: sendStep = 7 break if readData4G != None: print("@6:", readData4G) break time.sleep(sleepTime) elif sendStep == 7: uart4G.write('AT+SAPBR=3,1,"APN",""\r\n') while 1: readData4G = uart4G.read() if readData4G != None and 'AT+SAPBR=3,1,"APN",""\r\n\r\nOK\r\n' in readData4G: sendStep = 8 break if readData4G != None: print("@7:", readData4G) break time.sleep(sleepTime) elif sendStep == 8: uart4G.write('AT+SAPBR=1,1\r\n') while 1: readData4G = uart4G.read() if readData4G != None and 'AT+SAPBR=1,1\r\n\r\nOK\r\n' in readData4G: sendStep = 9 break elif readData4G != None and 'AT+SAPBR=1,1\r\n\r\n' in readData4G and '+CME ERROR' in readData4G: sendStep = 9 break if readData4G != None: print("@8:", readData4G) break time.sleep(sleepTime) elif sendStep == 9: uart4G.write('AT+SAPBR=2,1\r\n') while 1: readData4G = uart4G.read() if readData4G != None and 'AT+SAPBR=2,1\r\n\r\n' in readData4G and '+SAPBR: 1,1,' in readData4G: sendStep = 10 break elif readData4G != None and 'AT+SAPBR=2,1\r\n\r\n' in readData4G and not ('+SAPBR: 1,1,' in readData4G): sendStep = 8 break if readData4G != None: print("@9:", readData4G) break time.sleep(sleepTime) elif sendStep == 10: #Init http uart4G.write('AT+HTTPINIT\r\n') while 1: readData4G = uart4G.read() if readData4G != None and 'AT+HTTPINIT\r\n\r\nOK\r\n' in readData4G: sendStep = 11 break elif readData4G != None and 'AT+HTTPINIT\r\n\r\n' in readData4G and '+CME ERROR: 3' in readData4G: sendStep = 11 break if readData4G != None: print("@10:", readData4G) break time.sleep(sleepTime) elif sendStep == 11: #Set http CID uart4G.write('AT+HTTPPARA="CID",1\r\n') while 1: readData4G = uart4G.read() if readData4G != None and 'AT+HTTPPARA="CID",1\r\n\r\nOK\r\n' in readData4G: sendStep = 12 break if readData4G != None: print("@11:", readData4G) break time.sleep(sleepTime) elif sendStep == 12: #Set http url if sendImg == 1: uart4G.write('AT+HTTPPARA="URL","http://images.bemfa.com/upload/v1/upimages.php"\r\n') while 1: readData4G = uart4G.read() if readData4G != None and 'AT+HTTPPARA="URL",' in readData4G and 'OK\r\n' in readData4G: sendStep = 13 break if readData4G != None: print("@12:", readData4G) break else: sendStep = 19 time.sleep(sleepTime) elif sendStep == 13: #Set http user header uart4G.write('AT+HTTPPARA="USERDATA","User-Agent:MaixPy\\r\\nAuthorization:7d54f85af42976ee3c2693e6xxxxxxxx\\r\\nAuthtopic:K210IMG"\r\n') while 1: readData4G = uart4G.read() if readData4G != None and 'AT+HTTPPARA="USERDATA",' in readData4G and 'OK\r\n' in readData4G: sendStep = 15 break if readData4G != None: print("@13:", readData4G) break time.sleep(sleepTime) elif sendStep == 14: uart4G.write('AT+HTTPPARA?\r\n') while 1: readData4G = uart4G.read() if readData4G != None: print("@14:", readData4G) sendStep = 15 break else: break time.sleep(sleepTime) elif sendStep == 15: #Send file/image img = image.Image("/flash/oPic.jpg") img = img.compress(quality=60) imgBytes = img.to_bytes() #uart4G.write('AT+HTTPDATA=1024,100000\r\n') print('AT+HTTPDATA=%d,10000\r\n' % img.size()) uart4G.write('AT+HTTPDATA=%d,10000\r\n' % img.size()) time.sleep(1) while 1: readData4G = uart4G.read() if readData4G != None and 'DOWNLOAD' in readData4G: if imgBytes: print("Write ...") uart4G.write(img) print('Write', len(imgBytes)) while 1: readData4G = uart4G.read() if readData4G != None and '\r\nOK\r\n' in readData4G: sendStep = 16 break if readData4G != None: print("@15:", readData4G) break #elif readData4G == None: #print("Add") #uart4G.write('\r\n') break time.sleep(sleepTime) elif sendStep == 16: #Send http uart4G.write('AT+HTTPACTION=1\r\n') while 1: readData4G = uart4G.read() #if readData4G != None and 'AT+HTTPACTION=1\r\n\r\nOK\r\n' in readData4G: #sendStep = 17 #break if readData4G != None and '+HTTPACTION: 1,200,' in readData4G: sendStep = 17 break if readData4G != None: print("@16:", readData4G) #break time.sleep(sleepTime) elif sendStep == 17: #Get response uart4G.write('AT+HTTPREAD\r\n') while 1: readData4G = uart4G.read() if readData4G != None and 'AT+HTTPREAD\r\n\r\n+HTTPREAD:' in readData4G: if sendImg: imgUrlJson = readData4G[readData4G.index(b"\r\n{")+2:readData4G.index(b"}\r\n")+1] responseJson = None else: responseJson = readData4G[readData4G.index(b"\r\n{")+2:readData4G.index(b"}\r\n")+1] imgUrlJson = None print(readData4G) sendStep = 18 break if readData4G != None: print("@17:", readData4G) break time.sleep(sleepTime) elif sendStep == 18: #Terminate http uart4G.write('AT+HTTPTERM\r\n') while 1: readData4G = uart4G.read() if readData4G != None and 'AT+HTTPTERM\r\n\r\nOK\r\n' in readData4G: sendStep = 20 break if readData4G != None: print("@18:", readData4G) break time.sleep(sleepTime) elif sendStep == 19: #Send message uart4G.write('AT+HTTPPARA="URL","{}"\r\n'.format(url)) while 1: readData4G = uart4G.read() if readData4G != None and 'AT+HTTPPARA="URL",' in readData4G and 'OK\r\n' in readData4G: sendStep = 16 break if readData4G != None: print("@19:", readData4G) break time.sleep(sleepTime) else: if imgUrlJson: sendStep = 5 return imgUrlJson elif responseJson: sendStep = 5 return responseJson else: sendStep = 17 #return None time.sleep(1) print(sendStep) #------------------------------------------------------------------------------# try: import usocket as socket except: import socket class Response: def __init__(self, f): self.raw = f self.encoding = "utf-8" self._cached = None def close(self): if self.raw: self.raw.close() self.raw = None self._cached = None @property def content(self): if self._cached is None: try: self._cached = self.raw.read() finally: self.raw.close() self.raw = None return self._cached @property def text(self): return str(self.content, self.encoding) def json(self): import ujson return ujson.loads(self.content) def request(method, url, data=None, json=None, headers={}, stream=None, parse_headers=True): redir_cnt = 1 if json is not None: assert data is None import ujson data = ujson.dumps(json) while True: try: proto, dummy, host, path = url.split("/", 3) except ValueError: proto, dummy, host = url.split("/", 2) path = "" if proto == "http:": port = 80 elif proto == "https:": port = 443 else: raise ValueError("Unsupported protocol: " + proto) if ":" in host: host, port = host.split(":", 1) port = int(port) ai = socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM) ai = ai[0] resp_d = None if parse_headers is not False: resp_d = {} s = socket.socket(ai[0], ai[1], ai[2]) try: s.connect(ai[-1]) if proto == "https:": s = ssl.wrap_socket(s, server_hostname=host) s.write(b"%s /%s HTTP/1.0\r\n" % (method, path)) if not "Host" in headers: s.write(b"Host: %s\r\n" % host) # Iterate over keys to avoid tuple alloc for k in headers: s.write(k) s.write(b": ") s.write(headers[k]) s.write(b"\r\n") if json is not None: s.write(b"Content-Type: application/json\r\n") if data: s.write(b"Content-Length: %d\r\n" % len(data)) s.write(b"Connection: close\r\n\r\n") if data: s.write(data) l = s.readline() print("lA: ", l) if not l or l == b"\r\n": status = 0 else: l = l.split(None, 2) status = int(l[1]) reason = "" if len(l) > 2: reason = l[2].rstrip() while True: l = s.readline() if not l or l == b"\r\n": break print("lB: ", l) if l.startswith(b"Transfer-Encoding:"): if b"chunked" in l: raise ValueError("Unsupported " + l) elif l.startswith(b"Location:") and 300 <= status <= 399: if not redir_cnt: raise ValueError("Too many redirects") redir_cnt -= 1 url = l[9:].decode().strip() #print("redir to:", url) status = 300 break if parse_headers is False: pass elif parse_headers is True: l = l.decode() k, v = l.split(":", 1) resp_d[k] = v.strip() else: parse_headers(l, resp_d) except OSError: s.close() raise if status != 300: break resp = Response(s) resp.status_code = status resp.reason = reason if resp_d is not None: resp.headers = resp_d return resp def head(url, **kw): return request("HEAD", url, **kw) def get(url, **kw): return request("GET", url, **kw) def post(url, **kw): return request("POST", url, **kw) def put(url, **kw): return request("PUT", url, **kw) def patch(url, **kw): return request("PATCH", url, **kw) def delete(url, **kw): return request("DELETE", url, **kw) imgHeaders = { "Content-Type": "image/jpg", "Authorization": "7d54f85af42976ee3c2693e6xxxxxxxx", "Authtopic": "K210IMG", #"Wechatmsg": "IMG", "User-Agent": "MaixPy" } textHeaders = { "User-Agent": "MaixPy" } def HttpSend(imgPath): print("Sending: '%s'" % imgPath) img = image.Image(imgPath) lcd.rotation(0) lcd.display(img) img = img.compress(quality=60) imgBytes = img.to_bytes() if img == "": print("No file") if LCD_DEBUG == 1: lcd.clear() lcd.draw_string(10, 150, "No file!", lcd.WHITE, lcd.BLACK) else: print("Img size: %d" % len(imgBytes)) res = post("http://images.bemfa.com/upload/v1/upimages.php", data=imgBytes, headers=imgHeaders) print("Response: ", res.status_code) #print("Content: ", res.content) imgUrl = json.loads(res.content)["url"] #del res print("ImgUrl: ", imgUrl) resB = get("http://api.bemfa.com/api/device/v1/data/3/push/get/?uid=7d54f85af42976ee3c2693e6xxxxxxxx&topic=K210T1&msg=imgUrl:%s" % imgUrl, headers=textHeaders) print("Response: ", resB.status_code) #print("Content: ", resB.content) #del resB if LCD_DEBUG == 1: lcd.clear() lcd.rotation(1) lcd.draw_string(10, 150, "Send img!", lcd.WHITE, lcd.BLACK) #------------------------------------------------------------------------------# bellFlag = 0 askDoorFlag = 0 openDoorCFlag = 0 openedDoorFlag = 0 closedDoorFlag = 0 def BellTestIrq(pin_num): global bellFlag, askDoorFlag bellFlag = 1 askDoorFlag = 1 print("Key", pin_num, "\n") fm.register(19, fm.fpioa.GPIOHS10) bellPin = GPIO(GPIO.GPIOHS10, GPIO.IN, GPIO.PULL_NONE) bellPin.irq(BellTestIrq, GPIO.IRQ_RISING, GPIO.WAKEUP_NOT_SUPPORT, 7) fm.register(16, fm.fpioa.GPIOHS11) bootPin = GPIO(GPIO.GPIOHS11, GPIO.IN, GPIO.PULL_NONE) bootPin.irq(BellTestIrq, GPIO.IRQ_RISING, GPIO.WAKEUP_NOT_SUPPORT, 7) ''' def UartInitDeal(): global bellFlag, askDoorFlag, openDoorCFlag, openedDoorFlag, closedDoorFlag askDoorCount = 0 #fm.register(14, fm.fpioa.UART1_TX, force=True) fm.register(15, fm.fpioa.UART1_RX, force=True) uartA = UART(UART.UART1, 9600, 8, None, 1, timeout=1000, read_buf_len=4096) while 1: readData = uartA.read() if readData != None and readData != "" and readData != 0: print(readData) if LCD_DEBUG == 1: lcd.clear() lcd.draw_string(10, 130, ubinascii.hexlify(readData), lcd.WHITE, lcd.BLACK) if b"\xf5\x01\x04\x09\x03\xfa\x0a" in readData: print("U - Bell!") if LCD_DEBUG == 1: lcd.fill_rectangle(10, 130, lcd.width()-10, 40, lcd.BLACK) lcd.draw_string(10, 130, ubinascii.hexlify(b"\xf5\x01\x04\x09\x03\xfa\x0a"), lcd.WHITE, lcd.BLACK) lcd.draw_string(10, 150, "L - BELL!", lcd.WHITE, lcd.BLACK) bellFlag = 1 askDoorFlag = 1 elif b"\xf5\x01\x05\x02\x01\x01\xf3\x0a" in readData: print("U - Door opened!") if LCD_DEBUG == 1: lcd.fill_rectangle(10, 130, lcd.width()-10, 40, lcd.BLACK) lcd.draw_string(10, 130, ubinascii.hexlify(b"\xf5\x01\x05\x02\x01\x01\xf3\x0a"), lcd.WHITE, lcd.BLACK) lcd.draw_string(10, 150, "L - Door opened!", lcd.WHITE, lcd.BLACK) openedDoorFlag = 1 elif b"\xf5\x01\x05\x02\x01\x00\xf2\x0a" in readData: print("U - Door closed!") if LCD_DEBUG == 1: lcd.fill_rectangle(10, 130, lcd.width()-10, 40, lcd.BLACK) lcd.draw_string(10, 130, ubinascii.hexlify(b"\xf5\x01\x05\x02\x01\x00\xf2\x0a"), lcd.WHITE, lcd.BLACK) lcd.draw_string(10, 150, "L - Door closed!", lcd.WHITE, lcd.BLACK) closedDoorFlag = 1 elif readData[0] == 0xaa and readData[1] == 0x55 and readData[2] == 0x03: uartA.write(b"\xaa\x55\x02\x00\x00\x00\x00\x00\x00\x01") print("U - State get!") if readData[3] == 0x00: if LCD_DEBUG == 1: lcd.fill_rectangle(10, 150, lcd.width()-20, 20, lcd.BLACK) lcd.draw_string(10, 150, "L - Door closed!", lcd.WHITE, lcd.BLACK) elif readData[3] == 0x01: if LCD_DEBUG == 1: lcd.fill_rectangle(10, 150, lcd.width()-20, 20, lcd.BLACK) lcd.draw_string(10, 150, "L - Door opened!", lcd.WHITE, lcd.BLACK) elif readData[3] == 0x02: if LCD_DEBUG == 1: lcd.fill_rectangle(10, 150, lcd.width()-20, 20, lcd.BLACK) lcd.draw_string(10, 150, "L - Door alarm!", lcd.WHITE, lcd.BLACK) if readData[4] == 0x00: if LCD_DEBUG == 1: lcd.fill_rectangle(10, 170, lcd.width()-20, 20, lcd.BLACK) lcd.draw_string(10, 170, "L - Lock danger!", lcd.WHITE, lcd.BLACK) else: if LCD_DEBUG == 1: lcd.fill_rectangle(10, 170, lcd.width()-20, 20, lcd.BLACK) lcd.draw_string(10, 170, "L - Lock freeze!", lcd.WHITE, lcd.BLACK) elif readData[3] == 0x03: if LCD_DEBUG == 1: lcd.fill_rectangle(10, 150, lcd.width()-20, 20, lcd.BLACK) lcd.draw_string(10, 150, "L - Door ask!", lcd.WHITE, lcd.BLACK) uartA.write(b"\xaa\x55\x13\x00\x00\x00\x00\x00\x00\x12") #bellFlag = 1 #askDoorFlag = 1 elif readData[3] == 0x04: if readData[4] == 0x08: if LCD_DEBUG == 1: lcd.fill_rectangle(10, 150, lcd.width()-20, 20, lcd.BLACK) lcd.draw_string(10, 150, "L - Low power!", lcd.WHITE, lcd.BLACK) if readData[4] == 0x0c: if LCD_DEBUG == 1: lcd.fill_rectangle(10, 150, lcd.width()-20, 20, lcd.BLACK) lcd.draw_string(10, 150, "L - BELL!", lcd.WHITE, lcd.BLACK) elif readData[0] == 0xaa and readData[1] == 0x55 and readData[2] == 0x1c: uartA.write(b"\xaa\x55\x1d\x00\x00\x00\x00\x00\x00\x1c") if LCD_DEBUG == 1: lcd.fill_rectangle(10, 150, lcd.width()-20, 20, lcd.BLACK) lcd.draw_string(10, 150, "L - Img start!", lcd.WHITE, lcd.BLACK) else: print("U - Wrong command!") if LCD_DEBUG == 1: lcd.fill_rectangle(10, 150, lcd.width()-20, 20, lcd.BLACK) lcd.draw_string(10, 150, "L - Wrong command!", lcd.WHITE, lcd.BLACK) #lcd.fill_rectangle(10, 150, lcd.width()-20, 20, lcd.BLACK) if bellFlag == 1: bellFlag = 0 print("F - Bell!") if LCD_DEBUG == 1: lcd.fill_rectangle(10, 150, lcd.width()-20, 20, lcd.BLACK) lcd.draw_string(100, 150, "BELL!", lcd.WHITE, lcd.BLACK) CamDisplay() EspatInit() if wifi.isconnected() == True: HttpSend("/flash/oPic.jpg") else: askDoorFlag = 0 if LCD_DEBUG == 1: lcd.clear() lcd.draw_string(90, 150, "No wifi!", lcd.RED, lcd.BLACK) if askDoorFlag == 1: print("F - Door ask!") if wifi.isconnected() == True: res = get("http://api.bemfa.com/api/device/v1/data/3/push/get/?uid=7d54f85af42976ee3c2693e6xxxxxxxx&topic=K210T1&msg=Door-ASK", headers=textHeaders) print("Response: ", res.status_code) #print("Content: ", res.content) #del res askDoorFlag = 2 if askDoorFlag == 2: print("Ask door count: %d" % askDoorCount) if askDoorCount > 30: askDoorCount = 0 askDoorFlag = 0 print("Ask door failed!") if LCD_DEBUG == 1: lcd.clear() lcd.draw_string(60, 150, "Ask door failed!", lcd.WHITE, lcd.BLACK) else: if LCD_DEBUG == 1: lcd.clear() lcd.draw_string(65, 150, "Asking door ...", lcd.WHITE, lcd.BLACK) lcd.draw_string(110, 170, "{}".format(31-askDoorCount), lcd.WHITE, lcd.BLACK) askDoorCount += 1 res = get("http://api.bemfa.com/api/device/v1/data/3/get/?uid=7d54f85af42976ee3c2693e6xxxxxxxx&topic=K210T1&num=1", headers=textHeaders) print("Response: ", res.status_code) #print("Content: ", res.content) resJson = json.loads(res.content) resJson = resJson["data"][0]["msg"] #del res print("Msg: ", resJson) if resJson == "Door-Oc": askDoorFlag = 0 openDoorCFlag = 1 fm.register(14, fm.fpioa.UART1_TX, force=True) uartA.write(b"\xF5\x01\x05\x02\x01\x01\xF3\x0A") print("Ask back 'Open' command!") if LCD_DEBUG == 1: lcd.clear() lcd.draw_string(55, 150, "Get open command!", lcd.WHITE, lcd.BLACK) #print(openDoorCFlag) if openDoorCFlag != 0 and openedDoorFlag == 0: openDoorCFlag += 1 if openDoorCFlag >= 500: uartA.write(b"\xF5\x01\x05\x02\x01\x01\xF3\x0A") print("Send open uart.") openDoorCFlag = 1 if openedDoorFlag == 1 and wifi.isconnected() == True: openedDoorFlag = 0 openDoorCFlag = 0 fm.unregister(14) print("F - Door opened!") if LCD_DEBUG == 1: lcd.clear() lcd.draw_string(70, 150, "Door opened!", lcd.WHITE, lcd.BLACK) res = get("http://api.bemfa.com/api/device/v1/data/3/push/get/?uid=7d54f85af42976ee3c2693e6xxxxxxxx&topic=K210T1&msg=Door-Od", headers=textHeaders) print("Response: ", res.status_code) #print("Content: ", res.content) #del res if closedDoorFlag == 1 and wifi.isconnected() == True: closedDoorFlag = 0 print("F - Door closed!") if LCD_DEBUG == 1: lcd.clear() lcd.draw_string(70, 150, "Door closed!", lcd.WHITE, lcd.BLACK) res = get("http://api.bemfa.com/api/device/v1/data/3/push/get/?uid=7d54f85af42976ee3c2693e6xxxxxxxx&topic=K210T1&msg=Door-Cd", headers=textHeaders) print("Response: ", res.status_code) #print("Content: ", res.content) #del res uartA.deinit() del uartA #UartInitDeal() ''' fm.register(11, fm.fpioa.GPIOHS12) airPin = GPIO(GPIO.GPIOHS12, GPIO.OUT, GPIO.PULL_NONE) airPin.irq(BellTestIrq, GPIO.IRQ_RISING, GPIO.WAKEUP_NOT_SUPPORT, 7) def AirIsConnect(): for i in range(5): uart4G.write('AT\r\n') readData4GTemp = uart4G.read() if readData4GTemp != None and 'AT\r\n\r\nOK\r\n' in readData4GTemp: del readData4GTemp return 1 time.sleep(1) del readData4GTemp return 0 def UartInitDeal4G(): global bellFlag, askDoorFlag, openDoorCFlag, openedDoorFlag, closedDoorFlag if LCD_DEBUG == 1: #EspatInit() #Check 4G model connect4gFlag = 0 if LCD_DEBUG == 1: lcd.clear() lcd.draw_string(10, 150, "Air connecting ...", lcd.WHITE, lcd.BLACK) uart4G.write('AT\r\n') while 1: airIsConnectFlag = AirIsConnect() if airIsConnectFlag: print("Air connected.") if LCD_DEBUG == 1: lcd.clear() lcd.draw_string(10, 150, "Air connected!", lcd.WHITE, lcd.BLACK) del airIsConnectFlag break else: airPin.value(0) time.sleep(2) airPin.value(1) askDoorCount = 0 #fm.register(14, fm.fpioa.UART1_TX, force=True) fm.register(15, fm.fpioa.UART1_RX, force=True) uartA = UART(UART.UART1, 9600, 8, None, 1, timeout=1000, read_buf_len=4096) while 1: readData = uartA.read() if readData != None and readData != "" and readData != 0: print(readData) if LCD_DEBUG == 1: lcd.clear() lcd.draw_string(10, 130, ubinascii.hexlify(readData), lcd.WHITE, lcd.BLACK) if b"\xf5\x01\x04\x09\x03\xfa\x0a" in readData: print("U - Bell!") if LCD_DEBUG == 1: lcd.fill_rectangle(10, 130, lcd.width()-10, 40, lcd.BLACK) lcd.draw_string(10, 130, ubinascii.hexlify(b"\xf5\x01\x04\x09\x03\xfa\x0a"), lcd.WHITE, lcd.BLACK) lcd.draw_string(10, 150, "L - BELL!", lcd.WHITE, lcd.BLACK) bellFlag = 1 askDoorFlag = 1 elif b"\xf5\x01\x05\x02\x01\x01\xf3\x0a" in readData: print("U - Door opened!") if LCD_DEBUG == 1: lcd.fill_rectangle(10, 130, lcd.width()-10, 40, lcd.BLACK) lcd.draw_string(10, 130, ubinascii.hexlify(b"\xf5\x01\x05\x02\x01\x01\xf3\x0a"), lcd.WHITE, lcd.BLACK) lcd.draw_string(10, 150, "L - Door opened!", lcd.WHITE, lcd.BLACK) openedDoorFlag = 1 elif b"\xf5\x01\x05\x02\x01\x00\xf2\x0a" in readData: print("U - Door closed!") if LCD_DEBUG == 1: lcd.fill_rectangle(10, 130, lcd.width()-10, 40, lcd.BLACK) lcd.draw_string(10, 130, ubinascii.hexlify(b"\xf5\x01\x05\x02\x01\x00\xf2\x0a"), lcd.WHITE, lcd.BLACK) lcd.draw_string(10, 150, "L - Door closed!", lcd.WHITE, lcd.BLACK) closedDoorFlag = 1 else: print("U - Wrong command!") if LCD_DEBUG == 1: lcd.fill_rectangle(10, 150, lcd.width()-20, 20, lcd.BLACK) lcd.draw_string(10, 150, "L - Wrong command!", lcd.WHITE, lcd.BLACK) #lcd.fill_rectangle(10, 150, lcd.width()-20, 20, lcd.BLACK) if bellFlag == 1: bellFlag = 0 print("F - Bell!") if LCD_DEBUG == 1: lcd.fill_rectangle(10, 150, lcd.width()-20, 20, lcd.BLACK) lcd.draw_string(100, 150, "BELL!", lcd.WHITE, lcd.BLACK) CamDisplay() resJson = Uart4GSendImg(1, "") imgUrl = json.loads(resJson)["url"] print("ImgUrl:", imgUrl) time.sleep(1) resJson = Uart4GSendImg(0, "http://api.bemfa.com/api/device/v1/data/3/push/get/?uid=7d54f85af42976ee3c2693e6xxxxxxxx&topic=K210T1&msg=imgUrl:%s" % imgUrl) print("Response:", resJson) if askDoorFlag == 1: print("F - Door ask!") resJson = Uart4GSendImg(0, "http://api.bemfa.com/api/device/v1/data/3/push/get/?uid=7d54f85af42976ee3c2693e6xxxxxxxx&topic=K210T1&msg=Door-ASK") print("Response: ", resJson) askDoorFlag = 2 if askDoorFlag == 2: print("Ask door count: %d" % askDoorCount) if askDoorCount > 30: askDoorCount = 0 askDoorFlag = 0 print("Ask door failed!") if LCD_DEBUG == 1: lcd.clear() lcd.draw_string(60, 150, "Ask door failed!", lcd.WHITE, lcd.BLACK) else: if LCD_DEBUG == 1: lcd.clear() lcd.draw_string(65, 150, "Asking door ...", lcd.WHITE, lcd.BLACK) lcd.draw_string(110, 170, "{}".format(31-askDoorCount), lcd.WHITE, lcd.BLACK) time.sleep(1) askDoorCount += 1 resJson = Uart4GSendImg(0, "http://api.bemfa.com/api/device/v1/data/3/get/?uid=7d54f85af42976ee3c2693e6xxxxxxxx&topic=K210T1&num=1") if resJson: print("Response: ", resJson) resJson = json.loads(resJson) resJson = resJson["data"][0]["msg"] print("Msg: ", resJson) if resJson == "Door-Oc": askDoorFlag = 0 openDoorCFlag = 1 fm.register(14, fm.fpioa.UART1_TX, force=True) uartA.write(b"\xF5\x01\x05\x02\x01\x01\xF3\x0A") time.sleep(1) uartA.write(b"\xF5\x01\x05\x02\x01\x01\xF3\x0A") time.sleep(1) print("Ask back 'Open' command!") if LCD_DEBUG == 1: lcd.clear() lcd.draw_string(55, 150, "Get open command!", lcd.WHITE, lcd.BLACK) #print(openDoorCFlag) if openDoorCFlag != 0 and openedDoorFlag == 0: openDoorCFlag += 1 if openDoorCFlag >= 600: uartA.write(b"\xF5\x01\x05\x02\x01\x01\xF3\x0A") print("Send open uart.") openDoorCFlag = 1 if openedDoorFlag == 1: openedDoorFlag = 0 openDoorCFlag = 0 fm.unregister(14) print("F - Door opened!") if LCD_DEBUG == 1: lcd.clear() lcd.draw_string(70, 150, "Door opened!", lcd.WHITE, lcd.BLACK) resJson = Uart4GSendImg(0, "http://api.bemfa.com/api/device/v1/data/3/push/get/?uid=7d54f85af42976ee3c2693e6xxxxxxxx&topic=K210T1&msg=Door-Od") print("Response: ", resJson) if closedDoorFlag == 1: closedDoorFlag = 0 print("F - Door closed!") if LCD_DEBUG == 1: lcd.clear() lcd.draw_string(70, 150, "Door closed!", lcd.WHITE, lcd.BLACK) resJson = Uart4GSendImg(0, "http://api.bemfa.com/api/device/v1/data/3/push/get/?uid=7d54f85af42976ee3c2693e6xxxxxxxx&topic=K210T1&msg=Door-Cd") print("Response: ", resJson) #uartA.deinit() #del uartA UartInitDeal4G()network_espat.py(Backup)
# This file is part of MaixPY # Copyright (c) sipeed.com # # Licensed under the MIT license: # http://www.opensource.org/licenses/mit-license.php # import time, network from Maix import GPIO from machine import UART from fpioa_manager import fm from board import board_info class wifi(): __is_m1w__ = True uart = None eb = None nic = None def init(): if __class__.__is_m1w__: fm.register(0, fm.fpioa.GPIOHS1, force=True) M1wPower=GPIO(GPIO.GPIOHS1, GPIO.OUT) M1wPower.value(0) # b'\r\n ets Jan 8 2013,rst cause:1, boot mode:(7,6)\r\n\r\nwaiting for host\r\n' fm.register(board_info.WIFI_EN, fm.fpioa.GPIOHS0) # board_info.WIFI_EN == IO 8 __class__.en = GPIO(GPIO.GPIOHS0,GPIO.OUT) fm.register(board_info.WIFI_RX,fm.fpioa.UART2_TX) # board_info.WIFI_RX == IO 7 fm.register(board_info.WIFI_TX,fm.fpioa.UART2_RX) # board_info.WIFI_TX == IO 6 __class__.uart = UART(UART.UART2, 115200, timeout=1000, read_buf_len=8192) def enable(en): __class__.en.value(en) def _at_cmd(cmd="AT\r\n", resp="OK\r\n", timeout=20): __class__.uart.write(cmd) # "AT+GMR\r\n" time.sleep_ms(timeout) tmp = __class__.uart.read() # print(tmp) if tmp and tmp.endswith(resp): return True return False def at_cmd(cmd="AT\r\n", timeout=20): __class__.uart.write(cmd) # "AT+GMR\r\n" time.sleep_ms(timeout) tmp = __class__.uart.read() return tmp def reset(force=False, reply=5): if force == False and __class__.isconnected(): return True __class__.init() for i in range(reply): print('reset...') __class__.enable(False) time.sleep_ms(50) __class__.enable(True) time.sleep_ms(500) # at start > 500ms if __class__._at_cmd(timeout=500): break __class__._at_cmd() __class__._at_cmd('AT+UART_CUR=921600,8,1,0,0\r\n', "OK\r\n") __class__.uart = UART(UART.UART2, 921600, timeout=1000, read_buf_len=10240) # important! baudrate too low or read_buf_len too small will loose data #print(__class__._at_cmd()) try: __class__.nic = network.ESP8285(__class__.uart) time.sleep_ms(500) # wait at ready to connect except Exception as e: print(e) return False return True def connect(ssid="wifi_name", pasw="pass_word"): if __class__.nic != None: return __class__.nic.connect(ssid, pasw) def ifconfig(): # should check ip != 0.0.0.0 if __class__.nic != None: return __class__.nic.ifconfig() def isconnected(): if __class__.nic != None: return __class__.nic.isconnected() return False if __name__ == "__main__": # It is recommended to callas a class library (upload network_espat.py) # from network_espat import wifi SSID = "LSJ" PASW = "95898858" def check_wifi_net(reply=5): if wifi.isconnected() != True: for i in range(reply): try: wifi.reset() print('try AT connect wifi...', wifi._at_cmd()) wifi.connect(SSID, PASW) if wifi.isconnected(): break except Exception as e: print(e) return wifi.isconnected() if wifi.isconnected() == False: check_wifi_net() print('network state:', wifi.isconnected(), wifi.ifconfig()) # The network is no longer configured repeatedly import socket sock = socket.socket() # your send or recv # see other demo_socket_tcp.py / udp / http / mqtt sock.close() '''ouput >>> raw REPL; CTRL-B to exit >OK [Warning] function is used by fm.fpioa.GPIOHS1(pin:17) [Warning] function is used by fm.fpioa.GPIOHS0(pin:16) reset... try AT connect wifi... True could not connect to ssid=Sipeed_2.4G reset... try AT connect wifi... True network state: True ('192.168.0.165', '255.255.255.0', '192.168.0.1', '0', '0', 'b0:b9:8a:5b:be:7f', 'Sipeed_2.4G') > MicroPython v0.5.1-136-g039f72b6c-dirty on 2020-11-18; Sipeed_M1 with kendryte-k210 Type "help()" for more information. >>> >>> >>> raw REPL; CTRL-B to exit >OK network state: True ('192.168.0.165', '255.255.255.0', '192.168.0.1', '0', '0', 'b0:b9:8a:5b:be:7f', 'Sipeed_2.4G') > '''umqtt_simple.py(Backup)
import usocket as socket import ustruct as struct from ubinascii import hexlify class MQTTException(Exception): pass class MQTTClient: def __init__(self, client_id, server, port=0, user=None, password=None, keepalive=0, ssl=False, ssl_params={}): if port == 0: port = 8883 if ssl else 1883 self.client_id = client_id self.sock = None self.server = server self.port = port self.ssl = ssl self.ssl_params = ssl_params self.pid = 0 self.cb = None self.user = user self.pswd = password self.keepalive = keepalive self.lw_topic = None self.lw_msg = None self.lw_qos = 0 self.lw_retain = False def _send_str(self, s): self.sock.write(struct.pack("!H", len(s))) self.sock.write(s) def _recv_len(self): n = 0 sh = 0 while 1: b = self.sock.read(1)[0] n |= (b & 0x7f) << sh if not b & 0x80: return n sh += 7 def set_callback(self, f): self.cb = f def set_last_will(self, topic, msg, retain=False, qos=0): assert 0 <= qos <= 2 assert topic self.lw_topic = topic self.lw_msg = msg self.lw_qos = qos self.lw_retain = retain def connect(self, clean_session=True): self.sock = socket.socket() addr = socket.getaddrinfo(self.server, self.port)[0][-1] self.sock.connect(addr) if self.ssl: import ussl self.sock = ussl.wrap_socket(self.sock, **self.ssl_params) premsg = bytearray(b"\x10\0\0\0\0\0") msg = bytearray(b"\x04MQTT\x04\x02\0\0") sz = 10 + 2 + len(self.client_id) msg[6] = clean_session << 1 if self.user is not None: sz += 2 + len(self.user) + 2 + len(self.pswd) msg[6] |= 0xC0 if self.keepalive: assert self.keepalive < 65536 msg[7] |= self.keepalive >> 8 msg[8] |= self.keepalive & 0x00FF if self.lw_topic: sz += 2 + len(self.lw_topic) + 2 + len(self.lw_msg) msg[6] |= 0x4 | (self.lw_qos & 0x1) << 3 | (self.lw_qos & 0x2) << 3 msg[6] |= self.lw_retain << 5 i = 1 while sz > 0x7f: premsg[i] = (sz & 0x7f) | 0x80 sz >>= 7 i += 1 premsg[i] = sz self.sock.write(premsg, i + 2) self.sock.write(msg) #print(hex(len(msg)), hexlify(msg, ":")) self._send_str(self.client_id) if self.lw_topic: self._send_str(self.lw_topic) self._send_str(self.lw_msg) if self.user is not None: self._send_str(self.user) self._send_str(self.pswd) resp = self.sock.read(4) assert resp[0] == 0x20 and resp[1] == 0x02 if resp[3] != 0: raise MQTTException(resp[3]) return resp[2] & 1 def disconnect(self): self.sock.write(b"\xe0\0") self.sock.close() def ping(self): self.sock.write(b"\xc0\0") def publish(self, topic, msg, retain=False, qos=0): pkt = bytearray(b"\x30\0\0\0") pkt[0] |= qos << 1 | retain sz = 2 + len(topic) + len(msg) if qos > 0: sz += 2 assert sz < 2097152 i = 1 while sz > 0x7f: pkt[i] = (sz & 0x7f) | 0x80 sz >>= 7 i += 1 pkt[i] = sz #print(hex(len(pkt)), hexlify(pkt, ":")) self.sock.write(pkt, i + 1) self._send_str(topic) if qos > 0: self.pid += 1 pid = self.pid struct.pack_into("!H", pkt, 0, pid) self.sock.write(pkt, 2) self.sock.write(msg) if qos == 1: while 1: op = self.wait_msg() if op == 0x40: sz = self.sock.read(1) assert sz == b"\x02" rcv_pid = self.sock.read(2) rcv_pid = rcv_pid[0] << 8 | rcv_pid[1] if pid == rcv_pid: return elif qos == 2: assert 0 def subscribe(self, topic, qos=0): assert self.cb is not None, "Subscribe callback is not set" pkt = bytearray(b"\x82\0\0\0") self.pid += 1 struct.pack_into("!BH", pkt, 1, 2 + 2 + len(topic) + 1, self.pid) #print(hex(len(pkt)), hexlify(pkt, ":")) self.sock.write(pkt) self._send_str(topic) self.sock.write(qos.to_bytes(1, "little")) while 1: op = self.wait_msg() if op == 0x90: resp = self.sock.read(4) #print(resp) assert resp[1] == pkt[2] and resp[2] == pkt[3] if resp[3] == 0x80: raise MQTTException(resp[3]) return # Wait for a single incoming MQTT message and process it. # Subscribed messages are delivered to a callback previously # set by .set_callback() method. Other (internal) MQTT # messages processed internally. def wait_msg(self): res = self.sock.read(1) self.sock.setblocking(True) if res is None: return None if res == b"": return None#raise OSError(-1) if res == b"\xd0": # PINGRESP sz = self.sock.read(1)[0] assert sz == 0 return None op = res[0] if op & 0xf0 != 0x30: return op sz = self._recv_len() topic_len = self.sock.read(2) topic_len = (topic_len[0] << 8) | topic_len[1] topic = self.sock.read(topic_len) sz -= topic_len + 2 if op & 6: pid = self.sock.read(2) pid = pid[0] << 8 | pid[1] sz -= 2 msg = self.sock.read(sz) self.cb(topic, msg) if op & 6 == 2: pkt = bytearray(b"\x40\x02\0\0") struct.pack_into("!H", pkt, 2, pid) self.sock.write(pkt) elif op & 6 == 4: assert 0 # Checks whether a pending message from server is available. # If not, returns immediately with None. Otherwise, does # the same processing as wait_msg. def check_msg(self): self.sock.setblocking(False) return self.wait_msg()
评论已关闭