espnow autodiscovery
listener
# esp32 listener
# make sure listener is running before turning on the sender
# demo of autodiscovery of espnow without hardcoded macs in the source
# queue is from uasyncio
# queue class - https://github.com/peterhinch/micropython-async/blob/master/v3/primitives/queue.py
# espnow image - https://github.com/glenn20/micropython-espnow-images
import uasyncio
from queue import Queue
import producer
async def main():
print("starting master...")
q = Queue(50)
uasyncio.create_task(producer.producer(q))
while True:
if not q.empty(): # check queue for contents
packet = await q.get() # get one packet
mac = packet[2:8] # extract mac from the packet
message = packet[8:8 + packet[1]] # extract message from the packet
print(message)
if message == b'broadcast': # check if message is broadcast
producer.send_message(mac, b'discovered') # acknowledge broadcast
await uasyncio.sleep_ms(0)
else:
print("waiting for packet...") # queue is empty
await uasyncio.sleep_ms(500)
uasyncio.run(main())
sender
# esp32 sender
# make sure listener is on first
# before turning on this sender
# color meaning:
# red - just starting, not registered from the listener
# yellow - sent broadcast to listener
# green - registered from the listener, sending normal messages
# blue - empty queue message
import uasyncio
from queue import Queue
import producer
import neo
registered = False
neo_pin = 14
async def main():
global registered
led = neo.Neo(neo_pin)
led.set('red')
q = Queue(50)
uasyncio.create_task(producer.producer(q))
await uasyncio.sleep_ms(500)
######## start of discovery ###########
led.set('yellow')
for _ in range(3):
producer.broadcast() # broadcast ourself 3 times
await uasyncio.sleep_ms(300)
await uasyncio.sleep_ms(200) # give listener some time
while True: # check if listener found us
packet = await q.get() # should check for empty
mac = packet[2:8] # extract mac from packet
message = packet[8:8 + packet[1]] # extract message from packet
if message == b'discovered': # listener found us
led.set('green')
registered = True # our mac is registered
break # on the listener peer list
########## end of discovery ###########
if registered:
led.set('green')
while True:
producer.send(b'normal') # send sensor readings... etc
await uasyncio.sleep_ms(200)
else:
led.set('blue')
print("not registered")
uasyncio.run(main())
producer
# producer.py
import network
from esp import espnow
import uasyncio
bcast = b'\xff' * 6 # broadcast mac six bytes of \xff
wifi = network.WLAN(network.STA_IF) # espnow initialization
wifi.active(True)
e = espnow.ESPNow()
e.init()
e.add_peer(bcast)
sr = uasyncio.StreamReader(e)
# try adding peer from: https://github.com/zoland/SMesh/blob/main/smesh.py
def try_adding_peer(mac): # add peer and check if it's already added
try:
e.add_peer(mac)
except OSError as err:
if len(err.args) < 2:
raise err
elif err.args[1] == 'ESP_ERR_ESPNOW_EXIST':
...
else:
raise err
async def producer(q):
while True:
p = await sr.read(-1) # check for packets
mac = p[2:8] # extract mac from packet
try_adding_peer(mac) # try adding peer to espnow
await uasyncio.sleep_ms(0)
if not q.full(): # check if queue is full
await q.put(p) # add packet to queue
await uasyncio.sleep_ms(10)
def broadcast():
e.send(bcast, b'broadcast', False)
def send_message(mac, message): # send packet to particular mac
e.send(mac, message, False)
def send(message): # send packet to all peers registered
e.send(message)
def peer_info():
return e.peer_count()
neopixel
# neo.py
from machine import Pin, bitstream
class Neo:
def __init__(self, pin):
self.pin = Pin(pin, Pin.OUT)
self.timing = (400, 850, 800, 450)
self.buf = bytearray(b'\x00\x00\x00')
def set(self, color):
if color == 'red' : self.buf = (b'\x40\x00\x00')
elif color == 'green' : self.buf = (b'\x00\x40\x00')
elif color == 'blue' : self.buf = (b'\x00\x00\x40')
elif color == 'yellow' : self.buf = (b'\x40\x40\x00')
elif color == 'white' : self.buf = (b'\x40\x40\x40')
bitstream(self.pin, 0, self.timing, self.buf)