From 0ef84bf14f7d6bd9ec35f28dc2826c0054a744ab Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Tue, 26 Nov 2024 21:45:06 +0000 Subject: [PATCH] Copy updated files to the release folder --- software/release/adxl345.py | 53 ++ software/release/files.py | 96 +++ software/release/icons.py | 241 +++++++ software/release/main.py | 45 ++ software/release/networking.py | 1096 +++++++++++++++++++++++++++++++ software/release/sensors.py | 146 ++++ software/release/servo.py | 33 + software/release/smartmotor.py | 444 +++++++++++++ software/release/ssd1306.py | 158 +++++ software/release/variableLED.py | 93 +++ 10 files changed, 2405 insertions(+) create mode 100644 software/release/adxl345.py create mode 100644 software/release/files.py create mode 100644 software/release/icons.py create mode 100644 software/release/main.py create mode 100644 software/release/networking.py create mode 100644 software/release/sensors.py create mode 100644 software/release/servo.py create mode 100644 software/release/smartmotor.py create mode 100644 software/release/ssd1306.py create mode 100644 software/release/variableLED.py diff --git a/software/release/adxl345.py b/software/release/adxl345.py new file mode 100644 index 0000000..034b647 --- /dev/null +++ b/software/release/adxl345.py @@ -0,0 +1,53 @@ +#https://github.com/DFRobot/micropython-dflib/blob/master/ADXL345/user_lib/ADXL345.py +from machine import Pin,I2C +import math +import time + +device = const(0x53) +regAddress = const(0x32) +TO_READ = 6 +buff = bytearray(6) + +class ADXL345: + def __init__(self,i2c,addr=device): + self.addr = addr + self.i2c = i2c + b = bytearray(1) + b[0] = 0 + self.i2c.writeto_mem(self.addr,0x2d,b) + b[0] = 16 + self.i2c.writeto_mem(self.addr,0x2d,b) + b[0] = 8 + self.i2c.writeto_mem(self.addr,0x2d,b) + + @property + def xValue(self): + buff = self.i2c.readfrom_mem(self.addr,regAddress,TO_READ) + x = (int(buff[1]) << 8) | buff[0] + if x > 32767: + x -= 65536 + return x + + @property + def yValue(self): + buff = self.i2c.readfrom_mem(self.addr,regAddress,TO_READ) + y = (int(buff[3]) << 8) | buff[2] + if y > 32767: + y -= 65536 + return y + + @property + def zValue(self): + buff = self.i2c.readfrom_mem(self.addr,regAddress,TO_READ) + z = (int(buff[5]) << 8) | buff[4] + if z > 32767: + z -= 65536 + return z + + def RP_calculate(self,x,y,z): + roll = math.atan2(y , z) * 57.3 + pitch = math.atan2((- x) , math.sqrt(y * y + z * z)) * 57.3 + return roll,pitch + + + diff --git a/software/release/files.py b/software/release/files.py new file mode 100644 index 0000000..38efd56 --- /dev/null +++ b/software/release/files.py @@ -0,0 +1,96 @@ +import sys +def savetofile(pointstosave): # the points to save should have format of [[light, pot],[light,pot]] + import os + if(os.listdir().count('data.py')): + import data + datapoints=[] + del sys.modules["data"] + import data + try: + datapoints=data.points + datapoints.append(pointstosave) + except: + datapoints.append(pointstosave) + del sys.modules["data"] + #getting ready to reimporting data file + else: + datapoints=[] + datapoints.append(pointstosave) + print("new file") + + #writing files to the data.py + f=open("data.py","w") + f.write("points="+str(datapoints)+"\r\n") + f.close() + + +def cleardatafile(): # the points to save should have format of [[light, pot],[light,pot]] + import os + f=open("data.py","w") + f.write("points=[]") + f.close() + + +def replacefile(pointstosave): + import os + if(os.listdir().count('data.py')): + f=open("data.py","w") + f.write("points="+str(pointstosave)+"\r\n") + f.close() + else: + return 0 + + + +def readfile(): + + import os + if(os.listdir().count('data.py')): + import data + if(data.points): + return(data.points) + else: + print("returning blank") + return([]) + else: + return([]) + + #also make this go home + +def resetlog(): + import os + try: + os.remove("log.py") + except: + print("no log file to remove") + +def resetprefs(): + f=open("prefs.py","w") + f.write("log=False\r\n") + f.close() + + + +def setprefs(): + f=open("prefs.py","w") + f.write("log=True\r\n") + f.close() + + +def savetolog(*logtext): # the points to save should have format of [[light, pot],[light,pot]] + import os + if(os.listdir().count('log.py')): + f=open("log.py","a") + try: + print("writing",logtext) + f.write(str(logtext)+"\r\n") + except: + print("errr") + else: + f=open("log.py","w") + f.write(str(logtext)+",") + + #writing files to the data.py + f.close() + + diff --git a/software/release/icons.py b/software/release/icons.py new file mode 100644 index 0000000..f60745b --- /dev/null +++ b/software/release/icons.py @@ -0,0 +1,241 @@ +import framebuf +import ssd1306 +from machine import Pin +import time + + +MAX_BATTERY=2900 +MIN_BATTERY=2600 + + +#define icons here +def createIcons(iconSize, iconFrames, offsetx=0, offsety=0, direction=0): + icons=[] + padding=2 + spacingV=int((screenHeight - iconSize*len(iconFrames))/ (len(iconFrames))-padding) + spacingH=int((screenWidth - iconSize*len(iconFrames))/ (len(iconFrames))-padding) + + for i in range(len(iconFrames)): + Iconx=offsetx + i * (spacingH + iconSize) * ((direction+1)%2) + Icony=offsety# + i * (spacingV + iconSize) * ((direction)%2) + + icons.append([Iconx,Icony,iconFrames[i]]) + return icons + + + + +#Homescreen icons +Icons=[] + +screenWidth=128 +screenHeight=120 + +#logo +fb_SMLOGO = framebuf.FrameBuffer(bytearray(b'\x00\x00\x01\xe0\x00\x00\x00\x00\x00\xe1\xe1\xc0\x00\x00\x00\x00\xff\xff\xc0\x00\x00\x00\x00\xff\xff\xc0\x00\x00\x00\x11\xff\xff\xe2\x00\x00\x00\x7f\xff\xff\xff\x80\x00\x00\x7f\xff\xff\xff\x80\x00\x00?\xff\xff\xff\x00\x00\x00\x7f\xff\xff\xff\x80\x00\x06\xff\xf0\x03\xff\xd8\x00\x07\xff\xc0\x00\xff\xf8\x00\x0f\xff\x00\x00?\xfc\x00\x07\xfe\x00\x00\x1f\xf8\x00\x07\xfc\x00\x00\x0f\xf8\x00\x07\xf8\x00\x00\x07\xf8\x00\x0f\xf0\x00\x00\x03\xfc\x00\x7f\xe0\x00\x00\x01\xff\x80\x7f\xe0\x00\x00\x01\xff\x80\x7f\xc0\x00\x00\x00\xff\x80?\xc0\x00\x00\x00\xff\x00\x1f\xc0\x00\x00\x00\xfe\x00\x1f\x80\x00\x00\x00~\x00?\x80\x00\x00\x00\x7f\x00\xff\x80\x00\x079\xbf\xc0\xff\x80\x00\x07\xbd\xbf\xc0\xff\x80\x00\x079\xbf\xc0\xff\x80\x00\x00\x00\xff\xc0?\x80\x00\x00\x00\x7f\x00\x1f\xc0\x00\x00\x00\xfe\x00\x1f\xc0\x00\x00\x00\xfe\x00?\xc0\x00\x00\x00\xff\x00\x7f\xe0\x00\x00\x01\xff\x80\x7f\xe0\x00\x00\x01\xff\x80\x7f\xf0\x00\x00\x03\xff\x80\x0f\xf0\x00\x00\x03\xfc\x00\x07\xf8\x00\x00\x07\xf8\x00\x03\xfc\x00\x00\x0f\xf0\x00\x07\xff\x00\x00?\xf8\x00\x0f\xff\x80\x00\x7f\xfc\x00\x07\xff\xe0\x01\xff\xf8\x00\x06\x7f\xfe\x1f\xff\x98\x00\x00?\xff\xff\xff\x00\x00\x00?\xff\xff\xff\x00\x00\x00\x7f\xff\xff\xff\x80\x00\x00{\xff\xff\xf7\x80\x00\x00\x10\xff\xff\xc2\x00\x00\x00\x00\xff\xff\xc0\x00\x00\x00\x00\xf1\xe3\xc0\x00\x00\x00\x00\xe1\xe1\xc0\x00\x00\x00\x00\x01\xe0\x00\x00\x00'), 50, 50, framebuf.MONO_HLSB) +#plug +fb_battcharging = framebuf.FrameBuffer(bytearray(b'?\xff\xff\xc0 \x00\x00@ \x06\x00@\xe0\x07\x80@\xe0\x7f\xc0@\xe0<\x00@\xe0\x0c\x00@ \x00\x00@ \x00\x00@?\xff\xff\xc0'), 26, 10, framebuf.MONO_HLSB) +fb_batthigh = framebuf.FrameBuffer(bytearray(b'?\xff\xff\xc0 \x00\x00@/\xef\xef@\xef\xef\xef@\xef\xef\xef@\xef\xef\xef@\xef\xef\xef@/\xef\xef@ \x00\x00@?\xff\xff\xc0'), 26, 10, framebuf.MONO_HLSB) +fb_battmid = framebuf.FrameBuffer(bytearray(b'?\xff\xff\xc0 \x00\x00@ \x0f\xef@\xe0\x0f\xef@\xe0\x0f\xef@\xe0\x0f\xef@\xe0\x0f\xef@ \x0f\xef@ \x00\x00@?\xff\xff\xc0'), 26, 10, framebuf.MONO_HLSB) +fb_battlow = framebuf.FrameBuffer(bytearray(b'?\xff\xff\xc0 \x00\x00@ \x00\x0f@\xe0\x00\x0f@\xe0\x00\x0f@\xe0\x00\x0f@\xe0\x00\x0f@ \x00\x0f@ \x00\x00@?\xff\xff\xc0'), 26, 10, framebuf.MONO_HLSB) + +#HomeScreen Icons +fb_Train = framebuf.FrameBuffer(bytearray(b'\xff\xff\xff\xff\x80\x00\x00\x01\x80\x00\x00\x01\x80\x00\x00\x01\x80\x00\x00\x01\x80\xff\xff\x01\x80\x80\x01\x01\x80\x80\x01\x01\x80\xe0\x01\x01\x81\xf0\x01\x01\x81\xf8\x01\x01\x83\xf8\x01\x01\x81\xf0\x01\x01\x81\xf0A\x01\x80\xe0\x81\x01\x81\xf1\x81\x01\x83\xfb\x01\x01\x83\xfe\x01\x01\x87\xfc\x01\x01\x87\xfc\x01\x01\x87\xff\xff\x01\x87\xfc\x00\x01\x87\xfc\x00\x01\x87\xfc\x00\x01\x83\xf8\x00\x01\x83\xf8\x00\x01\x81\xf0\x00\x01\x80\x00\x00\x01\x80\x00\x00\x01\x80\x00\x00\x01\x80\x00\x00\x01\xff\xff\xff\xff'), 32, 32, framebuf.MONO_HLSB) +fb_Setting = framebuf.FrameBuffer(bytearray(b'\xff\xff\xff\xff\x80\x00\x00\x01\x80\x00\x00\x01\x80\x00\x00\x01\x80\x01\xc3\x01\x81\xc3\xc7\x81\x81\xe3\xc7\x81\x81\xff\xef\x01\x81\xff\xff\x01\x80\xff\xfe\x19\x80\xfc?9\xb9\xf0\x0f\xfd\xbf\xe0\x07\xfd\xbf\xc0\x03\xfd\xbf\x80\x01\xf1\x93\x80\x01\xc1\x83\x80\x01\xc1\x83\x80\x01\xd9\xbf\x80\x01\xfd\xbf\xc0\x03\xfd\xbf\xe0\x07\xfd\xbf\xf0\x0f\x85\x9c\xfc?\x01\x80\x7f\xff\x81\x80\x7f\xff\xc1\x80\xff\xef\x81\x81\xf1\xe3\x81\x81\xe3\xe1\x81\x80\xe3\xe0\x01\x80\x00\x00\x01\x80\x00\x00\x01\xff\xff\xff\xff'), 32, 32, framebuf.MONO_HLSB) +fb_Play = framebuf.FrameBuffer(bytearray(b'\xff\xff\xff\xff\x80\x00\x00\x01\x81\x00\x00\x01\x81\xc0\x00\x01\x81\xe0\x00\x01\x81\xf8\x00\x01\x81\xfe\x00\x01\x81\xff\x00\x01\x81\xff\xc0\x01\x81\xff\xe0\x01\x81\xff\xf8\x01\x81\xff\xfe\x01\x81\xff\xff\x01\x81\xff\xff\xc1\x81\xff\xff\xe1\x81\xff\xff\xf9\x81\xff\xff\xe1\x81\xff\xff\xc1\x81\xff\xff\x01\x81\xff\xfe\x01\x81\xff\xf8\x01\x81\xff\xe0\x01\x81\xff\xc0\x01\x81\xff\x00\x01\x81\xfe\x00\x01\x81\xf8\x00\x01\x81\xe0\x00\x01\x81\xc0\x00\x01\x81\x00\x00\x01\x80\x00\x00\x01\x80\x00\x00\x01\xff\xff\xff\xff'), 32, 32, framebuf.MONO_HLSB) + +#TrainScreen Icons +fb_add = framebuf.FrameBuffer(bytearray(b'\xff\xff\xff\x80\x80\x00\x00\x80\x80\x00\x00\x80\x80\x00\x00\x80\x80\x00\x00\x80\x80\x00\x00\x80\x80\x1e\x00\x80\x80\x1e\x00\x80\x80\x1e\x00\x80\x80\x1e\x00\x80\x83\xff\xe0\x80\x83\xff\xe0\x80\x83\xff\xe0\x80\x83\xff\xe0\x80\x80\x1e\x00\x80\x80\x1e\x00\x80\x80\x1e\x00\x80\x80\x1e\x00\x80\x80\x1e\x00\x80\x80\x00\x00\x80\x80\x00\x00\x80\x80\x00\x00\x80\x80\x00\x00\x80\x80\x00\x00\x80\xff\xff\xff\x80'), 25, 25, framebuf.MONO_HLSB) +fb_delete = framebuf.FrameBuffer(bytearray(b'\xff\xff\xff\x80\x80\x00\x00\x80\x80\x00\x00\x80\x8c\x00\x18\x80\x9e\x00<\x80\x8f\x00x\x80\x87\x80\xf0\x80\x83\xc1\xe0\x80\x81\xe3\xc0\x80\x80\xf7\x80\x80\x80\x7f\x00\x80\x80>\x00\x80\x80>\x00\x80\x80\x7f\x00\x80\x80\xff\x80\x80\x81\xe3\xc0\x80\x83\xc1\xe0\x80\x87\x80\xf0\x80\x8f\x00x\x80\x9f\x00|\x80\x8c\x00\x18\x80\x8c\x00\x18\x80\x80\x00\x00\x80\x80\x00\x00\x80\xff\xff\xff\x80'), 25, 25, framebuf.MONO_HLSB) +fb_smallplay = framebuf.FrameBuffer(bytearray(b'\xff\xff\xff\x80\x80\x00\x00\x80\x80\x00\x00\x80\x80\x00\x00\x80\x80\x00\x00\x80\x83\x00\x00\x80\x83\xc0\x00\x80\x83\xf0\x00\x80\x83\xf8\x00\x80\x83\xfe\x00\x80\x83\xff\x80\x80\x83\xff\xc0\x80\x83\xff\xf0\x80\x83\xff\xf0\x80\x83\xff\xc0\x80\x83\xff\x80\x80\x83\xfe\x00\x80\x83\xf8\x00\x80\x83\xf0\x00\x80\x83\xc0\x00\x80\x83\x00\x00\x80\x80\x00\x00\x80\x80\x00\x00\x80\x80\x00\x00\x80\xff\xff\xff\x80'), 25, 25, framebuf.MONO_HLSB) +fb_back = framebuf.FrameBuffer(bytearray(b'\xff\xff\xff\x80\x80\x00\x00\x80\x80\x00\x00\x80\x80\x00\x00\x80\x80\x07\x80\x80\x80\x0f\x80\x80\x80\x1f\x80\x80\x80>\x00\x80\x80~\x00\x80\x80\xf8\x00\x80\x81\xf8\x00\x80\x83\xe0\x00\x80\x83\xe0\x00\x80\x81\xf8\x00\x80\x80\xf8\x00\x80\x80~\x00\x80\x80>\x00\x80\x80\x1f\x80\x80\x80\x0f\x80\x80\x80\x07\x80\x80\x80\x00\x00\x80\x80\x00\x00\x80\x80\x00\x00\x80\x80\x00\x00\x80\xff\xff\xff\x80'), 25, 25, framebuf.MONO_HLSB) + +fb_upnobox = framebuf.FrameBuffer(bytearray(b'\x04\x00\x0e\x00\x1f\x00?\x80\x7f\xc0\xff\xe0\x1f\x00\x1f\x00'), 11, 8, framebuf.MONO_HLSB) +fb_downnobox = framebuf.FrameBuffer(bytearray(b'\x1f\x00\x1f\x00\xff\xe0\x7f\xc0?\x80\x1f\x00\x0e\x00\x04\x00'), 11, 8, framebuf.MONO_HLSB) + +#PlayScreen Icons +fb_home = framebuf.FrameBuffer(bytearray(b'\xff\xff\xff\x80\x80\x00\x00\x80\x80\x00\x00\x80\x80>\x00\x80\x80\x7f\x00\x80\x80\xff\x80\x80\x81\xff\xc0\x80\x83\xff\xe0\x80\x87\xff\xf0\x80\x8f\xff\xf8\x80\x9f\xff\xfc\x80\xbf\xff\xfe\x80\x87\xff\xf0\x80\x87\xff\xf0\x80\x87\xc1\xf0\x80\x87\xc1\xf0\x80\x87\xc1\xf0\x80\x87\xc1\xf0\x80\x87\xc1\xf0\x80\x87\xc1\xf0\x80\x87\xc1\xf0\x80\x80\x00\x00\x80\x80\x00\x00\x80\x80\x00\x00\x80\xff\xff\xff\x80'), 25, 25, framebuf.MONO_HLSB) +fb_pause = framebuf.FrameBuffer(bytearray(b'\xff\xff\xff\x80\x80\x00\x00\x80\x80\x00\x00\x80\x80\x00\x00\x80\x80\x00\x00\x80\x80\x00\x00\x80\x80\xc1\x80\x80\x81\xe3\xc0\x80\x81\xe3\xc0\x80\x81\xe3\xc0\x80\x81\xe3\xc0\x80\x81\xe3\xc0\x80\x81\xe3\xc0\x80\x81\xe3\xc0\x80\x81\xe3\xc0\x80\x81\xe3\xc0\x80\x81\xe3\xc0\x80\x81\xe3\xc0\x80\x80\xc1\x80\x80\x80\x00\x00\x80\x80\x00\x00\x80\x80\x00\x00\x80\x80\x00\x00\x80\x80\x00\x00\x80\xff\xff\xff\x80'), 25, 25, framebuf.MONO_HLSB) +fb_save = framebuf.FrameBuffer(bytearray(b'\xff\xff\xff\x80\x80\x00\x00\x80\x80\x00\x00\x80\x9f\xfe\x00\x80\x9f\xff\x00\x80\x9c\x03\x80\x80\x9c\x03\xc0\x80\x9c\x03\xe0\x80\x9c\x03\xf0\x80\x9c\x03\xf8\x80\x9f\xff\xf8\x80\x9f\xff\xfc\x80\x9f\xff\xfc\x80\x9f\xfc<\x80\x9f\xf8\x1c\x80\x9f\xf8\x1c\x80\x9f\xf8\x1c\x80\x9f\xf8\x1c\x80\x9f\xfc<\x80\x9f\xff\xfc\x80\x9f\xff\xfc\x80\x9f\xff\xfc\x80\x80\x00\x00\x80\x80\x00\x00\x80\xff\xff\xff\x80'), 25, 25, framebuf.MONO_HLSB) +fb_toggle = framebuf.FrameBuffer(bytearray(b'\xff\xff\xff\x80\x80\x00\x00\x80\x80\x00\x00\x80\x80\x00\x00\x80\x80\x00\x00\x80\x87\xf8\x00\x80\x84\x08\x00\x80\x84\x08\x00\x80\x84\x08\x00\x80\x84\x08\x00\x80\x84\x08\x00\x80\x9c\x0f\xfc\x80\x9c\x0f\xfc\x80\x9c\x0f\xfc\x80\x84\x08\x00\x80\x84\x08\x00\x80\x84\x08\x00\x80\x84\x08\x00\x80\x84\x08\x00\x80\x87\xf8\x00\x80\x80\x00\x00\x80\x80\x00\x00\x80\x80\x00\x00\x80\x80\x00\x00\x80\xff\xff\xff\x80'), 25, 25, framebuf.MONO_HLSB) +fb_settings_small = framebuf.FrameBuffer(bytearray(b'\xff\xff\xff\x80\x80\x00\x00\x80\x80\x00\x00\x80\x87\x00\x00\x80\x9d\xff\xfc\x80\x9d\xff\xfc\x80\x9d\xff\xfc\x80\x87\x00\x00\x80\x80\x00\x00\x80\x80\x00\x00\x80\x80\x07\x00\x80\x9f\xfd\xfc\x80\x9f\xfd\xfc\x80\x9f\xfd\xfc\x80\x80\x07\x00\x80\x80\x00\x00\x80\x80\x00\x00\x80\x80p\x00\x80\x9f\xdf\xfc\x80\x9f\xdf\xfc\x80\x9f\xdf\xfc\x80\x80p\x00\x80\x80\x00\x00\x80\x80\x00\x00\x80\xff\xff\xff\x80'), 25, 25, framebuf.MONO_HLSB) +fb_help = framebuf.FrameBuffer(bytearray(b'\xff\xff\xff\x80\x80\x00\x00\x80\x80\x00\x00\x80\x80\x00\x00\x80\x87\xff\xf0\x80\x87\xff\xf0\x80\x87\xff\xf0\x80\x80\x00p\x80\x80\x00p\x80\x80\x00p\x80\x80\x7f\xf0\x80\x80\x7f\xf0\x80\x80\x7f\xf0\x80\x80p\x00\x80\x80p\x00\x80\x80p\x00\x80\x80p\x00\x80\x80\x00\x00\x80\x80\x00\x00\x80\x80p\x00\x80\x80p\x00\x80\x80p\x00\x80\x80\x00\x00\x80\x80\x00\x00\x80\xff\xff\xff\x80'), 25, 25, framebuf.MONO_HLSB) + +#load saved files +fb_prev = framebuf.FrameBuffer(bytearray(b'\xff\xff\xff\x80\x80\x00\x00\x80\x80\x00\x00\x80\x80\x00\x00\x80\x80\x07\x80\x80\x80\x0f\x80\x80\x80\x1f\x80\x80\x80>\x00\x80\x80~\x00\x80\x80\xf8\x00\x80\x81\xff\xf0\x80\x83\xff\xf0\x80\x83\xff\xf0\x80\x81\xff\xf0\x80\x80\xf8\x00\x80\x80~\x00\x80\x80>\x00\x80\x80\x1f\x80\x80\x80\x0f\x80\x80\x80\x07\x80\x80\x80\x00\x00\x80\x80\x00\x00\x80\x80\x00\x00\x80\x80\x00\x00\x80\xff\xff\xff\x80'), 25, 25, framebuf.MONO_HLSB) +fb_next = framebuf.FrameBuffer(bytearray(b'\xff\xff\xff\x80\x80\x00\x00\x80\x80\x00\x00\x80\x80\x00\x00\x80\x80\x00\x00\x80\x80\xf0\x00\x80\x80\xf8\x00\x80\x80\xfc\x00\x80\x80>\x00\x80\x80?\x00\x80\x80\x0f\x80\x80\x87\xff\xc0\x80\x87\xff\xe0\x80\x87\xff\xe0\x80\x87\xff\xc0\x80\x80\x0f\x80\x80\x80?\x00\x80\x80>\x00\x80\x80\xfc\x00\x80\x80\xf8\x00\x80\x80\xf0\x00\x80\x80\x00\x00\x80\x80\x00\x00\x80\x80\x00\x00\x80\xff\xff\xff\x80'), 25, 25, framebuf.MONO_HLSB) +fb_load = framebuf.FrameBuffer(bytearray(b'\xff\xff\xff\x80\x80\x00\x00\x80\x80\x7f\x00\x80\x80\x7f\x00\x80\x80\x7f\x00\x80\x80\x7f\x00\x80\x80\x7f\x00\x80\x80\x7f\x00\x80\x80\x7f\x00\x80\x80\x7f\x00\x80\x83\xff\xe0\x80\x81\xff\xc0\x80\xb0\xff\x86\x80\xb0\xff\x86\x80\xb0\x7f\x06\x80\xb0>\x06\x80\xb0>\x06\x80\xb0\x1c\x06\x80\xb0\x18\x06\x80\xb0\x08\x06\x80\xbf\xff\xfe\x80\xbf\xff\xfe\x80\x80\x00\x00\x80\x80\x00\x00\x80\xff\xff\xff\x80'), 25, 25, framebuf.MONO_HLSB) + +#Settings +fb_BIGHome = framebuf.FrameBuffer(bytearray(b'\xff\xff\xff\xff\x80\x00\x00\x01\x80\x00\x00\x01\x80\x00\x00\x01\x80\x00\x00\x01\x80\x01\x80\x01\x80\x03\xc0\x01\x80\x0f\xf0\x01\x80\x1f\xf8\x01\x80?\xfc\x01\x80\x7f\xfe\x01\x81\xff\xff\x81\x83\xff\xff\xc1\x87\xff\xff\xe1\x8f\xff\xff\xf1\x83\xff\xff\xc1\x83\xff\xff\xc1\x83\xff\xff\xc1\x83\xd7\x1f\xc1\x83\xff\x1f\xc1\x83\xd7\x1f\xc1\x83\xff\x1f\xc1\x83\xff\x1f\xc1\x83\xff\x1f\xc1\x80\x00\x00\x01\x80\x00\x00\x01\x80\x00\x00\x01\x80\x00\x00\x01\x80\x00\x00\x01\x80\x00\x00\x01\x80\x00\x00\x01\xff\xff\xff\xff'), 32, 32, framebuf.MONO_HLSB) +fb_Lead = framebuf.FrameBuffer(bytearray(b'\xff\xff\xff\xff\x80\x00\x00\x01\x80\x00\x00\x01\x80\x00\x00\x01\x80\x00\x00\x01\x80\x00\x00\x01\x83\xf8\x0f\xe1\x82\x00\x00!\x82\x00\x00!\x82x\x0f!\x82@\x01!\x82@\x01!\x82L\x19!\x82H\t!\x82I\xc9!\x82I\xc9!\x82I\xc9!\x82H\x89!\x82@\x81!\x82@\x81!\x82p\x87!\x82\x00\x80!\x82\x00\x80!\x83\xf8\x8f\xe1\x80\x00\x80\x01\x80\x00\x80\x01\x80\x00\x80\x01\x80\x00\x00\x01\x80\x00\x00\x01\x80\x00\x00\x01\x80\x00\x00\x01\xff\xff\xff\xff'), 32, 32, framebuf.MONO_HLSB) +fb_Follow = framebuf.FrameBuffer(bytearray(b'\xff\xff\xff\xff\x80\x00\x00\x01\x80\x00\x00\x01\x80\x00\x00\x01\x80\x00\x00\x01\x80\x00\x00\x01\x80\x00\x00\x01\x80\x00\x00\x01\x80\x00\x00\x01\x80\x00\x00\x01\x80\x07\xe0\x01\x80\x0c0\x01\x80\x10\x08\x01\x80 \x04\x01\x80@\x02\x01\x80@\x02\x01\x80@\x02\x01\x80@\x02\x01\x80@\x02\x01\x80 \x04\x01\x80\x10\x0c\x01\x80\x0c<\x01\x80\x07\xee\x01\x80\x00\x07\x01\x80\x00\x03\x81\x80\x00\x01\x01\x80\x00\x00\x01\x80\x00\x00\x01\x80\x00\x00\x01\x80\x00\x00\x01\x80\x00\x00\x01\xff\xff\xff\xff'), 32, 32, framebuf.MONO_HLSB) + +#Add the icons to the array, add iconsizes add the direction 0 - horizontal , 1 - vertical +#iconFrames=[[fb_Train,fb_Play,fb_Setting],[fb_add,fb_delete,fb_smallplay,fb_home],[fb_save,fb_pause,fb_home,fb_toggle],[fb_next,fb_delete,fb_home,fb_toggle],[fb_Lead,fb_Follow, fb_BIGHome]] +#iconFrames=[[fb_Train,fb_Play],[fb_add,fb_delete,fb_smallplay,fb_home],[fb_save,fb_home],[fb_next,fb_delete,fb_home]] +#iconFrames=[[fb_Train,fb_Play],[fb_smallplay,fb_add,fb_delete,fb_settings_small,fb_help],[fb_next,fb_delete,fb_home]] +iconFrames=[[fb_Train,fb_Play],[fb_smallplay,fb_add,fb_delete],[fb_next,fb_delete,fb_home]] + +iconSize=[32,25,25] +offsets= [(20,20),(102,29),(102,29)] #where you want to display your first icon +direction=[0,1,1] # 0 horizonal and 1 vertical arrangement + +for index,icon in enumerate(iconFrames): + icons = createIcons(iconSize[index], icon, offsetx=offsets[index][0], offsety=offsets[index][1] , direction=direction[index]) + Icons.append(icons) + +class SSD1306_SMART(ssd1306.SSD1306_I2C): + def __init__(self, width, height, i2c, addr=0x3C, external_vcc=False, scale = 8, mode = 0, plotsize = [[3,3],[100,60]]): + self.scale = scale + self.mode = mode # 0 for learn, 1 for repeat + self.plotsize = plotsize + self.ranges = {'light': [0, 4095], 'motor': [0, 180], 'screenx': [4, 96], 'screeny': [59, 5]} # screeny is backwards because it is from top to bottom + #Battery + + super().__init__(width, height, i2c, addr = 0x3C, external_vcc = external_vcc) + self.graphx=27 + self.graphy=0 + + self.iconx=0 + self.icony=29 + + self.color=0 + self.graphwidthx=100 + self.graphwidthy=64 + self.iconwidth=25 + self.updownwidth=10 + self.updownx=self.iconx+int(self.iconwidth/2-self.updownwidth/2) + self.upy=self.icony - self.updownwidth + self.downy=self.icony + self.iconwidth + 1 + + + + + + def displayscreen(self,whereamI): + for IconX,IconY,frame in Icons[whereamI]: + self.blit(frame, IconX, IconY, 0) + self.show() + + def selector(self,whereamI,icon,previcon): + print("icon and previcon and whereamI",icon,previcon, whereamI) + if(direction[whereamI]==0): + padding=3 + width=height=iconSize[whereamI]+2*padding + self.rect(Icons[whereamI][previcon][0]-padding,Icons[whereamI][previcon][1]-padding,width,height, 0) #delete previous selector icon + self.rect(Icons[whereamI][icon][0]-padding,Icons[whereamI][icon][1]-padding,width,height, 1) #display current selector + self.displayscreen(whereamI) + else: + + + self.fill_rect(self.iconx,self.icony,self.iconwidth,self.iconwidth, 0) #delete previous selector icon + self.blit(fb_upnobox,self.updownx,self.upy,0) + self.blit(Icons[whereamI][icon][2], self.iconx, self.icony, 0) + self.blit(fb_downnobox,self.updownx,self.downy,0) + #self.blit(fb_DOWN, 102, 49, 0) + self.show() + + + def showbattery(self, batterylevel): + length=20 + width=6 + gap=2 + # if ( batterylevel>3000): #charging + if(batterylevel=="charging"): #charging + self.blit(fb_battcharging,self.iconx,0,0) + elif(batterylevel=="full"): #full charge + self.blit(fb_batthigh,self.iconx,0,0) + elif(batterylevel=="half"): #medium charge + self.blit(fb_battmid,self.iconx,0,0) + elif(batterylevel=="low"): # low charge + self.blit(fb_battlow,self.iconx,0,0) + else: + pass + self.show() + + + + def transform(self,initial, final, value): + initial = self.ranges[initial] + final = self.ranges[final] + return int((final[1]-final[0]) / (initial[1]-initial[0]) * (value - initial[0]) + final[0]) + + def graph(self, oldpoint,point, points, color): + if color==self.color: + #don't change + pass + else: + self.color=color + #clear the screen with the latest color + self.fill_rect(self.graphx,self.graphy,self.graphwidthx,self.graphwidthy,color) + + + rectsize=8 + dotsize=4 + + + ox,oy=oldpoint + x,y=point + + ox=self.transform('light', 'screenx', ox)+self.graphx + oy=self.transform('motor','screeny',oy)+self.graphy + x=self.transform('light', 'screenx', x)+self.graphx + y=self.transform('motor','screeny',y)+self.graphy + self.rect(self.graphx,self.graphy,self.graphwidthx,self.graphwidthy,(color+1)%2) + + self.line(self.graphx,oy,self.graphx+self.graphwidthx,oy,color%2) + self.line(ox,self.graphy,ox,self.graphy+self.graphwidthy,color%2) + + self.line(self.graphx,y,self.graphx+self.graphwidthx,y,(color+1)%2) + self.line(x,self.graphy,x,self.graphy+self.graphwidthy,(color+1)%2) + + self.rect(ox-int(rectsize/2),oy-int(rectsize/2),rectsize,rectsize,color%2) + self.rect(x-int(rectsize/2),y-int(rectsize/2),rectsize,rectsize,(color+1)%2) + for i in points: + x,y=i + x=self.transform('light', 'screenx', x)+self.graphx + y=self.transform('motor','screeny',y)+self.graphy + self.fill_rect(x-int(dotsize/2),y-int(dotsize/2),dotsize,dotsize,(color+1)%2) + self.show() + + def cleargraph(self): + self.fill_rect(self.graphx+1,self.graphy+1,self.graphwidthx-2,self.graphwidthy-2,0) + self.rect(self.graphx,self.graphy,self.graphwidthx,self.graphwidthy,1) + + + + def showmessage(self,msg): + + self.fill_rect(30,10,90,50,0) + self.text(msg,35,20,1) + self.show() + time.sleep(2) + self.fill_rect(30,10,90,50,0) + #self.fill(0) + self.show() + + + def welcomemessage(self): + self.fill(1) + self.show() + + for a in range(32): + i=2*a + self.fill_rect(64-(2*i),32-i,4*i,2*i,0) + self.blit(fb_SMLOGO,39,7,0) + self.show() + ''' + for i in range(80): + self.fill(0) + self.blit(fb_SMLOGO,38-i,7,0) + self.blit(fb_SMLOGO,40+i,7,0) + self.show() + ''' + self.clear() + self.show() + + def clear(self): + self.fill(0) + self.show() + diff --git a/software/release/main.py b/software/release/main.py new file mode 100644 index 0000000..7b44041 --- /dev/null +++ b/software/release/main.py @@ -0,0 +1,45 @@ +from machine import Pin +import machine +import gc +gc.collect() + +import time + +print("Running pyscript networking tool") + +from networking import Networking + +#Network +networking = Networking(True, False, True) +peer_mac = b'\xff\xff\xff\xff\xff\xff' + +print(f"{(time.ticks_ms() - networking.inittime) / 1000:.3f} Name: {networking.name}, ID: {networking.id}, config: {networking.config}, Sta mac: {networking.sta.mac()}, Ap mac: {networking.ap.mac()}, Version: {networking.version_n}") + +lastPressed = 0 + +message="Boop!" + +def boop(pin): + global lastPressed + if(time.ticks_ms()-lastPressed>1000): + lastPressed = time.ticks_ms() + networking.aen.ping(peer_mac) + networking.aen.echo(peer_mac, message) + networking.aen.send(peer_mac, message) + print(f"{(time.ticks_ms() - networking.inittime) / 1000:.3f} Networking Tool: Sent {message} to {peer_mac}") + print(f"{(time.ticks_ms() - networking.inittime) / 1000:.3f} Networking Tool: RSSI table: {networking.aen.rssi()}") + +switch_select = Pin(9, Pin.IN, Pin.PULL_UP) + +#Buttons +switch_select = Pin(9, Pin.IN, Pin.PULL_UP) +switch_select.irq(trigger=Pin.IRQ_FALLING, handler=boop) + +def heartbeat(timer): + print("") + print(f"{(time.ticks_ms() - networking.inittime) / 1000:.3f} Networking Tool Heartbeat: {gc.mem_free()} bytes") + print("") + gc.collect() + +timer = machine.Timer(0) +timer.init(period=5000, mode=machine.Timer.PERIODIC, callback=heartbeat) diff --git a/software/release/networking.py b/software/release/networking.py new file mode 100644 index 0000000..2ff74cf --- /dev/null +++ b/software/release/networking.py @@ -0,0 +1,1096 @@ +import network +import machine +from config import mysecrets, configname, config, whitelist, i2c_dict, version, msg_codes, msg_subcodes, networking_keys +import time +import ubinascii +import urequests +import espnow +import gc +import asyncio +import struct +import json +import os + + +class Networking: + def __init__(self, infmsg=False, dbgmsg=False, admin=False, inittime=time.ticks_ms()): + gc.collect() + self.inittime = inittime + if infmsg: + print(f"{(time.ticks_ms() - self.inittime) / 1000:.3f} Initialising Networking") + self.master = self + self.infmsg = infmsg + self.dbgmsg = dbgmsg + self.admin = admin + + self._staif = network.WLAN(network.STA_IF) + self._apif = network.WLAN(network.AP_IF) + + self.sta = self.Sta(self, self._staif) + self.ap = self.Ap(self, self._apif) + self.aen = self.Aen(self) + + self.id = ubinascii.hexlify(machine.unique_id()).decode() + self.name = configname + if not self.name: + self.name = str(self.id) + self.config = config + self.version = version + self.version_n = ''.join(str(value) for value in self.version.values()) + if infmsg: + print(f"{(time.ticks_ms() - self.inittime) / 1000:.3f} seconds: Networking initialised and ready") + + def cleanup(self): + self.dprint(".cleanup") + self.aen.cleanup() + self._staif.active(False) + self._apif.active(False) + + def iprint(self, message): + if self.infmsg: + try: + print(f"{(time.ticks_ms() - self.inittime) / 1000:.3f} Networking Info: {message}") + except Exception as e: + print(f"Error printing networking Info: {e}") + return + + def dprint(self, message): + if self.dbgmsg: + try: + print(f"{(time.ticks_ms() - self.inittime) / 1000:.3f} Networking Debug: {message}") + except Exception as e: + print(f"Error printing networking Debug: {e}") + return + + class Sta: + def __init__(self, master, _staif): + self.master = master + self._sta = _staif + self._sta.active(True) + self.master.iprint("STA initialised and ready") + + def scan(self): + self.master.dprint("sta.scan") + scan_result = self._sta.scan() + if self.master.infmsg: + for ap in scan_result: + self.master.iprint(f"SSID:%s BSSID:%s Channel:%d Strength:%d RSSI:%d Auth:%d " % ap) + return scan_result + + def connect(self, ssid, key="", timeout=10): + self.master.dprint("sta.connect") + self._sta.connect(ssid, key) + stime = time.time() + while time.time() - stime < timeout: + if self._sta.ifconfig()[0] != '0.0.0.0': + self.master.iprint("Connected to WiFi") + return + time.sleep(0.1) + self.master.iprint(f"Failed to connect to WiFi: {self._sta.status()}") + + def disconnect(self): + self.master.dprint("sta.disconnect") + self._sta.disconnect() + + def ip(self): + self.master.dprint("sta.ip") + return self._sta.ifconfig() + + def mac(self): + self.master.dprint("sta.mac") + return bytes(self._sta.config('mac')) + + def mac_decoded(self): # Necessary? + self.master.dprint("sta.mac_decoded") + return ubinascii.hexlify(self._sta.config('mac'), ':').decode() + + def channel(self): + self.master.dprint("sta.channel") + return self._sta.config('channel') + + def set_channel(self, number): + self.master.dprint("sta.set_channel") + if number > 14 or number < 0: + number = 0 + self._sta.config(channel=number) + self.master.dprint(f"STA channel set to {number}") + + def get_joke(self): + self.master.dprint("sta.get_joke") + try: + reply = urequests.get('https://v2.jokeapi.dev/joke/Programming') + if reply.status_code == 200: + joke = reply.json() + return joke.get('setup', '') + '\n' + joke.get('delivery', joke.get('joke', '')) + except Exception as e: + print('Error fetching joke:', str(e)) + return None + + class Ap: + def __init__(self, master, _apif): + self.master = master + self._ap = _apif + self._ap.active(True) + self.master.iprint("AP initialised and ready") + + def set_ap(self, name="", password="", max_clients=10): + self.master.dprint("ap.set_ap") + if name == "": + name = self.master.name + self._ap.active(True) + self._ap.config(essid=name) + if password: + self._ap.config(authmode=network.AUTH_WPA_WPA2_PSK, password=password) + self._ap.config(max_clients=max_clients) + self.master.iprint(f"Access Point {name} set with max clients {max_clients}") + + def deactivate(self): + self.master.dprint("ap.deactivate") + self._ap.active(False) + self.master.iprint("Access Point deactivated") + + def ip(self): + self.master.dprint("ap.ip") + return self._ap.ifconfig() + + def mac(self): + self.master.dprint("ap.mac") + return bytes(self._ap.config('mac')) + + def mac_decoded(self): + self.master.dprint("ap.mac_decoded") + return ubinascii.hexlify(self._ap.config('mac'), ':').decode() + + def channel(self): + self.master.dprint("ap.channel") + return self._ap.config('channel') + + def set_channel(self, number): + self.master.dprint("ap.set_channel") + if number > 14 or number < 0: + number = 0 + self._ap.config(channel=number) + self.master.dprint(f"AP channel set to {number}") + + class Aen: + def __init__(self, master): + self.master = master + self._aen = espnow.ESPNow() + self._aen.active(True) + + self._peers = {} + self._received_messages = [] + self._received_messages_size = [] + self._long_buffer = {} + self._long_buffer_size = {} + self.received_sensor_data = {} + self.received_rssi_data = {} + self._irq_function = None + self._pause_function = None + self.boops = 0 + self.ifidx = 0 # 0 sends via sta, 1 via ap + # self.channel = 0 + + self._whitelist = whitelist + + # Flags + self._pairing_enabled = True + self._pairing = False + self._paired = False + self._paired_macs = [] + self._running = True + + self._aen.irq(self._irq) + + self.master.iprint("ESP-NOW initialised and ready") + + def cleanup(self): + self.master.iprint("aen.cleanup") + self.irq(None) + self._aen.active(False) + # add delete buffers and stuff + + def update_peer(self, peer_mac, name=None, channel=None, ifidx=None): + self.master.dprint("aen.update_peer") + if peer_mac in self._peers: + try: + if name is not None: + self._peers[peer_mac]['name'] = name + if channel is not None: + self._peers[peer_mac]['channel'] = channel + if ifidx is not None: + self._peers[peer_mac]['ifidx'] = ifidx + self.master.dprint(f"Peer {peer_mac} updated to channel {channel}, ifidx {ifidx} and name {name}") + except OSError as e: + print(f"Error updating peer {peer_mac}: {e}") + return + self.master.iprint(f"Peer {peer_mac} not found") + + def add_peer(self, peer_mac, name=None, channel=None, ifidx=None): + self.master.dprint("aen.add_peer") + if peer_mac not in self._peers: + try: + self._peers[peer_mac] = {'channel': channel, 'ifidx': ifidx, 'name': name} + self.master.dprint(f"Peer {peer_mac} added with channel {channel}, ifidx {ifidx} and name {name}") + except OSError as e: + print(f"Error adding peer {peer_mac}: {e}") + else: + self.master.dprint(f"Peer {peer_mac} already exists, updating") + self.update_peer(peer_mac, name, channel, ifidx) + + def remove_peer(self, peer_mac): + self.master.dprint("aen.remove_peers") + if peer_mac in self._peers: + try: + del self._peers[peer_mac] + self.master.iprint(f"Peer {peer_mac} removed") + except OSError as e: + print(f"Error removing peer {peer_mac}: {e}") + + def peers(self): + self.master.dprint("aen.peers") + return self._peers + + def peer_name(self, key): + self.master.dprint("aen.name") + if key in self._peers: + return self._peers[key]['name'] + else: + return None + + def rssi(self): + self.master.dprint("aen.rssi") + return self._aen.peers_table + + # Send cmds + def send_command(self, msg_subkey, mac, payload=None, channel=None, ifidx=None, sudo=False): + self.master.dprint("aen.send_command") + if sudo and isinstance(payload, list): + payload.append("sudo") + elif sudo and payload is None: + payload = ["sudo"] + else: + payload = [payload, "sudo"] + if (msg_key := "cmd") and msg_subkey in msg_subcodes[msg_key]: + self.master.iprint( + f"Sending {msg_subkey} ({bytes([msg_subcodes[msg_key][msg_subkey]])}) command to {mac} ({self.peer_name(mac)})") + self._compose(mac, payload, msg_codes[msg_key], msg_subcodes[msg_key][msg_subkey], channel, ifidx) + else: + self.master.iprint(f"Command {msg_subkey} not found") + gc.collect() + + def reboot(self, mac, channel=None, ifidx=None, sudo=False): + self.master.dprint("aen.reboot") + self.send_command("Reboot", mac, None, channel, ifidx, sudo) + + def firmware_update(self, mac, channel=None, ifidx=None, sudo=False): + self.master.dprint("aen.firmware_update") + self.send_command("Firmware-Update", mac, None, channel, ifidx, sudo) + + def file_update(self, mac, channel=None, ifidx=None, sudo=False): + self.master.dprint("aen.file_update") + self.send_command("File-Update", mac, None, channel, ifidx, sudo) + + def file_download(self, mac, link, file_list=None, channel=None, ifidx=None, sudo=False): + self.master.dprint("aen.file_download") + self.send_command("File-Download", mac, [link, file_list], channel, ifidx, sudo) + + def web_repl(self, mac, channel=None, ifidx=None, sudo=False): + self.master.dprint("aen.web_repl") + self.master.ap.set_ap(ap_name := self.master.name, password := networking_keys["default_ap_key"]) + self.send_command("Web-Repl", mac, [ap_name, password], channel, ifidx, sudo) + # await success message and if success False disable AP or try again + + def file_run(self, mac, filename, channel=None, ifidx=None, sudo=False): + self.master.dprint("aen.file_run") + self.send_command("File-Run", mac, filename, channel, ifidx, sudo) + + def admin_set(self, mac, new_bool, channel=None, ifidx=None, sudo=False): + self.master.dprint("aen.admin_set") + self.send_command("Admin-Set", mac, new_bool, channel, ifidx, sudo) + + def whitelist_add(self, mac, mac_list=None, channel=None, ifidx=None, sudo=False): + self.master.dprint("aen.whitelist_add") + if mac_list is not None: + mac_list = [self.master.sta.mac, self.master.ap.mac] + self.send_command("Whitelist-Add", mac, mac_list, channel, ifidx, sudo) + + def config_change(self, mac, new_config, channel=None, ifidx=None, sudo=False): + self.master.dprint("aen.config_chain") + self.send_command("Config-Chain", mac, new_config, channel, ifidx, sudo) + + def ping(self, mac, channel=None, ifidx=None): + self.master.dprint("aen.ping") + if bool(self.ifidx): + send_channel = self.master.ap.channel() + else: + send_channel = self.master.sta.channel() + self.send_command("Ping", mac, [send_channel, self.ifidx, self.master.name], channel, + ifidx) # sends channel, ifidx and name + + def pair(self, mac, key=networking_keys["handshake_key1"], channel=None, ifidx=None): + self.master.dprint("aen.pair") + self.send_command("Pair", mac, key, channel, ifidx) + + def pair_enable(self, mac, pair_bool, channel=None, ifidx=None, sudo=False): + self.master.dprint("aen.pair") + self.send_command("Set-Pair", mac, pair_bool, channel, ifidx, sudo) + + def boop(self, mac, channel=None, ifidx=None, sudo=False): + self.master.dprint("aen.boop") + self.send_command("RSSI/Status/Config-Boop", mac, None, channel, ifidx, sudo) + + def directory_get(self, mac, channel=None, ifidx=None, sudo=False): + self.master.dprint("aen.directory_get") + self.send_command("Directory-Get", mac, None, channel, ifidx, sudo) + + def echo(self, mac, message, channel=None, ifidx=None): + self.master.dprint("aen.echo") + try: + self.master.iprint(f"Sending echo ({message}) to {mac} ({self.peer_name(mac)})") + except Exception as e: + self.master.iprint( + f"Sending echo to {mac} ({self.peer_name(mac)}), but error printing message content: {e}") + self._compose(mac, message, 0x01, 0x15, channel, ifidx) + gc.collect() + + # resend cmd + + def wifi_connect(self, mac, name, password, channel=None, ifidx=None, sudo=False): + self.master.dprint("aen.wifi_connect") + self.send_command("Wifi-Connect", mac, [name, password], channel, ifidx, sudo) + + def wifi_disconnect(self, mac, channel=None, ifidx=None, sudo=False): + self.master.dprint("aen.wifi_disconnect") + self.send_command("Wifi-Disconnect", mac, None, channel, ifidx, sudo) + + def ap_enable(self, mac, name, password, channel=None, ifidx=None, sudo=False): + self.master.dprint("aen.ap_enable") + self.send_command("AP-Enable", mac, [name, password], channel, ifidx, sudo) + + def ap_disable(self, mac, channel=None, ifidx=None, sudo=False): + self.master.dprint("aen.ap_disable") + self.send_command("AP-Disable", mac, None, channel, ifidx, sudo) + + def pause(self, mac, channel=None, ifidx=None, sudo=False): + self.master.dprint("aen.pause") + self.send_command("Pause", mac, None, channel, ifidx, sudo) + + def resume(self, mac, channel=None, ifidx=None, sudo=False): + self.master.dprint("aen.resume") + self.send_command("Resume", mac, None, channel, ifidx, sudo) + + def send(self, mac, message, channel=None, ifidx=None): + self.master.dprint("aen.message") + if len(str(message)) > 241: + try: + self.master.iprint( + f"Sending message ({str(message)[:50] + '... (truncated)'}) to {mac} ({self.peer_name(mac)})") + except Exception as e: + self.master.iprint( + f"Sending message to {mac} ({self.peer_name(mac)}), but error printing message content: {e}") + gc.collect() + else: + self.master.iprint(f"Sending message ({message}) to {mac} ({self.peer_name(mac)})") + self._compose(mac, message, 0x02, 0x22, channel, ifidx) + gc.collect() + self.master.dprint(f"Free memory: {gc.mem_free()}") + + def broadcast(self, message, channel=None, ifidx=None): + self.master.dprint("aen.broadcast") + mac = b'\xff\xff\xff\xff\xff\xff' + self.send(mac, message, channel, ifidx) + + def send_sensor(self, mac, message, channel=None, + ifidx=None): # message is a dict, key is the sensor type and the value is the sensor value + self.master.dprint("aen.message") + try: + self.master.iprint(f"Sending sensor data ({message}) to {mac} ({self.peer_name(mac)})") + except Exception as e: + self.master.iprint( + f"Sending sensor data to {mac} ({self.peer_name(mac)}), but error printing message content: {e}") + self._compose(mac, message, 0x02, 0x21, channel, ifidx) + + def check_messages(self): + self.master.dprint("aen.check_message") + return len(self._received_messages) > 0 + + def return_message(self): + self.master.dprint("aen.return_message") + if self.check_messages(): + self._received_messages_size.pop() + return self._received_messages.pop() + return None, None, None + + def return_messages(self): + self.master.dprint("aen.return_messages") + if self.check_messages(): + messages = self._received_messages[:] + self._received_messages.clear() + self._received_messages_size.clear() + gc.collect() + return messages + return [(None, None, None)] + + def _irq(self): + self.master.dprint("aen._irq") + if self.master.admin: + try: + self._receive() + if self._irq_function and self.check_messages() and self._running: + self._irq_function() + gc.collect() + return + except KeyboardInterrupt: + # machine.disable_irq() #throws errors + self.master.iprint("aen._irq except KeyboardInterrupt") + # self._aen.irq(None) #does not work + self._aen.active(False) + # self.master.cleanup() #network cleanup + # self.cleanup() #aen cleanup + raise SystemExit( + "Stopping networking execution. ctrl-c or ctrl-d again to stop main code") # in thonny stops library code but main code keeps running, same in terminal + else: + self._receive() + if self._irq_function and self.check_messages() and self._running: + self._irq_function() + gc.collect() + return + + def irq(self, func): + self.master.dprint("aen.irq") + self._irq_function = func + + def _send(self, peers_mac, messages, channel, ifidx): + self.master.dprint("aen._send") + + if isinstance(peers_mac, bytes): + peers_mac = [peers_mac] + for peer_mac in peers_mac: + try: + if channel is not None and ifidx is not None: + self._aen.add_peer(peer_mac, channel=channel, ifidx=ifidx) + elif channel is not None: + if peer_mac in self._peers: + self._aen.add_peer(peer_mac, channel=channel, ifidx=self._peers[peer_mac]['ifidx']) + else: + self._aen.add_peer(peer_mac, channel=channel, ifidx=self.ifidx) + elif ifidx is not None: + if peer_mac in self._peers: + self._aen.add_peer(peer_mac, channel=self._peers[peer_mac]['channel'], ifidx=ifidx) + else: + self._aen.add_peer(peer_mac, channel=0, ifidx=ifidx) + elif peer_mac in self._peers: + self._aen.add_peer(peer_mac, channel=self._peers[peer_mac]['channel'], + ifidx=self._peers[peer_mac]['ifidx']) + else: + self._aen.add_peer(peer_mac, channel=0, ifidx=self.ifidx) + self.master.dprint(f"Added {peer_mac} to espnow buffer") + except Exception as e: + self.master.iprint(f"Error adding {peer_mac} to espnow buffer: {e}") + + for m in range(len(messages)): + for i in range(3): + i += i + try: + self._aen.send(peer_mac, messages[m]) + break + except Exception as e: + self.master.iprint(f"Error sending to {peer_mac}: {e}") + self.master.dprint(f"Sent {messages[m]} to {peer_mac} ({self.peer_name(peer_mac)})") + gc.collect() + + try: + self._aen.del_peer(peer_mac) + self.master.dprint(f"Removed {peer_mac} from espnow buffer") + except Exception as e: + print(f"Error removing {peer_mac} from espnow buffer: {e}") + + def _compose(self, peer_mac, payload=None, msg_type=0x02, subtype=0x22, channel=None, ifidx=None): + self.master.dprint("aen._compose") + + if isinstance(peer_mac, list): + for peer_macs in peer_mac: + if peer_macs not in self._peers: + self.add_peer(peer_macs, None, channel, ifidx) + elif peer_mac not in self._peers: + self.add_peer(peer_mac, None, channel, ifidx) + + def __encode_payload(): + self.master.dprint("aen.__encode_payload") + if payload is None: # No payload type + return b'\x00', b'' + elif isinstance(payload, bytearray): # bytearray + return b'\x01', bytes(payload) + elif isinstance(payload, bytes): # bytes + return b'\x01', payload + elif isinstance(payload, bool): # bool + return b'\x02', (b'\x01' if payload else b'\x00') + elif isinstance(payload, int): # int + return b'\x03', struct.pack('>i', payload) + elif isinstance(payload, float): # float + return b'\x04', struct.pack('>f', payload) + elif isinstance(payload, str): # string + return b'\x05', payload.encode('utf-8') + elif isinstance(payload, dict) or isinstance(payload, list): # json dict or list + json_payload = json.dumps(payload) + return b'\x06', json_payload.encode('utf-8') + else: + raise ValueError("Unsupported payload type") + + payload_type, payload_bytes = __encode_payload() + messages = [] + identifier = 0x2a + timestamp = time.ticks_ms() + header = bytearray(8) + header[0] = identifier + header[1] = msg_type + header[2] = subtype + header[3:7] = timestamp.to_bytes(4, 'big') + if len(payload_bytes) < 242: # 250-9=241=max_length + header[7] = payload_type[0] + total_length = 1 + 1 + 1 + 4 + 1 + len(payload_bytes) + 1 + message = bytearray(total_length) + message[:8] = header + message[8:-1] = payload_bytes + message[-1:] = (sum(message) % 256).to_bytes(1, 'big') # Checksum + self.master.dprint(f"Message {1}/{1}; Length: {len(message)}; Free memory: {gc.mem_free()}") + messages.append(message) + else: + self.master.dprint("Long message: Splitting!") + max_size = 238 # 241-3 + total_chunk_number = (-(-len(payload_bytes) // max_size)) # Round up division + lba = b'\x07' + header[7] = lba[0] # Long byte array + if total_chunk_number > 256: + raise ValueError("More than 256 chunks, unsupported") + for chunk_index in range(total_chunk_number): + message = bytearray(9 + 3 + min(max_size, len(payload_bytes) - chunk_index * max_size)) + message[:8] = header + message[8:10] = chunk_index.to_bytes(1, 'big') + total_chunk_number.to_bytes(1, 'big') + message[10] = payload_type[0] + message[11:-1] = payload_bytes[chunk_index * max_size: (chunk_index + 1) * max_size] + message[-1:] = (sum(message) % 256).to_bytes(1, 'big') # Checksum + self.master.dprint(message) + messages.append(bytes(message)) + self.master.dprint( + f"Message {chunk_index + 1}/{total_chunk_number}; length: {len(message)}; Free memory: {gc.mem_free()}") + gc.collect() + gc.collect() + self._send(peer_mac, messages, channel, ifidx) + + def _receive(self): # Processes all the messages in the buffer + self.master.dprint("aen._receive") + + def __decode_payload(payload_type, payload_bytes): + self.master.dprint("aen.__decode_payload") + if payload_type == b'\x00': # None + return None + elif payload_type == b'\x01': # bytearray or bytes + return bytes(payload_bytes) + elif payload_type == b'\x02': # bool + return payload_bytes[0:1] == b'\x01' + elif payload_type == b'\x03': # int + return struct.unpack('>i', payload_bytes)[0] + elif payload_type == b'\x04': # float + return struct.unpack('>f', payload_bytes)[0] + elif payload_type == b'\x05': # string + return payload_bytes.decode('utf-8') + elif payload_type == b'\x06': # json dict or list + return json.loads(payload_bytes.decode('utf-8')) + elif payload_type == b'\x07': # Long byte array + return bytes(payload_bytes) + else: + raise ValueError(f"Unsupported payload type: {payload_type} Message: {payload_bytes}") + + def __process_message(sender_mac, message, receive_timestamp): + self.master.dprint("aen.__process_message") + if message[0] != 0x2a: # Unique Message Identifier Check + self.master.dprint("Invalid message: Message ID Fail") + return None + if len(message) < 9: # Min size + self.master.dprint("Invalid message: too short") + return None + + msg_type = bytes(message[1:2]) + subtype = bytes(message[2:3]) + send_timestamp = int.from_bytes(message[3:7], 'big') + payload_type = bytes(message[7:8]) + payload = message[8:-1] + checksum = message[-1] + self.master.dprint( + f"{type(msg_type)}: {msg_type}, {type(subtype)}: {subtype}, {type(send_timestamp)}: {send_timestamp}, {type(payload_type)}: {payload_type}, {type(payload)}: {payload}, {type(checksum)}: {checksum}") + + # Checksum + if checksum != sum(message[:-1]) % 256: + self.master.dprint("Invalid message: checksum mismatch") + return None + + if sender_mac not in self._peers: + self.add_peer(sender_mac) + + if payload_type == b'\x07': + self.master.dprint("Long message received, processing...") + part_n = int.from_bytes(payload[0:1], 'big') + total_n = int.from_bytes(payload[1:2], 'big') + payload_type = bytes(payload[2:3]) + payload = payload[3:] + + # Create a key as a bytearray: (msg_type, subtype, timestamp, payload_type, total_n) + key = bytearray() + key.extend(msg_type) + key.extend(subtype) + key.extend(message[3:7]) + key.extend(payload_type) + key.append(total_n) + key = bytes(key) + self.master.dprint(f"Key: {key}") + + # Check if the key already exists in the long buffer + if key in self._long_buffer: + # If the part is None, add the payload + if self._long_buffer[key][part_n] is None: + self._long_buffer[key][part_n] = payload + self._long_buffer_size[key] = self._long_buffer_size[key] + len(payload) + self.master.dprint( + f"Long message: Key found, message added to entry in long_message_buffer, {sum(1 for item in self._long_buffer[key] if item is not None)} out of {total_n} packages received") + # If there are still missing parts, return + if any(value is None for value in self._long_buffer[key]): + gc.collect() + return + else: + # Initialize the long message buffer for this key + payloads = [None] * total_n + payloads[part_n] = payload + self._long_buffer[key] = payloads + self._long_buffer_size[key] = len(payload) + self.master.dprint( + f"Long message: Key not found and new entry created in long_message_buffer, {sum(1 for item in self._long_buffer[key] if item is not None)} out of {total_n} packages received") + + while len(self._long_buffer) > 8 or sum(self._long_buffer_size.values()) > 75000: + self.master.dprint( + f"Maximum buffer size reached: {len(self._long_buffer)}, {sum(self._long_buffer_size.values())} bytes; Reducing!") + self._long_buffer.popitem(last=False) + self._long_buffer_size.popitem(last=False) + gc.collect() + return + + # If all parts have been received, reconstruct the message + if not any(value is None for value in self._long_buffer[key]): + payload = bytearray() + for i in range(0, total_n): + payload.extend(self._long_buffer[key][i]) + del self._long_buffer[key] + del self._long_buffer_size[key] + self.master.dprint("Long message: All packages received!") + else: + self.master.dprint("Long Message: Safeguard triggered, code should not have gotten here") + gc.collect() + return + + # Handle the message based on type + if msg_type == (msg_key := msg_codes["cmd"]): # Command Message + __handle_cmd(sender_mac, subtype, send_timestamp, receive_timestamp, payload_type, + payload if payload else None, msg_key) + elif msg_type == (msg_key := msg_codes["inf"]): # Informational Message + __handle_inf(sender_mac, subtype, send_timestamp, receive_timestamp, payload_type, + payload if payload else None, msg_key) + elif msg_type == (msg_key := msg_codes["ack"]): # Acknowledgement Message + __handle_ack(sender_mac, subtype, send_timestamp, receive_timestamp, payload_type, + payload if payload else None, msg_key) + else: + self.master.iprint( + f"Unknown message type from {sender_mac} ({self.peer_name(sender_mac)}): {message}") + + def __check_authorisation(sender_mac, payload): + return sender_mac in self._whitelist or payload == "sudo" or payload[-1] == "sudo" + + def __send_confirmation(msg_type, recipient_mac, msg_subkey_type, payload=None, error=None): + if msg_type == "Success": + self._compose(recipient_mac, [msg_subkey_type, payload], 0x03, 0x11) + elif msg_type == "Fail": + self._compose(recipient_mac, [msg_subkey_type, error, payload], 0x03, 0x12) + else: + self._compose(recipient_mac, [msg_subkey_type, payload], 0x03, 0x13) + + def __handle_cmd(sender_mac, subtype, send_timestamp, receive_timestamp, payload_type, payload, msg_key): + self.master.dprint(f"aen.__handle_cmd") + payload = __decode_payload(payload_type, payload) + if (msg_subkey := "Reboot") and subtype == msg_subcodes[msg_key][msg_subkey]: # Reboot + self.master.iprint( + f"{msg_subkey} ({subtype}) command received from {sender_mac} ({self.peer_name(sender_mac)})") + if __check_authorisation(sender_mac, payload): + __send_confirmation("Confirm", sender_mac, f"{msg_subkey} ({subtype})", payload) + machine.reset() + else: + __send_confirmation("Fail", sender_mac, f"{msg_subkey} ({subtype})", payload, "Not authorised") + elif (msg_subkey := "Firmware-Update") and subtype == msg_subcodes[msg_key][ + msg_subkey]: # Firmware-Update + self.master.iprint( + f"{msg_subkey} ({subtype}) command received from {sender_mac} ({self.peer_name(sender_mac)})") + if __check_authorisation(sender_mac, payload): + try: + # Insert update logic here + self.master.iprint("no update logic written just yet") + __send_confirmation("Success", sender_mac, f"{msg_subkey} ({subtype})", payload) + except Exception as e: + self.master.iprint(f"Error: {e} with payload: {payload}") + __send_confirmation("Fail", sender_mac, f"{msg_subkey} ({subtype})", payload, e) + else: + __send_confirmation("Fail", sender_mac, f"{msg_subkey} ({subtype})", payload, "Not authorised") + elif (msg_subkey := "File-Update") and subtype == msg_subcodes[msg_key][msg_subkey]: # File-Update + self.master.iprint( + f"{msg_subkey} ({subtype}) command received from {sender_mac} ({self.peer_name(sender_mac)})") + if __check_authorisation(sender_mac, payload): + try: + # Insert update logic here + self.master.iprint("No update logic written just yet") + __send_confirmation("Success", sender_mac, f"{msg_subkey} ({subtype})", payload) + except Exception as e: + self.master.iprint(f"Error: {e} with payload: {payload}") + __send_confirmation("Fail", sender_mac, f"{msg_subkey} ({subtype})", payload, e) + else: + __send_confirmation("Fail", sender_mac, f"{msg_subkey} ({subtype})", payload, "Not authorised") + elif (msg_subkey := "File-Download") and subtype == msg_subcodes[msg_key][msg_subkey]: # File-Download + self.master.iprint( + f"{msg_subkey} ({subtype}) command received from {sender_mac} ({self.peer_name(sender_mac)})") + # should return a list with a link and the list of files to download + if __check_authorisation(sender_mac, payload): + try: + import mip + base = payload[0] + files_to_copy = payload[1] + if files_to_copy is None: + mip.install(base) + else: + for f in files_to_copy: + mip.install(base + f) + __send_confirmation("Success", sender_mac, f"{msg_subkey} ({subtype})", payload) + except Exception as e: + self.master.iprint(f"Error: {e} with payload: {payload}") + __send_confirmation("Fail", sender_mac, f"{msg_subkey} ({subtype})", payload, e) + else: + __send_confirmation("Fail", sender_mac, f"{msg_subkey} ({subtype})", payload, "Not authorised") + elif (msg_subkey := "Web-Repl") and subtype == msg_subcodes[msg_key][msg_subkey]: # Web-Repl + self.master.iprint( + f"{msg_subkey} ({subtype}) command received from {sender_mac} ({self.peer_name(sender_mac)})") + # should be a list with name and password + if __check_authorisation(sender_mac, payload): + try: + # add logic to connect to Wi-Fi and set up webrepl + self.master.sta.connect(payload[0], payload[1]) + link = "webrepl link" + __send_confirmation("Success", sender_mac, f"{msg_subkey} ({subtype})", link) + except Exception as e: + self.master.iprint(f"Error: {e} with payload: {payload}") + __send_confirmation("Fail", sender_mac, f"{msg_subkey} ({subtype})", payload, e) + else: + __send_confirmation("Fail", sender_mac, f"{msg_subkey} ({subtype})", payload, "Not authorised") + elif (msg_subkey := "File-Run") and subtype == msg_subcodes[msg_key][msg_subkey]: # File-Run + self.master.iprint( + f"{msg_subkey} ({subtype}) command received from {sender_mac} ({self.peer_name(sender_mac)})") + if __check_authorisation(sender_mac, payload): + try: + self.master.iprint("Execute logic not implemented") + # insert run logic here + except Exception as e: + self.master.iprint(f"Error: {e} with payload: {payload}") + __send_confirmation("Fail", sender_mac, f"{msg_subkey} ({subtype})", payload, e) + else: + __send_confirmation("Fail", sender_mac, f"{msg_subkey} ({subtype})", payload, "Not authorised") + elif (msg_subkey := "Set-Admin") and subtype == msg_subcodes[msg_key][msg_subkey]: # Set Admin Bool + self.master.iprint( + f"{msg_subkey} ({subtype}) command received from {sender_mac} ({self.peer_name(sender_mac)})") + if __check_authorisation(sender_mac, payload): + self.master.iprint(f"Received Set-Admin command: self.admin set to {payload[0]}") + try: + self.master.admin = payload[0] + __send_confirmation("Success", sender_mac, f"{msg_subkey} ({subtype})", payload) + except Exception as e: + __send_confirmation("Fail", sender_mac, f"{msg_subkey} ({subtype})", payload, e) + else: + __send_confirmation("Fail", sender_mac, f"{msg_subkey} ({subtype})", payload, "Not authorised") + elif (msg_subkey := "Whitelist-Add") and subtype == msg_subcodes[msg_key][ + msg_subkey]: # Whitelist-Add - Add Admin macs to _whitelist + self.master.iprint( + f"{msg_subkey} ({subtype}) command received from {sender_mac} ({self.peer_name(sender_mac)})") + if __check_authorisation(sender_mac, payload): + self.master.iprint( + f"Received add admin macs to _whitelist command, added {payload[0]} and {payload[1]}") + try: + self._whitelist.append(payload[0]) + self._whitelist.append(payload[1]) + __send_confirmation("Success", sender_mac, f"{msg_subkey} ({subtype})", payload) + except Exception as e: + __send_confirmation("Fail", sender_mac, f"{msg_subkey} ({subtype})", payload, e) + else: + __send_confirmation("Fail", sender_mac, f"{msg_subkey} ({subtype})", payload, "Not authorised") + elif (msg_subkey := "Config-Change") and subtype == msg_subcodes[msg_key][msg_subkey]: # change config + self.master.iprint( + f"{msg_subkey} ({subtype}) command received from {sender_mac} ({self.peer_name(sender_mac)})") + if __check_authorisation(sender_mac, payload): + try: + self.master.iprint(f"Not yet implemented") + __send_confirmation("Success", sender_mac, f"{msg_subkey} ({subtype})", payload) + except Exception as e: + __send_confirmation("Fail", sender_mac, f"{msg_subkey} ({subtype})", payload, e) + else: + __send_confirmation("Fail", sender_mac, f"{msg_subkey} ({subtype})", payload, "Not authorised") + elif (msg_subkey := "Ping") and subtype == msg_subcodes[msg_key][msg_subkey]: # Ping + self.master.iprint( + f"{msg_subkey} ({subtype}) command received from {sender_mac} ({self.peer_name(sender_mac)})") + self.add_peer(sender_mac, payload[2], payload[0], payload[1]) + if bool(self.ifidx): + channel = self.master.ap.channel() + else: + channel = self.master.sta.channel() + response = [channel, self.ifidx, self.master.name, send_timestamp] + self._compose(sender_mac, response, 0x03, 0x10) + elif (msg_subkey := "Pair") and subtype == msg_subcodes[msg_key][ + msg_subkey]: # Pair #add something that checks that the messages are from the same mac + self.master.iprint( + f"{msg_subkey} ({subtype}) command received from {sender_mac} ({self.peer_name(sender_mac)})") + if self._pairing_enabled and networking_keys["handshake_key_1"] == payload: + self._pairing = True + self.pair(sender_mac, networking_keys["handshake_key_2"]) + # some timer for if key 3 is not received to reset states + elif self._pairing_enabled and self._pairing and networking_keys["handshake_key_2"] == payload: + self._paired = True + self._paired_macs.append(sender_mac) + self.pair(sender_mac, networking_keys["handshake_key_3"]) + # some timer that sets to false if key 4 is not received + elif self._pairing_enabled and self._pairing and networking_keys["handshake_key_3"] == payload: + try: + # Insert pairing logic here do a reverse handshake + self._paired = True + self._paired_macs.append(sender_mac) + self.pair(sender_mac, networking_keys["handshake_key_4"]) + self.master.iprint("no pairing logic written just yet") + __send_confirmation("Success", sender_mac, f"{msg_subkey} ({subtype})", payload) + except Exception as e: + self.master.iprint(f"Error: {e} with payload: {payload}") + __send_confirmation("Fail", sender_mac, f"{msg_subkey} ({subtype})", payload, e) + elif self._pairing_enabled and self._pairing and networking_keys["handshake_key_3"] == payload: + self._paired = True + # remove timer from before + else: + __send_confirmation("Fail", sender_mac, f"{msg_subkey} ({subtype})", "Pairing disabled", + payload) + elif (msg_subkey := "Set-Pair") and subtype == msg_subcodes[msg_key][msg_subkey]: # Enable pairing mode + self.master.iprint( + f"{msg_subkey} ({subtype}) command received from {sender_mac} ({self.peer_name(sender_mac)})") + if __check_authorisation(sender_mac, payload): + try: + self._pairing_enabled = payload[0] + __send_confirmation("Success", sender_mac, f"{msg_subkey} ({subtype})", payload) + except Exception as e: + self.master.iprint(f"Error: {e} with payload: {payload}") + __send_confirmation("Fail", sender_mac, f"{msg_subkey} ({subtype})", payload, e) + else: + __send_confirmation("Fail", sender_mac, f"{msg_subkey} ({subtype})", payload, "Not authorised") + elif (msg_subkey := "RSSI/Status/Config-Boop") and subtype == msg_subcodes[msg_key][ + msg_subkey]: # RSSI/Status/Config Boop + self.boops = self.boops + 1 + self.master.iprint( + f"{msg_subkey} ({subtype}) command received from {sender_mac} ({self.peer_name(sender_mac)}), Received total of {self.boops} boops!") + try: + self._compose(sender_mac, + [self.master.id, self.master.name, self.master.config, self.master.version_n, + self.master.version, self.master.sta.mac, self.master.ap.mac, self.rssi()], 0x02, + 0x20) # [ID, Name, Config, Version, sta mac, ap mac, rssi] + except Exception as e: + __send_confirmation("Fail", sender_mac, f"{msg_subkey} ({subtype})", payload, e) + elif (msg_subkey := "Directory-Get") and subtype == msg_subcodes[msg_key][ + msg_subkey]: # Get List of Files + self.master.iprint( + f"{msg_subkey} ({subtype}) command received from {sender_mac} ({self.peer_name(sender_mac)})") + try: + result = [] + entries = os.listdir() + for entry in entries: + full_path = f"{path}/{entry}" + if os.stat(full_path)[0] & 0x4000: + result.extend(list_all_paths(full_path)) + else: + result.append(full_path) + self._compose(sender_mac, result, 0x02, 0x20) + except Exception as e: + __send_confirmation("Fail", sender_mac, f"{msg_subkey} ({subtype})", payload, e) + elif (msg_subkey := "Echo") and subtype == msg_subcodes[msg_key][msg_subkey]: # Echo + self.master.iprint( + f"{msg_subkey} ({subtype}) command received from {sender_mac} ({self.peer_name(sender_mac)}): {__decode_payload(payload_type, payload)}") # Check i or d + self._compose(sender_mac, payload, 0x03, 0x15) + elif (msg_subkey := "Resend") and subtype == msg_subcodes[msg_key][ + msg_subkey]: # Resend lost long messages + self.master.iprint( + f"{msg_subkey} ({subtype}) command received from {sender_mac} ({self.peer_name(sender_mac)})") + self.master.iprint("Long_sent_buffer disabled due to memory constraints") + elif (msg_subkey := "WiFi-Connect") and subtype == msg_subcodes[msg_key][ + msg_subkey]: # Connect to Wi-Fi + self.master.iprint( + f"{msg_subkey} ({subtype}) command received from {sender_mac} ({self.peer_name(sender_mac)})") + if __check_authorisation(sender_mac, payload): + try: + self.master.sta.connect(payload[0], payload[1]) + __send_confirmation("Success", sender_mac, f"{msg_subkey} ({subtype})", payload) + except Exception as e: + self.master.iprint(f"Error: {e} with payload: {payload}") + __send_confirmation("Fail", sender_mac, f"{msg_subkey} ({subtype})", payload, e) + else: + __send_confirmation("Fail", sender_mac, f"{msg_subkey} ({subtype})", payload, "Not authorised") + elif (msg_subkey := "WiFi-Disconnect") and subtype == msg_subcodes[msg_key][ + msg_subkey]: # Disconnect from Wi-Fi + self.master.iprint( + f"{msg_subkey} ({subtype}) command received from {sender_mac} ({self.peer_name(sender_mac)})") + if __check_authorisation(sender_mac, payload): + try: + self.master.sta.disconnect() + __send_confirmation("Success", sender_mac, f"{msg_subkey} ({subtype})", payload) + except Exception as e: + self.master.iprint(f"Error: {e}") + __send_confirmation("Fail", sender_mac, f"{msg_subkey} ({subtype})", payload, e) + else: + __send_confirmation("Fail", sender_mac, f"{msg_subkey} ({subtype})", payload, "Not authorised") + elif (msg_subkey := "AP-Enable") and subtype == msg_subcodes[msg_key][msg_subkey]: # Enable AP + self.master.iprint( + f"{msg_subkey} ({subtype}) command received from {sender_mac} ({self.peer_name(sender_mac)})") + if __check_authorisation(sender_mac, payload): + try: + ssid = payload[0] + if ssid == "": + ssid = self.master.name + password = payload[1] + self.master.ap.setap(ssid, password) + __send_confirmation("Success", sender_mac, f"{msg_subkey} ({subtype})", payload) + except Exception as e: + self.master.iprint(f"Error: {e} with payload: {payload}") + __send_confirmation("Fail", sender_mac, f"{msg_subkey} ({subtype})", payload, e) + else: + __send_confirmation("Fail", sender_mac, f"{msg_subkey} ({subtype})", payload, "Not authorised") + elif (msg_subkey := "AP-Disable") and subtype == msg_subcodes[msg_key][msg_subkey]: # Disable AP + self.master.iprint( + f"{msg_subkey} ({subtype}) command received from {sender_mac} ({self.peer_name(sender_mac)})") + payload = __decode_payload(payload_type, + payload) # should return a list of desired name, password and max clients + if __check_authorisation(sender_mac, payload): + try: + self.master.ap.deactivate() + __send_confirmation("Success", sender_mac, f"{msg_subkey} ({subtype})", payload) + except Exception as e: + self.master.iprint(f"Error: {e}") + __send_confirmation("Fail", sender_mac, f"{msg_subkey} ({subtype})", payload, e) + else: + __send_confirmation("Fail", sender_mac, f"{msg_subkey} ({subtype})", payload, "Not authorised") + elif (msg_subkey := "Pause") and subtype == msg_subcodes[msg_key][msg_subkey]: # Set Pause + self.master.iprint( + f"{msg_subkey} ({subtype}) command received from {sender_mac} ({self.peer_name(sender_mac)})") + if __check_authorisation(sender_mac, payload): + try: + self.master.iprint(f"Received pause command: {payload[0]}") + self._running = False + __send_confirmation("Success", sender_mac, f"{msg_subkey} ({subtype})", payload) + if self._pause_function: + self._pause_function() # calls the custom set pause function to display a screen + while not self._running: + time.sleep(0.5) + except Exception as e: + __send_confirmation("Fail", sender_mac, f"{msg_subkey} ({subtype})", payload, e) + else: + __send_confirmation("Fail", sender_mac, f"{msg_subkey} ({subtype})", payload, "Not authorised") + elif (msg_subkey := "Resume") and subtype == msg_subcodes[msg_key][msg_subkey]: # Set Continue + self.master.iprint( + f"{msg_subkey} ({subtype}) command received from {sender_mac} ({self.peer_name(sender_mac)})") + if __check_authorisation(sender_mac, payload): + try: + self.master.iprint(f"Received continue command: {payload}") + self.master._running = True + __send_confirmation("Success", sender_mac, f"{msg_subkey} ({subtype})", payload) + except Exception as e: + __send_confirmation("Fail", sender_mac, f"{msg_subkey} ({subtype})", payload, e) + else: + __send_confirmation("Fail", sender_mac, f"{msg_subkey} ({subtype})", payload, "Not authorised") + else: + self.master.iprint( + f"Unknown command subtype from {sender_mac} ({self.peer_name(sender_mac)}): {subtype}") + + def __handle_inf(sender_mac, subtype, send_timestamp, receive_timestamp, payload_type, payload, msg_key): + self.master.dprint("aen.__handle_inf") + payload = __decode_payload(payload_type, payload) + if (msg_subkey := "RSSI") and subtype == msg_subcodes[msg_key][msg_subkey]: # RSSI + # payload["time_sent"] = send_timestamp + # payload["time_recv"] = receive_timestamp + self.master.iprint( + f"{msg_subkey} ({subtype}) data received from {sender_mac} ({self.peer_name(sender_mac)}): {payload}") + self.received_rssi_data[sender_mac] = payload + # __send_confirmation("Confirm", sender_mac, f"{msg_subkey} ({subtype})", payload) #confirm message recv + elif (msg_subkey := "Sensor") and subtype == msg_subcodes[msg_key][msg_subkey]: # Sensor Data + payload["time_sent"] = send_timestamp + payload["time_recv"] = receive_timestamp + self.master.iprint( + f"{msg_subkey} ({subtype}) data received from {sender_mac} ({self.peer_name(sender_mac)}): {payload}") + self.received_sensor_data[sender_mac] = payload + # __send_confirmation("Confirm", sender_mac, f"{msg_subkey} ({subtype})", payload) #confirm message recv + elif (msg_subkey := "Message") and subtype == msg_subcodes[msg_key][msg_subkey]: # Message / Other + self.master.iprint( + f"{msg_subkey} ({subtype}) received from {sender_mac} ({self.peer_name(sender_mac)}): {payload}") + self._received_messages.append((sender_mac, payload, receive_timestamp)) + self._received_messages_size.append(len(payload)) + while len(self._received_messages) > 2048 or sum(self._received_messages_size) > 20000: + self.master.dprint( + f"Maximum buffer size reached: {len(self._received_messages)}, {sum(self._received_messages_size)} bytes; Reducing!") + self._received_messages.pop(0) + self._received_messages_size.pop(0) + # __send_confirmation("Confirm", sender_mac, f"{msg_subkey} ({subtype})", payload) #confirm message recv + elif (msg_subkey := "Directory") and subtype == msg_subcodes[msg_key][msg_subkey]: # File Directory + self.master.iprint( + f"{msg_subkey} ({subtype}) data received from {sender_mac} ({self.peer_name(sender_mac)}): {payload}") + # __send_confirmation("Confirm", sender_mac, f"{msg_subkey} ({subtype})", payload) #confirm message recv + else: + self.master.iprint( + f"Unknown info subtype from {sender_mac} ({self.peer_name(sender_mac)}): {subtype}") + + def __handle_ack(sender_mac, subtype, send_timestamp, receive_timestamp, payload_type, payload, msg_key): + self.master.dprint("aen.__handle_ack") + payload = __decode_payload(payload_type, payload) + if (msg_subkey := "Pong") and subtype == msg_subcodes[msg_key][msg_subkey]: # Pong + self.add_peer(sender_mac, payload[2], payload[0], payload[1]) + self.master.iprint( + f"{msg_subkey} ({subtype}) received from {sender_mac} ({self.peer_name(sender_mac)}), {receive_timestamp - payload[3]}") + elif (msg_subkey := "Echo") and subtype == msg_subcodes[msg_key][msg_subkey]: # Echo + self.master.iprint( + f"{msg_subkey} ({subtype}) received from {sender_mac} ({self.peer_name(sender_mac)}), {__decode_payload(payload_type, payload)}") + elif (msg_subkey := "Success") and subtype == msg_subcodes[msg_key][msg_subkey]: # Success + # payload should return a list with a cmd type and payload + self.master.iprint( + f"Cmd {msg_subkey} ({subtype}) received from {sender_mac} ({self.peer_name(sender_mac)}) for type {payload[0]} with payload {payload[1]}") + # add to ack buffer + elif (msg_subkey := "Fail") and subtype == msg_subcodes[msg_key][msg_subkey]: # Fail + # payload should return a list with a cmd type, error and payload + self.master.iprint( + f"Cmd {msg_subkey} ({subtype}) received from {sender_mac} ({self.peer_name(sender_mac)}) for type {payload[0]} with error {payload[1]} and payload {payload[2]}") + # add to ack buffer + elif (msg_subkey := "Confirm") and subtype == msg_subcodes[msg_key][msg_subkey]: # Confirmation + # payload should return a list with message type and payload + self.master.iprint( + f"{msg_subkey} ({subtype}) received from {sender_mac} ({self.peer_name(sender_mac)}) for type {payload[0]} with payload {payload[1]}") + # add to ack buffer + else: + self.master.iprint( + f"Unknown ack subtype from {sender_mac} ({self.peer_name(sender_mac)}): {subtype}, Payload: {payload}") + # Insert more acknowledgement logic here and/or add message to acknowledgement buffer + + if self._aen.any(): + for mac, data in self._aen: + self.master.dprint(f"Received {mac, data}") + if mac is None: # mac, msg will equal (None, None) on timeout + break + if data: + if mac and data is not None: + # self._received_messages.append((sender_mac, data, receive_timestamp))#Messages will be saved here, this is only for debugging purposes + __process_message(mac, data, time.ticks_ms()) + if not self._aen.any(): # this is necessary as the for loop gets stuck and does not exit properly. + break + +# message structure (what kind of message types do I need?: Command which requires me to do something (ping, pair, change state(update, code, mesh mode, run a certain file), Informational Message (Sharing Sensor Data and RSSI Data) +# | Header (1 byte) | Type (1 byte) | Subtype (1 byte) | Timestamp(ms ticks) (4 bytes) | Payload type (1) | Payload (variable) | Checksum (1 byte) | + diff --git a/software/release/sensors.py b/software/release/sensors.py new file mode 100644 index 0000000..96cd1e8 --- /dev/null +++ b/software/release/sensors.py @@ -0,0 +1,146 @@ +from machine import Pin,I2C +from machine import Pin, SoftI2C, PWM, ADC +import adxl345 +import time + +i2c = SoftI2C(scl = Pin(7), sda = Pin(6)) + + + +class SENSORS: + def __init__(self,connection=i2c): + self.i2c=connection + self.adx = adxl345.ADXL345(self.i2c) + + self.initial = [0, 4095] + self.final = [0, 180] + self.pot = ADC(Pin(3)) + self.pot.atten(ADC.ATTN_11DB) # the pin expects a voltage range up to 3.3V + + + self.attached = False + self.selectsensor() + + time.sleep(1) + if self.attached: + self.light = ADC(Pin(5)) + self.light.atten(ADC.ATTN_11DB) # the pin expects a voltage range up to 3.3V\ + + + + self.battery = ADC(Pin(4)) + self.battery.atten(ADC.ATTN_11DB) # the pin expects a voltage range up to 3.3V + + self.x=None + self.y=None + self.z=None + self.roll=None + self.pitch=None + + def selectsensor(self): + p_digital = Pin(5, Pin.OUT) #set pin as digital + p_digital.value(0) #set pin low + p_analog = ADC(Pin(5)) # set pin 5 to analog + p_analog.atten(ADC.ATTN_11DB) # the pin expects a voltage range up to 3.3V + low = p_analog.read() # read value + + + p_digital = Pin(5, Pin.OUT) #set pin as digital + p_digital.value(1) #set pin high + p_analog = ADC(Pin(5)) #set pin to analog + p_analog.atten(ADC.ATTN_11DB) # the pin expects a voltage range up to 3.3V + high = p_analog.read() #read value + + if(low<200 and high>4000): + self.attached = False + + else: + self.attached = True + + def map_angle_to_range(self, angle): + if 90 <= angle <= 180: + # Normalize range 90 to 180 to 0 to 90 + normalized_angle = angle - 90 + elif -180 <= angle < -90: + # Normalize range -180 to -90 to 90 to 180 + normalized_angle = angle + 270 # -180 + 270 = 90, -90 + 270 = 180 + elif -90 <= angle < 0: + return 4095 + else: + return 0 + # Now normalized_angle is in the range [0, 180] + # Map this range to [0, 4095] + mapped_value = int((normalized_angle / 180.0) * 4095) + + + return mapped_value + def readlight(self): + return self.light.read() + + + def readpot(self): + return self.pot.read() + + def accel(self): + self.x =self.adx.xValue + self.y =self.adx.yValue + self.z =self.adx.zValue + + + def readaccel(self): + self.accel() + return self.x, self.y,self.z + + def readroll(self): + self.accel() + self.roll, self.pitch = self.adx.RP_calculate(self.x,self.y,self.z) + return self.roll,self.pitch + + + def readpoint(self): + l=[] + p=[] + + for i in range(100): + if self.attached: + l.append(self.readlight()) + p.append(self.readpot()) + + l.sort() + p.sort() + if self.attached: + l=l[30:60] + avlight=sum(l)/len(l) + else: + avlight = self.map_angle_to_range(self.readroll()[0]) + + p=p[30:60] + avpos=sum(p)/len(p) + + point = avlight, self.mappot(avpos) + return point + + def mappot(self, value): + return int((self.final[1]-self.final[0]) / (self.initial[1]-self.initial[0]) * (value - self.initial[0]) + self.final[0]) + + def readbattery(self): + batterylevel=self.battery.read() + + if(batterylevel>2850): #charging + return 'charging' + + elif(batterylevel>2700 and batterylevel <2875): #full charge + return 'full' + elif(batterylevel>2500 and batterylevel <2700): #medium charge + return 'half' + elif(batterylevel<2500): # low charge + return 'low' + else: + pass + + return "" + + + + + diff --git a/software/release/servo.py b/software/release/servo.py new file mode 100644 index 0000000..7237b17 --- /dev/null +++ b/software/release/servo.py @@ -0,0 +1,33 @@ +from machine import PWM +import math + +# originally by Radomir Dopieralski http://sheep.art.pl +# from https://bitbucket.org/thesheep/micropython-servo + +class Servo: + def __init__(self, pin, freq=50, min_us=600, max_us=2400, angle=180): + self.min_us = min_us + self.max_us = max_us + self.us = 0 + self.freq = freq + self.angle = angle + self.pwm = PWM(pin, freq=freq, duty=0) + + def write_us(self, us): + #"""Set the signal to be ``us`` microseconds long. Zero disables it.""" + if us == 0: + self.pwm.duty(0) + return + us = min(self.max_us, max(self.min_us, us)) + duty = us * 1024 * self.freq // 1000000 + self.pwm.duty(duty) + + def write_angle(self, degrees=None, radians=None): + #"""Move to the specified angle in ``degrees`` or ``radians``.""" + if degrees is None: + degrees = math.degrees(radians) + degrees = degrees % 360 + total_range = self.max_us - self.min_us + us = self.min_us + total_range * degrees // self.angle + self.write_us(us) + diff --git a/software/release/smartmotor.py b/software/release/smartmotor.py new file mode 100644 index 0000000..b2649a9 --- /dev/null +++ b/software/release/smartmotor.py @@ -0,0 +1,444 @@ +from machine import Pin, SoftI2C, PWM, ADC +from files import * +import time +from machine import Timer +import servo +import icons +import os +import sys +import ubinascii +import machine + + +import sensors +sens=sensors.SENSORS() + +#unique name + +ID= ubinascii.hexlify(machine.unique_id()).decode() +numberofIcons=[len(icons.iconFrames[i]) for i in range(len(icons.iconFrames))] #[homescreen, trainscreen, playscreen, playthefilesscreen, settingsscreen] +highlightedIcon=[] +for numberofIcon in numberofIcons: + highlightedIcon.append([0,numberofIcon]) + +screenID=1 +lastPressed=0 +previousIcon=0 +filenumber=0 + + +currentlocaltime=0 +oldlocaltime=0 + +points = [] + + +#Defining all flags +#flags +flags=[False,False,False,False, False] +playFlag=False +triggered=False +LOGGING=True + + +#switch flags +switch_state_up = False +switch_state_down = False +switch_state_select = False + +last_switch_state_up = False +last_switch_state_down = False +last_switch_state_select = False + +switched_up = False +switched_down = False +switched_select = False + + +#mainloop flags +clearscreen=False + + +#define buttons , sensors and motors +#servo +s = servo.Servo(Pin(2)) + +#nav switches +switch_down = Pin(8, Pin.IN) +switch_select = Pin(9, Pin.IN) +switch_up= Pin(10, Pin.IN) + +i2c = SoftI2C(scl = Pin(7), sda = Pin(6)) +display = icons.SSD1306_SMART(128, 64, i2c,switch_up) + + +#highlightedIcon=[(ICON,TotalIcons),...] +#screenID gives the SCREEN number I am at +#SCREENID +#0 - HOMESCREEN +#1 - PlaySCREEN +#2 - TrainSCREEN +#3 - Playthefiles +#4 - ConnectSCREEN + +#highligtedIcons[screenID][0] which icon is highlighted on screenID screen +#highligtedIcons[screenID][0] =1 # First Icon selected +#highligtedIcons[screenID][0] =2 #Second +#highligtedIcons[screenID][0] =3 #Third + +#interrupt functions + +def downpressed(count=-1): + global playFlag + global triggered + playFlag = False + time.sleep(0.05) + if(time.ticks_ms()-lastPressed>200): + displayselect(count) + + triggered=True #log file + + +def uppressed(count=1): + global playFlag + global triggered + playFlag = False + time.sleep(0.05) + if(time.ticks_ms()-lastPressed>200): + displayselect(count) + triggered=True #log file + + +def displayselect(selectedIcon): + global screenID + global highlightedIcon + global lastPressed + global previousIcon + + highlightedIcon[screenID][0]=(highlightedIcon[screenID][0]+selectedIcon)%highlightedIcon[screenID][1] + display.selector(screenID,highlightedIcon[screenID][0],previousIcon) #draw circle at selection position, and remove from the previousIconious position + previousIcon=highlightedIcon[screenID][0] + + lastPressed=time.ticks_ms() + +def selectpressed(): + #declare all global variables, include all flags + global flags + global triggered + time.sleep(0.05) + flags[highlightedIcon[screenID][0]]=True + triggered=True #log file + + + + +def resettohome(): + global screenID + global highlightedIcon + global previousIcon + global clearscreen + screenID=0 + previousIcon=0 + for numberofIcon in numberofIcons: + highlightedIcon.append([0,numberofIcon]) + display.selector(screenID,highlightedIcon[screenID][0],0) + #display.fill(0) # clear screen + clearscreen=True + +def check_switch(p): + global switch_state_up + global switch_state_down + global switch_state_select + + global switched_up + global switched_down + global switched_select + + global last_switch_state_up + global last_switch_state_down + global last_switch_state_select + + switch_state_up = switch_up.value() + switch_state_down = switch_down.value() + switch_state_select = switch_select.value() + + if switch_state_up != last_switch_state_up: + switched_up = True + + elif switch_state_down != last_switch_state_down: + switched_down = True + + elif switch_state_select != last_switch_state_select: + switched_select = True + + if switched_up: + if switch_state_up == 0: + uppressed() + switched_up = False + elif switched_down: + if switch_state_down == 0: + downpressed() + switched_down = False + elif switched_select: + if switch_state_select == 0: + selectpressed() + switched_select = False + + last_switch_state_up = switch_state_up + last_switch_state_down = switch_state_down + last_switch_state_select = switch_state_select + + + +def displaybatt(p): + batterycharge=sens.readbattery() + display.showbattery(batterycharge) + if LOGGING: + savetolog(time.time(),screenID,highlightedIcon, point,points) + return batterycharge + + + +def nearestNeighbor(data, point): + try: + point = point[0] + except TypeError: + pass + if len(data) == 0: + return 0 + diff = 10000 + test = None + for i in data: + if abs(i[0] - point) <= diff: + diff = abs(i[0] - point) + test = i + return (point, test[1]) + +def resetflags(): + global flags + for i in range(len(flags)): + flags[i]=False + + +def shakemotor(point): + motorpos=point[1] + for i in range(2): + s.write_angle(min(180,motorpos+5)) + time.sleep(0.1) + s.write_angle(max(0,motorpos-5)) + time.sleep(0.1) + + + + +def readdatapoints(): + datapoints=readfile() + if(datapoints): + numberofdata=len(datapoints) + return datapoints[-1] + else: + return ([]) + + +def setloggingmode(): + #sets pref to log by default + if not switch_down.value() and not switch_up.value() and not switch_select.value(): + resetlog() #delete all the previous logs + setprefs() #sets preference file to True + display.showmessage("LOG: ON") + print("resetting the log file") + + + #resets prefs to not log by default + if not switch_down.value() and not switch_up.value() and switch_select.value(): + resetprefs() #resets preference file to False + print("turn OFF the logging") + display.showmessage("LOG: OFF") + + + if switch_down.value() and switch_up.value() and switch_select.value(): + print("default: turn ON the logging") + + + import prefs + return prefs.log + + + +points=readdatapoints() +if points==[]: + highlightedIcon[1][0]=1 #go to add if there are no data saved + +#setting up Timers +tim = Timer(0) +tim.init(period=50, mode=Timer.PERIODIC, callback=check_switch) +batt = Timer(2) +batt.init(period=3000, mode=Timer.PERIODIC, callback=displaybatt) + + +display.welcomemessage() + + + + + +LOGGING=setloggingmode() + +#setup with homescreen #starts with screenID=0 +display.selector(screenID,highlightedIcon[screenID][0],-1) +oldpoint=[-1,-1] + + +while True: + #log + point = sens.readpoint() + if(triggered): + if LOGGING: + savetolog(time.time(),screenID,highlightedIcon, point,points) + triggered=False + + + #broadcast(point, screenID, highlightedIcon[screenID][0],ID) + + #Homepage + #[fb_Train,fb_Play] + + if(screenID==0): + if(flags[0]): + points=[] #empty the points arrray + screenID=1 + clearscreen=True + display.graph(oldpoint, point, points,0) #normal color + resetflags() + + elif(flags[1]): + screenID=2 + clearscreen=True + datapoints=readfile() + if (datapoints==[]): + display.showmessage("No data saved") + resettohome() + else: + display.graph(oldpoint, point, points,0) #normal color + resetflags() + + # Training Screen + # [fb_add,fb_delete,fb_smallplay,fb_home] + elif(screenID==1): + if(flags[0]): # Play button is pressed + if (points): + playFlag=True + savetofile(points) + shakemotor(point) + #screenID=2 # trigger play screen + #uppressed(count=4) + else: + cleardatafile() + display.showmessage("NO DATA") + resetflags() + + + elif(flags[1]): # add button is pressed + points.append(list(point)) + display.graph(oldpoint, point, points,0) + shakemotor(point) + resetflags() + + elif(flags[2]): #delete button is pressed + if(points): #delete only when there is something + points.pop() + display.cleargraph() + display.graph(oldpoint, point, points,0) + resetflags() + + + elif(flags[3]): # change this to settings icon save button is pressed + # This is where we can put other advanced settings, maybe call the main screen + #screenId=0 + #savetofile(points) + #resettohome() + + print("some settings stuff") + resetflags() + + elif(flags[4]): # help button is prssed + # quit to home + display.showmessage("This is for help whatever you need") + # resettohome() + resetflags() + + + if(playFlag): #only when point is different now + if(not point==oldpoint): #only when point is different now + point = nearestNeighbor(points,point) + s.write_angle(point[1]) + display.graph(oldpoint, point, points,1) #inverted color + + + + else: + if(not point==oldpoint): #only when point is different now + s.write_angle(point[1]) + display.graph(oldpoint, point, points,0) #normal color + + + + + # Load saved files screen + #[fb_next,fb_delete,fb_home,fb_toggle] + elif(screenID==2): + datapoints=readfile() + if(datapoints): + numberofdata=len(datapoints) + points=datapoints[filenumber] + + if(flags[0]): + filenumber=((filenumber+1)%numberofdata) + points=datapoints[filenumber] + display.cleargraph() + resetflags() + + elif(flags[1]): + del datapoints[filenumber] + replacefile(datapoints) + filenumber=0 + display.cleargraph() + resetflags() + + elif(flags[2]): + resettohome() + resetflags() + + if(not point==oldpoint): #only when point is different now + point = nearestNeighbor(points,point) + s.write_angle(point[1]) + display.graph(oldpoint, point, points,0) #normal color + else: + display.showmessage("No files to show") + resettohome() + + # Settings Screen + #[fb_Lead,fb_Follow, fb_BIGHome] + elif(screenID==4): + if(flags[0]): + display.showmessage(ID) + waitforconnection() + resetflags() + + elif(flags[1]): + print("I shall follow you") + resetflags() + + elif(flags[2]): + resettohome() + resetflags() + + oldpoint=point + if clearscreen: + display.fill(0) + display.selector(screenID,highlightedIcon[screenID][0],-1) + clearscreen=False + + + + diff --git a/software/release/ssd1306.py b/software/release/ssd1306.py new file mode 100644 index 0000000..d046b25 --- /dev/null +++ b/software/release/ssd1306.py @@ -0,0 +1,158 @@ +# This is an ssd1306 package we found on the internet. +# https://github.com/stlehmann/micropython-ssd1306/blob/master/ssd1306.py +# Do not edit this file. +# MicroPython SSD1306 OLED driver, I2C and SPI interfaces +from micropython import const +import framebuf + + +# register definitions +SET_CONTRAST = const(0x81) +SET_ENTIRE_ON = const(0xA4) +SET_NORM_INV = const(0xA6) +SET_DISP = const(0xAE) +SET_MEM_ADDR = const(0x20) +SET_COL_ADDR = const(0x21) +SET_PAGE_ADDR = const(0x22) +SET_DISP_START_LINE = const(0x40) +SET_SEG_REMAP = const(0xA0) +SET_MUX_RATIO = const(0xA8) +SET_COM_OUT_DIR = const(0xC0) +SET_DISP_OFFSET = const(0xD3) +SET_COM_PIN_CFG = const(0xDA) +SET_DISP_CLK_DIV = const(0xD5) +SET_PRECHARGE = const(0xD9) +SET_VCOM_DESEL = const(0xDB) +SET_CHARGE_PUMP = const(0x8D) + +# Subclassing FrameBuffer provides support for graphics primitives +# http://docs.micropython.org/en/latest/pyboard/library/framebuf.html +class SSD1306(framebuf.FrameBuffer): + def __init__(self, width, height, external_vcc): + self.width = width + self.height = height + self.external_vcc = external_vcc + self.pages = self.height // 8 + self.buffer = bytearray(self.pages * self.width) + super().__init__(self.buffer, self.width, self.height, framebuf.MONO_VLSB) + self.init_display() + + def init_display(self): + for cmd in ( + SET_DISP | 0x00, # off + # address setting + SET_MEM_ADDR, + 0x00, # horizontal + # resolution and layout + SET_DISP_START_LINE | 0x00, + SET_SEG_REMAP | 0x01, # column addr 127 mapped to SEG0 + SET_MUX_RATIO, + self.height - 1, + SET_COM_OUT_DIR | 0x08, # scan from COM[N] to COM0 + SET_DISP_OFFSET, + 0x00, + SET_COM_PIN_CFG, + 0x02 if self.width > 2 * self.height else 0x12, + # timing and driving scheme + SET_DISP_CLK_DIV, + 0x80, + SET_PRECHARGE, + 0x22 if self.external_vcc else 0xF1, + SET_VCOM_DESEL, + 0x30, # 0.83*Vcc + # display + SET_CONTRAST, + 0xFF, # maximum + SET_ENTIRE_ON, # output follows RAM contents + SET_NORM_INV, # not inverted + # charge pump + SET_CHARGE_PUMP, + 0x10 if self.external_vcc else 0x14, + SET_DISP | 0x01, + ): # on + self.write_cmd(cmd) + self.fill(0) + self.show() + + def poweroff(self): + self.write_cmd(SET_DISP | 0x00) + + def poweron(self): + self.write_cmd(SET_DISP | 0x01) + + def contrast(self, contrast): + self.write_cmd(SET_CONTRAST) + self.write_cmd(contrast) + + def invert(self, invert): + self.write_cmd(SET_NORM_INV | (invert & 1)) + + def show(self): + x0 = 0 + x1 = self.width - 1 + if self.width == 64: + # displays with width of 64 pixels are shifted by 32 + x0 += 32 + x1 += 32 + self.write_cmd(SET_COL_ADDR) + self.write_cmd(x0) + self.write_cmd(x1) + self.write_cmd(SET_PAGE_ADDR) + self.write_cmd(0) + self.write_cmd(self.pages - 1) + self.write_data(self.buffer) + + +class SSD1306_I2C(SSD1306): + def __init__(self, width, height, i2c, addr=0x3C, external_vcc=False): + self.i2c = i2c + self.addr = addr + self.temp = bytearray(2) + self.write_list = [b"\x40", None] # Co=0, D/C#=1 + super().__init__(width, height, external_vcc) + + def write_cmd(self, cmd): + self.temp[0] = 0x80 # Co=1, D/C#=0 + self.temp[1] = cmd + self.i2c.writeto(self.addr, self.temp) + + def write_data(self, buf): + self.write_list[1] = buf + self.i2c.writevto(self.addr, self.write_list) + + +class SSD1306_SPI(SSD1306): + def __init__(self, width, height, spi, dc, res, cs, external_vcc=False): + self.rate = 10 * 1024 * 1024 + dc.init(dc.OUT, value=0) + res.init(res.OUT, value=0) + cs.init(cs.OUT, value=1) + self.spi = spi + self.dc = dc + self.res = res + self.cs = cs + import time + + self.res(1) + time.sleep_ms(1) + self.res(0) + time.sleep_ms(10) + self.res(1) + super().__init__(width, height, external_vcc) + + def write_cmd(self, cmd): + self.spi.init(baudrate=self.rate, polarity=0, phase=0) + self.cs(1) + self.dc(0) + self.cs(0) + self.spi.write(bytearray([cmd])) + self.cs(1) + + def write_data(self, buf): + self.spi.init(baudrate=self.rate, polarity=0, phase=0) + self.cs(1) + self.dc(1) + self.cs(0) + self.spi.write(buf) + self.cs(1) + diff --git a/software/release/variableLED.py b/software/release/variableLED.py new file mode 100644 index 0000000..ca8ae29 --- /dev/null +++ b/software/release/variableLED.py @@ -0,0 +1,93 @@ +from machine import Pin +import time +#from time import sleep_us + +class VariableLED: + def __init__(self, pin_clk, pin_data, num_leds): + self.pin_clk = pin_clk + self.pin_data = pin_data + self.num_leds = num_leds + self.pin_clk.init(Pin.OUT) + self.pin_data.init(Pin.OUT) + self.reset() + + def __setitem__(self, index, val): + offset = index * 3 + for i in range(3): + self.buf[offset + i] = val[i] + + def __getitem__(self, index): + offset = index * 3 + return tuple(self.buf[offset + i] for i in range(3)) + + def fill(self, color): + for i in range(self.num_leds): + self[i] = color + + def reset(self): + self.buf = bytearray(self.num_leds * 3) + # Begin data frame 4 bytes + self._frame() + # 4 bytes for each led (checksum, blue, green, red) + for i in range(self.num_leds): + self._write_byte(0xC0) + for i in range(3): + self._write_byte(0) + # End data frame 4 bytes + self._frame() + + def write(self): + # Begin data frame 4 bytes + self._frame() + + # 4 bytes for each led (checksum, blue, green, red) + for i in range(self.num_leds): + self._write_color(self.buf[i * 3], self.buf[i * 3 + 1], self.buf[i * 3 + 2]) + + # End data frame 4 bytes + self._frame() + + def _frame(self): + # Send 32x zeros + self.pin_data(0) + for i in range(32): + self._clk() + + def _clk(self): + self.pin_clk(0) + #sleep_us(1) # works without it + self.pin_clk(1) + #sleep_us(1) # works without it + + def _write_byte(self, b): + if b == 0: + # Fast send 8x zeros + self.pin_data(0) + for i in range(8): + self._clk() + else: + # Send each bit, MSB first + for i in range(8): + if ((b & 0x80) != 0): + self.pin_data(1) + else: + self.pin_data(0) + self._clk() + + # On to the next bit + b <<= 1 + + def _write_color(self, r, g, b): + # Send a checksum byte with the format "1 1 ~B7 ~B6 ~G7 ~G6 ~R7 ~R6" + # The checksum colour bits should bitwise NOT the data colour bits + checksum = 0xC0 # 0b11000000 + checksum |= (b >> 6 & 3) << 4 + checksum |= (g >> 6 & 3) << 2 + checksum |= (r >> 6 & 3) + + self._write_byte(checksum) + + # Send the 3 colours + self._write_byte(b) + self._write_byte(g) + self._write_byte(r) \ No newline at end of file