# esp32 listener # make sure listener is running before turning on the sender # demo of autodiscovery of espnow without hardcoded macs in the source # queue is from uasyncio # queue class - https://github.com/peterhinch/micropython-async/blob/master/v3/primitives/queue.py # espnow image - https://github.com/glenn20/micropython-espnow-images import uasyncio from queue import Queue import producer async def main(): print("starting master...") q = Queue(50) uasyncio.create_task(producer.producer(q)) while True: if not q.empty(): # check queue for contents packet = await q.get() # get one packet mac = packet[2:8] # extract mac from the packet message = packet[8:8 + packet[1]] # extract message from the packet print(message) if message == b'broadcast': # check if message is broadcast producer.send_message(mac, b'discovered') # acknowledge broadcast await uasyncio.sleep_ms(0) else: print("waiting for packet...") # queue is empty await uasyncio.sleep_ms(500) uasyncio.run(main())
# esp32 sender # make sure listener is on first # before turning on this sender # color meaning: # red - just starting, not registered from the listener # yellow - sent broadcast to listener # green - registered from the listener, sending normal messages # blue - empty queue message import uasyncio from queue import Queue import producer import neo registered = False neo_pin = 14 async def main(): global registered led = neo.Neo(neo_pin) led.set('red') q = Queue(50) uasyncio.create_task(producer.producer(q)) await uasyncio.sleep_ms(500) ######## start of discovery ########### led.set('yellow') for _ in range(3): producer.broadcast() # broadcast ourself 3 times await uasyncio.sleep_ms(300) await uasyncio.sleep_ms(200) # give listener some time while True: # check if listener found us packet = await q.get() # should check for empty mac = packet[2:8] # extract mac from packet message = packet[8:8 + packet[1]] # extract message from packet if message == b'discovered': # listener found us led.set('green') registered = True # our mac is registered break # on the listener peer list ########## end of discovery ########### if registered: led.set('green') while True: producer.send(b'normal') # send sensor readings... etc await uasyncio.sleep_ms(200) else: led.set('blue') print("not registered") uasyncio.run(main())
# producer.py import network from esp import espnow import uasyncio bcast = b'\xff' * 6 # broadcast mac six bytes of \xff wifi = network.WLAN(network.STA_IF) # espnow initialization wifi.active(True) e = espnow.ESPNow() e.init() e.add_peer(bcast) sr = uasyncio.StreamReader(e) # try adding peer from: https://github.com/zoland/SMesh/blob/main/smesh.py def try_adding_peer(mac): # add peer and check if it's already added try: e.add_peer(mac) except OSError as err: if len(err.args) < 2: raise err elif err.args[1] == 'ESP_ERR_ESPNOW_EXIST': ... else: raise err async def producer(q): while True: p = await sr.read(-1) # check for packets mac = p[2:8] # extract mac from packet try_adding_peer(mac) # try adding peer to espnow await uasyncio.sleep_ms(0) if not q.full(): # check if queue is full await q.put(p) # add packet to queue await uasyncio.sleep_ms(10) def broadcast(): e.send(bcast, b'broadcast', False) def send_message(mac, message): # send packet to particular mac e.send(mac, message, False) def send(message): # send packet to all peers registered e.send(message) def peer_info(): return e.peer_count()
# neo.py from machine import Pin, bitstream class Neo: def __init__(self, pin): self.pin = Pin(pin, Pin.OUT) self.timing = (400, 850, 800, 450) self.buf = bytearray(b'\x00\x00\x00') def set(self, color): if color == 'red' : self.buf = (b'\x40\x00\x00') elif color == 'green' : self.buf = (b'\x00\x40\x00') elif color == 'blue' : self.buf = (b'\x00\x00\x40') elif color == 'yellow' : self.buf = (b'\x40\x40\x00') elif color == 'white' : self.buf = (b'\x40\x40\x40') bitstream(self.pin, 0, self.timing, self.buf)