diff --git a/.DS_Store b/.DS_Store index 9720e53..bd69ca8 100644 Binary files a/.DS_Store and b/.DS_Store differ diff --git a/.github/workflows/render-README.yml b/.github/workflows/render-README.yml index 04e9909..b79778e 100644 --- a/.github/workflows/render-README.yml +++ b/.github/workflows/render-README.yml @@ -43,5 +43,5 @@ jobs: #git checkout stage || git checkout -b stage git remote set-url origin https://x-access-token:${GITHUB_TOKEN}@github.com/${GITHUB_REPOSITORY}.git - git push origin main + diff --git a/.gitignore b/.gitignore index f5e8672..bbc19dc 100644 --- a/.gitignore +++ b/.gitignore @@ -4,5 +4,3 @@ .Ruserdata keys/ACTIONS_RENDER keys/ACTIONS_RENDER.pub -.github/.DS_Store -.DS_Store diff --git a/docs/.DS_Store b/docs/.DS_Store index 78ce0ae..e96c85d 100644 Binary files a/docs/.DS_Store and b/docs/.DS_Store differ diff --git a/software/.DS_Store b/software/.DS_Store index 6d29c9c..fc1f08d 100644 Binary files a/software/.DS_Store and b/software/.DS_Store differ diff --git a/software/applications/.DS_Store b/software/applications/.DS_Store index b5c09c0..5f658ce 100644 Binary files a/software/applications/.DS_Store and b/software/applications/.DS_Store differ diff --git a/software/libraries/README.qmd b/software/libraries/README.qmd index 498ac84..f913fe0 100644 --- a/software/libraries/README.qmd +++ b/software/libraries/README.qmd @@ -35,4 +35,11 @@ A short description of the directories can be found below. | name | description | contribution | |----------------|-------------------------------------------|--------------| +| adxl345.py | Support for the built in accelerometer https://github.com/DFRobot/micropython-dflib/blob/master/ADXL345/user_lib/ADXL345.py | Milan | +| files.py | Custom save to file library | Milan | +| icons.py | Icon support for the smart motor module | Milan | +| sensors.py | Sensor support for the smart motor module | Milan | +| servo.py | Servo support for the smart motor module | Milan | +| smartmotor.py | Smart motor module main program | Milan | +| ssd1306.py | Support for the built in OLED screen https://github.com/stlehmann/micropython-ssd1306/blob/master/ssd1306.py | Milan | | variableLED.py | Library that powers the variable LED grid | Sophie | diff --git a/software/libraries/adxl345.py b/software/libraries/adxl345.py new file mode 100644 index 0000000..034b647 --- /dev/null +++ b/software/libraries/adxl345.py @@ -0,0 +1,53 @@ +#https://github.com/DFRobot/micropython-dflib/blob/master/ADXL345/user_lib/ADXL345.py +from machine import Pin,I2C +import math +import time + +device = const(0x53) +regAddress = const(0x32) +TO_READ = 6 +buff = bytearray(6) + +class ADXL345: + def __init__(self,i2c,addr=device): + self.addr = addr + self.i2c = i2c + b = bytearray(1) + b[0] = 0 + self.i2c.writeto_mem(self.addr,0x2d,b) + b[0] = 16 + self.i2c.writeto_mem(self.addr,0x2d,b) + b[0] = 8 + self.i2c.writeto_mem(self.addr,0x2d,b) + + @property + def xValue(self): + buff = self.i2c.readfrom_mem(self.addr,regAddress,TO_READ) + x = (int(buff[1]) << 8) | buff[0] + if x > 32767: + x -= 65536 + return x + + @property + def yValue(self): + buff = self.i2c.readfrom_mem(self.addr,regAddress,TO_READ) + y = (int(buff[3]) << 8) | buff[2] + if y > 32767: + y -= 65536 + return y + + @property + def zValue(self): + buff = self.i2c.readfrom_mem(self.addr,regAddress,TO_READ) + z = (int(buff[5]) << 8) | buff[4] + if z > 32767: + z -= 65536 + return z + + def RP_calculate(self,x,y,z): + roll = math.atan2(y , z) * 57.3 + pitch = math.atan2((- x) , math.sqrt(y * y + z * z)) * 57.3 + return roll,pitch + + + diff --git a/software/libraries/files.py b/software/libraries/files.py new file mode 100644 index 0000000..38efd56 --- /dev/null +++ b/software/libraries/files.py @@ -0,0 +1,96 @@ +import sys +def savetofile(pointstosave): # the points to save should have format of [[light, pot],[light,pot]] + import os + if(os.listdir().count('data.py')): + import data + datapoints=[] + del sys.modules["data"] + import data + try: + datapoints=data.points + datapoints.append(pointstosave) + except: + datapoints.append(pointstosave) + del sys.modules["data"] + #getting ready to reimporting data file + else: + datapoints=[] + datapoints.append(pointstosave) + print("new file") + + #writing files to the data.py + f=open("data.py","w") + f.write("points="+str(datapoints)+"\r\n") + f.close() + + +def cleardatafile(): # the points to save should have format of [[light, pot],[light,pot]] + import os + f=open("data.py","w") + f.write("points=[]") + f.close() + + +def replacefile(pointstosave): + import os + if(os.listdir().count('data.py')): + f=open("data.py","w") + f.write("points="+str(pointstosave)+"\r\n") + f.close() + else: + return 0 + + + +def readfile(): + + import os + if(os.listdir().count('data.py')): + import data + if(data.points): + return(data.points) + else: + print("returning blank") + return([]) + else: + return([]) + + #also make this go home + +def resetlog(): + import os + try: + os.remove("log.py") + except: + print("no log file to remove") + +def resetprefs(): + f=open("prefs.py","w") + f.write("log=False\r\n") + f.close() + + + +def setprefs(): + f=open("prefs.py","w") + f.write("log=True\r\n") + f.close() + + +def savetolog(*logtext): # the points to save should have format of [[light, pot],[light,pot]] + import os + if(os.listdir().count('log.py')): + f=open("log.py","a") + try: + print("writing",logtext) + f.write(str(logtext)+"\r\n") + except: + print("errr") + else: + f=open("log.py","w") + f.write(str(logtext)+",") + + #writing files to the data.py + f.close() + + diff --git a/software/libraries/icons.py b/software/libraries/icons.py new file mode 100644 index 0000000..f60745b --- /dev/null +++ b/software/libraries/icons.py @@ -0,0 +1,241 @@ +import framebuf +import ssd1306 +from machine import Pin +import time + + +MAX_BATTERY=2900 +MIN_BATTERY=2600 + + +#define icons here +def createIcons(iconSize, iconFrames, offsetx=0, offsety=0, direction=0): + icons=[] + padding=2 + spacingV=int((screenHeight - iconSize*len(iconFrames))/ (len(iconFrames))-padding) + spacingH=int((screenWidth - iconSize*len(iconFrames))/ (len(iconFrames))-padding) + + for i in range(len(iconFrames)): + Iconx=offsetx + i * (spacingH + iconSize) * ((direction+1)%2) + Icony=offsety# + i * (spacingV + iconSize) * ((direction)%2) + + icons.append([Iconx,Icony,iconFrames[i]]) + return icons + + + + +#Homescreen icons +Icons=[] + +screenWidth=128 +screenHeight=120 + +#logo +fb_SMLOGO = framebuf.FrameBuffer(bytearray(b'\x00\x00\x01\xe0\x00\x00\x00\x00\x00\xe1\xe1\xc0\x00\x00\x00\x00\xff\xff\xc0\x00\x00\x00\x00\xff\xff\xc0\x00\x00\x00\x11\xff\xff\xe2\x00\x00\x00\x7f\xff\xff\xff\x80\x00\x00\x7f\xff\xff\xff\x80\x00\x00?\xff\xff\xff\x00\x00\x00\x7f\xff\xff\xff\x80\x00\x06\xff\xf0\x03\xff\xd8\x00\x07\xff\xc0\x00\xff\xf8\x00\x0f\xff\x00\x00?\xfc\x00\x07\xfe\x00\x00\x1f\xf8\x00\x07\xfc\x00\x00\x0f\xf8\x00\x07\xf8\x00\x00\x07\xf8\x00\x0f\xf0\x00\x00\x03\xfc\x00\x7f\xe0\x00\x00\x01\xff\x80\x7f\xe0\x00\x00\x01\xff\x80\x7f\xc0\x00\x00\x00\xff\x80?\xc0\x00\x00\x00\xff\x00\x1f\xc0\x00\x00\x00\xfe\x00\x1f\x80\x00\x00\x00~\x00?\x80\x00\x00\x00\x7f\x00\xff\x80\x00\x079\xbf\xc0\xff\x80\x00\x07\xbd\xbf\xc0\xff\x80\x00\x079\xbf\xc0\xff\x80\x00\x00\x00\xff\xc0?\x80\x00\x00\x00\x7f\x00\x1f\xc0\x00\x00\x00\xfe\x00\x1f\xc0\x00\x00\x00\xfe\x00?\xc0\x00\x00\x00\xff\x00\x7f\xe0\x00\x00\x01\xff\x80\x7f\xe0\x00\x00\x01\xff\x80\x7f\xf0\x00\x00\x03\xff\x80\x0f\xf0\x00\x00\x03\xfc\x00\x07\xf8\x00\x00\x07\xf8\x00\x03\xfc\x00\x00\x0f\xf0\x00\x07\xff\x00\x00?\xf8\x00\x0f\xff\x80\x00\x7f\xfc\x00\x07\xff\xe0\x01\xff\xf8\x00\x06\x7f\xfe\x1f\xff\x98\x00\x00?\xff\xff\xff\x00\x00\x00?\xff\xff\xff\x00\x00\x00\x7f\xff\xff\xff\x80\x00\x00{\xff\xff\xf7\x80\x00\x00\x10\xff\xff\xc2\x00\x00\x00\x00\xff\xff\xc0\x00\x00\x00\x00\xf1\xe3\xc0\x00\x00\x00\x00\xe1\xe1\xc0\x00\x00\x00\x00\x01\xe0\x00\x00\x00'), 50, 50, framebuf.MONO_HLSB) +#plug +fb_battcharging = framebuf.FrameBuffer(bytearray(b'?\xff\xff\xc0 \x00\x00@ \x06\x00@\xe0\x07\x80@\xe0\x7f\xc0@\xe0<\x00@\xe0\x0c\x00@ \x00\x00@ \x00\x00@?\xff\xff\xc0'), 26, 10, framebuf.MONO_HLSB) +fb_batthigh = framebuf.FrameBuffer(bytearray(b'?\xff\xff\xc0 \x00\x00@/\xef\xef@\xef\xef\xef@\xef\xef\xef@\xef\xef\xef@\xef\xef\xef@/\xef\xef@ \x00\x00@?\xff\xff\xc0'), 26, 10, framebuf.MONO_HLSB) +fb_battmid = framebuf.FrameBuffer(bytearray(b'?\xff\xff\xc0 \x00\x00@ \x0f\xef@\xe0\x0f\xef@\xe0\x0f\xef@\xe0\x0f\xef@\xe0\x0f\xef@ \x0f\xef@ \x00\x00@?\xff\xff\xc0'), 26, 10, framebuf.MONO_HLSB) +fb_battlow = framebuf.FrameBuffer(bytearray(b'?\xff\xff\xc0 \x00\x00@ \x00\x0f@\xe0\x00\x0f@\xe0\x00\x0f@\xe0\x00\x0f@\xe0\x00\x0f@ \x00\x0f@ \x00\x00@?\xff\xff\xc0'), 26, 10, framebuf.MONO_HLSB) + +#HomeScreen Icons +fb_Train = framebuf.FrameBuffer(bytearray(b'\xff\xff\xff\xff\x80\x00\x00\x01\x80\x00\x00\x01\x80\x00\x00\x01\x80\x00\x00\x01\x80\xff\xff\x01\x80\x80\x01\x01\x80\x80\x01\x01\x80\xe0\x01\x01\x81\xf0\x01\x01\x81\xf8\x01\x01\x83\xf8\x01\x01\x81\xf0\x01\x01\x81\xf0A\x01\x80\xe0\x81\x01\x81\xf1\x81\x01\x83\xfb\x01\x01\x83\xfe\x01\x01\x87\xfc\x01\x01\x87\xfc\x01\x01\x87\xff\xff\x01\x87\xfc\x00\x01\x87\xfc\x00\x01\x87\xfc\x00\x01\x83\xf8\x00\x01\x83\xf8\x00\x01\x81\xf0\x00\x01\x80\x00\x00\x01\x80\x00\x00\x01\x80\x00\x00\x01\x80\x00\x00\x01\xff\xff\xff\xff'), 32, 32, framebuf.MONO_HLSB) +fb_Setting = framebuf.FrameBuffer(bytearray(b'\xff\xff\xff\xff\x80\x00\x00\x01\x80\x00\x00\x01\x80\x00\x00\x01\x80\x01\xc3\x01\x81\xc3\xc7\x81\x81\xe3\xc7\x81\x81\xff\xef\x01\x81\xff\xff\x01\x80\xff\xfe\x19\x80\xfc?9\xb9\xf0\x0f\xfd\xbf\xe0\x07\xfd\xbf\xc0\x03\xfd\xbf\x80\x01\xf1\x93\x80\x01\xc1\x83\x80\x01\xc1\x83\x80\x01\xd9\xbf\x80\x01\xfd\xbf\xc0\x03\xfd\xbf\xe0\x07\xfd\xbf\xf0\x0f\x85\x9c\xfc?\x01\x80\x7f\xff\x81\x80\x7f\xff\xc1\x80\xff\xef\x81\x81\xf1\xe3\x81\x81\xe3\xe1\x81\x80\xe3\xe0\x01\x80\x00\x00\x01\x80\x00\x00\x01\xff\xff\xff\xff'), 32, 32, framebuf.MONO_HLSB) +fb_Play = framebuf.FrameBuffer(bytearray(b'\xff\xff\xff\xff\x80\x00\x00\x01\x81\x00\x00\x01\x81\xc0\x00\x01\x81\xe0\x00\x01\x81\xf8\x00\x01\x81\xfe\x00\x01\x81\xff\x00\x01\x81\xff\xc0\x01\x81\xff\xe0\x01\x81\xff\xf8\x01\x81\xff\xfe\x01\x81\xff\xff\x01\x81\xff\xff\xc1\x81\xff\xff\xe1\x81\xff\xff\xf9\x81\xff\xff\xe1\x81\xff\xff\xc1\x81\xff\xff\x01\x81\xff\xfe\x01\x81\xff\xf8\x01\x81\xff\xe0\x01\x81\xff\xc0\x01\x81\xff\x00\x01\x81\xfe\x00\x01\x81\xf8\x00\x01\x81\xe0\x00\x01\x81\xc0\x00\x01\x81\x00\x00\x01\x80\x00\x00\x01\x80\x00\x00\x01\xff\xff\xff\xff'), 32, 32, framebuf.MONO_HLSB) + +#TrainScreen Icons +fb_add = framebuf.FrameBuffer(bytearray(b'\xff\xff\xff\x80\x80\x00\x00\x80\x80\x00\x00\x80\x80\x00\x00\x80\x80\x00\x00\x80\x80\x00\x00\x80\x80\x1e\x00\x80\x80\x1e\x00\x80\x80\x1e\x00\x80\x80\x1e\x00\x80\x83\xff\xe0\x80\x83\xff\xe0\x80\x83\xff\xe0\x80\x83\xff\xe0\x80\x80\x1e\x00\x80\x80\x1e\x00\x80\x80\x1e\x00\x80\x80\x1e\x00\x80\x80\x1e\x00\x80\x80\x00\x00\x80\x80\x00\x00\x80\x80\x00\x00\x80\x80\x00\x00\x80\x80\x00\x00\x80\xff\xff\xff\x80'), 25, 25, framebuf.MONO_HLSB) +fb_delete = framebuf.FrameBuffer(bytearray(b'\xff\xff\xff\x80\x80\x00\x00\x80\x80\x00\x00\x80\x8c\x00\x18\x80\x9e\x00<\x80\x8f\x00x\x80\x87\x80\xf0\x80\x83\xc1\xe0\x80\x81\xe3\xc0\x80\x80\xf7\x80\x80\x80\x7f\x00\x80\x80>\x00\x80\x80>\x00\x80\x80\x7f\x00\x80\x80\xff\x80\x80\x81\xe3\xc0\x80\x83\xc1\xe0\x80\x87\x80\xf0\x80\x8f\x00x\x80\x9f\x00|\x80\x8c\x00\x18\x80\x8c\x00\x18\x80\x80\x00\x00\x80\x80\x00\x00\x80\xff\xff\xff\x80'), 25, 25, framebuf.MONO_HLSB) +fb_smallplay = framebuf.FrameBuffer(bytearray(b'\xff\xff\xff\x80\x80\x00\x00\x80\x80\x00\x00\x80\x80\x00\x00\x80\x80\x00\x00\x80\x83\x00\x00\x80\x83\xc0\x00\x80\x83\xf0\x00\x80\x83\xf8\x00\x80\x83\xfe\x00\x80\x83\xff\x80\x80\x83\xff\xc0\x80\x83\xff\xf0\x80\x83\xff\xf0\x80\x83\xff\xc0\x80\x83\xff\x80\x80\x83\xfe\x00\x80\x83\xf8\x00\x80\x83\xf0\x00\x80\x83\xc0\x00\x80\x83\x00\x00\x80\x80\x00\x00\x80\x80\x00\x00\x80\x80\x00\x00\x80\xff\xff\xff\x80'), 25, 25, framebuf.MONO_HLSB) +fb_back = framebuf.FrameBuffer(bytearray(b'\xff\xff\xff\x80\x80\x00\x00\x80\x80\x00\x00\x80\x80\x00\x00\x80\x80\x07\x80\x80\x80\x0f\x80\x80\x80\x1f\x80\x80\x80>\x00\x80\x80~\x00\x80\x80\xf8\x00\x80\x81\xf8\x00\x80\x83\xe0\x00\x80\x83\xe0\x00\x80\x81\xf8\x00\x80\x80\xf8\x00\x80\x80~\x00\x80\x80>\x00\x80\x80\x1f\x80\x80\x80\x0f\x80\x80\x80\x07\x80\x80\x80\x00\x00\x80\x80\x00\x00\x80\x80\x00\x00\x80\x80\x00\x00\x80\xff\xff\xff\x80'), 25, 25, framebuf.MONO_HLSB) + +fb_upnobox = framebuf.FrameBuffer(bytearray(b'\x04\x00\x0e\x00\x1f\x00?\x80\x7f\xc0\xff\xe0\x1f\x00\x1f\x00'), 11, 8, framebuf.MONO_HLSB) +fb_downnobox = framebuf.FrameBuffer(bytearray(b'\x1f\x00\x1f\x00\xff\xe0\x7f\xc0?\x80\x1f\x00\x0e\x00\x04\x00'), 11, 8, framebuf.MONO_HLSB) + +#PlayScreen Icons +fb_home = framebuf.FrameBuffer(bytearray(b'\xff\xff\xff\x80\x80\x00\x00\x80\x80\x00\x00\x80\x80>\x00\x80\x80\x7f\x00\x80\x80\xff\x80\x80\x81\xff\xc0\x80\x83\xff\xe0\x80\x87\xff\xf0\x80\x8f\xff\xf8\x80\x9f\xff\xfc\x80\xbf\xff\xfe\x80\x87\xff\xf0\x80\x87\xff\xf0\x80\x87\xc1\xf0\x80\x87\xc1\xf0\x80\x87\xc1\xf0\x80\x87\xc1\xf0\x80\x87\xc1\xf0\x80\x87\xc1\xf0\x80\x87\xc1\xf0\x80\x80\x00\x00\x80\x80\x00\x00\x80\x80\x00\x00\x80\xff\xff\xff\x80'), 25, 25, framebuf.MONO_HLSB) +fb_pause = framebuf.FrameBuffer(bytearray(b'\xff\xff\xff\x80\x80\x00\x00\x80\x80\x00\x00\x80\x80\x00\x00\x80\x80\x00\x00\x80\x80\x00\x00\x80\x80\xc1\x80\x80\x81\xe3\xc0\x80\x81\xe3\xc0\x80\x81\xe3\xc0\x80\x81\xe3\xc0\x80\x81\xe3\xc0\x80\x81\xe3\xc0\x80\x81\xe3\xc0\x80\x81\xe3\xc0\x80\x81\xe3\xc0\x80\x81\xe3\xc0\x80\x81\xe3\xc0\x80\x80\xc1\x80\x80\x80\x00\x00\x80\x80\x00\x00\x80\x80\x00\x00\x80\x80\x00\x00\x80\x80\x00\x00\x80\xff\xff\xff\x80'), 25, 25, framebuf.MONO_HLSB) +fb_save = framebuf.FrameBuffer(bytearray(b'\xff\xff\xff\x80\x80\x00\x00\x80\x80\x00\x00\x80\x9f\xfe\x00\x80\x9f\xff\x00\x80\x9c\x03\x80\x80\x9c\x03\xc0\x80\x9c\x03\xe0\x80\x9c\x03\xf0\x80\x9c\x03\xf8\x80\x9f\xff\xf8\x80\x9f\xff\xfc\x80\x9f\xff\xfc\x80\x9f\xfc<\x80\x9f\xf8\x1c\x80\x9f\xf8\x1c\x80\x9f\xf8\x1c\x80\x9f\xf8\x1c\x80\x9f\xfc<\x80\x9f\xff\xfc\x80\x9f\xff\xfc\x80\x9f\xff\xfc\x80\x80\x00\x00\x80\x80\x00\x00\x80\xff\xff\xff\x80'), 25, 25, framebuf.MONO_HLSB) +fb_toggle = framebuf.FrameBuffer(bytearray(b'\xff\xff\xff\x80\x80\x00\x00\x80\x80\x00\x00\x80\x80\x00\x00\x80\x80\x00\x00\x80\x87\xf8\x00\x80\x84\x08\x00\x80\x84\x08\x00\x80\x84\x08\x00\x80\x84\x08\x00\x80\x84\x08\x00\x80\x9c\x0f\xfc\x80\x9c\x0f\xfc\x80\x9c\x0f\xfc\x80\x84\x08\x00\x80\x84\x08\x00\x80\x84\x08\x00\x80\x84\x08\x00\x80\x84\x08\x00\x80\x87\xf8\x00\x80\x80\x00\x00\x80\x80\x00\x00\x80\x80\x00\x00\x80\x80\x00\x00\x80\xff\xff\xff\x80'), 25, 25, framebuf.MONO_HLSB) +fb_settings_small = framebuf.FrameBuffer(bytearray(b'\xff\xff\xff\x80\x80\x00\x00\x80\x80\x00\x00\x80\x87\x00\x00\x80\x9d\xff\xfc\x80\x9d\xff\xfc\x80\x9d\xff\xfc\x80\x87\x00\x00\x80\x80\x00\x00\x80\x80\x00\x00\x80\x80\x07\x00\x80\x9f\xfd\xfc\x80\x9f\xfd\xfc\x80\x9f\xfd\xfc\x80\x80\x07\x00\x80\x80\x00\x00\x80\x80\x00\x00\x80\x80p\x00\x80\x9f\xdf\xfc\x80\x9f\xdf\xfc\x80\x9f\xdf\xfc\x80\x80p\x00\x80\x80\x00\x00\x80\x80\x00\x00\x80\xff\xff\xff\x80'), 25, 25, framebuf.MONO_HLSB) +fb_help = framebuf.FrameBuffer(bytearray(b'\xff\xff\xff\x80\x80\x00\x00\x80\x80\x00\x00\x80\x80\x00\x00\x80\x87\xff\xf0\x80\x87\xff\xf0\x80\x87\xff\xf0\x80\x80\x00p\x80\x80\x00p\x80\x80\x00p\x80\x80\x7f\xf0\x80\x80\x7f\xf0\x80\x80\x7f\xf0\x80\x80p\x00\x80\x80p\x00\x80\x80p\x00\x80\x80p\x00\x80\x80\x00\x00\x80\x80\x00\x00\x80\x80p\x00\x80\x80p\x00\x80\x80p\x00\x80\x80\x00\x00\x80\x80\x00\x00\x80\xff\xff\xff\x80'), 25, 25, framebuf.MONO_HLSB) + +#load saved files +fb_prev = framebuf.FrameBuffer(bytearray(b'\xff\xff\xff\x80\x80\x00\x00\x80\x80\x00\x00\x80\x80\x00\x00\x80\x80\x07\x80\x80\x80\x0f\x80\x80\x80\x1f\x80\x80\x80>\x00\x80\x80~\x00\x80\x80\xf8\x00\x80\x81\xff\xf0\x80\x83\xff\xf0\x80\x83\xff\xf0\x80\x81\xff\xf0\x80\x80\xf8\x00\x80\x80~\x00\x80\x80>\x00\x80\x80\x1f\x80\x80\x80\x0f\x80\x80\x80\x07\x80\x80\x80\x00\x00\x80\x80\x00\x00\x80\x80\x00\x00\x80\x80\x00\x00\x80\xff\xff\xff\x80'), 25, 25, framebuf.MONO_HLSB) +fb_next = framebuf.FrameBuffer(bytearray(b'\xff\xff\xff\x80\x80\x00\x00\x80\x80\x00\x00\x80\x80\x00\x00\x80\x80\x00\x00\x80\x80\xf0\x00\x80\x80\xf8\x00\x80\x80\xfc\x00\x80\x80>\x00\x80\x80?\x00\x80\x80\x0f\x80\x80\x87\xff\xc0\x80\x87\xff\xe0\x80\x87\xff\xe0\x80\x87\xff\xc0\x80\x80\x0f\x80\x80\x80?\x00\x80\x80>\x00\x80\x80\xfc\x00\x80\x80\xf8\x00\x80\x80\xf0\x00\x80\x80\x00\x00\x80\x80\x00\x00\x80\x80\x00\x00\x80\xff\xff\xff\x80'), 25, 25, framebuf.MONO_HLSB) +fb_load = framebuf.FrameBuffer(bytearray(b'\xff\xff\xff\x80\x80\x00\x00\x80\x80\x7f\x00\x80\x80\x7f\x00\x80\x80\x7f\x00\x80\x80\x7f\x00\x80\x80\x7f\x00\x80\x80\x7f\x00\x80\x80\x7f\x00\x80\x80\x7f\x00\x80\x83\xff\xe0\x80\x81\xff\xc0\x80\xb0\xff\x86\x80\xb0\xff\x86\x80\xb0\x7f\x06\x80\xb0>\x06\x80\xb0>\x06\x80\xb0\x1c\x06\x80\xb0\x18\x06\x80\xb0\x08\x06\x80\xbf\xff\xfe\x80\xbf\xff\xfe\x80\x80\x00\x00\x80\x80\x00\x00\x80\xff\xff\xff\x80'), 25, 25, framebuf.MONO_HLSB) + +#Settings +fb_BIGHome = framebuf.FrameBuffer(bytearray(b'\xff\xff\xff\xff\x80\x00\x00\x01\x80\x00\x00\x01\x80\x00\x00\x01\x80\x00\x00\x01\x80\x01\x80\x01\x80\x03\xc0\x01\x80\x0f\xf0\x01\x80\x1f\xf8\x01\x80?\xfc\x01\x80\x7f\xfe\x01\x81\xff\xff\x81\x83\xff\xff\xc1\x87\xff\xff\xe1\x8f\xff\xff\xf1\x83\xff\xff\xc1\x83\xff\xff\xc1\x83\xff\xff\xc1\x83\xd7\x1f\xc1\x83\xff\x1f\xc1\x83\xd7\x1f\xc1\x83\xff\x1f\xc1\x83\xff\x1f\xc1\x83\xff\x1f\xc1\x80\x00\x00\x01\x80\x00\x00\x01\x80\x00\x00\x01\x80\x00\x00\x01\x80\x00\x00\x01\x80\x00\x00\x01\x80\x00\x00\x01\xff\xff\xff\xff'), 32, 32, framebuf.MONO_HLSB) +fb_Lead = framebuf.FrameBuffer(bytearray(b'\xff\xff\xff\xff\x80\x00\x00\x01\x80\x00\x00\x01\x80\x00\x00\x01\x80\x00\x00\x01\x80\x00\x00\x01\x83\xf8\x0f\xe1\x82\x00\x00!\x82\x00\x00!\x82x\x0f!\x82@\x01!\x82@\x01!\x82L\x19!\x82H\t!\x82I\xc9!\x82I\xc9!\x82I\xc9!\x82H\x89!\x82@\x81!\x82@\x81!\x82p\x87!\x82\x00\x80!\x82\x00\x80!\x83\xf8\x8f\xe1\x80\x00\x80\x01\x80\x00\x80\x01\x80\x00\x80\x01\x80\x00\x00\x01\x80\x00\x00\x01\x80\x00\x00\x01\x80\x00\x00\x01\xff\xff\xff\xff'), 32, 32, framebuf.MONO_HLSB) +fb_Follow = framebuf.FrameBuffer(bytearray(b'\xff\xff\xff\xff\x80\x00\x00\x01\x80\x00\x00\x01\x80\x00\x00\x01\x80\x00\x00\x01\x80\x00\x00\x01\x80\x00\x00\x01\x80\x00\x00\x01\x80\x00\x00\x01\x80\x00\x00\x01\x80\x07\xe0\x01\x80\x0c0\x01\x80\x10\x08\x01\x80 \x04\x01\x80@\x02\x01\x80@\x02\x01\x80@\x02\x01\x80@\x02\x01\x80@\x02\x01\x80 \x04\x01\x80\x10\x0c\x01\x80\x0c<\x01\x80\x07\xee\x01\x80\x00\x07\x01\x80\x00\x03\x81\x80\x00\x01\x01\x80\x00\x00\x01\x80\x00\x00\x01\x80\x00\x00\x01\x80\x00\x00\x01\x80\x00\x00\x01\xff\xff\xff\xff'), 32, 32, framebuf.MONO_HLSB) + +#Add the icons to the array, add iconsizes add the direction 0 - horizontal , 1 - vertical +#iconFrames=[[fb_Train,fb_Play,fb_Setting],[fb_add,fb_delete,fb_smallplay,fb_home],[fb_save,fb_pause,fb_home,fb_toggle],[fb_next,fb_delete,fb_home,fb_toggle],[fb_Lead,fb_Follow, fb_BIGHome]] +#iconFrames=[[fb_Train,fb_Play],[fb_add,fb_delete,fb_smallplay,fb_home],[fb_save,fb_home],[fb_next,fb_delete,fb_home]] +#iconFrames=[[fb_Train,fb_Play],[fb_smallplay,fb_add,fb_delete,fb_settings_small,fb_help],[fb_next,fb_delete,fb_home]] +iconFrames=[[fb_Train,fb_Play],[fb_smallplay,fb_add,fb_delete],[fb_next,fb_delete,fb_home]] + +iconSize=[32,25,25] +offsets= [(20,20),(102,29),(102,29)] #where you want to display your first icon +direction=[0,1,1] # 0 horizonal and 1 vertical arrangement + +for index,icon in enumerate(iconFrames): + icons = createIcons(iconSize[index], icon, offsetx=offsets[index][0], offsety=offsets[index][1] , direction=direction[index]) + Icons.append(icons) + +class SSD1306_SMART(ssd1306.SSD1306_I2C): + def __init__(self, width, height, i2c, addr=0x3C, external_vcc=False, scale = 8, mode = 0, plotsize = [[3,3],[100,60]]): + self.scale = scale + self.mode = mode # 0 for learn, 1 for repeat + self.plotsize = plotsize + self.ranges = {'light': [0, 4095], 'motor': [0, 180], 'screenx': [4, 96], 'screeny': [59, 5]} # screeny is backwards because it is from top to bottom + #Battery + + super().__init__(width, height, i2c, addr = 0x3C, external_vcc = external_vcc) + self.graphx=27 + self.graphy=0 + + self.iconx=0 + self.icony=29 + + self.color=0 + self.graphwidthx=100 + self.graphwidthy=64 + self.iconwidth=25 + self.updownwidth=10 + self.updownx=self.iconx+int(self.iconwidth/2-self.updownwidth/2) + self.upy=self.icony - self.updownwidth + self.downy=self.icony + self.iconwidth + 1 + + + + + + def displayscreen(self,whereamI): + for IconX,IconY,frame in Icons[whereamI]: + self.blit(frame, IconX, IconY, 0) + self.show() + + def selector(self,whereamI,icon,previcon): + print("icon and previcon and whereamI",icon,previcon, whereamI) + if(direction[whereamI]==0): + padding=3 + width=height=iconSize[whereamI]+2*padding + self.rect(Icons[whereamI][previcon][0]-padding,Icons[whereamI][previcon][1]-padding,width,height, 0) #delete previous selector icon + self.rect(Icons[whereamI][icon][0]-padding,Icons[whereamI][icon][1]-padding,width,height, 1) #display current selector + self.displayscreen(whereamI) + else: + + + self.fill_rect(self.iconx,self.icony,self.iconwidth,self.iconwidth, 0) #delete previous selector icon + self.blit(fb_upnobox,self.updownx,self.upy,0) + self.blit(Icons[whereamI][icon][2], self.iconx, self.icony, 0) + self.blit(fb_downnobox,self.updownx,self.downy,0) + #self.blit(fb_DOWN, 102, 49, 0) + self.show() + + + def showbattery(self, batterylevel): + length=20 + width=6 + gap=2 + # if ( batterylevel>3000): #charging + if(batterylevel=="charging"): #charging + self.blit(fb_battcharging,self.iconx,0,0) + elif(batterylevel=="full"): #full charge + self.blit(fb_batthigh,self.iconx,0,0) + elif(batterylevel=="half"): #medium charge + self.blit(fb_battmid,self.iconx,0,0) + elif(batterylevel=="low"): # low charge + self.blit(fb_battlow,self.iconx,0,0) + else: + pass + self.show() + + + + def transform(self,initial, final, value): + initial = self.ranges[initial] + final = self.ranges[final] + return int((final[1]-final[0]) / (initial[1]-initial[0]) * (value - initial[0]) + final[0]) + + def graph(self, oldpoint,point, points, color): + if color==self.color: + #don't change + pass + else: + self.color=color + #clear the screen with the latest color + self.fill_rect(self.graphx,self.graphy,self.graphwidthx,self.graphwidthy,color) + + + rectsize=8 + dotsize=4 + + + ox,oy=oldpoint + x,y=point + + ox=self.transform('light', 'screenx', ox)+self.graphx + oy=self.transform('motor','screeny',oy)+self.graphy + x=self.transform('light', 'screenx', x)+self.graphx + y=self.transform('motor','screeny',y)+self.graphy + self.rect(self.graphx,self.graphy,self.graphwidthx,self.graphwidthy,(color+1)%2) + + self.line(self.graphx,oy,self.graphx+self.graphwidthx,oy,color%2) + self.line(ox,self.graphy,ox,self.graphy+self.graphwidthy,color%2) + + self.line(self.graphx,y,self.graphx+self.graphwidthx,y,(color+1)%2) + self.line(x,self.graphy,x,self.graphy+self.graphwidthy,(color+1)%2) + + self.rect(ox-int(rectsize/2),oy-int(rectsize/2),rectsize,rectsize,color%2) + self.rect(x-int(rectsize/2),y-int(rectsize/2),rectsize,rectsize,(color+1)%2) + for i in points: + x,y=i + x=self.transform('light', 'screenx', x)+self.graphx + y=self.transform('motor','screeny',y)+self.graphy + self.fill_rect(x-int(dotsize/2),y-int(dotsize/2),dotsize,dotsize,(color+1)%2) + self.show() + + def cleargraph(self): + self.fill_rect(self.graphx+1,self.graphy+1,self.graphwidthx-2,self.graphwidthy-2,0) + self.rect(self.graphx,self.graphy,self.graphwidthx,self.graphwidthy,1) + + + + def showmessage(self,msg): + + self.fill_rect(30,10,90,50,0) + self.text(msg,35,20,1) + self.show() + time.sleep(2) + self.fill_rect(30,10,90,50,0) + #self.fill(0) + self.show() + + + def welcomemessage(self): + self.fill(1) + self.show() + + for a in range(32): + i=2*a + self.fill_rect(64-(2*i),32-i,4*i,2*i,0) + self.blit(fb_SMLOGO,39,7,0) + self.show() + ''' + for i in range(80): + self.fill(0) + self.blit(fb_SMLOGO,38-i,7,0) + self.blit(fb_SMLOGO,40+i,7,0) + self.show() + ''' + self.clear() + self.show() + + def clear(self): + self.fill(0) + self.show() + diff --git a/software/libraries/sensors.py b/software/libraries/sensors.py new file mode 100644 index 0000000..96cd1e8 --- /dev/null +++ b/software/libraries/sensors.py @@ -0,0 +1,146 @@ +from machine import Pin,I2C +from machine import Pin, SoftI2C, PWM, ADC +import adxl345 +import time + +i2c = SoftI2C(scl = Pin(7), sda = Pin(6)) + + + +class SENSORS: + def __init__(self,connection=i2c): + self.i2c=connection + self.adx = adxl345.ADXL345(self.i2c) + + self.initial = [0, 4095] + self.final = [0, 180] + self.pot = ADC(Pin(3)) + self.pot.atten(ADC.ATTN_11DB) # the pin expects a voltage range up to 3.3V + + + self.attached = False + self.selectsensor() + + time.sleep(1) + if self.attached: + self.light = ADC(Pin(5)) + self.light.atten(ADC.ATTN_11DB) # the pin expects a voltage range up to 3.3V\ + + + + self.battery = ADC(Pin(4)) + self.battery.atten(ADC.ATTN_11DB) # the pin expects a voltage range up to 3.3V + + self.x=None + self.y=None + self.z=None + self.roll=None + self.pitch=None + + def selectsensor(self): + p_digital = Pin(5, Pin.OUT) #set pin as digital + p_digital.value(0) #set pin low + p_analog = ADC(Pin(5)) # set pin 5 to analog + p_analog.atten(ADC.ATTN_11DB) # the pin expects a voltage range up to 3.3V + low = p_analog.read() # read value + + + p_digital = Pin(5, Pin.OUT) #set pin as digital + p_digital.value(1) #set pin high + p_analog = ADC(Pin(5)) #set pin to analog + p_analog.atten(ADC.ATTN_11DB) # the pin expects a voltage range up to 3.3V + high = p_analog.read() #read value + + if(low<200 and high>4000): + self.attached = False + + else: + self.attached = True + + def map_angle_to_range(self, angle): + if 90 <= angle <= 180: + # Normalize range 90 to 180 to 0 to 90 + normalized_angle = angle - 90 + elif -180 <= angle < -90: + # Normalize range -180 to -90 to 90 to 180 + normalized_angle = angle + 270 # -180 + 270 = 90, -90 + 270 = 180 + elif -90 <= angle < 0: + return 4095 + else: + return 0 + # Now normalized_angle is in the range [0, 180] + # Map this range to [0, 4095] + mapped_value = int((normalized_angle / 180.0) * 4095) + + + return mapped_value + def readlight(self): + return self.light.read() + + + def readpot(self): + return self.pot.read() + + def accel(self): + self.x =self.adx.xValue + self.y =self.adx.yValue + self.z =self.adx.zValue + + + def readaccel(self): + self.accel() + return self.x, self.y,self.z + + def readroll(self): + self.accel() + self.roll, self.pitch = self.adx.RP_calculate(self.x,self.y,self.z) + return self.roll,self.pitch + + + def readpoint(self): + l=[] + p=[] + + for i in range(100): + if self.attached: + l.append(self.readlight()) + p.append(self.readpot()) + + l.sort() + p.sort() + if self.attached: + l=l[30:60] + avlight=sum(l)/len(l) + else: + avlight = self.map_angle_to_range(self.readroll()[0]) + + p=p[30:60] + avpos=sum(p)/len(p) + + point = avlight, self.mappot(avpos) + return point + + def mappot(self, value): + return int((self.final[1]-self.final[0]) / (self.initial[1]-self.initial[0]) * (value - self.initial[0]) + self.final[0]) + + def readbattery(self): + batterylevel=self.battery.read() + + if(batterylevel>2850): #charging + return 'charging' + + elif(batterylevel>2700 and batterylevel <2875): #full charge + return 'full' + elif(batterylevel>2500 and batterylevel <2700): #medium charge + return 'half' + elif(batterylevel<2500): # low charge + return 'low' + else: + pass + + return "" + + + + + diff --git a/software/libraries/servo.py b/software/libraries/servo.py new file mode 100644 index 0000000..7237b17 --- /dev/null +++ b/software/libraries/servo.py @@ -0,0 +1,33 @@ +from machine import PWM +import math + +# originally by Radomir Dopieralski http://sheep.art.pl +# from https://bitbucket.org/thesheep/micropython-servo + +class Servo: + def __init__(self, pin, freq=50, min_us=600, max_us=2400, angle=180): + self.min_us = min_us + self.max_us = max_us + self.us = 0 + self.freq = freq + self.angle = angle + self.pwm = PWM(pin, freq=freq, duty=0) + + def write_us(self, us): + #"""Set the signal to be ``us`` microseconds long. Zero disables it.""" + if us == 0: + self.pwm.duty(0) + return + us = min(self.max_us, max(self.min_us, us)) + duty = us * 1024 * self.freq // 1000000 + self.pwm.duty(duty) + + def write_angle(self, degrees=None, radians=None): + #"""Move to the specified angle in ``degrees`` or ``radians``.""" + if degrees is None: + degrees = math.degrees(radians) + degrees = degrees % 360 + total_range = self.max_us - self.min_us + us = self.min_us + total_range * degrees // self.angle + self.write_us(us) + diff --git a/software/libraries/smartmotor.py b/software/libraries/smartmotor.py new file mode 100644 index 0000000..b2649a9 --- /dev/null +++ b/software/libraries/smartmotor.py @@ -0,0 +1,444 @@ +from machine import Pin, SoftI2C, PWM, ADC +from files import * +import time +from machine import Timer +import servo +import icons +import os +import sys +import ubinascii +import machine + + +import sensors +sens=sensors.SENSORS() + +#unique name + +ID= ubinascii.hexlify(machine.unique_id()).decode() +numberofIcons=[len(icons.iconFrames[i]) for i in range(len(icons.iconFrames))] #[homescreen, trainscreen, playscreen, playthefilesscreen, settingsscreen] +highlightedIcon=[] +for numberofIcon in numberofIcons: + highlightedIcon.append([0,numberofIcon]) + +screenID=1 +lastPressed=0 +previousIcon=0 +filenumber=0 + + +currentlocaltime=0 +oldlocaltime=0 + +points = [] + + +#Defining all flags +#flags +flags=[False,False,False,False, False] +playFlag=False +triggered=False +LOGGING=True + + +#switch flags +switch_state_up = False +switch_state_down = False +switch_state_select = False + +last_switch_state_up = False +last_switch_state_down = False +last_switch_state_select = False + +switched_up = False +switched_down = False +switched_select = False + + +#mainloop flags +clearscreen=False + + +#define buttons , sensors and motors +#servo +s = servo.Servo(Pin(2)) + +#nav switches +switch_down = Pin(8, Pin.IN) +switch_select = Pin(9, Pin.IN) +switch_up= Pin(10, Pin.IN) + +i2c = SoftI2C(scl = Pin(7), sda = Pin(6)) +display = icons.SSD1306_SMART(128, 64, i2c,switch_up) + + +#highlightedIcon=[(ICON,TotalIcons),...] +#screenID gives the SCREEN number I am at +#SCREENID +#0 - HOMESCREEN +#1 - PlaySCREEN +#2 - TrainSCREEN +#3 - Playthefiles +#4 - ConnectSCREEN + +#highligtedIcons[screenID][0] which icon is highlighted on screenID screen +#highligtedIcons[screenID][0] =1 # First Icon selected +#highligtedIcons[screenID][0] =2 #Second +#highligtedIcons[screenID][0] =3 #Third + +#interrupt functions + +def downpressed(count=-1): + global playFlag + global triggered + playFlag = False + time.sleep(0.05) + if(time.ticks_ms()-lastPressed>200): + displayselect(count) + + triggered=True #log file + + +def uppressed(count=1): + global playFlag + global triggered + playFlag = False + time.sleep(0.05) + if(time.ticks_ms()-lastPressed>200): + displayselect(count) + triggered=True #log file + + +def displayselect(selectedIcon): + global screenID + global highlightedIcon + global lastPressed + global previousIcon + + highlightedIcon[screenID][0]=(highlightedIcon[screenID][0]+selectedIcon)%highlightedIcon[screenID][1] + display.selector(screenID,highlightedIcon[screenID][0],previousIcon) #draw circle at selection position, and remove from the previousIconious position + previousIcon=highlightedIcon[screenID][0] + + lastPressed=time.ticks_ms() + +def selectpressed(): + #declare all global variables, include all flags + global flags + global triggered + time.sleep(0.05) + flags[highlightedIcon[screenID][0]]=True + triggered=True #log file + + + + +def resettohome(): + global screenID + global highlightedIcon + global previousIcon + global clearscreen + screenID=0 + previousIcon=0 + for numberofIcon in numberofIcons: + highlightedIcon.append([0,numberofIcon]) + display.selector(screenID,highlightedIcon[screenID][0],0) + #display.fill(0) # clear screen + clearscreen=True + +def check_switch(p): + global switch_state_up + global switch_state_down + global switch_state_select + + global switched_up + global switched_down + global switched_select + + global last_switch_state_up + global last_switch_state_down + global last_switch_state_select + + switch_state_up = switch_up.value() + switch_state_down = switch_down.value() + switch_state_select = switch_select.value() + + if switch_state_up != last_switch_state_up: + switched_up = True + + elif switch_state_down != last_switch_state_down: + switched_down = True + + elif switch_state_select != last_switch_state_select: + switched_select = True + + if switched_up: + if switch_state_up == 0: + uppressed() + switched_up = False + elif switched_down: + if switch_state_down == 0: + downpressed() + switched_down = False + elif switched_select: + if switch_state_select == 0: + selectpressed() + switched_select = False + + last_switch_state_up = switch_state_up + last_switch_state_down = switch_state_down + last_switch_state_select = switch_state_select + + + +def displaybatt(p): + batterycharge=sens.readbattery() + display.showbattery(batterycharge) + if LOGGING: + savetolog(time.time(),screenID,highlightedIcon, point,points) + return batterycharge + + + +def nearestNeighbor(data, point): + try: + point = point[0] + except TypeError: + pass + if len(data) == 0: + return 0 + diff = 10000 + test = None + for i in data: + if abs(i[0] - point) <= diff: + diff = abs(i[0] - point) + test = i + return (point, test[1]) + +def resetflags(): + global flags + for i in range(len(flags)): + flags[i]=False + + +def shakemotor(point): + motorpos=point[1] + for i in range(2): + s.write_angle(min(180,motorpos+5)) + time.sleep(0.1) + s.write_angle(max(0,motorpos-5)) + time.sleep(0.1) + + + + +def readdatapoints(): + datapoints=readfile() + if(datapoints): + numberofdata=len(datapoints) + return datapoints[-1] + else: + return ([]) + + +def setloggingmode(): + #sets pref to log by default + if not switch_down.value() and not switch_up.value() and not switch_select.value(): + resetlog() #delete all the previous logs + setprefs() #sets preference file to True + display.showmessage("LOG: ON") + print("resetting the log file") + + + #resets prefs to not log by default + if not switch_down.value() and not switch_up.value() and switch_select.value(): + resetprefs() #resets preference file to False + print("turn OFF the logging") + display.showmessage("LOG: OFF") + + + if switch_down.value() and switch_up.value() and switch_select.value(): + print("default: turn ON the logging") + + + import prefs + return prefs.log + + + +points=readdatapoints() +if points==[]: + highlightedIcon[1][0]=1 #go to add if there are no data saved + +#setting up Timers +tim = Timer(0) +tim.init(period=50, mode=Timer.PERIODIC, callback=check_switch) +batt = Timer(2) +batt.init(period=3000, mode=Timer.PERIODIC, callback=displaybatt) + + +display.welcomemessage() + + + + + +LOGGING=setloggingmode() + +#setup with homescreen #starts with screenID=0 +display.selector(screenID,highlightedIcon[screenID][0],-1) +oldpoint=[-1,-1] + + +while True: + #log + point = sens.readpoint() + if(triggered): + if LOGGING: + savetolog(time.time(),screenID,highlightedIcon, point,points) + triggered=False + + + #broadcast(point, screenID, highlightedIcon[screenID][0],ID) + + #Homepage + #[fb_Train,fb_Play] + + if(screenID==0): + if(flags[0]): + points=[] #empty the points arrray + screenID=1 + clearscreen=True + display.graph(oldpoint, point, points,0) #normal color + resetflags() + + elif(flags[1]): + screenID=2 + clearscreen=True + datapoints=readfile() + if (datapoints==[]): + display.showmessage("No data saved") + resettohome() + else: + display.graph(oldpoint, point, points,0) #normal color + resetflags() + + # Training Screen + # [fb_add,fb_delete,fb_smallplay,fb_home] + elif(screenID==1): + if(flags[0]): # Play button is pressed + if (points): + playFlag=True + savetofile(points) + shakemotor(point) + #screenID=2 # trigger play screen + #uppressed(count=4) + else: + cleardatafile() + display.showmessage("NO DATA") + resetflags() + + + elif(flags[1]): # add button is pressed + points.append(list(point)) + display.graph(oldpoint, point, points,0) + shakemotor(point) + resetflags() + + elif(flags[2]): #delete button is pressed + if(points): #delete only when there is something + points.pop() + display.cleargraph() + display.graph(oldpoint, point, points,0) + resetflags() + + + elif(flags[3]): # change this to settings icon save button is pressed + # This is where we can put other advanced settings, maybe call the main screen + #screenId=0 + #savetofile(points) + #resettohome() + + print("some settings stuff") + resetflags() + + elif(flags[4]): # help button is prssed + # quit to home + display.showmessage("This is for help whatever you need") + # resettohome() + resetflags() + + + if(playFlag): #only when point is different now + if(not point==oldpoint): #only when point is different now + point = nearestNeighbor(points,point) + s.write_angle(point[1]) + display.graph(oldpoint, point, points,1) #inverted color + + + + else: + if(not point==oldpoint): #only when point is different now + s.write_angle(point[1]) + display.graph(oldpoint, point, points,0) #normal color + + + + + # Load saved files screen + #[fb_next,fb_delete,fb_home,fb_toggle] + elif(screenID==2): + datapoints=readfile() + if(datapoints): + numberofdata=len(datapoints) + points=datapoints[filenumber] + + if(flags[0]): + filenumber=((filenumber+1)%numberofdata) + points=datapoints[filenumber] + display.cleargraph() + resetflags() + + elif(flags[1]): + del datapoints[filenumber] + replacefile(datapoints) + filenumber=0 + display.cleargraph() + resetflags() + + elif(flags[2]): + resettohome() + resetflags() + + if(not point==oldpoint): #only when point is different now + point = nearestNeighbor(points,point) + s.write_angle(point[1]) + display.graph(oldpoint, point, points,0) #normal color + else: + display.showmessage("No files to show") + resettohome() + + # Settings Screen + #[fb_Lead,fb_Follow, fb_BIGHome] + elif(screenID==4): + if(flags[0]): + display.showmessage(ID) + waitforconnection() + resetflags() + + elif(flags[1]): + print("I shall follow you") + resetflags() + + elif(flags[2]): + resettohome() + resetflags() + + oldpoint=point + if clearscreen: + display.fill(0) + display.selector(screenID,highlightedIcon[screenID][0],-1) + clearscreen=False + + + + diff --git a/software/libraries/ssd1306.py b/software/libraries/ssd1306.py new file mode 100644 index 0000000..d046b25 --- /dev/null +++ b/software/libraries/ssd1306.py @@ -0,0 +1,158 @@ +# This is an ssd1306 package we found on the internet. +# https://github.com/stlehmann/micropython-ssd1306/blob/master/ssd1306.py +# Do not edit this file. +# MicroPython SSD1306 OLED driver, I2C and SPI interfaces +from micropython import const +import framebuf + + +# register definitions +SET_CONTRAST = const(0x81) +SET_ENTIRE_ON = const(0xA4) +SET_NORM_INV = const(0xA6) +SET_DISP = const(0xAE) +SET_MEM_ADDR = const(0x20) +SET_COL_ADDR = const(0x21) +SET_PAGE_ADDR = const(0x22) +SET_DISP_START_LINE = const(0x40) +SET_SEG_REMAP = const(0xA0) +SET_MUX_RATIO = const(0xA8) +SET_COM_OUT_DIR = const(0xC0) +SET_DISP_OFFSET = const(0xD3) +SET_COM_PIN_CFG = const(0xDA) +SET_DISP_CLK_DIV = const(0xD5) +SET_PRECHARGE = const(0xD9) +SET_VCOM_DESEL = const(0xDB) +SET_CHARGE_PUMP = const(0x8D) + +# Subclassing FrameBuffer provides support for graphics primitives +# http://docs.micropython.org/en/latest/pyboard/library/framebuf.html +class SSD1306(framebuf.FrameBuffer): + def __init__(self, width, height, external_vcc): + self.width = width + self.height = height + self.external_vcc = external_vcc + self.pages = self.height // 8 + self.buffer = bytearray(self.pages * self.width) + super().__init__(self.buffer, self.width, self.height, framebuf.MONO_VLSB) + self.init_display() + + def init_display(self): + for cmd in ( + SET_DISP | 0x00, # off + # address setting + SET_MEM_ADDR, + 0x00, # horizontal + # resolution and layout + SET_DISP_START_LINE | 0x00, + SET_SEG_REMAP | 0x01, # column addr 127 mapped to SEG0 + SET_MUX_RATIO, + self.height - 1, + SET_COM_OUT_DIR | 0x08, # scan from COM[N] to COM0 + SET_DISP_OFFSET, + 0x00, + SET_COM_PIN_CFG, + 0x02 if self.width > 2 * self.height else 0x12, + # timing and driving scheme + SET_DISP_CLK_DIV, + 0x80, + SET_PRECHARGE, + 0x22 if self.external_vcc else 0xF1, + SET_VCOM_DESEL, + 0x30, # 0.83*Vcc + # display + SET_CONTRAST, + 0xFF, # maximum + SET_ENTIRE_ON, # output follows RAM contents + SET_NORM_INV, # not inverted + # charge pump + SET_CHARGE_PUMP, + 0x10 if self.external_vcc else 0x14, + SET_DISP | 0x01, + ): # on + self.write_cmd(cmd) + self.fill(0) + self.show() + + def poweroff(self): + self.write_cmd(SET_DISP | 0x00) + + def poweron(self): + self.write_cmd(SET_DISP | 0x01) + + def contrast(self, contrast): + self.write_cmd(SET_CONTRAST) + self.write_cmd(contrast) + + def invert(self, invert): + self.write_cmd(SET_NORM_INV | (invert & 1)) + + def show(self): + x0 = 0 + x1 = self.width - 1 + if self.width == 64: + # displays with width of 64 pixels are shifted by 32 + x0 += 32 + x1 += 32 + self.write_cmd(SET_COL_ADDR) + self.write_cmd(x0) + self.write_cmd(x1) + self.write_cmd(SET_PAGE_ADDR) + self.write_cmd(0) + self.write_cmd(self.pages - 1) + self.write_data(self.buffer) + + +class SSD1306_I2C(SSD1306): + def __init__(self, width, height, i2c, addr=0x3C, external_vcc=False): + self.i2c = i2c + self.addr = addr + self.temp = bytearray(2) + self.write_list = [b"\x40", None] # Co=0, D/C#=1 + super().__init__(width, height, external_vcc) + + def write_cmd(self, cmd): + self.temp[0] = 0x80 # Co=1, D/C#=0 + self.temp[1] = cmd + self.i2c.writeto(self.addr, self.temp) + + def write_data(self, buf): + self.write_list[1] = buf + self.i2c.writevto(self.addr, self.write_list) + + +class SSD1306_SPI(SSD1306): + def __init__(self, width, height, spi, dc, res, cs, external_vcc=False): + self.rate = 10 * 1024 * 1024 + dc.init(dc.OUT, value=0) + res.init(res.OUT, value=0) + cs.init(cs.OUT, value=1) + self.spi = spi + self.dc = dc + self.res = res + self.cs = cs + import time + + self.res(1) + time.sleep_ms(1) + self.res(0) + time.sleep_ms(10) + self.res(1) + super().__init__(width, height, external_vcc) + + def write_cmd(self, cmd): + self.spi.init(baudrate=self.rate, polarity=0, phase=0) + self.cs(1) + self.dc(0) + self.cs(0) + self.spi.write(bytearray([cmd])) + self.cs(1) + + def write_data(self, buf): + self.spi.init(baudrate=self.rate, polarity=0, phase=0) + self.cs(1) + self.dc(1) + self.cs(0) + self.spi.write(buf) + self.cs(1) + diff --git a/software/main/.DS_Store b/software/main/.DS_Store new file mode 100644 index 0000000..5008ddf Binary files /dev/null and b/software/main/.DS_Store differ diff --git a/software/main/README.qmd b/software/main/README.qmd new file mode 100644 index 0000000..e4c428f --- /dev/null +++ b/software/main/README.qmd @@ -0,0 +1,40 @@ +--- +format: gfm +execute: + echo: false + message: false + warning: false +editor: visual +editor_options: + chunk_output_type: console +--- + +# Smart System Education Platform + +```{r setup, include=FALSE} +file_path <- fs::path_real("README.qmd") +path_components <- strsplit(file_path, "/")[[1]] +shortened_path <- fs::path_join(path_components[-c(2, 3)]) +``` + +## Location: `r shortened_path` + +### Description + +This directory hosts the main code for the smart modules. + +### Directories & Files + +The repository has the following directory tree. + +```{r} +fs::dir_tree(recurse = 1) +``` + +A short description of the directories can be found below. + +| name | description | contribution | +|--------------|-----------------------------------------|--------------| +| config.py | Smart Module Configuration File | Nick | +| main.py | Smart Module Main.py | Nick | + diff --git a/software/main/main.py b/software/main/main.py new file mode 100644 index 0000000..7b44041 --- /dev/null +++ b/software/main/main.py @@ -0,0 +1,45 @@ +from machine import Pin +import machine +import gc +gc.collect() + +import time + +print("Running pyscript networking tool") + +from networking import Networking + +#Network +networking = Networking(True, False, True) +peer_mac = b'\xff\xff\xff\xff\xff\xff' + +print(f"{(time.ticks_ms() - networking.inittime) / 1000:.3f} Name: {networking.name}, ID: {networking.id}, config: {networking.config}, Sta mac: {networking.sta.mac()}, Ap mac: {networking.ap.mac()}, Version: {networking.version_n}") + +lastPressed = 0 + +message="Boop!" + +def boop(pin): + global lastPressed + if(time.ticks_ms()-lastPressed>1000): + lastPressed = time.ticks_ms() + networking.aen.ping(peer_mac) + networking.aen.echo(peer_mac, message) + networking.aen.send(peer_mac, message) + print(f"{(time.ticks_ms() - networking.inittime) / 1000:.3f} Networking Tool: Sent {message} to {peer_mac}") + print(f"{(time.ticks_ms() - networking.inittime) / 1000:.3f} Networking Tool: RSSI table: {networking.aen.rssi()}") + +switch_select = Pin(9, Pin.IN, Pin.PULL_UP) + +#Buttons +switch_select = Pin(9, Pin.IN, Pin.PULL_UP) +switch_select.irq(trigger=Pin.IRQ_FALLING, handler=boop) + +def heartbeat(timer): + print("") + print(f"{(time.ticks_ms() - networking.inittime) / 1000:.3f} Networking Tool Heartbeat: {gc.mem_free()} bytes") + print("") + gc.collect() + +timer = machine.Timer(0) +timer.init(period=5000, mode=machine.Timer.PERIODIC, callback=heartbeat) diff --git a/software/networking/.DS_Store b/software/networking/.DS_Store index 0abd89c..370708d 100644 Binary files a/software/networking/.DS_Store and b/software/networking/.DS_Store differ diff --git a/software/networking/.idea/inspectionProfiles/profiles_settings.xml b/software/networking/.idea/inspectionProfiles/profiles_settings.xml new file mode 100644 index 0000000..dd4c951 --- /dev/null +++ b/software/networking/.idea/inspectionProfiles/profiles_settings.xml @@ -0,0 +1,7 @@ + + + + \ No newline at end of file diff --git a/software/networking/.idea/libraries/MicroPython.xml b/software/networking/.idea/libraries/MicroPython.xml new file mode 100644 index 0000000..aa131a7 --- /dev/null +++ b/software/networking/.idea/libraries/MicroPython.xml @@ -0,0 +1,11 @@ + + + + + + + + + + + \ No newline at end of file diff --git a/software/networking/.idea/misc.xml b/software/networking/.idea/misc.xml new file mode 100644 index 0000000..6038861 --- /dev/null +++ b/software/networking/.idea/misc.xml @@ -0,0 +1,7 @@ + + + + + + \ No newline at end of file diff --git a/software/networking/.idea/modules.xml b/software/networking/.idea/modules.xml new file mode 100644 index 0000000..27cf2c6 --- /dev/null +++ b/software/networking/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/software/networking/.idea/networking.iml b/software/networking/.idea/networking.iml new file mode 100644 index 0000000..945fce2 --- /dev/null +++ b/software/networking/.idea/networking.iml @@ -0,0 +1,18 @@ + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/software/networking/README.qmd b/software/networking/README.qmd index ae6ee9e..31fa77b 100644 --- a/software/networking/README.qmd +++ b/software/networking/README.qmd @@ -43,7 +43,7 @@ A short description of the directories can be found below. | name | description | contribution | |----------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------------| -| config.py | This file hosts configurations and secrets, such as the board name, WiFi name and password, as well as handshake keys which is used by some of the legacy networking code. | Nick | +| config.py | This file hosts configurations and secrets, such as the board name, WiFi name and password, as well as handshake keys which is used by some of the legacy networking code. This file is now located in release. | Nick | | examples | This directory hosts example code | Nick | | examples/example.py | This is some basic example code on how to use my networking library | Nick | | examples/long_message_example.py | This code showcases the long message capabilities built into my code. By sending multiple messages that are then stitched back together by the recipient the max payload can be increased from 241 bytes to 256 x 238 = 60928 bytes, although in reality the ESP32 boards will start running out of memory with messages above 30 kilobytes. | Nick | diff --git a/software/networking/config.py b/software/networking/config.py deleted file mode 100644 index 26b6133..0000000 --- a/software/networking/config.py +++ /dev/null @@ -1,7 +0,0 @@ -mysecrets = {"SSID": "Tufts_Robot", "key" : ""} -codes = {"0": b'PairingCodePhrase', "1": b'PairingResponsePhrase', "2": b'PairingConfirmationPhrase'} -configname = "Nickname" -config = "AdminModuleConfig" -whitelist = [b'd\xe83\x84\xd8\x18',b'd\xe83\x84\xd8\x19',b'd\xe83\x85\xd3\xbc', b'd\xe83\x85\xd3\xbd', b'd\xe83\x84\xd8\x18', b'd\xe83\x84\xd8\x19'] #each ESP32 has two MAC addresses -i2c_dict = {"0x3C": ["pca9685", 0, "screen"], "0x53" : ["ACCEL", 1, "accelerometer"]} #key is i2c address: ["device name", Output (0) or Input (1), "Description"] -version={"adxl345":3,"files":2, "icons":2, "motor":4, "main":0, "networking":0, "prefs":2, "sensors":4, "servo":2, "ssd1306":2} #motor used to be main \ No newline at end of file diff --git a/software/networking/examples/example.py b/software/networking/examples/example.py index 437d831..7c18d72 100644 --- a/software/networking/examples/example.py +++ b/software/networking/examples/example.py @@ -1,55 +1,58 @@ from networking import Networking import time -#Initialise +# Initialise networking = Networking() ###Example code### -recipient_mac = b'\xff\xff\xff\xff\xff\xff' #This mac sends to all -message = b'Boop' +recipient_mac = b'\xff\xff\xff\xff\xff\xff' # This mac sends to all +message = b'Boop' -#Print own mac +# Print own mac print(networking.sta._sta.config('mac')) +print(networking.ap._ap.config('mac')) print() -#Ping, calculates the time until you receive a response from the peer +# Ping, calculates the time until you receive a response from the peer networking.aen.ping(recipient_mac) print() -#Echo, sends a message that will be repeated back by the recipient +# Echo, sends a message that will be repeated back by the recipient networking.aen.echo(recipient_mac, message) print() -#Message, sends the specified message to the recipient, supported formats are bytearrays, bytes, int, float, string, bool, list and dict, if above 241 bytes, it will send in multiple packages: max 60928 bytes +# Message, sends the specified message to the recipient, supported formats are bytearrays, bytes, int, float, string, bool, list and dict, if above 241 bytes, it will send in multiple packages: max 60928 bytes networking.aen.send(recipient_mac, message) print() -#Check if there are any messages in the message buffer +# Check if there are any messages in the message buffer print(networking.aen.check_messages()) print() -#Get Last Received Message -print(networking.aen.return_message()) #Returns none, none, none if there are no messages +# Get Last Received Message +print(networking.aen.return_message()) # Returns none, none, none if there are no messages print() -#Get the RSSI table +# Get the RSSI table print(networking.aen.rssi()) print() -#Get All Recieved Messages +# Get All Recieved Messages messages = networking.aen.return_messages() for mac, message, receive_time in messages: print(mac, message, receive_time) - -#Set up an interrupt which runs a function as soon as possible after receiving a new message + + +# Set up an interrupt which runs a function as soon as possible after receiving a new message def receive(): print("Receive") - for mac, message, rtime in networking.aen.return_messages(): #You can directly iterate over the function + for mac, message, rtime in networking.aen.return_messages(): # You can directly iterate over the function print(mac, message, rtime) -networking.aen.irq(receive) #interrupt handler +networking.aen.irq(receive) # interrupt handler print(networking.aen._irq_function) -time.sleep(0.05)#There is a bug in thonny with some ESP32 devices, which makes this statement necessary. I don't know why, currently discussing and debugging this with thonny devs. \ No newline at end of file +time.sleep( + 0.05) # There is a bug in thonny with some ESP32 devices, which makes this statement necessary. I don't know why, currently discussing and debugging this with thonny devs. diff --git a/software/networking/networking.py b/software/networking/networking.py index 3d7e15e..2ff74cf 100644 --- a/software/networking/networking.py +++ b/software/networking/networking.py @@ -1,6 +1,6 @@ import network import machine -from config import mysecrets, configname, config, whitelist, i2c_dict, version +from config import mysecrets, configname, config, whitelist, i2c_dict, version, msg_codes, msg_subcodes, networking_keys import time import ubinascii import urequests @@ -11,115 +11,112 @@ import json import os -inittime = time.ticks_ms() -class Networking: - def __init__(self, infmsg=False, dbgmsg=False, admin=False): +class Networking: + def __init__(self, infmsg=False, dbgmsg=False, admin=False, inittime=time.ticks_ms()): + gc.collect() self.inittime = inittime if infmsg: - print(f"{(time.ticks_ms() - inittime) / 1000:.3f} Initialising Networking") + print(f"{(time.ticks_ms() - self.inittime) / 1000:.3f} Initialising Networking") self.master = self self.infmsg = infmsg self.dbgmsg = dbgmsg - self._admin = admin - + self.admin = admin + self._staif = network.WLAN(network.STA_IF) self._apif = network.WLAN(network.AP_IF) - + self.sta = self.Sta(self, self._staif) self.ap = self.Ap(self, self._apif) self.aen = self.Aen(self) - + self.id = ubinascii.hexlify(machine.unique_id()).decode() self.name = configname - if self.name == "" or self.name == None: + if not self.name: self.name = str(self.id) self.config = config self.version = version self.version_n = ''.join(str(value) for value in self.version.values()) if infmsg: - print(f"{(time.ticks_ms() - inittime) / 1000:.3f} seconds: Networking initialised and ready") - - def _cleanup(self): - self._dprint("._cleanup") - self.aen.irq(None) - self.aen._aen.active(False) + print(f"{(time.ticks_ms() - self.inittime) / 1000:.3f} seconds: Networking initialised and ready") + + def cleanup(self): + self.dprint(".cleanup") + self.aen.cleanup() self._staif.active(False) self._apif.active(False) - - def _iprint(self, message): + + def iprint(self, message): if self.infmsg: try: - print(f"{(time.ticks_ms() - inittime) / 1000:.3f} Networking Info: {message}") + print(f"{(time.ticks_ms() - self.inittime) / 1000:.3f} Networking Info: {message}") except Exception as e: print(f"Error printing networking Info: {e}") return - - def _dprint(self, message): + + def dprint(self, message): if self.dbgmsg: try: - print(f"{(time.ticks_ms() - inittime) / 1000:.3f} Networking Debug: {message}") + print(f"{(time.ticks_ms() - self.inittime) / 1000:.3f} Networking Debug: {message}") except Exception as e: print(f"Error printing networking Debug: {e}") return - - - + class Sta: def __init__(self, master, _staif): self.master = master self._sta = _staif self._sta.active(True) - self.master._iprint("STA initialised and ready") - + self.master.iprint("STA initialised and ready") + def scan(self): - self.master._dprint("sta.scan") + self.master.dprint("sta.scan") scan_result = self._sta.scan() - if self.infmsg: + if self.master.infmsg: for ap in scan_result: - self.master._iprint(f"SSID:%s BSSID:%s Channel:%d Strength:%d RSSI:%d Auth:%d " % (ap)) + self.master.iprint(f"SSID:%s BSSID:%s Channel:%d Strength:%d RSSI:%d Auth:%d " % ap) return scan_result - + def connect(self, ssid, key="", timeout=10): - self.master._dprint("sta.connect") + self.master.dprint("sta.connect") self._sta.connect(ssid, key) stime = time.time() while time.time() - stime < timeout: if self._sta.ifconfig()[0] != '0.0.0.0': - self.master._iprint("Connected to WiFi") + self.master.iprint("Connected to WiFi") return time.sleep(0.1) - self.master._iprint(f"Failed to connect to WiFi: {self._sta.status()}") - + self.master.iprint(f"Failed to connect to WiFi: {self._sta.status()}") + def disconnect(self): - self.master._dprint("sta.disconnect") + self.master.dprint("sta.disconnect") self._sta.disconnect() - + def ip(self): - self.master._dprint("sta.ip") + self.master.dprint("sta.ip") return self._sta.ifconfig() - + def mac(self): - self.master._dprint("sta.mac") + self.master.dprint("sta.mac") return bytes(self._sta.config('mac')) - - def mac_decoded(self):#Necessary? - self.master._dprint("sta.mac_decoded") + + def mac_decoded(self): # Necessary? + self.master.dprint("sta.mac_decoded") return ubinascii.hexlify(self._sta.config('mac'), ':').decode() - + def channel(self): - self.master._dprint("sta.channel") + self.master.dprint("sta.channel") return self._sta.config('channel') - + def set_channel(self, number): - self.master._dprint("sta.set_channel") + self.master.dprint("sta.set_channel") if number > 14 or number < 0: number = 0 self._sta.config(channel=number) - self.vprint(f"STA channel set to {number}") - + self.master.dprint(f"STA channel set to {number}") + def get_joke(self): - self.master._dprint("sta.get_joke") + self.master.dprint("sta.get_joke") try: reply = urequests.get('https://v2.jokeapi.dev/joke/Programming') if reply.status_code == 200: @@ -128,63 +125,59 @@ def get_joke(self): except Exception as e: print('Error fetching joke:', str(e)) return None - - - + class Ap: def __init__(self, master, _apif): self.master = master self._ap = _apif self._ap.active(True) - self.master._iprint("AP initialised and ready") - + self.master.iprint("AP initialised and ready") + def set_ap(self, name="", password="", max_clients=10): - self.master._dprint("ap.setap") + self.master.dprint("ap.set_ap") if name == "": - name = self.name + name = self.master.name self._ap.active(True) self._ap.config(essid=name) if password: self._ap.config(authmode=network.AUTH_WPA_WPA2_PSK, password=password) self._ap.config(max_clients=max_clients) - self.master._iprint(f"Access Point {name} set with max clients {max_clients}") - + self.master.iprint(f"Access Point {name} set with max clients {max_clients}") + def deactivate(self): - self.master._dprint("ap.deactivate") + self.master.dprint("ap.deactivate") self._ap.active(False) - self.master._iprint("Access Point deactivated") - + self.master.iprint("Access Point deactivated") + def ip(self): - self.master._dprint("ap.ip") + self.master.dprint("ap.ip") return self._ap.ifconfig() - + def mac(self): - self.master._dprint("ap.mac") + self.master.dprint("ap.mac") return bytes(self._ap.config('mac')) - + def mac_decoded(self): - self.master._dprint("ap.mac_decoded") + self.master.dprint("ap.mac_decoded") return ubinascii.hexlify(self._ap.config('mac'), ':').decode() - + def channel(self): - self.master._dprint("ap.channel") + self.master.dprint("ap.channel") return self._ap.config('channel') - + def set_channel(self, number): - self.master._dprint("ap.set_channel") + self.master.dprint("ap.set_channel") if number > 14 or number < 0: number = 0 self._ap.config(channel=number) - self.vprint(f"AP channel set to {number}") - - + self.master.dprint(f"AP channel set to {number}") class Aen: def __init__(self, master): self.master = master self._aen = espnow.ESPNow() self._aen.active(True) - + self._peers = {} self._received_messages = [] self._received_messages_size = [] @@ -192,134 +185,245 @@ def __init__(self, master): self._long_buffer_size = {} self.received_sensor_data = {} self.received_rssi_data = {} - #self._long_sent_buffer = {} self._irq_function = None self._pause_function = None self.boops = 0 - self.ifidx = 0 #0 sends via sta, 1 via ap - #self._channel = 0 - + self.ifidx = 0 # 0 sends via sta, 1 via ap + # self.channel = 0 + self._whitelist = whitelist - - #Flags - self._pairing = True + + # Flags + self._pairing_enabled = True + self._pairing = False + self._paired = False + self._paired_macs = [] self._running = True - + self._aen.irq(self._irq) - - self.master._iprint("ESP-NOW initialised and ready") - + + self.master.iprint("ESP-NOW initialised and ready") + + def cleanup(self): + self.master.iprint("aen.cleanup") + self.irq(None) + self._aen.active(False) + # add delete buffers and stuff + def update_peer(self, peer_mac, name=None, channel=None, ifidx=None): - self.master._dprint("aen.update_peer") + self.master.dprint("aen.update_peer") if peer_mac in self._peers: try: - if name != None: + if name is not None: self._peers[peer_mac]['name'] = name - if channel != None: + if channel is not None: self._peers[peer_mac]['channel'] = channel - if ifidx != None: + if ifidx is not None: self._peers[peer_mac]['ifidx'] = ifidx - self.master._dprint(f"Peer {peer_mac} updated to channel {channel}, ifidx {ifidx} and name {name}") + self.master.dprint(f"Peer {peer_mac} updated to channel {channel}, ifidx {ifidx} and name {name}") except OSError as e: print(f"Error updating peer {peer_mac}: {e}") return - self.master._iprint(f"Peer {peer_mac} not found") + self.master.iprint(f"Peer {peer_mac} not found") def add_peer(self, peer_mac, name=None, channel=None, ifidx=None): - self.master._dprint("aen.add_peer") + self.master.dprint("aen.add_peer") if peer_mac not in self._peers: try: self._peers[peer_mac] = {'channel': channel, 'ifidx': ifidx, 'name': name} - self.master._dprint(f"Peer {peer_mac} added with channel {channel}, ifidx {ifidx} and name {name}") + self.master.dprint(f"Peer {peer_mac} added with channel {channel}, ifidx {ifidx} and name {name}") except OSError as e: print(f"Error adding peer {peer_mac}: {e}") else: - self.master._dprint(f"Peer {peer_mac} already exists, updating") + self.master.dprint(f"Peer {peer_mac} already exists, updating") self.update_peer(peer_mac, name, channel, ifidx) def remove_peer(self, peer_mac): - self.master._dprint("aen.remove_peers") + self.master.dprint("aen.remove_peers") if peer_mac in self._peers: try: del self._peers[peer_mac] - self.master._iprint(f"Peer {peer_mac} removed") + self.master.iprint(f"Peer {peer_mac} removed") except OSError as e: print(f"Error removing peer {peer_mac}: {e}") def peers(self): - self.master._dprint("aen.peers") + self.master.dprint("aen.peers") return self._peers - + def peer_name(self, key): - self.master._dprint("aen.name") + self.master.dprint("aen.name") if key in self._peers: return self._peers[key]['name'] else: return None - + def rssi(self): - self.master._dprint("aen.rssi") + self.master.dprint("aen.rssi") return self._aen.peers_table - + + # Send cmds + def send_command(self, msg_subkey, mac, payload=None, channel=None, ifidx=None, sudo=False): + self.master.dprint("aen.send_command") + if sudo and isinstance(payload, list): + payload.append("sudo") + elif sudo and payload is None: + payload = ["sudo"] + else: + payload = [payload, "sudo"] + if (msg_key := "cmd") and msg_subkey in msg_subcodes[msg_key]: + self.master.iprint( + f"Sending {msg_subkey} ({bytes([msg_subcodes[msg_key][msg_subkey]])}) command to {mac} ({self.peer_name(mac)})") + self._compose(mac, payload, msg_codes[msg_key], msg_subcodes[msg_key][msg_subkey], channel, ifidx) + else: + self.master.iprint(f"Command {msg_subkey} not found") + gc.collect() + + def reboot(self, mac, channel=None, ifidx=None, sudo=False): + self.master.dprint("aen.reboot") + self.send_command("Reboot", mac, None, channel, ifidx, sudo) + + def firmware_update(self, mac, channel=None, ifidx=None, sudo=False): + self.master.dprint("aen.firmware_update") + self.send_command("Firmware-Update", mac, None, channel, ifidx, sudo) + + def file_update(self, mac, channel=None, ifidx=None, sudo=False): + self.master.dprint("aen.file_update") + self.send_command("File-Update", mac, None, channel, ifidx, sudo) + + def file_download(self, mac, link, file_list=None, channel=None, ifidx=None, sudo=False): + self.master.dprint("aen.file_download") + self.send_command("File-Download", mac, [link, file_list], channel, ifidx, sudo) + + def web_repl(self, mac, channel=None, ifidx=None, sudo=False): + self.master.dprint("aen.web_repl") + self.master.ap.set_ap(ap_name := self.master.name, password := networking_keys["default_ap_key"]) + self.send_command("Web-Repl", mac, [ap_name, password], channel, ifidx, sudo) + # await success message and if success False disable AP or try again + + def file_run(self, mac, filename, channel=None, ifidx=None, sudo=False): + self.master.dprint("aen.file_run") + self.send_command("File-Run", mac, filename, channel, ifidx, sudo) + + def admin_set(self, mac, new_bool, channel=None, ifidx=None, sudo=False): + self.master.dprint("aen.admin_set") + self.send_command("Admin-Set", mac, new_bool, channel, ifidx, sudo) + + def whitelist_add(self, mac, mac_list=None, channel=None, ifidx=None, sudo=False): + self.master.dprint("aen.whitelist_add") + if mac_list is not None: + mac_list = [self.master.sta.mac, self.master.ap.mac] + self.send_command("Whitelist-Add", mac, mac_list, channel, ifidx, sudo) + + def config_change(self, mac, new_config, channel=None, ifidx=None, sudo=False): + self.master.dprint("aen.config_chain") + self.send_command("Config-Chain", mac, new_config, channel, ifidx, sudo) + def ping(self, mac, channel=None, ifidx=None): - self.master._dprint("aen.ping") + self.master.dprint("aen.ping") if bool(self.ifidx): - schannel = self.master.ap.channel() + send_channel = self.master.ap.channel() else: - schannel = self.master.sta.channel() - self._compose(mac, [schannel,self.ifidx,self.master.name], 0x01, 0x10, channel, ifidx) #sends channel, ifidx and name - self.master._iprint(f"Sent ping to {mac} ({self.peer_name(mac)})") - gc.collect() - + send_channel = self.master.sta.channel() + self.send_command("Ping", mac, [send_channel, self.ifidx, self.master.name], channel, + ifidx) # sends channel, ifidx and name + + def pair(self, mac, key=networking_keys["handshake_key1"], channel=None, ifidx=None): + self.master.dprint("aen.pair") + self.send_command("Pair", mac, key, channel, ifidx) + + def pair_enable(self, mac, pair_bool, channel=None, ifidx=None, sudo=False): + self.master.dprint("aen.pair") + self.send_command("Set-Pair", mac, pair_bool, channel, ifidx, sudo) + + def boop(self, mac, channel=None, ifidx=None, sudo=False): + self.master.dprint("aen.boop") + self.send_command("RSSI/Status/Config-Boop", mac, None, channel, ifidx, sudo) + + def directory_get(self, mac, channel=None, ifidx=None, sudo=False): + self.master.dprint("aen.directory_get") + self.send_command("Directory-Get", mac, None, channel, ifidx, sudo) + def echo(self, mac, message, channel=None, ifidx=None): - self.master._dprint("aen.echo") + self.master.dprint("aen.echo") try: - self.master._iprint(f"Sending echo ({message}) to {mac} ({self.peer_name(mac)})") + self.master.iprint(f"Sending echo ({message}) to {mac} ({self.peer_name(mac)})") except Exception as e: - self.master._iprint(f"Sending echo to {mac} ({self.peer_name(mac)}), but error printing message content: {e}") + self.master.iprint( + f"Sending echo to {mac} ({self.peer_name(mac)}), but error printing message content: {e}") self._compose(mac, message, 0x01, 0x15, channel, ifidx) gc.collect() - + + # resend cmd + + def wifi_connect(self, mac, name, password, channel=None, ifidx=None, sudo=False): + self.master.dprint("aen.wifi_connect") + self.send_command("Wifi-Connect", mac, [name, password], channel, ifidx, sudo) + + def wifi_disconnect(self, mac, channel=None, ifidx=None, sudo=False): + self.master.dprint("aen.wifi_disconnect") + self.send_command("Wifi-Disconnect", mac, None, channel, ifidx, sudo) + + def ap_enable(self, mac, name, password, channel=None, ifidx=None, sudo=False): + self.master.dprint("aen.ap_enable") + self.send_command("AP-Enable", mac, [name, password], channel, ifidx, sudo) + + def ap_disable(self, mac, channel=None, ifidx=None, sudo=False): + self.master.dprint("aen.ap_disable") + self.send_command("AP-Disable", mac, None, channel, ifidx, sudo) + + def pause(self, mac, channel=None, ifidx=None, sudo=False): + self.master.dprint("aen.pause") + self.send_command("Pause", mac, None, channel, ifidx, sudo) + + def resume(self, mac, channel=None, ifidx=None, sudo=False): + self.master.dprint("aen.resume") + self.send_command("Resume", mac, None, channel, ifidx, sudo) + def send(self, mac, message, channel=None, ifidx=None): - self.master._dprint("aen.message") + self.master.dprint("aen.message") if len(str(message)) > 241: try: - self.master._iprint(f"Sending message ({str(message)[:50] + '... (truncated)'}) to {mac} ({self.peer_name(mac)})") + self.master.iprint( + f"Sending message ({str(message)[:50] + '... (truncated)'}) to {mac} ({self.peer_name(mac)})") except Exception as e: - self.master._iprint(f"Sending message to {mac} ({self.peer_name(mac)}), but error printing message content: {e}") + self.master.iprint( + f"Sending message to {mac} ({self.peer_name(mac)}), but error printing message content: {e}") gc.collect() else: - self.master._iprint(f"Sending message ({message}) to {mac} ({self.peer_name(mac)})") + self.master.iprint(f"Sending message ({message}) to {mac} ({self.peer_name(mac)})") self._compose(mac, message, 0x02, 0x22, channel, ifidx) gc.collect() - self.master._dprint(f"Free memory: {gc.mem_free()}") - + self.master.dprint(f"Free memory: {gc.mem_free()}") + def broadcast(self, message, channel=None, ifidx=None): - self.master._dprint("aen.broadcast") + self.master.dprint("aen.broadcast") mac = b'\xff\xff\xff\xff\xff\xff' self.send(mac, message, channel, ifidx) - def send_sensor(self, mac, message, channel=None, ifidx=None):#message is a dict, key is the sensor type and the value is the sensor value - self.master._dprint("aen.message") + def send_sensor(self, mac, message, channel=None, + ifidx=None): # message is a dict, key is the sensor type and the value is the sensor value + self.master.dprint("aen.message") try: - self.master._iprint(f"Sending sensor data ({message}) to {mac} ({self.peer_name(mac)})") + self.master.iprint(f"Sending sensor data ({message}) to {mac} ({self.peer_name(mac)})") except Exception as e: - self.master._iprint(f"Sending sensor data to {mac} ({self.peer_name(mac)}), but error printing message content: {e}") + self.master.iprint( + f"Sending sensor data to {mac} ({self.peer_name(mac)}), but error printing message content: {e}") self._compose(mac, message, 0x02, 0x21, channel, ifidx) def check_messages(self): - self.master._dprint("aen.check_message") + self.master.dprint("aen.check_message") return len(self._received_messages) > 0 - + def return_message(self): - self.master._dprint("aen.return_message") + self.master.dprint("aen.return_message") if self.check_messages(): self._received_messages_size.pop() return self._received_messages.pop() - return (None, None, None) - + return None, None, None + def return_messages(self): - self.master._dprint("aen.return_messages") + self.master.dprint("aen.return_messages") if self.check_messages(): messages = self._received_messages[:] self._received_messages.clear() @@ -327,10 +431,10 @@ def return_messages(self): gc.collect() return messages return [(None, None, None)] - - def _irq(self, espnow): - self.master._dprint("aen._irq") - if self.master._admin: + + def _irq(self): + self.master.dprint("aen._irq") + if self.master.admin: try: self._receive() if self._irq_function and self.check_messages() and self._running: @@ -338,119 +442,103 @@ def _irq(self, espnow): gc.collect() return except KeyboardInterrupt: - #machine.disable_irq() #throws errors - self.master._iprint("aen._irq except KeyboardInterrupt") - #self._aen.irq(None) #does not work + # machine.disable_irq() #throws errors + self.master.iprint("aen._irq except KeyboardInterrupt") + # self._aen.irq(None) #does not work self._aen.active(False) - #self.master._cleanup() - raise SystemExit("Stopping networking execution. ctrl-c or ctrl-d again to stop main code") #in thonny stops library code but main code keeps running, same in terminal - #self._running = False - #raise KeyboardInterrupt #error in thonny but then stops running, just keeps running in terminal - #sys.exit(0) #breaks thonny, keeps running and recv (although ctl-d-able and keps running main loop in terminal - #machine.reset() #nogo keeps raising errors and running in terminal - #uos.sysexit() #raises an error in thonny but keeps running in terminal (althouzgh ctrl-d able) - #raise SystemExit #stops current library script, but main script keeps running, but now it just keeps the main code running in terminal... - #os.execv(sys.argv[0], sys.argv) #error in thonny, keeps running recv in terminal - #raise Exception("An error occurred!") #error in thonny, and then stops running due to keyboard interrupt, keeps running recv and irq in terminal - #raise KeyboardInterrupt("User interrupt simulated.") #interrupts library code, but main code keeps running, recv just keeps running in terminal + # self.master.cleanup() #network cleanup + # self.cleanup() #aen cleanup + raise SystemExit( + "Stopping networking execution. ctrl-c or ctrl-d again to stop main code") # in thonny stops library code but main code keeps running, same in terminal else: self._receive() if self._irq_function and self.check_messages() and self._running: self._irq_function() gc.collect() return - + def irq(self, func): - self.master._dprint("aen.irq") + self.master.dprint("aen.irq") self._irq_function = func - + def _send(self, peers_mac, messages, channel, ifidx): - self.master._dprint("aen._send") - - def __aen_add_peer(peers_mac, channel, ifidx): #Rethink the logic here! - if isinstance(peers_mac, bytes): - peers_mac = [peers_mac] - for peer_mac in peers_mac: - try: - if channel != None and ifidx != None: - self._aen.add_peer(peer_mac, channel=channel, ifidx=ifidx) - elif channel != None: - if peer_mac in self._peers: - self._aen.add_peer(peer_mac, channel=channel, ifidx=self._peers[peer_mac]['ifidx']) - else: - self._aen.add_peer(peer_mac, channel=channel, ifidx=self.ifidx) - elif ifidx != None: - if peer_mac in self._peers: - self._aen.add_peer(peer_mac, channel=self._peers[peer_mac]['channel'], ifidx=ifidx) - else: - self._aen.add_peer(peer_mac, channel=0, ifidx=ifidx) - elif peer_mac in self._peers: - self._aen.add_peer(peer_mac, channel=self._peers[peer_mac]['channel'], ifidx=self._peers[peer_mac]['ifidx']) + self.master.dprint("aen._send") + + if isinstance(peers_mac, bytes): + peers_mac = [peers_mac] + for peer_mac in peers_mac: + try: + if channel is not None and ifidx is not None: + self._aen.add_peer(peer_mac, channel=channel, ifidx=ifidx) + elif channel is not None: + if peer_mac in self._peers: + self._aen.add_peer(peer_mac, channel=channel, ifidx=self._peers[peer_mac]['ifidx']) else: - self._aen.add_peer(peer_mac, channel=0, ifidx=self.ifidx) - except Exception as e: - print(f"Error adding {peer_mac} to buffer: {e}") - - def __aen_del_peer(peers_mac): - if isinstance(peers_mac, bytes): - peers_mac = [peers_mac] - for peer_mac in peers_mac: - try: - self._aen.del_peer(peer_mac) - except Exception as e: - print(f"Error removing {peer_mac} from buffer: {e}") - - __aen_add_peer(peers_mac, channel, ifidx) - for m in range(len(messages)): - if isinstance(peers_mac, list): - mac = None - else: - mac = peers_mac - for i in range(3): - i += i - try: - self._aen.send(mac, messages[m]) - break - except Exception as e: - print(f"Error sending to {mac}: {e}") - self.master._dprint(f"Sent {messages[m]} to {mac} ({self.peer_name(mac)})") - gc.collect() - __aen_del_peer(peers_mac) - - + self._aen.add_peer(peer_mac, channel=channel, ifidx=self.ifidx) + elif ifidx is not None: + if peer_mac in self._peers: + self._aen.add_peer(peer_mac, channel=self._peers[peer_mac]['channel'], ifidx=ifidx) + else: + self._aen.add_peer(peer_mac, channel=0, ifidx=ifidx) + elif peer_mac in self._peers: + self._aen.add_peer(peer_mac, channel=self._peers[peer_mac]['channel'], + ifidx=self._peers[peer_mac]['ifidx']) + else: + self._aen.add_peer(peer_mac, channel=0, ifidx=self.ifidx) + self.master.dprint(f"Added {peer_mac} to espnow buffer") + except Exception as e: + self.master.iprint(f"Error adding {peer_mac} to espnow buffer: {e}") + + for m in range(len(messages)): + for i in range(3): + i += i + try: + self._aen.send(peer_mac, messages[m]) + break + except Exception as e: + self.master.iprint(f"Error sending to {peer_mac}: {e}") + self.master.dprint(f"Sent {messages[m]} to {peer_mac} ({self.peer_name(peer_mac)})") + gc.collect() + + try: + self._aen.del_peer(peer_mac) + self.master.dprint(f"Removed {peer_mac} from espnow buffer") + except Exception as e: + print(f"Error removing {peer_mac} from espnow buffer: {e}") + def _compose(self, peer_mac, payload=None, msg_type=0x02, subtype=0x22, channel=None, ifidx=None): - self.master._dprint("aen._compose") - + self.master.dprint("aen._compose") + if isinstance(peer_mac, list): - for peer_macs in peer_mac: - if peer_macs not in self._peers: - self.add_peer(peer_macs, None, channel, ifidx) + for peer_macs in peer_mac: + if peer_macs not in self._peers: + self.add_peer(peer_macs, None, channel, ifidx) elif peer_mac not in self._peers: self.add_peer(peer_mac, None, channel, ifidx) - - def __encode_payload(payload): - self.master._dprint("aen.__encode_payload") - if payload is None: #No payload type + + def __encode_payload(): + self.master.dprint("aen.__encode_payload") + if payload is None: # No payload type return b'\x00', b'' - elif isinstance(payload, bytearray): #bytearray - return (b'\x01', bytes(payload)) - elif isinstance(payload, bytes): #bytes - return (b'\x01', payload) - elif isinstance(payload, bool): #bool - return (b'\x02', (b'\x01' if payload else b'\x00')) - elif isinstance(payload, int): #int - return (b'\x03', struct.pack('>i', payload)) - elif isinstance(payload, float): #float - return (b'\x04', struct.pack('>f', payload)) - elif isinstance(payload, str): #string - return (b'\x05', payload.encode('utf-8')) - elif isinstance(payload, dict) or isinstance(payload, list): #json dict or list + elif isinstance(payload, bytearray): # bytearray + return b'\x01', bytes(payload) + elif isinstance(payload, bytes): # bytes + return b'\x01', payload + elif isinstance(payload, bool): # bool + return b'\x02', (b'\x01' if payload else b'\x00') + elif isinstance(payload, int): # int + return b'\x03', struct.pack('>i', payload) + elif isinstance(payload, float): # float + return b'\x04', struct.pack('>f', payload) + elif isinstance(payload, str): # string + return b'\x05', payload.encode('utf-8') + elif isinstance(payload, dict) or isinstance(payload, list): # json dict or list json_payload = json.dumps(payload) - return (b'\x06', json_payload.encode('utf-8')) + return b'\x06', json_payload.encode('utf-8') else: raise ValueError("Unsupported payload type") - - payload_type, payload_bytes = __encode_payload(payload) + + payload_type, payload_bytes = __encode_payload() messages = [] identifier = 0x2a timestamp = time.ticks_ms() @@ -459,101 +547,95 @@ def __encode_payload(payload): header[1] = msg_type header[2] = subtype header[3:7] = timestamp.to_bytes(4, 'big') - if len(payload_bytes) < 242: #250-9=241=max_length + if len(payload_bytes) < 242: # 250-9=241=max_length header[7] = payload_type[0] total_length = 1 + 1 + 1 + 4 + 1 + len(payload_bytes) + 1 message = bytearray(total_length) message[:8] = header - message[8:-1] = payload_bytes - message[-1:] = (sum(message) % 256).to_bytes(1, 'big') #Checksum - self.master._dprint(f"Message {1}/{1}; Length: {len(message)}; Free memory: {gc.mem_free()}") + message[8:-1] = payload_bytes + message[-1:] = (sum(message) % 256).to_bytes(1, 'big') # Checksum + self.master.dprint(f"Message {1}/{1}; Length: {len(message)}; Free memory: {gc.mem_free()}") messages.append(message) - else: - self.master._dprint("Long message: Splitting!") - max_size = 238 #241-3 - total_chunk_number = (-(-len(payload_bytes)//max_size)) #Round up division + self.master.dprint("Long message: Splitting!") + max_size = 238 # 241-3 + total_chunk_number = (-(-len(payload_bytes) // max_size)) # Round up division lba = b'\x07' - header[7] = lba[0] #Long byte array + header[7] = lba[0] # Long byte array if total_chunk_number > 256: raise ValueError("More than 256 chunks, unsupported") for chunk_index in range(total_chunk_number): - message = bytearray(9 + 3 + min(max_size,len(payload_bytes)-chunk_index*max_size)) + message = bytearray(9 + 3 + min(max_size, len(payload_bytes) - chunk_index * max_size)) message[:8] = header message[8:10] = chunk_index.to_bytes(1, 'big') + total_chunk_number.to_bytes(1, 'big') message[10] = payload_type[0] - message[11:-1] = payload_bytes[chunk_index * max_size: (chunk_index + 1) * max_size] - message[-1:] = (sum(message) % 256).to_bytes(1, 'big') #Checksum - self.master._dprint(message) + message[11:-1] = payload_bytes[chunk_index * max_size: (chunk_index + 1) * max_size] + message[-1:] = (sum(message) % 256).to_bytes(1, 'big') # Checksum + self.master.dprint(message) messages.append(bytes(message)) - self.master._dprint(f"Message {chunk_index+1}/{total_chunk_number}; length: {len(message)}; Free memory: {gc.mem_free()}") + self.master.dprint( + f"Message {chunk_index + 1}/{total_chunk_number}; length: {len(message)}; Free memory: {gc.mem_free()}") gc.collect() -# key = bytearray() -# key.extend(header[1:8]) -# key.extend(total_chunk_number.to_bytes(1, 'big')) -# self._long_sent_buffer[bytes(key)] = (messages, (channel,ifidx)) - - message = bytearray() gc.collect() self._send(peer_mac, messages, channel, ifidx) + def _receive(self): # Processes all the messages in the buffer + self.master.dprint("aen._receive") - def _receive(self): #Processes all the messages in the buffer - self.master._dprint("aen._receive") - def __decode_payload(payload_type, payload_bytes): - self.master._dprint("aen.__decode_payload") - if payload_type == b'\x00': #None + self.master.dprint("aen.__decode_payload") + if payload_type == b'\x00': # None return None - elif payload_type == b'\x01': #bytearray or bytes + elif payload_type == b'\x01': # bytearray or bytes return bytes(payload_bytes) - elif payload_type == b'\x02': #bool + elif payload_type == b'\x02': # bool return payload_bytes[0:1] == b'\x01' - elif payload_type == b'\x03': #int + elif payload_type == b'\x03': # int return struct.unpack('>i', payload_bytes)[0] - elif payload_type == b'\x04': #float + elif payload_type == b'\x04': # float return struct.unpack('>f', payload_bytes)[0] - elif payload_type == b'\x05': #string + elif payload_type == b'\x05': # string return payload_bytes.decode('utf-8') - elif payload_type == b'\x06': #json dict or list - return json.loads(payload_bytes.decode('utf-8')) - elif payload_type == b'\x07': #Long byte array + elif payload_type == b'\x06': # json dict or list + return json.loads(payload_bytes.decode('utf-8')) + elif payload_type == b'\x07': # Long byte array return bytes(payload_bytes) else: raise ValueError(f"Unsupported payload type: {payload_type} Message: {payload_bytes}") - - def __process_message(sender_mac, message, rtimestamp): - self.master._dprint("aen.__process_message") - if message[0] != 0x2a: # Uniqe Message Identifier Check - self.master._dprint("Invalid message: Message ID Fail") + + def __process_message(sender_mac, message, receive_timestamp): + self.master.dprint("aen.__process_message") + if message[0] != 0x2a: # Unique Message Identifier Check + self.master.dprint("Invalid message: Message ID Fail") return None if len(message) < 9: # Min size - self.master._dprint("Invalid message: too short") + self.master.dprint("Invalid message: too short") return None msg_type = bytes(message[1:2]) subtype = bytes(message[2:3]) - stimestamp = int.from_bytes(message[3:7], 'big') + send_timestamp = int.from_bytes(message[3:7], 'big') payload_type = bytes(message[7:8]) payload = message[8:-1] checksum = message[-1] - self.master._dprint(f"{type(msg_type)}: {msg_type}, {type(subtype)}: {subtype}, {type(stimestamp)}: {stimestamp}, {type(payload_type)}: {payload_type}, {type(payload)}: {payload}, {type(checksum)}: {checksum}") - - #Checksum + self.master.dprint( + f"{type(msg_type)}: {msg_type}, {type(subtype)}: {subtype}, {type(send_timestamp)}: {send_timestamp}, {type(payload_type)}: {payload_type}, {type(payload)}: {payload}, {type(checksum)}: {checksum}") + + # Checksum if checksum != sum(message[:-1]) % 256: - self.master._dprint("Invalid message: checksum mismatch") + self.master.dprint("Invalid message: checksum mismatch") return None - + if sender_mac not in self._peers: self.add_peer(sender_mac) - + if payload_type == b'\x07': - self.master._dprint("Long message received, processing...") + self.master.dprint("Long message received, processing...") part_n = int.from_bytes(payload[0:1], 'big') total_n = int.from_bytes(payload[1:2], 'big') payload_type = bytes(payload[2:3]) payload = payload[3:] - + # Create a key as a bytearray: (msg_type, subtype, timestamp, payload_type, total_n) key = bytearray() key.extend(msg_type) @@ -562,15 +644,16 @@ def __process_message(sender_mac, message, rtimestamp): key.extend(payload_type) key.append(total_n) key = bytes(key) - self.master._dprint(f"Key: {key}") - + self.master.dprint(f"Key: {key}") + # Check if the key already exists in the long buffer if key in self._long_buffer: # If the part is None, add the payload if self._long_buffer[key][part_n] is None: self._long_buffer[key][part_n] = payload self._long_buffer_size[key] = self._long_buffer_size[key] + len(payload) - self.master._dprint(f"Long message: Key found, message added to entry in long_message_buffer, {sum(1 for item in self._long_buffer[key] if item is not None)} out of {total_n} packages received") + self.master.dprint( + f"Long message: Key found, message added to entry in long_message_buffer, {sum(1 for item in self._long_buffer[key] if item is not None)} out of {total_n} packages received") # If there are still missing parts, return if any(value is None for value in self._long_buffer[key]): gc.collect() @@ -581,28 +664,17 @@ def __process_message(sender_mac, message, rtimestamp): payloads[part_n] = payload self._long_buffer[key] = payloads self._long_buffer_size[key] = len(payload) - self.master._dprint(f"Long message: Key not found and new entry created in long_message_buffer, {sum(1 for item in self._long_buffer[key] if item is not None)} out of {total_n} packages received") - + self.master.dprint( + f"Long message: Key not found and new entry created in long_message_buffer, {sum(1 for item in self._long_buffer[key] if item is not None)} out of {total_n} packages received") + while len(self._long_buffer) > 8 or sum(self._long_buffer_size.values()) > 75000: - self.master._dprint(f"Maximum buffer size reached: {len(self._long_buffer)}, {sum(self._long_buffer_size.values())} bytes; Reducing!") + self.master.dprint( + f"Maximum buffer size reached: {len(self._long_buffer)}, {sum(self._long_buffer_size.values())} bytes; Reducing!") self._long_buffer.popitem(last=False) self._long_buffer_size.popitem(last=False) gc.collect() -# # If there are missing parts, request missing messages, due to buffer constraints this is disabled -# if any(value is None for value in self._long_buffer[key]): -# none_indexes = [index for index, value in enumerate(self._long_buffer[key]) if value is None] -# response = bytearray() -# response.extend(msg_type) -# response.extend(subtype) -# response.extend(message[3:7])#stimestamp.to_bytes(4, 'big') -# response.extend(payload_type) -# response.extend(payload[1:2])#total_n.to_bytes(1, 'big') -# for none_index in none_indexes: -# response.extend(none_index.to_bytes(1, 'big')) -# #asyncio.create_task(__request_missing_messages(sender_mac, response, key, total_n)) -# gc.collect() return - + # If all parts have been received, reconstruct the message if not any(value is None for value in self._long_buffer[key]): payload = bytearray() @@ -610,256 +682,228 @@ def __process_message(sender_mac, message, rtimestamp): payload.extend(self._long_buffer[key][i]) del self._long_buffer[key] del self._long_buffer_size[key] - self.master._dprint("Long message: All packages received!") + self.master.dprint("Long message: All packages received!") else: - self.master._dprint("Long Message: Safeguard triggered, code should not have gotten here") + self.master.dprint("Long Message: Safeguard triggered, code should not have gotten here") gc.collect() return - - #Handle the message based on type - if msg_type == b'\x01': # Command Message - __handle_cmd(sender_mac, subtype, stimestamp, rtimestamp, payload_type, payload if payload else None) - elif msg_type == b'\x02': # Informational Message - __handle_inf(sender_mac, subtype, stimestamp, rtimestamp, payload_type, payload if payload else None) - elif msg_type == b'\x03': # Acknowledgement Message - __handle_ack(sender_mac, subtype, stimestamp, rtimestamp, payload_type, payload if payload else None) + + # Handle the message based on type + if msg_type == (msg_key := msg_codes["cmd"]): # Command Message + __handle_cmd(sender_mac, subtype, send_timestamp, receive_timestamp, payload_type, + payload if payload else None, msg_key) + elif msg_type == (msg_key := msg_codes["inf"]): # Informational Message + __handle_inf(sender_mac, subtype, send_timestamp, receive_timestamp, payload_type, + payload if payload else None, msg_key) + elif msg_type == (msg_key := msg_codes["ack"]): # Acknowledgement Message + __handle_ack(sender_mac, subtype, send_timestamp, receive_timestamp, payload_type, + payload if payload else None, msg_key) else: - self.master._iprint(f"Unknown message type from {sender_mac} ({self.peer_name(sender_mac)}): {message}") + self.master.iprint( + f"Unknown message type from {sender_mac} ({self.peer_name(sender_mac)}): {message}") - async def __request_missing_messages(sender_mac, payload, key, total_n=0, retries=3, waittime=2): - self.master._dprint("aen.__request_missing_messages") - for i in range(retries): - i += i - asyncio.sleep(waittime + total_n*0.1) - if key in self._long_buffer: - self._compose(sender_mac, payload, 0x01, 0x17) - else: - return - - def __handle_cmd(sender_mac, subtype, stimestamp, rtimestamp, payload_type, payload): - self.master._dprint("aen.__handle_cmd") - if subtype == b'\x10': #Ping - self.master._iprint(f"Ping command received from {sender_mac} ({self.peer_name(sender_mac)})") - info = __decode_payload(payload_type, payload) - self.add_peer(sender_mac, info[2], info[0], info[1]) - if bool(self.ifidx): - channel = self.master.ap.channel() - else: - channel = self.master.sta.channel() - response = [channel, self.ifidx, self.master.name, stimestamp] - self._compose(sender_mac, response, 0x03, 0x10) - elif subtype == b'\x11': #Pair - self.master._iprint(f"Pairing command received from {sender_mac} ({self.peer_name(sender_mac)})") - payload = __decode_payload(payload_type, payload) - if self._pairing == True: - try: - # Insert pairing logic here - self.master._iprint("no pairing logic written just yet") - self._compose(sender_mac, ["Pair (\x11)", payload], 0x03, 0x11) - except Exception as e: - self.master._iprint(f"Error: {e} with payload: {payload}") - self._compose(sender_mac, ["Pair (\x11)", e, payload], 0x03, 0x12) - else: - self._compose(sender_mac, ["Pair (\x11)", "Pairing disabled", payload], 0x03, 0x12) - elif subtype == b'\x26': #Set Pair - self.master._iprint(f"Pairing command received from {sender_mac} ({self.peer_name(sender_mac)})") - payload = __decode_payload(payload_type, payload) #should be a list with a bool - if sender_mac in self._whitelist or payload == "sudo" or payload[-1] == "sudo": - try: - self._pairing = payload[0] - self._compose(sender_mac, ["Set Pair (\x11)", payload], 0x03, 0x11) - except Exception as e: - self.master._iprint(f"Error: {e} with payload: {payload}") - self._compose(sender_mac, ["Set Pair (\x11)", e, payload], 0x03, 0x12) + def __check_authorisation(sender_mac, payload): + return sender_mac in self._whitelist or payload == "sudo" or payload[-1] == "sudo" + + def __send_confirmation(msg_type, recipient_mac, msg_subkey_type, payload=None, error=None): + if msg_type == "Success": + self._compose(recipient_mac, [msg_subkey_type, payload], 0x03, 0x11) + elif msg_type == "Fail": + self._compose(recipient_mac, [msg_subkey_type, error, payload], 0x03, 0x12) + else: + self._compose(recipient_mac, [msg_subkey_type, payload], 0x03, 0x13) + + def __handle_cmd(sender_mac, subtype, send_timestamp, receive_timestamp, payload_type, payload, msg_key): + self.master.dprint(f"aen.__handle_cmd") + payload = __decode_payload(payload_type, payload) + if (msg_subkey := "Reboot") and subtype == msg_subcodes[msg_key][msg_subkey]: # Reboot + self.master.iprint( + f"{msg_subkey} ({subtype}) command received from {sender_mac} ({self.peer_name(sender_mac)})") + if __check_authorisation(sender_mac, payload): + __send_confirmation("Confirm", sender_mac, f"{msg_subkey} ({subtype})", payload) + machine.reset() else: - elf._compose(sender_mac, ["Set Pair (\x11)", "Not authorised", payload], 0x03, 0x12) - elif subtype == b'\x12': #Change Mode to Firmware Update - self.master._iprint(f"Update command received from {sender_mac} ({self.peer_name(sender_mac)})") - payload = __decode_payload(payload_type, payload) - if sender_mac in self._whitelist or payload == "sudo" or payload[-1] == "sudo": + __send_confirmation("Fail", sender_mac, f"{msg_subkey} ({subtype})", payload, "Not authorised") + elif (msg_subkey := "Firmware-Update") and subtype == msg_subcodes[msg_key][ + msg_subkey]: # Firmware-Update + self.master.iprint( + f"{msg_subkey} ({subtype}) command received from {sender_mac} ({self.peer_name(sender_mac)})") + if __check_authorisation(sender_mac, payload): try: # Insert update logic here - self.master._iprint("no update logic written just yet") - self._compose(sender_mac, ["Update (\x12)", payload], 0x03, 0x11) + self.master.iprint("no update logic written just yet") + __send_confirmation("Success", sender_mac, f"{msg_subkey} ({subtype})", payload) except Exception as e: - self.master._iprint(f"Error: {e} with payload: {payload}") - self._compose(sender_mac, ["Update (\x12)", e, payload], 0x03, 0x12) - else: - self._compose(sender_mac, ["Update (\x12)", "Not authorised", payload], 0x03, 0x12) - elif subtype == b'\x13': #RSSI/Status/Config Boop - self.boops = self.boops + 1 - self.master._iprint(f"Boop command received from {sender_mac} ({self.peer_name(sender_mac)}), Received total of {self.boops} boops!") - try: - self._compose(sender_mac, [self.master.id, self.master.name, self.master.config, self.master.version_n, self.master.version, self.master.sta.mac, self.master.ap.mac, self.rssi()], 0x02, 0x20) #[ID, Name, Config, Version, sta mac, ap mac, rssi] - except Exception as e: - self._compose(sender_mac, ["RSSI Boop (\x13)", e, payload], 0x03, 0x12) - elif subtype == b'\x14': #Reboot - self.master._iprint(f"Reboot command received from {sender_mac} ({self.peer_name(sender_mac)})") - payload = __decode_payload(payload_type, payload) - if sender_mac in self._whitelist or payload == "sudo" or payload[-1] == "sudo": - self._compose(sender_mac, ["Reboot (\x14)", payload], 0x03, 0x13) - machine.reset() + self.master.iprint(f"Error: {e} with payload: {payload}") + __send_confirmation("Fail", sender_mac, f"{msg_subkey} ({subtype})", payload, e) else: - self._compose(sender_mac, ["Reboot (\x14)", "Not authorised", payload], 0x03, 0x12) - elif subtype == b'\x15': #Echo - self.master._iprint(f"Echo command received from {sender_mac} ({self.peer_name(sender_mac)}): {__decode_payload(payload_type, payload)}")#Check i or d - self._compose(sender_mac, payload, 0x03, 0x15) - elif subtype == b'\x16': #Run file - filename = __decode_payload(payload_type, payload) - self.master._iprint(f"Run command received from {sender_mac} ({self.peer_name(sender_mac)}): {filename}") - #try: - # task = asyncio.create_task(run_script(filename)) - #Needs - #async def execute_script(script_path): - # Load and execute the script - #with open(script_path) as f: - # script_code = f.read() - # - #exec(script_code) # This executes the script in the current namespace - # - # Alternatively, if you want to run this in a separate function scope - #exec(script_code, {'__name__': '__main__'}) - # except Exception as e: - # print(f"Error running {filename}: {e}") - elif subtype == b'\x17': #Resend lost long messages - self.master._iprint("Received resend long message command, long_sent_buffer disabled due to memory constraints") -# payload = __decode_payload(payload_type, payload) -# self.master._iprint("Received resend long message command, checking buffer for lost message") -# key = payload[0:8] -# indexes_b = payload[8:] -# indexes = [] -# for i in range(0, len(indexes_b)): -# indexes.append(int.from_bytes(indexes_b[i], 'big')) -# if key in self._long_sent_buffer: -# channel = self._long_sent_buffer[key][1][0] -# ifidx = self._long_sent_buffer[key][1][1] -# for index in indexes: -# message.append(self._long_sent_buffer[key][0][index]) -# self._send(sender_mac, messages, channel, ifidx) -# #resend the message from the message buffer -# self.master._iprint("Resent all requested messages") -# else: -# self.master._iprint("Message not found in the buffer") - elif subtype == b'\x18': #Connect to WiFi - payload = __decode_payload(payload_type, payload) #should return a list of ssid and password - self.master._iprint("Received connect to wifi command") - if sender_mac in self._whitelist or payload == "sudo" or payload[-1] == "sudo": + __send_confirmation("Fail", sender_mac, f"{msg_subkey} ({subtype})", payload, "Not authorised") + elif (msg_subkey := "File-Update") and subtype == msg_subcodes[msg_key][msg_subkey]: # File-Update + self.master.iprint( + f"{msg_subkey} ({subtype}) command received from {sender_mac} ({self.peer_name(sender_mac)})") + if __check_authorisation(sender_mac, payload): try: - self.connect(payload[0], payload[1]) - self._compose(sender_mac, ["WiFi connect (\x18)", payload], 0x03, 0x11) + # Insert update logic here + self.master.iprint("No update logic written just yet") + __send_confirmation("Success", sender_mac, f"{msg_subkey} ({subtype})", payload) except Exception as e: - self.master._iprint(f"Error: {e} with payload: {payload}") - self._compose(sender_mac, ["WiFi connect (\x18)", e, payload], 0x03, 0x12) + self.master.iprint(f"Error: {e} with payload: {payload}") + __send_confirmation("Fail", sender_mac, f"{msg_subkey} ({subtype})", payload, e) else: - self._compose(sender_mac, ["WiFi connect (\x18)", "Not authorised", payload], 0x03, 0x12) - elif subtype == b'\x19': #Disconnect from WiFi - self.master._iprint("Received disconnect from wifi command") - payload = __decode_payload(payload_type, payload) - if sender_mac in self._whitelist or payload == "sudo" or payload[-1] == "sudo": + __send_confirmation("Fail", sender_mac, f"{msg_subkey} ({subtype})", payload, "Not authorised") + elif (msg_subkey := "File-Download") and subtype == msg_subcodes[msg_key][msg_subkey]: # File-Download + self.master.iprint( + f"{msg_subkey} ({subtype}) command received from {sender_mac} ({self.peer_name(sender_mac)})") + # should return a list with a link and the list of files to download + if __check_authorisation(sender_mac, payload): try: - self.disconnect() - self._compose(sender_mac, ["WiFi disconnect (\x19)", payload], 0x03, 0x11) + import mip + base = payload[0] + files_to_copy = payload[1] + if files_to_copy is None: + mip.install(base) + else: + for f in files_to_copy: + mip.install(base + f) + __send_confirmation("Success", sender_mac, f"{msg_subkey} ({subtype})", payload) except Exception as e: - self.master._iprint(f"Error: {e}") - self._compose(sender_mac, ["WiFi disconnect (\x19)", e, payload], 0x03, 0x12) + self.master.iprint(f"Error: {e} with payload: {payload}") + __send_confirmation("Fail", sender_mac, f"{msg_subkey} ({subtype})", payload, e) else: - self._compose(sender_mac, ["WiFi disconnect (\x19)", "Not authorised", payload], 0x03, 0x12) - elif subtype == b'\x20': #Enable AP - payload = __decode_payload(payload_type, payload) #should return a list of desired name, password an max clients - self.master._iprint("Received setup AP command") - if sender_mac in self._whitelist or payload == "sudo" or payload[-1] == "sudo": + __send_confirmation("Fail", sender_mac, f"{msg_subkey} ({subtype})", payload, "Not authorised") + elif (msg_subkey := "Web-Repl") and subtype == msg_subcodes[msg_key][msg_subkey]: # Web-Repl + self.master.iprint( + f"{msg_subkey} ({subtype}) command received from {sender_mac} ({self.peer_name(sender_mac)})") + # should be a list with name and password + if __check_authorisation(sender_mac, payload): try: - ssid = payload[0] - if ssid == "": - ssid = self.master.name - password = payload[1] - self.setap(ssid, password) - self._compose(sender_mac, ["Setup AP (\x20)", payload], 0x03, 0x11) + # add logic to connect to Wi-Fi and set up webrepl + self.master.sta.connect(payload[0], payload[1]) + link = "webrepl link" + __send_confirmation("Success", sender_mac, f"{msg_subkey} ({subtype})", link) except Exception as e: - self.master._iprint(f"Error: {e} with payload: {payload}") - self._compose(sender_mac, ["Setup AP (\x20)", e, payload], 0x03, 0x12) + self.master.iprint(f"Error: {e} with payload: {payload}") + __send_confirmation("Fail", sender_mac, f"{msg_subkey} ({subtype})", payload, e) else: - self._compose(sender_mac, ["Setup AP (\x20)", "Not authorised", payload], 0x03, 0x12) - elif subtype == b'\x21': #Disable AP - self.master._iprint("Received disable AP command") - payload = __decode_payload(payload_type, payload) #should return a list of desired name, password an max clients - if sender_mac in self._whitelist or payload == "sudo" or payload[-1] == "sudo": + __send_confirmation("Fail", sender_mac, f"{msg_subkey} ({subtype})", payload, "Not authorised") + elif (msg_subkey := "File-Run") and subtype == msg_subcodes[msg_key][msg_subkey]: # File-Run + self.master.iprint( + f"{msg_subkey} ({subtype}) command received from {sender_mac} ({self.peer_name(sender_mac)})") + if __check_authorisation(sender_mac, payload): try: - self.master.ap.deactivate() - self._compose(sender_mac, ["Disable AP (\x21)", payload], 0x03, 0x11) + self.master.iprint("Execute logic not implemented") + # insert run logic here except Exception as e: - self.master._iprint(f"Error: {e}") - self._compose(sender_mac, ["Disable AP (\x21)", e, payload], 0x03, 0x12) + self.master.iprint(f"Error: {e} with payload: {payload}") + __send_confirmation("Fail", sender_mac, f"{msg_subkey} ({subtype})", payload, e) else: - self._compose(sender_mac, ["Disable AP (\x21)", "Not authorised", payload], 0x03, 0x12) - elif subtype == b'\x22': #Set Admin Bool - payload = __decode_payload(payload_type, payload) #should return a bool in a list - if sender_mac in self._whitelist or payload == "sudo" or payload[-1] == "sudo": - self.master._iprint(f"Received set admin command: self.admin set to {payload[0]}") + __send_confirmation("Fail", sender_mac, f"{msg_subkey} ({subtype})", payload, "Not authorised") + elif (msg_subkey := "Set-Admin") and subtype == msg_subcodes[msg_key][msg_subkey]: # Set Admin Bool + self.master.iprint( + f"{msg_subkey} ({subtype}) command received from {sender_mac} ({self.peer_name(sender_mac)})") + if __check_authorisation(sender_mac, payload): + self.master.iprint(f"Received Set-Admin command: self.admin set to {payload[0]}") try: - self.master._admin = payload[0] - self._compose(sender_mac, ["Set Admin Bool (\x21)", "Success", payload], 0x03, 0x11) + self.master.admin = payload[0] + __send_confirmation("Success", sender_mac, f"{msg_subkey} ({subtype})", payload) except Exception as e: - self._compose(sender_mac, ["Set Admin Bool (\x21)", e, payload], 0x03, 0x12) + __send_confirmation("Fail", sender_mac, f"{msg_subkey} ({subtype})", payload, e) else: - self._compose(sender_mac, ["Set Admin Bool (\x21)", "Not authorised", payload], 0x03, 0x12) - elif subtype == b'\x27': #Add Admin macs to _whitelist - payload = __decode_payload(payload_type, payload) #should return a bool in a list - if sender_mac in self._whitelist or payload == "sudo" or payload[-1] == "sudo": - self.master._iprint(f"Received add admin macs to _whitelist command, added {payload[0]} and {payload[1]}") + __send_confirmation("Fail", sender_mac, f"{msg_subkey} ({subtype})", payload, "Not authorised") + elif (msg_subkey := "Whitelist-Add") and subtype == msg_subcodes[msg_key][ + msg_subkey]: # Whitelist-Add - Add Admin macs to _whitelist + self.master.iprint( + f"{msg_subkey} ({subtype}) command received from {sender_mac} ({self.peer_name(sender_mac)})") + if __check_authorisation(sender_mac, payload): + self.master.iprint( + f"Received add admin macs to _whitelist command, added {payload[0]} and {payload[1]}") try: self._whitelist.append(payload[0]) self._whitelist.append(payload[1]) - self._compose(sender_mac, ["Add admin macs to _whitelist (\x27)", "Success", payload], 0x03, 0x11) + __send_confirmation("Success", sender_mac, f"{msg_subkey} ({subtype})", payload) except Exception as e: - self._compose(sender_mac, ["Add admin macs to _whitelist (\x27)", e, payload], 0x03, 0x12) + __send_confirmation("Fail", sender_mac, f"{msg_subkey} ({subtype})", payload, e) else: - self._compose(sender_mac, ["Add admin macs to _whitelist (\x27)", "Not authorised", payload], 0x03, 0x12) - elif subtype == b'\x23': #Set Pause - payload = __decode_payload(payload_type, payload) #should return a bool - if sender_mac in self._whitelist or payload == "sudo" or payload[-1] == "sudo": + __send_confirmation("Fail", sender_mac, f"{msg_subkey} ({subtype})", payload, "Not authorised") + elif (msg_subkey := "Config-Change") and subtype == msg_subcodes[msg_key][msg_subkey]: # change config + self.master.iprint( + f"{msg_subkey} ({subtype}) command received from {sender_mac} ({self.peer_name(sender_mac)})") + if __check_authorisation(sender_mac, payload): try: - self.master._iprint(f"Received pause command: {payload[0]}") - self._running = False - self._compose(sender_mac, ["Set Pause (\x23)", "Success", payload], 0x03, 0x11) - if self._pause_function: - self._pause_function() #calls the custom set pause function to display a screen - while not self._running: - sleep(0.5) + self.master.iprint(f"Not yet implemented") + __send_confirmation("Success", sender_mac, f"{msg_subkey} ({subtype})", payload) except Exception as e: - self._compose(sender_mac, ["Set Pause (\x23)", e, payload], 0x03, 0x12) + __send_confirmation("Fail", sender_mac, f"{msg_subkey} ({subtype})", payload, e) + else: + __send_confirmation("Fail", sender_mac, f"{msg_subkey} ({subtype})", payload, "Not authorised") + elif (msg_subkey := "Ping") and subtype == msg_subcodes[msg_key][msg_subkey]: # Ping + self.master.iprint( + f"{msg_subkey} ({subtype}) command received from {sender_mac} ({self.peer_name(sender_mac)})") + self.add_peer(sender_mac, payload[2], payload[0], payload[1]) + if bool(self.ifidx): + channel = self.master.ap.channel() else: - self._compose(sender_mac, ["Set Pause (\x23)", "Not authorised", payload], 0x03, 0x12) - elif subtype == b'\x24': #Set Continue - payload = __decode_payload(payload_type, payload) #should return a bool - if sender_mac in self._whitelist or payload == "sudo" or payload[-1] == "sudo": + channel = self.master.sta.channel() + response = [channel, self.ifidx, self.master.name, send_timestamp] + self._compose(sender_mac, response, 0x03, 0x10) + elif (msg_subkey := "Pair") and subtype == msg_subcodes[msg_key][ + msg_subkey]: # Pair #add something that checks that the messages are from the same mac + self.master.iprint( + f"{msg_subkey} ({subtype}) command received from {sender_mac} ({self.peer_name(sender_mac)})") + if self._pairing_enabled and networking_keys["handshake_key_1"] == payload: + self._pairing = True + self.pair(sender_mac, networking_keys["handshake_key_2"]) + # some timer for if key 3 is not received to reset states + elif self._pairing_enabled and self._pairing and networking_keys["handshake_key_2"] == payload: + self._paired = True + self._paired_macs.append(sender_mac) + self.pair(sender_mac, networking_keys["handshake_key_3"]) + # some timer that sets to false if key 4 is not received + elif self._pairing_enabled and self._pairing and networking_keys["handshake_key_3"] == payload: try: - self.master._iprint(f"Received continue command: {payload}") - self.master._running = True - self._compose(sender_mac, ["Set Continue (\x23)", "Success", payload], 0x03, 0x11) + # Insert pairing logic here do a reverse handshake + self._paired = True + self._paired_macs.append(sender_mac) + self.pair(sender_mac, networking_keys["handshake_key_4"]) + self.master.iprint("no pairing logic written just yet") + __send_confirmation("Success", sender_mac, f"{msg_subkey} ({subtype})", payload) except Exception as e: - self._compose(sender_mac, ["Set Continue (\x23)", e, payload], 0x03, 0x12) + self.master.iprint(f"Error: {e} with payload: {payload}") + __send_confirmation("Fail", sender_mac, f"{msg_subkey} ({subtype})", payload, e) + elif self._pairing_enabled and self._pairing and networking_keys["handshake_key_3"] == payload: + self._paired = True + # remove timer from before else: - self._compose(sender_mac, ["Set Continue (\x23)", "Not authorised", payload], 0x03, 0x12) - elif subtype == b'\x25': #Download github files - self.master._iprint(f"Received download files command") - payload = __decode_payload(payload_type, payload) #should return a list with a link and the list of files to download - if sender_mac in self._whitelist or payload[-1] == "sudo": + __send_confirmation("Fail", sender_mac, f"{msg_subkey} ({subtype})", "Pairing disabled", + payload) + elif (msg_subkey := "Set-Pair") and subtype == msg_subcodes[msg_key][msg_subkey]: # Enable pairing mode + self.master.iprint( + f"{msg_subkey} ({subtype}) command received from {sender_mac} ({self.peer_name(sender_mac)})") + if __check_authorisation(sender_mac, payload): try: - import mip - base = payload[0] - files_to_copy = payload[1] - for f in files_to_copy: - print("Installing: ", f) - mip.install(base + f) - self._compose(sender_mac, ["Github Download (\x25)", payload], 0x03, 0x11) + self._pairing_enabled = payload[0] + __send_confirmation("Success", sender_mac, f"{msg_subkey} ({subtype})", payload) except Exception as e: - self.master._iprint(f"Error: {e} with payload: {payload}") - self._compose(sender_mac, ["Github Download (\x25)", e, payload], 0x03, 0x12) - else: - self._compose(sender_mac, ["Github Download (\x25)", "Not authorised", payload], 0x03, 0x12) - elif subtype == b'\x28': #Get List of Files - self.master._iprint(f"Received list files command") + self.master.iprint(f"Error: {e} with payload: {payload}") + __send_confirmation("Fail", sender_mac, f"{msg_subkey} ({subtype})", payload, e) + else: + __send_confirmation("Fail", sender_mac, f"{msg_subkey} ({subtype})", payload, "Not authorised") + elif (msg_subkey := "RSSI/Status/Config-Boop") and subtype == msg_subcodes[msg_key][ + msg_subkey]: # RSSI/Status/Config Boop + self.boops = self.boops + 1 + self.master.iprint( + f"{msg_subkey} ({subtype}) command received from {sender_mac} ({self.peer_name(sender_mac)}), Received total of {self.boops} boops!") + try: + self._compose(sender_mac, + [self.master.id, self.master.name, self.master.config, self.master.version_n, + self.master.version, self.master.sta.mac, self.master.ap.mac, self.rssi()], 0x02, + 0x20) # [ID, Name, Config, Version, sta mac, ap mac, rssi] + except Exception as e: + __send_confirmation("Fail", sender_mac, f"{msg_subkey} ({subtype})", payload, e) + elif (msg_subkey := "Directory-Get") and subtype == msg_subcodes[msg_key][ + msg_subkey]: # Get List of Files + self.master.iprint( + f"{msg_subkey} ({subtype}) command received from {sender_mac} ({self.peer_name(sender_mac)})") try: result = [] entries = os.listdir() @@ -871,82 +915,182 @@ def __handle_cmd(sender_mac, subtype, stimestamp, rtimestamp, payload_type, payl result.append(full_path) self._compose(sender_mac, result, 0x02, 0x20) except Exception as e: - self._compose(sender_mac, ["List files (\x28)", e, payload], 0x03, 0x12) + __send_confirmation("Fail", sender_mac, f"{msg_subkey} ({subtype})", payload, e) + elif (msg_subkey := "Echo") and subtype == msg_subcodes[msg_key][msg_subkey]: # Echo + self.master.iprint( + f"{msg_subkey} ({subtype}) command received from {sender_mac} ({self.peer_name(sender_mac)}): {__decode_payload(payload_type, payload)}") # Check i or d + self._compose(sender_mac, payload, 0x03, 0x15) + elif (msg_subkey := "Resend") and subtype == msg_subcodes[msg_key][ + msg_subkey]: # Resend lost long messages + self.master.iprint( + f"{msg_subkey} ({subtype}) command received from {sender_mac} ({self.peer_name(sender_mac)})") + self.master.iprint("Long_sent_buffer disabled due to memory constraints") + elif (msg_subkey := "WiFi-Connect") and subtype == msg_subcodes[msg_key][ + msg_subkey]: # Connect to Wi-Fi + self.master.iprint( + f"{msg_subkey} ({subtype}) command received from {sender_mac} ({self.peer_name(sender_mac)})") + if __check_authorisation(sender_mac, payload): + try: + self.master.sta.connect(payload[0], payload[1]) + __send_confirmation("Success", sender_mac, f"{msg_subkey} ({subtype})", payload) + except Exception as e: + self.master.iprint(f"Error: {e} with payload: {payload}") + __send_confirmation("Fail", sender_mac, f"{msg_subkey} ({subtype})", payload, e) + else: + __send_confirmation("Fail", sender_mac, f"{msg_subkey} ({subtype})", payload, "Not authorised") + elif (msg_subkey := "WiFi-Disconnect") and subtype == msg_subcodes[msg_key][ + msg_subkey]: # Disconnect from Wi-Fi + self.master.iprint( + f"{msg_subkey} ({subtype}) command received from {sender_mac} ({self.peer_name(sender_mac)})") + if __check_authorisation(sender_mac, payload): + try: + self.master.sta.disconnect() + __send_confirmation("Success", sender_mac, f"{msg_subkey} ({subtype})", payload) + except Exception as e: + self.master.iprint(f"Error: {e}") + __send_confirmation("Fail", sender_mac, f"{msg_subkey} ({subtype})", payload, e) + else: + __send_confirmation("Fail", sender_mac, f"{msg_subkey} ({subtype})", payload, "Not authorised") + elif (msg_subkey := "AP-Enable") and subtype == msg_subcodes[msg_key][msg_subkey]: # Enable AP + self.master.iprint( + f"{msg_subkey} ({subtype}) command received from {sender_mac} ({self.peer_name(sender_mac)})") + if __check_authorisation(sender_mac, payload): + try: + ssid = payload[0] + if ssid == "": + ssid = self.master.name + password = payload[1] + self.master.ap.setap(ssid, password) + __send_confirmation("Success", sender_mac, f"{msg_subkey} ({subtype})", payload) + except Exception as e: + self.master.iprint(f"Error: {e} with payload: {payload}") + __send_confirmation("Fail", sender_mac, f"{msg_subkey} ({subtype})", payload, e) + else: + __send_confirmation("Fail", sender_mac, f"{msg_subkey} ({subtype})", payload, "Not authorised") + elif (msg_subkey := "AP-Disable") and subtype == msg_subcodes[msg_key][msg_subkey]: # Disable AP + self.master.iprint( + f"{msg_subkey} ({subtype}) command received from {sender_mac} ({self.peer_name(sender_mac)})") + payload = __decode_payload(payload_type, + payload) # should return a list of desired name, password and max clients + if __check_authorisation(sender_mac, payload): + try: + self.master.ap.deactivate() + __send_confirmation("Success", sender_mac, f"{msg_subkey} ({subtype})", payload) + except Exception as e: + self.master.iprint(f"Error: {e}") + __send_confirmation("Fail", sender_mac, f"{msg_subkey} ({subtype})", payload, e) + else: + __send_confirmation("Fail", sender_mac, f"{msg_subkey} ({subtype})", payload, "Not authorised") + elif (msg_subkey := "Pause") and subtype == msg_subcodes[msg_key][msg_subkey]: # Set Pause + self.master.iprint( + f"{msg_subkey} ({subtype}) command received from {sender_mac} ({self.peer_name(sender_mac)})") + if __check_authorisation(sender_mac, payload): + try: + self.master.iprint(f"Received pause command: {payload[0]}") + self._running = False + __send_confirmation("Success", sender_mac, f"{msg_subkey} ({subtype})", payload) + if self._pause_function: + self._pause_function() # calls the custom set pause function to display a screen + while not self._running: + time.sleep(0.5) + except Exception as e: + __send_confirmation("Fail", sender_mac, f"{msg_subkey} ({subtype})", payload, e) + else: + __send_confirmation("Fail", sender_mac, f"{msg_subkey} ({subtype})", payload, "Not authorised") + elif (msg_subkey := "Resume") and subtype == msg_subcodes[msg_key][msg_subkey]: # Set Continue + self.master.iprint( + f"{msg_subkey} ({subtype}) command received from {sender_mac} ({self.peer_name(sender_mac)})") + if __check_authorisation(sender_mac, payload): + try: + self.master.iprint(f"Received continue command: {payload}") + self.master._running = True + __send_confirmation("Success", sender_mac, f"{msg_subkey} ({subtype})", payload) + except Exception as e: + __send_confirmation("Fail", sender_mac, f"{msg_subkey} ({subtype})", payload, e) + else: + __send_confirmation("Fail", sender_mac, f"{msg_subkey} ({subtype})", payload, "Not authorised") else: - self.master._iprint(f"Unknown command subtype from {sender_mac} ({self.peer_name(sender_mac)}): {subtype}") - - def __handle_inf(sender_mac, subtype, stimestamp, rtimestamp, payload_type, payload): - self.master._dprint("aen.__inf") - if subtype == b'\x20': #RSSI - payload = __decode_payload(payload_type, payload) - #payload["time_sent"] = stimestamp - #payload["time_recv"] = rtimestamp - self.master._iprint(f"RSSI data received from {sender_mac} ({self.peer_name(sender_mac)}): {payload}") + self.master.iprint( + f"Unknown command subtype from {sender_mac} ({self.peer_name(sender_mac)}): {subtype}") + + def __handle_inf(sender_mac, subtype, send_timestamp, receive_timestamp, payload_type, payload, msg_key): + self.master.dprint("aen.__handle_inf") + payload = __decode_payload(payload_type, payload) + if (msg_subkey := "RSSI") and subtype == msg_subcodes[msg_key][msg_subkey]: # RSSI + # payload["time_sent"] = send_timestamp + # payload["time_recv"] = receive_timestamp + self.master.iprint( + f"{msg_subkey} ({subtype}) data received from {sender_mac} ({self.peer_name(sender_mac)}): {payload}") self.received_rssi_data[sender_mac] = payload - #self._compose(sender_mac, ["RSSI (\x20)", payload], 0x03, 0x13) #confirm message recv - elif subtype == b'\x21': #Sensor Data - payload = __decode_payload(payload_type, payload) - payload["time_sent"] = stimestamp - payload["time_recv"] = rtimestamp - self.master._iprint(f"Sensor data received from {sender_mac} ({self.peer_name(sender_mac)}): {payload}") + # __send_confirmation("Confirm", sender_mac, f"{msg_subkey} ({subtype})", payload) #confirm message recv + elif (msg_subkey := "Sensor") and subtype == msg_subcodes[msg_key][msg_subkey]: # Sensor Data + payload["time_sent"] = send_timestamp + payload["time_recv"] = receive_timestamp + self.master.iprint( + f"{msg_subkey} ({subtype}) data received from {sender_mac} ({self.peer_name(sender_mac)}): {payload}") self.received_sensor_data[sender_mac] = payload - #self._compose(sender_mac, ["Sensor Data (\x21)", payload], 0x03, 0x13) #confirm message recv - elif subtype == b'\x22': #Message / Other - payloadlength = len(payload) - payload = __decode_payload(payload_type, payload) - self.master._iprint(f"Message received from {sender_mac} ({self.peer_name(sender_mac)}): {payload}") - self._received_messages.append((sender_mac, payload, rtimestamp)) - self._received_messages_size.append(payloadlength) + # __send_confirmation("Confirm", sender_mac, f"{msg_subkey} ({subtype})", payload) #confirm message recv + elif (msg_subkey := "Message") and subtype == msg_subcodes[msg_key][msg_subkey]: # Message / Other + self.master.iprint( + f"{msg_subkey} ({subtype}) received from {sender_mac} ({self.peer_name(sender_mac)}): {payload}") + self._received_messages.append((sender_mac, payload, receive_timestamp)) + self._received_messages_size.append(len(payload)) while len(self._received_messages) > 2048 or sum(self._received_messages_size) > 20000: - self.master._dprint(f"Maximum buffer size reached: {len(self._received_messages)}, {sum(self._received_messages_size)} bytes; Reducing!") + self.master.dprint( + f"Maximum buffer size reached: {len(self._received_messages)}, {sum(self._received_messages_size)} bytes; Reducing!") self._received_messages.pop(0) self._received_messages_size.pop(0) - #self._compose(sender_mac, ["Other (\x22)", payload], 0x03, 0x13) #confirm message recv - elif subtype == b'\x23': #File Directory - payload = __decode_payload(payload_type, payload) - self.master._iprint(f"Directory received from {sender_mac} ({self.peer_name(sender_mac)}): {payload}") - #self._compose(sender_mac, ["Other (\x23)", payload], 0x03, 0x13) #confirm message recv + # __send_confirmation("Confirm", sender_mac, f"{msg_subkey} ({subtype})", payload) #confirm message recv + elif (msg_subkey := "Directory") and subtype == msg_subcodes[msg_key][msg_subkey]: # File Directory + self.master.iprint( + f"{msg_subkey} ({subtype}) data received from {sender_mac} ({self.peer_name(sender_mac)}): {payload}") + # __send_confirmation("Confirm", sender_mac, f"{msg_subkey} ({subtype})", payload) #confirm message recv else: - payload = __decode_payload(payload_type, payload) - self.master._iprint(f"Unknown info subtype from {sender_mac} ({self.peer_name(sender_mac)}): {subtype}") - - def __handle_ack(sender_mac, subtype, stimestamp, rtimestamp, payload_type, payload): - self.master._dprint("aen.__handle_ack") - if subtype == b'\x10': #Pong - info = __decode_payload(payload_type, payload) - self.add_peer(sender_mac, info[2], info[0], info[1]) - self.master._iprint(f"Pong received from {sender_mac} ({self.peer_name(sender_mac)}), {rtimestamp-info[3]}") - elif subtype == b'\x15': #Echo - self.master._iprint(f"Echo received from {sender_mac} ({self.peer_name(sender_mac)}), {__decode_payload(payload_type, payload)}") - elif subtype == b'\x11': #Success - payload = __decode_payload(payload_type, payload) # should return a list with a cmd type and payload - self.master._iprint(f"Success received from {sender_mac} ({self.peer_name(sender_mac)}) for type {payload[0]} with payload {payload[1]}") - #add to ack buffer - elif subtype == b'\x12': #Fail - payload = __decode_payload(payload_type, payload) # should return a list with a cmd type, error and payload - self.master._iprint(f"Command fail received from {sender_mac} ({self.peer_name(sender_mac)}) for type {payload[0]} with error {payload[1]} and payload {payload[2]}") - #add to ack buffer - elif subtype == b'\x13': #Confirmation - payload = __decode_payload(payload_type,payload) # should return a list with message type and payload - self.master._iprint(f"Receive confirmation received from {sender_mac} ({self.peer_name(sender_mac)}) for type {payload[0]} with payload {payload[1]}") - #add to ack buffer + self.master.iprint( + f"Unknown info subtype from {sender_mac} ({self.peer_name(sender_mac)}): {subtype}") + + def __handle_ack(sender_mac, subtype, send_timestamp, receive_timestamp, payload_type, payload, msg_key): + self.master.dprint("aen.__handle_ack") + payload = __decode_payload(payload_type, payload) + if (msg_subkey := "Pong") and subtype == msg_subcodes[msg_key][msg_subkey]: # Pong + self.add_peer(sender_mac, payload[2], payload[0], payload[1]) + self.master.iprint( + f"{msg_subkey} ({subtype}) received from {sender_mac} ({self.peer_name(sender_mac)}), {receive_timestamp - payload[3]}") + elif (msg_subkey := "Echo") and subtype == msg_subcodes[msg_key][msg_subkey]: # Echo + self.master.iprint( + f"{msg_subkey} ({subtype}) received from {sender_mac} ({self.peer_name(sender_mac)}), {__decode_payload(payload_type, payload)}") + elif (msg_subkey := "Success") and subtype == msg_subcodes[msg_key][msg_subkey]: # Success + # payload should return a list with a cmd type and payload + self.master.iprint( + f"Cmd {msg_subkey} ({subtype}) received from {sender_mac} ({self.peer_name(sender_mac)}) for type {payload[0]} with payload {payload[1]}") + # add to ack buffer + elif (msg_subkey := "Fail") and subtype == msg_subcodes[msg_key][msg_subkey]: # Fail + # payload should return a list with a cmd type, error and payload + self.master.iprint( + f"Cmd {msg_subkey} ({subtype}) received from {sender_mac} ({self.peer_name(sender_mac)}) for type {payload[0]} with error {payload[1]} and payload {payload[2]}") + # add to ack buffer + elif (msg_subkey := "Confirm") and subtype == msg_subcodes[msg_key][msg_subkey]: # Confirmation + # payload should return a list with message type and payload + self.master.iprint( + f"{msg_subkey} ({subtype}) received from {sender_mac} ({self.peer_name(sender_mac)}) for type {payload[0]} with payload {payload[1]}") + # add to ack buffer else: - self.master._iprint(f"Unknown ack subtype from {sender_mac} ({self.peer_name(sender_mac)}): {subtype}, Payload: {payload}") + self.master.iprint( + f"Unknown ack subtype from {sender_mac} ({self.peer_name(sender_mac)}): {subtype}, Payload: {payload}") # Insert more acknowledgement logic here and/or add message to acknowledgement buffer - if self._aen.any(): - for sender_mac, data in self._aen: - self.master._dprint(f"Received {sender_mac, data}") - if sender_mac is None: # mac, msg will equal (None, None) on timeout + if self._aen.any(): + for mac, data in self._aen: + self.master.dprint(f"Received {mac, data}") + if mac is None: # mac, msg will equal (None, None) on timeout break if data: - rtimestamp = time.ticks_ms() - if sender_mac != None and data != None: - #self._received_messages.append((sender_mac, data, rtimestamp))#Messages will be saved here, this is only for debugging purposes - __process_message(sender_mac, data, rtimestamp) - if not self._aen.any():#this is necessary as the for loop gets stuck and does not exit properly. + if mac and data is not None: + # self._received_messages.append((sender_mac, data, receive_timestamp))#Messages will be saved here, this is only for debugging purposes + __process_message(mac, data, time.ticks_ms()) + if not self._aen.any(): # this is necessary as the for loop gets stuck and does not exit properly. break +# message structure (what kind of message types do I need?: Command which requires me to do something (ping, pair, change state(update, code, mesh mode, run a certain file), Informational Message (Sharing Sensor Data and RSSI Data) +# | Header (1 byte) | Type (1 byte) | Subtype (1 byte) | Timestamp(ms ticks) (4 bytes) | Payload type (1) | Payload (variable) | Checksum (1 byte) | -#message structure (what kind of message types do I need?: Command which requires me to do something (ping, pair, change state(update, code, mesh mode, run a certain file), Informational Message (Sharing Sensor Data and RSSI Data) -#| Header (1 byte) | Type (1 byte) | Subtype (1 byte) | Timestamp(ms ticks) (4 bytes) | Payload type (1) | Payload (variable) | Checksum (1 byte) | \ No newline at end of file diff --git a/software/release/README.qmd b/software/release/README.qmd new file mode 100644 index 0000000..03e21ca --- /dev/null +++ b/software/release/README.qmd @@ -0,0 +1,48 @@ +--- +format: gfm +execute: + echo: false + message: false + warning: false +editor: visual +editor_options: + chunk_output_type: console +--- + +# Smart System Education Platform + +```{r setup, include=FALSE} +file_path <- fs::path_real("README.qmd") +path_components <- strsplit(file_path, "/")[[1]] +shortened_path <- fs::path_join(path_components[-c(2, 3)]) +``` + +## Location: `r shortened_path` + +### Description + +This directory hosts release ready code for the smart modules. + +### Directories & Files + +The repository has the following directory tree. + +```{r} +fs::dir_tree(recurse = 1) +``` + +A short description of the directories can be found below. + +| name | description | contribution | +|--------------|-----------------------------------------|--------------| +| release/config.py | Smart Module Configuration File | Nick | +| main/main.py | Smart Module Main.py | Nick | +| networking/networking.py| This is the main networking code that builds on ESP-NOW. There are many prepared functionalities (and some more that I am working on), such as long message support, sending of various variable types (bytes, bytearray, dicts, lists, int, float, char, string), as well as different types of messages such as ping, echo and more. There are also various features in place to make the networking more robust. It needs config.py to function. | Nick | +| libraries/adxl345.py | Support for the built in accelerometer https://github.com/DFRobot/micropython-dflib/blob/master/ADXL345/user_lib/ADXL345.py | Milan | +| libraries/files.py | Custom save to file library | Milan | +| libraries/icons.py | Icon support for the smart motor module | Milan | +| libraries/sensors.py | Sensor support for the smart motor module | Milan | +| libraries/servo.py | Servo support for the smart motor module | Milan | +| libraries/smartmotor.py | Smart motor module main program | Milan | +| libraries/ssd1306.py | Support for the built in OLED screen https://github.com/stlehmann/micropython-ssd1306/blob/master/ssd1306.py | Milan | +| libraries/variableLED.py | Library that powers the variable LED grid | Sophie | diff --git a/software/release/config.py b/software/release/config.py new file mode 100644 index 0000000..1629372 --- /dev/null +++ b/software/release/config.py @@ -0,0 +1,59 @@ +mysecrets = {"SSID": "Tufts_Robot", "key": ""} +msg_codes = {"cmd": 0x01, "inf": 0x01, "ack": 0x01} +msg_subcodes = { + "cmd": { + "Reboot": 0x00, + "Firmware-Update": 0x01, + "File-Update": 0x02, + "File-Download": 0x03, + "File-Run": 0x05, + "Set-Admin": 0x06, + "Whitelist-Add": 0x07, + "Config-Change": 0x08, + "Ping": 0x10, + "Pair": 0x11, + "Set-Pair": 0x12, + "RSSI/Status/Config-Boop": 0x13, + "Directory-Get": 0x14, + "Echo": 0x15, + "Resend": 0x16, + "WiFi-Connect": 0x21, + "WiFi-Disconnect": 0x22, + "AP-Enable": 0x23, + "AP-Disable": 0x24, + "Pause": 0x25, + "Resume": 0x26, + }, + "inf": { + "RSSI": 0x20, + "Sensor": 0x21, + "Message": 0x22, + "Directory": 0x23, + }, + "ack": { + "Pong": 0x10, + "Success": 0x11, + "Fail": 0x12, + "Confirm": 0x13, + "Echo": 0x15, + }, +} +networking_keys = { + "default_handshake_key": "handshake", + "handshake_key1": "handshake1", + "handshake_key2": "handshake2", + "handshake_key3": "handshake3", + "handshake_key4": "handshake4", + "default_ap_key": "password", + "default_wifi_key": "password" +} +configname = "Nickname" +config = "AdminModuleConfig" +whitelist = [b'd\xe83\x84\xd8\x18', b'd\xe83\x84\xd8\x19', + b'd\xe83\x85\xd3\xbc', b'd\xe83\x85\xd3\xbd', + b'd\xe83\x84\xd8\x18', b'd\xe83\x84\xd8\x19'] #each ESP32 has two MAC addresses +i2c_dict = { + "0x3C": ["pca9685", 0, "screen"], + "0x53": ["ACCEL", 1, "accelerometer"] +} #key is i2c address: ["device name", Output (0) or Input (1), "Description"] +version = {"adxl345": 3, "files": 2, "icons": 2, "motor": 4, "main": 0, "networking": 0, "prefs": 2, "sensors": 4, "servo": 2, "ssd1306": 2} # motor used to be main