MicroPython: ESP-NOW с ESP32 — Приём данных от нескольких плат (многие-к-одному)
В этом руководстве по MicroPython мы покажем вам, как настроить ESP32 для приёма и отображения данных от нескольких плат ESP32 с использованием протокола связи ESP-NOW (конфигурация «многие-к-одному»). Мы создадим пример проекта, в котором платы-отправители передают данные датчиков в формате JSON, а одна плата-приёмник собирает данные и отображает их на OLED-дисплее. В руководстве используются две платы-отправителя в качестве примера, но установку можно легко масштабировать для включения большего количества плат.
Используете Arduino IDE? Следуйте этому руководству: ESP-NOW с ESP32: Приём данных от нескольких плат (многие-к-одному).
Предварительные требования — прошивка MicroPython
Для выполнения этого руководства вам нужна прошивка MicroPython, установленная на ваших платах ESP32 или ESP8266. Вам также понадобится IDE для написания и загрузки кода на плату. Мы рекомендуем использовать Thonny IDE:
Новичок в MicroPython? Ознакомьтесь с книгой: MicroPython Programming with ESP32 and ESP8266 eBook (2nd Edition)
Знакомство с ESP-NOW
ESP-NOW — это протокол беспроводной связи, разработанный Espressif, который позволяет нескольким платам ESP32 или ESP8266 обмениваться небольшими объёмами данных без использования Wi-Fi или Bluetooth. ESP-NOW не требует полного Wi-Fi-соединения (хотя контроллер Wi-Fi должен быть включён), что делает его идеальным для приложений с низким энергопотреблением и низкой задержкой, таких как сенсорные сети, пульты дистанционного управления или обмен данными между платами.
ESP-NOW использует модель связи без установления соединения, что означает, что устройства могут отправлять и получать данные без подключения к маршрутизатору или настройки точки доступа (в отличие от HTTP-связи между платами). Он поддерживает одноадресную (unicast — отправка данных на конкретное устройство по его MAC-адресу) и широковещательную (broadcast — отправка данных всем ближайшим устройствам с использованием широковещательного MAC-адреса) передачу сообщений.
Новичок в ESP-NOW? Прочитайте наше руководство по началу работы: MicroPython: ESP-NOW с ESP32 (Начало работы).
Обзор проекта
В этом руководстве показано, как настроить плату ESP32 для приёма данных от нескольких плат ESP32 через протокол связи ESP-NOW (конфигурация «многие-к-одному»), как показано на следующем рисунке.
Одна плата ESP32 выступает в роли приёмника;
Несколько плат ESP32 выступают в роли отправителей. В данном примере мы используем две платы-отправителя ESP32. При необходимости вы можете добавить в свою установку больше плат.
Плата-приёмник ESP32 получает сообщения от всех отправителей и определяет, какая плата отправила сообщение.
В качестве примера мы будем обмениваться значениями датчика BME280 между платами. Вы можете модифицировать этот проект для использования любого другого датчика или обмена любыми другими данными между платами.
Необходимые компоненты
Для выполнения проекта в этом руководстве вам понадобятся следующие компоненты:
3 (или более) платы ESP32
ESP32: Получение MAC-адреса платы
Для связи через ESP-NOW вам нужно знать MAC-адрес ваших плат. Чтобы получить MAC-адрес вашей платы, вы можете скопировать следующий код в Thonny IDE и запустить его на вашей плате.
# Rui Santos & Sara Santos - Random Nerd Tutorials
# Complete project details at https://RandomNerdTutorials.com/micropython-esp-now-esp32/
import network
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
# Get MAC address (returns bytes)
mac = wlan.config('mac')
# Convert to human-readable format
mac_address = ':'.join('%02x' % b for b in mac)
print("MAC Address:", mac_address)
После запуска кода он должен вывести MAC-адрес платы в консоль.
Получите MAC-адрес для всех ваших плат.
Например, в моём случае я получил:
Плата-отправитель 1: 24:0A:C4:31:40:50
Плата-отправитель 2: 30:AE:A4:F6:7D:4C
Плата-приёмник: 30:AE:A4:07:0D:64
Подготовка плат-отправителей
В этом руководстве мы будем отправлять данные на плату-приёмник ESP32 (через ESP-NOW) от двух разных плат. Вы можете модифицировать этот проект для отправки данных с большего количества плат.
Каждая плата будет идентифицироваться по ID (числу, которое мы присвоим каждой плате):
ID=1 для board1
ID=2 для board2
Подключение датчика BME280
Каждая плата-отправитель будет отправлять данные об окружающей среде с датчика BME280. Подключите датчик BME280 к каждой из ваших плат. Мы будем использовать стандартные пины I2C для ESP32.
Узнайте больше об I2C с ESP32: ESP32 I2C Communication: Set Pins, Multiple Bus Interfaces and Peripherals (Arduino IDE).
Не знакомы с BME280 и ESP32? Прочитайте это руководство: MicroPython: BME280 с ESP32 (давление, температура, влажность).
Если вы используете другую модель платы, стандартные пины I2C могут отличаться.
Импорт библиотеки BME280
Библиотека для работы с датчиком BME280 не входит в стандартную библиотеку MicroPython. Поэтому вам нужно загрузить файл библиотеки на каждую из ваших плат-отправителей.
Модуль BME280.py для MicroPython
Скопируйте следующий код в новый файл в Thonny IDE.
from machine import I2C
import time
# BME280 default address.
BME280_I2CADDR = 0x76
# Operating Modes
BME280_OSAMPLE_1 = 1
BME280_OSAMPLE_2 = 2
BME280_OSAMPLE_4 = 3
BME280_OSAMPLE_8 = 4
BME280_OSAMPLE_16 = 5
# BME280 Registers
BME280_REGISTER_DIG_T1 = 0x88 # Trimming parameter registers
BME280_REGISTER_DIG_T2 = 0x8A
BME280_REGISTER_DIG_T3 = 0x8C
BME280_REGISTER_DIG_P1 = 0x8E
BME280_REGISTER_DIG_P2 = 0x90
BME280_REGISTER_DIG_P3 = 0x92
BME280_REGISTER_DIG_P4 = 0x94
BME280_REGISTER_DIG_P5 = 0x96
BME280_REGISTER_DIG_P6 = 0x98
BME280_REGISTER_DIG_P7 = 0x9A
BME280_REGISTER_DIG_P8 = 0x9C
BME280_REGISTER_DIG_P9 = 0x9E
BME280_REGISTER_DIG_H1 = 0xA1
BME280_REGISTER_DIG_H2 = 0xE1
BME280_REGISTER_DIG_H3 = 0xE3
BME280_REGISTER_DIG_H4 = 0xE4
BME280_REGISTER_DIG_H5 = 0xE5
BME280_REGISTER_DIG_H6 = 0xE6
BME280_REGISTER_DIG_H7 = 0xE7
BME280_REGISTER_CHIPID = 0xD0
BME280_REGISTER_VERSION = 0xD1
BME280_REGISTER_SOFTRESET = 0xE0
BME280_REGISTER_CONTROL_HUM = 0xF2
BME280_REGISTER_CONTROL = 0xF4
BME280_REGISTER_CONFIG = 0xF5
BME280_REGISTER_PRESSURE_DATA = 0xF7
BME280_REGISTER_TEMP_DATA = 0xFA
BME280_REGISTER_HUMIDITY_DATA = 0xFD
class Device:
"""Class for communicating with an I2C device.
Allows reading and writing 8-bit, 16-bit, and byte array values to
registers on the device."""
def __init__(self, address, i2c):
"""Create an instance of the I2C device at the specified address using
the specified I2C interface object."""
self._address = address
self._i2c = i2c
def writeRaw8(self, value):
"""Write an 8-bit value on the bus (without register)."""
value = value & 0xFF
self._i2c.writeto(self._address, value)
def write8(self, register, value):
"""Write an 8-bit value to the specified register."""
b=bytearray(1)
b[0]=value & 0xFF
self._i2c.writeto_mem(self._address, register, b)
def write16(self, register, value):
"""Write a 16-bit value to the specified register."""
value = value & 0xFFFF
b=bytearray(2)
b[0]= value & 0xFF
b[1]= (value>>8) & 0xFF
self.i2c.writeto_mem(self._address, register, value)
def readRaw8(self):
"""Read an 8-bit value on the bus (without register)."""
return int.from_bytes(self._i2c.readfrom(self._address, 1),'little') & 0xFF
def readU8(self, register):
"""Read an unsigned byte from the specified register."""
return int.from_bytes(
self._i2c.readfrom_mem(self._address, register, 1),'little') & 0xFF
def readS8(self, register):
"""Read a signed byte from the specified register."""
result = self.readU8(register)
if result > 127:
result -= 256
return result
def readU16(self, register, little_endian=True):
"""Read an unsigned 16-bit value from the specified register, with the
specified endianness (default little endian, or least significant byte
first)."""
result = int.from_bytes(
self._i2c.readfrom_mem(self._address, register, 2),'little') & 0xFFFF
if not little_endian:
result = ((result << 8) & 0xFF00) + (result >> 8)
return result
def readS16(self, register, little_endian=True):
"""Read a signed 16-bit value from the specified register, with the
specified endianness (default little endian, or least significant byte
first)."""
result = self.readU16(register, little_endian)
if result > 32767:
result -= 65536
return result
def readU16LE(self, register):
"""Read an unsigned 16-bit value from the specified register, in little
endian byte order."""
return self.readU16(register, little_endian=True)
def readU16BE(self, register):
"""Read an unsigned 16-bit value from the specified register, in big
endian byte order."""
return self.readU16(register, little_endian=False)
def readS16LE(self, register):
"""Read a signed 16-bit value from the specified register, in little
endian byte order."""
return self.readS16(register, little_endian=True)
def readS16BE(self, register):
"""Read a signed 16-bit value from the specified register, in big
endian byte order."""
return self.readS16(register, little_endian=False)
class BME280:
def __init__(self, mode=BME280_OSAMPLE_1, address=BME280_I2CADDR, i2c=None,
**kwargs):
# Check that mode is valid.
if mode not in [BME280_OSAMPLE_1, BME280_OSAMPLE_2, BME280_OSAMPLE_4,
BME280_OSAMPLE_8, BME280_OSAMPLE_16]:
raise ValueError(
'Unexpected mode value {0}. Set mode to one of '
'BME280_ULTRALOWPOWER, BME280_STANDARD, BME280_HIGHRES, or '
'BME280_ULTRAHIGHRES'.format(mode))
self._mode = mode
# Create I2C device.
if i2c is None:
raise ValueError('An I2C object is required.')
self._device = Device(address, i2c)
# Load calibration values.
self._load_calibration()
self._device.write8(BME280_REGISTER_CONTROL, 0x3F)
self.t_fine = 0
def _load_calibration(self):
self.dig_T1 = self._device.readU16LE(BME280_REGISTER_DIG_T1)
self.dig_T2 = self._device.readS16LE(BME280_REGISTER_DIG_T2)
self.dig_T3 = self._device.readS16LE(BME280_REGISTER_DIG_T3)
self.dig_P1 = self._device.readU16LE(BME280_REGISTER_DIG_P1)
self.dig_P2 = self._device.readS16LE(BME280_REGISTER_DIG_P2)
self.dig_P3 = self._device.readS16LE(BME280_REGISTER_DIG_P3)
self.dig_P4 = self._device.readS16LE(BME280_REGISTER_DIG_P4)
self.dig_P5 = self._device.readS16LE(BME280_REGISTER_DIG_P5)
self.dig_P6 = self._device.readS16LE(BME280_REGISTER_DIG_P6)
self.dig_P7 = self._device.readS16LE(BME280_REGISTER_DIG_P7)
self.dig_P8 = self._device.readS16LE(BME280_REGISTER_DIG_P8)
self.dig_P9 = self._device.readS16LE(BME280_REGISTER_DIG_P9)
self.dig_H1 = self._device.readU8(BME280_REGISTER_DIG_H1)
self.dig_H2 = self._device.readS16LE(BME280_REGISTER_DIG_H2)
self.dig_H3 = self._device.readU8(BME280_REGISTER_DIG_H3)
self.dig_H6 = self._device.readS8(BME280_REGISTER_DIG_H7)
h4 = self._device.readS8(BME280_REGISTER_DIG_H4)
h4 = (h4 << 24) >> 20
self.dig_H4 = h4 | (self._device.readU8(BME280_REGISTER_DIG_H5) & 0x0F)
h5 = self._device.readS8(BME280_REGISTER_DIG_H6)
h5 = (h5 << 24) >> 20
self.dig_H5 = h5 | (
self._device.readU8(BME280_REGISTER_DIG_H5) >> 4 & 0x0F)
def read_raw_temp(self):
"""Reads the raw (uncompensated) temperature from the sensor."""
meas = self._mode
self._device.write8(BME280_REGISTER_CONTROL_HUM, meas)
meas = self._mode << 5 | self._mode << 2 | 1
self._device.write8(BME280_REGISTER_CONTROL, meas)
sleep_time = 1250 + 2300 * (1 << self._mode)
sleep_time = sleep_time + 2300 * (1 << self._mode) + 575
sleep_time = sleep_time + 2300 * (1 << self._mode) + 575
time.sleep_us(sleep_time) # Wait the required time
msb = self._device.readU8(BME280_REGISTER_TEMP_DATA)
lsb = self._device.readU8(BME280_REGISTER_TEMP_DATA + 1)
xlsb = self._device.readU8(BME280_REGISTER_TEMP_DATA + 2)
raw = ((msb << 16) | (lsb << 8) | xlsb) >> 4
return raw
def read_raw_pressure(self):
"""Reads the raw (uncompensated) pressure level from the sensor."""
"""Assumes that the temperature has already been read """
"""i.e. that enough delay has been provided"""
msb = self._device.readU8(BME280_REGISTER_PRESSURE_DATA)
lsb = self._device.readU8(BME280_REGISTER_PRESSURE_DATA + 1)
xlsb = self._device.readU8(BME280_REGISTER_PRESSURE_DATA + 2)
raw = ((msb << 16) | (lsb << 8) | xlsb) >> 4
return raw
def read_raw_humidity(self):
"""Assumes that the temperature has already been read """
"""i.e. that enough delay has been provided"""
msb = self._device.readU8(BME280_REGISTER_HUMIDITY_DATA)
lsb = self._device.readU8(BME280_REGISTER_HUMIDITY_DATA + 1)
raw = (msb << 8) | lsb
return raw
def read_temperature(self):
"""Get the compensated temperature in 0.01 of a degree celsius."""
adc = self.read_raw_temp()
var1 = ((adc >> 3) - (self.dig_T1 << 1)) * (self.dig_T2 >> 11)
var2 = ((
(((adc >> 4) - self.dig_T1) * ((adc >> 4) - self.dig_T1)) >> 12) *
self.dig_T3) >> 14
self.t_fine = var1 + var2
return (self.t_fine * 5 + 128) >> 8
def read_pressure(self):
"""Gets the compensated pressure in Pascals."""
adc = self.read_raw_pressure()
var1 = self.t_fine - 128000
var2 = var1 * var1 * self.dig_P6
var2 = var2 + ((var1 * self.dig_P5) << 17)
var2 = var2 + (self.dig_P4 << 35)
var1 = (((var1 * var1 * self.dig_P3) >> 8) +
((var1 * self.dig_P2) >> 12))
var1 = (((1 << 47) + var1) * self.dig_P1) >> 33
if var1 == 0:
return 0
p = 1048576 - adc
p = (((p << 31) - var2) * 3125) // var1
var1 = (self.dig_P9 * (p >> 13) * (p >> 13)) >> 25
var2 = (self.dig_P8 * p) >> 19
return ((p + var1 + var2) >> 8) + (self.dig_P7 << 4)
def read_humidity(self):
adc = self.read_raw_humidity()
# print 'Raw humidity = {0:d}'.format (adc)
h = self.t_fine - 76800
h = (((((adc << 14) - (self.dig_H4 << 20) - (self.dig_H5 * h)) +
16384) >> 15) * (((((((h * self.dig_H6) >> 10) * (((h *
self.dig_H3) >> 11) + 32768)) >> 10) + 2097152) *
self.dig_H2 + 8192) >> 14))
h = h - (((((h >> 15) * (h >> 15)) >> 7) * self.dig_H1) >> 4)
h = 0 if h < 0 else h
h = 419430400 if h > 419430400 else h
return h >> 12
@property
def temperature(self):
"Return the temperature in degrees."
t = self.read_temperature()
ti = t // 100
td = t - ti * 100
return "{}.{:02d}C".format(ti, td)
@property
def pressure(self):
"Return the temperature in hPa."
p = self.read_pressure() // 256
pi = p // 100
pd = p - pi * 100
return "{}.{:02d}hPa".format(pi, pd)
@property
def humidity(self):
"Return the humidity in percent."
h = self.read_humidity()
hi = h // 1024
hd = h * 100 // 1024 - hi * 100
return "{}.{:02d}%".format(hi, hd)
Перейдите в File > Save as…
Выберите сохранение на «MicroPython device»:
Назовите ваш файл BME280.py и нажмите кнопку OK:
Вот и всё. Библиотека загружена на вашу плату.
Повторите этот процесс для всех ваших плат-отправителей.
Скрипт ESP-NOW отправителя ESP32 — MicroPython
Следующий код считывает данные с датчика BME280 и отправляет их через ESP-NOW в формате JSON на плату-приёмник ESP32. Не забудьте изменить код, указав MAC-адрес вашей платы-приёмника. Кроме того, не забудьте изменить ID платы для каждой из ваших плат.
В этом коде мы используем модуль aioespnow, который позволяет нам использовать ESP-NOW асинхронно. Чтобы узнать больше об асинхронном программировании с MicroPython, прочитайте следующее руководство: MicroPython: ESP32/ESP8266 Asynchronous Programming – Run Multiple Tasks.
# Rui Santos & Sara Santos - Random Nerd Tutorials
# Complete project details at https://RandomNerdTutorials.com/micropython-esp-now-esp32-many-to-one/
import network
import aioespnow
import asyncio
import time
import ujson
from machine import Pin, I2C
import BME280
# Board ID
BOARD_ID = 1
# Receiver's MAC address
peer_mac = b'\xff\xff\xff\xff\xff\xff'
# Interval for sending data (in seconds)
send_interval = 10
# Initialize I2C and BME280
try:
i2c = I2C(0, scl=Pin(22), sda=Pin(21)) # Adjust pins as needed
bme = BME280.BME280(i2c=i2c, address=0x76)
except OSError as err:
print("Failed to initialize BME280:", err)
raise
# Initialize Wi-Fi in station mode
sta = network.WLAN(network.STA_IF)
try:
sta.active(True)
sta.config(channel=1) # Set channel explicitly
sta.disconnect()
except OSError as err:
print("Failed to initialize Wi-Fi:", err)
raise
# Initialize AIOESPNow
e = aioespnow.AIOESPNow()
try:
e.active(True)
except OSError as err:
print("Failed to initialize AIOESPNow:", err)
raise
# Add peer
try:
e.add_peer(peer_mac)
except OSError as err:
print("Failed to add peer:", err)
raise
# Counter for reading ID
reading_id = 0
def read_temperature():
try:
return float(bme.temperature[:-1]) # Remove 'C' from string
except Exception as err:
print("Error reading temperature:", err)
return 0.0
def read_humidity():
try:
return float(bme.humidity[:-1]) # Remove '%' from string
except Exception as err:
print("Error reading humidity:", err)
return 0.0
def prepare_sensor_data():
global reading_id
data = {
'id': BOARD_ID,
'temp': read_temperature(),
'hum': read_humidity(),
'readingId': reading_id
}
reading_id += 1
# Serialize to JSON and encode to bytes
return ujson.dumps(data).encode('utf-8')
async def send_messages(e, peer):
while True:
try:
# Prepare and serialize sensor data
message = prepare_sensor_data()
# Send JSON bytes
if await e.asend(peer, message, sync=True):
print(f"Sent data: {message.decode('utf-8')}")
else:
print("Failed to send data")
except OSError as err:
print("Send error:", err)
await asyncio.sleep(5)
await asyncio.sleep(send_interval) # Wait before next send
async def main(e, peer):
try:
await send_messages(e, peer)
except Exception as err:
print(f"Error in main: {err}")
await asyncio.sleep(5)
raise
# Run the async program
try:
asyncio.run(main(e, peer_mac))
except KeyboardInterrupt:
print("Stopping sender...")
e.active(False)
sta.active(False)
Как работает код?
Давайте рассмотрим, как работает код платы-отправителя. Если хотите, можете перейти к следующему разделу.
Импорт библиотек
Начните с импорта необходимых библиотек для использования ESP-NOW, датчика BME280 и обработки JSON-переменных.
import network
import aioespnow
import asyncio
import time
import ujson
from machine import Pin, I2C
import BME280
Установка ID платы
Присвойте уникальный ID вашей плате, чтобы приёмник мог легко её идентифицировать. Здесь мы просто нумеруем платы, но вы можете использовать другой метод, например, дать им имя или просто идентифицировать их по MAC-адресу (в этом случае этот параметр не нужен).
# Board ID
BOARD_ID = 2
MAC-адрес приёмника
Вставьте MAC-адрес приёмника. Например, в моём случае MAC-адрес платы-приёмника — 30:AE:A4:07:0D:64. Поэтому мне нужно добавить его в код в байтовом формате:
# Receiver's MAC address
peer_mac = b'\x30\xae\xa4\x07\x0d\x64'
Интервал отправки
Мы будем отправлять новые показания через ESP-NOW каждые 10 секунд. Вы можете настроить этот период в переменной send_interval.
# Interval for sending data (in seconds)
send_interval = 10
Инициализация датчика BME280
Инициализируйте I2C-коммуникацию на GPIO 22 и 21. Измените, если вы используете другие пины. Мы также инициализируем датчик BME280.
# Initialize I2C and BME280
try:
i2c = I2C(0, scl=Pin(22), sda=Pin(21)) # Adjust pins as needed
bme = BME280.BME280(i2c=i2c, address=0x76)
except OSError as err:
print("Failed to initialize BME280:", err)
raise
Инициализация Wi-Fi интерфейса и ESP-NOW
Для использования ESP-NOW нам нужно активировать Wi-Fi-интерфейс. В следующих строках мы инициализируем Wi-Fi и протокол связи ESP-NOW с помощью модуля aioespnow.
# Initialize Wi-Fi in station mode
sta = network.WLAN(network.STA_IF)
try:
sta.active(True)
sta.config(channel=1) # Set channel explicitly
sta.disconnect()
except OSError as err:
print("Failed to initialize Wi-Fi:", err)
raise
# Initialize AIOESPNow
e = aioespnow.AIOESPNow()
try:
e.active(True)
except OSError as err:
print("Failed to initialize AIOESPNow:", err)
raise
Добавление пиров (peers)
В следующей строке мы добавляем плату-приёмник как пир. Функция e.add_peer(peer_mac) необходима для надёжной одноадресной связи в aioespnow. Она регистрирует MAC-адрес приёмника, чтобы гарантировать, что отправитель может отправлять сообщения на эту конкретную плату.
# Add peer
try:
e.add_peer(peer_mac)
except OSError as err:
print("Failed to add peer:", err)
raise
Получение данных датчика BME280
Следующие функции возвращают температуру и влажность с датчика BME280.
def read_temperature():
try:
return float(bme.temperature[:-1]) # Remove 'C' from string
except Exception as err:
print("Error reading temperature:", err)
return 0.0
def read_humidity():
try:
return float(bme.humidity[:-1]) # Remove '%' from string
except Exception as err:
print("Error reading humidity:", err)
return 0.0
Функции из библиотеки BME280 возвращают результаты с символом единицы измерения, поэтому мы удаляем символы в конце показаний.
Узнайте больше о работе с BME280 и ESP32 с использованием MicroPython: MicroPython: BME280 с ESP32 (давление, температура, влажность).
Добавление данных в JSON-переменную
Мы создаём функцию prepare_sensor_data(), которая возвращает JSON-переменную, содержащую данные, которые мы хотим отправить. В данном случае мы отправляем ID платы, температуру, влажность и ID чтения (просто число для отслеживания количества снятых показаний с момента запуска программы).
def prepare_sensor_data():
global reading_id
data = {
'id': BOARD_ID,
'temp': read_temperature(),
'hum': read_humidity(),
'readingId': reading_id
}
reading_id += 1
# Serialize to JSON and encode to bytes
return ujson.dumps(data).encode('utf-8')
Отправка сообщений через ESP-NOW
Затем мы создаём асинхронную функцию для отправки сообщений ESP-NOW асинхронным способом.
async def send_messages(e, peer):
while True:
try:
# Prepare and serialize sensor data
message = prepare_sensor_data()
# Send JSON bytes
if await e.asend(peer, message, sync=True):
print(f"Sent data: {message.decode('utf-8')}")
else:
print("Failed to send data")
except OSError as err:
print("Send error:", err)
await asyncio.sleep(5)
await asyncio.sleep(send_interval) # Wait before next send
Запуск асинхронных задач
Следующая функция запускает задачу send_messages для отправки данных через ESP-NOW.
async def main(e, peer):
try:
await send_messages(e, peer)
except Exception as err:
print(f"Error in main: {err}")
await asyncio.sleep(5)
raise
Наконец, мы запускаем асинхронную программу.
# Run the async program
try:
asyncio.run(main(e, peer_mac))
except KeyboardInterrupt:
print("Stopping sender...")
e.active(False)
sta.active(False)
Запуск и загрузка кода на платы
После копирования кода в Thonny IDE и внесения необходимых изменений, протестируйте код, запустив его в Thonny IDE.
После установления соединения с вашей платой вы можете нажать на зелёную кнопку запуска.
Вы должны получить что-то похожее, как показано на картинке ниже, в вашей консоли MicroPython.
На данный момент доставка завершится ошибкой, потому что мы ещё не настроили плату-приёмник.
Загрузка кода на платы
Важное примечание: простой запуск файла через Thonny не копирует его на файловую систему платы постоянно. Это означает, что если вы отключите её от компьютера и подадите питание на плату, ничего не произойдёт, потому что на её файловой системе нет сохранённого Python-файла. Функция Run в Thonny IDE полезна для тестирования кода, но если вы хотите загрузить его на плату постоянно, вам нужно создать и сохранить файл в файловую систему платы.
Чтобы запустить код на ваших платах без подключения к компьютеру, вы должны загрузить его в файловую систему платы с именем main.py.
Перейдите в File > Save as…. MicroPython Device.
Назовите этот файл main.py и сохраните его на плате.
Теперь, если вы перезагрузите плату, она начнёт выполнять код. После этого у вас может не быть доступа к консоли MicroPython Shell.
Подготовка платы-приёмника
Плата-приёмник ESP32 будет получать данные от различных плат-отправителей и отображать их на OLED-дисплее. В этом примере мы получаем данные от двух разных плат.
Подключение OLED-дисплея
Подключите OLED-дисплей к вашей плате ESP32. Мы используем стандартные пины I2C — GPIO 22 (SCL) и GPIO 21 (SDA). Измените, если вы используете модель ESP32 с другой распиновкой.
Импорт библиотеки SSD1306
Библиотека для работы с OLED-дисплеем не входит в стандартную библиотеку MicroPython. Поэтому вам нужно загрузить файл библиотеки на вашу плату-приёмник.
Модуль SSD1306.py для MicroPython
Скопируйте следующий код в новый файл в Thonny IDE.
# MicroPython SSD1306 OLED driver, I2C and SPI interfaces created by Adafruit
import time
import framebuf
# register definitions
SET_CONTRAST = const(0x81)
SET_ENTIRE_ON = const(0xa4)
SET_NORM_INV = const(0xa6)
SET_DISP = const(0xae)
SET_MEM_ADDR = const(0x20)
SET_COL_ADDR = const(0x21)
SET_PAGE_ADDR = const(0x22)
SET_DISP_START_LINE = const(0x40)
SET_SEG_REMAP = const(0xa0)
SET_MUX_RATIO = const(0xa8)
SET_COM_OUT_DIR = const(0xc0)
SET_DISP_OFFSET = const(0xd3)
SET_COM_PIN_CFG = const(0xda)
SET_DISP_CLK_DIV = const(0xd5)
SET_PRECHARGE = const(0xd9)
SET_VCOM_DESEL = const(0xdb)
SET_CHARGE_PUMP = const(0x8d)
class SSD1306:
def __init__(self, width, height, external_vcc):
self.width = width
self.height = height
self.external_vcc = external_vcc
self.pages = self.height // 8
# Note the subclass must initialize self.framebuf to a framebuffer.
# This is necessary because the underlying data buffer is different
# between I2C and SPI implementations (I2C needs an extra byte).
self.poweron()
self.init_display()
def init_display(self):
for cmd in (
SET_DISP | 0x00, # off
# address setting
SET_MEM_ADDR, 0x00, # horizontal
# resolution and layout
SET_DISP_START_LINE | 0x00,
SET_SEG_REMAP | 0x01, # column addr 127 mapped to SEG0
SET_MUX_RATIO, self.height - 1,
SET_COM_OUT_DIR | 0x08, # scan from COM[N] to COM0
SET_DISP_OFFSET, 0x00,
SET_COM_PIN_CFG, 0x02 if self.height == 32 else 0x12,
# timing and driving scheme
SET_DISP_CLK_DIV, 0x80,
SET_PRECHARGE, 0x22 if self.external_vcc else 0xf1,
SET_VCOM_DESEL, 0x30, # 0.83*Vcc
# display
SET_CONTRAST, 0xff, # maximum
SET_ENTIRE_ON, # output follows RAM contents
SET_NORM_INV, # not inverted
# charge pump
SET_CHARGE_PUMP, 0x10 if self.external_vcc else 0x14,
SET_DISP | 0x01): # on
self.write_cmd(cmd)
self.fill(0)
self.show()
def poweroff(self):
self.write_cmd(SET_DISP | 0x00)
def contrast(self, contrast):
self.write_cmd(SET_CONTRAST)
self.write_cmd(contrast)
def invert(self, invert):
self.write_cmd(SET_NORM_INV | (invert & 1))
def show(self):
x0 = 0
x1 = self.width - 1
if self.width == 64:
# displays with width of 64 pixels are shifted by 32
x0 += 32
x1 += 32
self.write_cmd(SET_COL_ADDR)
self.write_cmd(x0)
self.write_cmd(x1)
self.write_cmd(SET_PAGE_ADDR)
self.write_cmd(0)
self.write_cmd(self.pages - 1)
self.write_framebuf()
def fill(self, col):
self.framebuf.fill(col)
def pixel(self, x, y, col):
self.framebuf.pixel(x, y, col)
def scroll(self, dx, dy):
self.framebuf.scroll(dx, dy)
def text(self, string, x, y, col=1):
self.framebuf.text(string, x, y, col)
class SSD1306_I2C(SSD1306):
def __init__(self, width, height, i2c, addr=0x3c, external_vcc=False):
self.i2c = i2c
self.addr = addr
self.temp = bytearray(2)
# Add an extra byte to the data buffer to hold an I2C data/command byte
# to use hardware-compatible I2C transactions. A memoryview of the
# buffer is used to mask this byte from the framebuffer operations
# (without a major memory hit as memoryview doesn't copy to a separate
# buffer).
self.buffer = bytearray(((height // 8) * width) + 1)
self.buffer[0] = 0x40 # Set first byte of data buffer to Co=0, D/C=1
self.framebuf = framebuf.FrameBuffer1(memoryview(self.buffer)[1:], width, height)
super().__init__(width, height, external_vcc)
def write_cmd(self, cmd):
self.temp[0] = 0x80 # Co=1, D/C#=0
self.temp[1] = cmd
self.i2c.writeto(self.addr, self.temp)
def write_framebuf(self):
# Blast out the frame buffer using a single I2C transaction to support
# hardware I2C interfaces.
self.i2c.writeto(self.addr, self.buffer)
def poweron(self):
pass
class SSD1306_SPI(SSD1306):
def __init__(self, width, height, spi, dc, res, cs, external_vcc=False):
self.rate = 10 * 1024 * 1024
dc.init(dc.OUT, value=0)
res.init(res.OUT, value=0)
cs.init(cs.OUT, value=1)
self.spi = spi
self.dc = dc
self.res = res
self.cs = cs
self.buffer = bytearray((height // 8) * width)
self.framebuf = framebuf.FrameBuffer1(self.buffer, width, height)
super().__init__(width, height, external_vcc)
def write_cmd(self, cmd):
self.spi.init(baudrate=self.rate, polarity=0, phase=0)
self.cs.high()
self.dc.low()
self.cs.low()
self.spi.write(bytearray([cmd]))
self.cs.high()
def write_framebuf(self):
self.spi.init(baudrate=self.rate, polarity=0, phase=0)
self.cs.high()
self.dc.high()
self.cs.low()
self.spi.write(self.buffer)
self.cs.high()
def poweron(self):
self.res.high()
time.sleep_ms(1)
self.res.low()
time.sleep_ms(10)
self.res.high()
Перейдите в File > Save as и выберите MicroPython device.
Назовите файл ssd1306.py и нажмите OK, чтобы сохранить файл в файловую систему ESP.
Вот и всё. Библиотека загружена на вашу плату. Теперь вы можете использовать функции библиотеки в своём коде, импортировав библиотеку.
Скрипт приёмника ESP-NOW — MicroPython
Следующий код настроит вашу плату ESP32 как приёмник ESP-NOW для получения данных от двух других плат-отправителей. После получения данных они будут отображены на OLED-экране.
Не забудьте изменить код, добавив MAC-адреса ваших плат-отправителей.
# Rui Santos & Sara Santos - Random Nerd Tutorials
# Complete project details at https://RandomNerdTutorials.com/micropython-esp-now-esp32-many-to-one/
import network
import aioespnow
import asyncio
import ujson
from machine import Pin, I2C
import ssd1306
# Initialize I2C for OLED
try:
i2c = I2C(0, scl=Pin(22), sda=Pin(21)) # Adjust pins as needed
display = ssd1306.SSD1306_I2C(128, 64, i2c, addr=0x3C)
print("SSD1306 initialized")
except Exception as err:
print("Failed to initialize SSD1306:", err)
raise
# Initialize Wi-Fi in station mode
sta = network.WLAN(network.STA_IF)
try:
sta.active(True)
sta.config(channel=1) # Set channel explicitly
sta.disconnect()
except OSError as err:
print("Failed to initialize Wi-Fi:", err)
raise
# Initialize AIOESPNow
e = aioespnow.AIOESPNow()
try:
e.active(True)
except OSError as err:
print("Failed to initialize AIOESPNow:", err)
raise
# Sender's MAC addresses (replace with actual sender MACs)
sender_mac_1 = b'\x24\x0a\xc4\x31\x40\x50' # First sender's MAC (Board ID=1)
sender_mac_2 = b'\x30\xae\xa4\xf6\x7d\x4c' # Second sender's MAC (Board ID=2)
# Add peers
try:
e.add_peer(sender_mac_1)
except OSError as err:
print(f"Failed to add peer {sender_mac_1.hex()}:", err)
raise
try:
e.add_peer(sender_mac_2)
except OSError as err:
print(f"Failed to add peer {sender_mac_2.hex()}:", err)
raise
# Dictionary to store latest readings for each board
board_readings = {
1: {'temp': 0.0, 'hum': 0.0, 'readingId': 0},
2: {'temp': 0.0, 'hum': 0.0, 'readingId': 0}
}
# Update OLED display with temperature and humidity for both boards on separate lines.
def update_display():
try:
display.fill(0)
# Board 1 data
display.text("Board 1:", 0, 0)
display.text(f"Temp: {board_readings[1]['temp']:.1f} C", 0, 10)
display.text(f"Hum: {board_readings[1]['hum']:.1f} %", 0, 20)
# Board 2 data
display.text("Board 2:", 0, 32)
display.text(f"Temp: {board_readings[2]['temp']:.1f} C", 0, 42)
display.text(f"Hum: {board_readings[2]['hum']:.1f} %", 0, 52)
display.show()
print("Display updated")
except Exception as err:
print("Error updating display:", err)
# Async function to receive and process messages.
async def receive_messages(e):
print("Listening for ESP-NOW messages...")
while True:
try:
async for mac, msg in e:
try:
# Decode and parse JSON message
json_str = msg.decode('utf-8')
data = ujson.loads(json_str)
# Extract parameters
board_id = data['id']
temperature = data['temp']
humidity = data['hum']
reading_id = data['readingId']
# Store in board_readings dictionary
if board_id in (1, 2):
board_readings[board_id] = {
'temp': temperature,
'hum': humidity,
'readingId': reading_id
}
# Update OLED display
update_display()
# Display on MicroPython terminal
print(f"\nReceived from {mac.hex()}:")
print(f" Board ID: {board_id}")
print(f" Temperature: {temperature} C")
print(f" Humidity: {humidity} %")
print(f" Reading ID: {reading_id}")
except (ValueError, KeyError) as err:
print(f"Error parsing JSON: {err}")
except OSError as err:
print("Receive error:", err)
await asyncio.sleep(5)
async def main(e):
await receive_messages(e)
# Run the async program
try:
asyncio.run(main(e))
except KeyboardInterrupt:
print("Stopping receiver...")
e.active(False)
sta.active(False)
Как работает код?
Давайте рассмотрим, как работает код платы-приёмника. Если хотите, можете перейти к следующему разделу.
Импорт библиотек
Начните с импорта необходимых библиотек (убедитесь, что вы предварительно загрузили файл ssd1306.py для поддержки работы с дисплеем).
import network
import aioespnow
import asyncio
import ujson
from machine import Pin, I2C
import ssd1306
Инициализация OLED-дисплея
Инициализируйте I2C-коммуникацию и OLED-дисплей следующими строками.
# Initialize I2C for OLED
try:
i2c = I2C(0, scl=Pin(22), sda=Pin(21)) # Adjust pins as needed
display = ssd1306.SSD1306_I2C(128, 64, i2c, addr=0x3C)
print("SSD1306 initialized")
except Exception as err:
print("Failed to initialize SSD1306:", err)
raise
Инициализация WiFi-интерфейса и ESP-NOW
Инициализируйте WiFi-интерфейс и ESP-NOW.
# Initialize Wi-Fi in station mode
sta = network.WLAN(network.STA_IF)
try:
sta.active(True)
sta.config(channel=1) # Set channel explicitly
sta.disconnect()
except OSError as err:
print("Failed to initialize Wi-Fi:", err)
raise
# Initialize AIOESPNow
e = aioespnow.AIOESPNow()
try:
e.active(True)
except OSError as err:
print("Failed to initialize AIOESPNow:", err)
raise
Добавление плат-отправителей как пиров
Добавьте платы-отправители как пиры (добавлять платы-отправители как пиры на стороне приёмника не обязательно, но это гарантирует большую надёжность связи).
Вставьте MAC-адреса плат-отправителей в следующие переменные (убедитесь, что вы вставляете MAC-адрес в байтовом формате).
# Sender's MAC addresses (replace with actual sender MACs)
sender_mac_1 = b'\x24\x0a\xc4\x31\x40\x50' # First sender's MAC (Board ID=1)
sender_mac_2 = b'\x30\xae\xa4\xf6\x7d\x4c' # Second sender's MAC (Board ID=2)
Затем добавьте их как ESP-NOW пиры:
# Add peers
try:
e.add_peer(sender_mac_1)
except OSError as err:
print(f"Failed to add peer {sender_mac_1.hex()}:", err)
raise
try:
e.add_peer(sender_mac_2)
except OSError as err:
print(f"Failed to add peer {sender_mac_2.hex()}:", err)
raise
Словарь для сохранения показаний
Создайте словарь для хранения последних полученных показаний датчиков от каждой платы. Сохранение этой информации в словаре — отличный способ хранить все полученные данные в одной переменной.
board_readings = {
1: {'temp': 0.0, 'hum': 0.0, 'readingId': 0},
2: {'temp': 0.0, 'hum': 0.0, 'readingId': 0}
}
Обновление дисплея
Функция update_display() получает данные, полученные от каждой платы, которые уже хранятся в переменной board_readings, и отображает их на OLED-экране.
def update_display():
try:
display.fill(0)
# Board 1 data
display.text("Board 1:", 0, 0)
display.text(f"Temp: {board_readings[1]['temp']:.1f} C", 0, 10)
display.text(f"Hum: {board_readings[1]['hum']:.1f} %", 0, 20)
# Board 2 data
display.text("Board 2:", 0, 32)
display.text(f"Temp: {board_readings[2]['temp']:.1f} C", 0, 42)
display.text(f"Hum: {board_readings[2]['hum']:.1f} %", 0, 52)
display.show()
print("Display updated")
except Exception as err:
print("Error updating display:", err)
Чтобы узнать больше о работе с OLED и ESP32 с использованием MicroPython, вы можете ознакомиться с нашим руководством: MicroPython: OLED Display with ESP32 and ESP8266.
Приём сообщений ESP-NOW
Мы создаём асинхронную функцию для получения и обработки сообщений, полученных через ESP-NOW.
async def receive_messages(e):
print("Listening for ESP-NOW messages...")
while True:
try:
async for mac, msg in e:
Данные приходят в формате JSON. Сначала мы декодируем и парсим JSON-сообщение.
try:
# Decode and parse JSON message
json_str = msg.decode('utf-8')
data = ujson.loads(json_str)
Мы извлекаем каждый из параметров в отдельные переменные.
# Extract parameters into individual variables
board_id = data['id']
temperature = data['temp']
humidity = data['hum']
reading_id = data['readingId']
Наконец, мы сохраняем данные в правильное место в нашем словаре board_readings в соответствии с ID платы.
# Store in board_readings dictionary
if board_id in (1, 2):
board_readings[board_id] = {
'temp': temperature,
'hum': humidity,
'readingId': reading_id
}
После этого переменная board_readings обновлена, и теперь мы можем вызвать функцию update_display() для обновления дисплея с текущей информацией.
update_display()
Наконец, мы выводим полученные данные в консоль MicroPython.
# Display on serial monitor
print(f"\nReceived from {mac.hex()}:")
print(f" Board ID: {board_id}")
print(f" Temperature: {temperature} C")
print(f" Humidity: {humidity} %")
print(f" Reading ID: {reading_id}")
Запуск асинхронных задач
Следующая функция запускает задачу receive_messages для получения данных через ESP-NOW.
async def main(e):
await receive_messages(e)
Наконец, мы запускаем асинхронную программу.
# Run the async program
try:
asyncio.run(main(e))
except KeyboardInterrupt:
print("Stopping receiver...")
e.active(False)
sta.active(False)
Демонстрация
Загрузите и/или запустите предыдущий код на вашей плате-приёмнике. Если вы подключены к терминалу Thonny IDE, он будет выводить полученные данные.
Одновременно он будет отображать показания, полученные от каждой из плат-отправителей, на OLED-экране.
Заключение
В этом руководстве вы узнали, как отправлять данные от нескольких отправителей ESP32 на одну плату-приёмник ESP32 через ESP-NOW с использованием MicroPython. Мы также показали вам, как отправлять несколько переменных данных в формате JSON и как парсить и обрабатывать их для получения отдельных фрагментов нужной информации.
Проект в этом руководстве можно легко адаптировать для добавления большего количества плат-отправителей и отправки любой другой информации, которая вам нужна.
Надеемся, это руководство оказалось для вас полезным. У нас есть ещё руководства по ESP-NOW с MicroPython, которые могут быть вам полезны:
Если вы хотите узнать больше о MicroPython, не забудьте ознакомиться с нашими ресурсами: