MicroPython: Двусторонняя связь ESP32 через ESP-NOW
Узнайте, как установить двустороннюю связь между двумя платами ESP32 с использованием протокола ESP-NOW. Сначала мы протестируем простой пример, демонстрирующий реализацию двусторонней связи. Затем мы создадим более сложный проект, в котором платы обмениваются показаниями датчиков и отображают результаты на OLED-дисплее.
Используете Arduino IDE? Следуйте этому руководству: ESP-NOW: Двусторонняя связь между платами ESP32.
Предварительные требования — прошивка MicroPython
Для выполнения этого руководства вам нужна прошивка MicroPython, установленная на ваших платах ESP32 или ESP8266. Вам также понадобится IDE для написания и загрузки кода на плату. Мы рекомендуем использовать Thonny IDE:
Новичок в MicroPython? Ознакомьтесь с книгой: MicroPython Programming with ESP32 and ESP8266 eBook (2nd Edition)
Знакомство с ESP-NOW
ESP-NOW — это протокол беспроводной связи, разработанный Espressif, который позволяет нескольким платам ESP32 или ESP8266 обмениваться небольшими объёмами данных без использования Wi-Fi или Bluetooth. ESP-NOW не требует полного Wi-Fi-соединения (хотя контроллер Wi-Fi должен быть включён), что делает его идеальным для приложений с низким энергопотреблением и низкой задержкой, таких как сенсорные сети, пульты дистанционного управления или обмен данными между платами.
ESP-NOW использует модель связи без установления соединения, что означает, что устройства могут отправлять и получать данные без подключения к маршрутизатору или настройки точки доступа (в отличие от HTTP-связи между платами). Он поддерживает одноадресную (unicast — отправка данных на конкретное устройство по его MAC-адресу) и широковещательную (broadcast — отправка данных всем ближайшим устройствам с использованием широковещательного MAC-адреса) передачу сообщений.
Впервые работаете с ESP-NOW? Прочитайте наше руководство по началу работы с MicroPython и ESP-NOW на ESP32.
ESP32: Получение MAC-адреса платы
Для связи через ESP-NOW вам нужно знать MAC-адрес ваших плат. Чтобы получить MAC-адрес вашей платы, скопируйте следующий код в Thonny IDE и запустите его на вашей плате.
# Rui Santos & Sara Santos - Random Nerd Tutorials
# Complete project details at https://RandomNerdTutorials.com/micropython-esp-now-esp32/
import network
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
# Get MAC address (returns bytes)
mac = wlan.config('mac')
# Convert to human-readable format
mac_address = ':'.join('%02x' % b for b in mac)
print("MAC Address:", mac_address)
После запуска кода MAC-адрес платы должен отобразиться в консоли (Shell).
Рекомендуем наклеить этикетку или стикер на каждую плату, чтобы можно было их легко идентифицировать.
Мы будем использовать ESP32 DOIT V1 и ESP32 S3 DevKitC, но код должен работать с любой моделью ESP32.
Двусторонняя связь между двумя платами ESP32 через ESP-NOW (MicroPython)
В этом разделе мы покажем базовый пример обмена простыми сообщениями между платами ESP32 с использованием протокола ESP-NOW. Каждая плата одновременно является приёмником и передатчиком, поэтому их можно назвать трансиверами.
Существуют две библиотеки MicroPython для ESP-NOW, включённые по умолчанию в текущие версии прошивки MicroPython: espnow и aioespnow. Мы будем использовать aioespnow — асинхронную версию библиотеки espnow.
Скопируйте следующий код в Thonny IDE.
# Rui Santos & Sara Santos - Random Nerd Tutorials
# Complete project details at https://RandomNerdTutorials.com/micropython-esp32-esp-now-two-way/
import network
import aioespnow
import asyncio
import time
# Initialize Wi-Fi in station mode
sta = network.WLAN(network.STA_IF)
sta.active(True)
sta.config(channel=1) # Set channel explicitly if packets are not received
sta.disconnect()
# Initialize AIOESPNow
e = aioespnow.AIOESPNow()
try:
e.active(True)
except OSError as err:
print("Failed to initialize AIOESPNow:", err)
raise
# Peer MAC address (replace with the actual MAC of the other board)
peer_mac = b'\xff\xff\xff\xff\xff\xff' # Example peer MAC for unicast
# Add peer for unicast reliability
try:
e.add_peer(peer_mac)
except OSError as err:
print("Failed to add peer:", err)
raise
# Stats tracking
last_stats_time = time.time()
stats_interval = 10 # Print stats every 10 seconds
# Async function to send messages
async def send_messages(e, peer):
message_count = 0
while True:
try:
message = f"Hello from ESP32 #{message_count}"
if await e.asend(peer, message, sync=True):
print(f"Sent message: {message}")
else:
print("Failed to send message")
message_count += 1
await asyncio.sleep(1) # Send every 1 second
except OSError as err:
print("Send error:", err)
await asyncio.sleep(5)
# Async function to receive messages
async def receive_messages(e):
while True:
try:
async for mac, msg in e:
print(f"Received from {mac.hex()}: {msg.decode()}")
except OSError as err:
print("Receive error:", err)
await asyncio.sleep(5)
# Async function to print stats periodically
async def print_stats(e):
global last_stats_time
while True:
if time.time() - last_stats_time >= stats_interval:
stats = e.stats()
print("\nESP-NOW Statistics:")
print(f" Packets Sent: {stats[0]}")
print(f" Packets Delivered: {stats[1]}")
print(f" Packets Dropped (TX): {stats[2]}")
print(f" Packets Received: {stats[3]}")
print(f" Packets Dropped (RX): {stats[4]}")
last_stats_time = time.time()
await asyncio.sleep(1) # Check every second
# Main async function
async def main(e, peer):
# Run send, receive, and stats tasks concurrently
await asyncio.gather(send_messages(e, peer), receive_messages(e), print_stats(e))
# Run the async program
try:
asyncio.run(main(e, peer_mac))
except KeyboardInterrupt:
print("Stopping transceiver...")
e.active(False)
sta.active(False)
Этот код настраивает плату ESP32 одновременно как приёмник и передатчик ESP-NOW. В коде необходимо указать MAC-адрес платы, которой вы хотите отправлять данные.
Например, если MAC-адрес платы-приёмника 30:AE:A4:F6:7D:4C, необходимо преобразовать его в формат bytes следующим образом:
30:AE:A4:F6:7D:4C ->
b'\x30\xae\xa4\xf6\x7d\x4c'
# Peer MAC address
receiver_mac = b'\x30\xae\xa4\xf6\x7d\x4c'
Загрузите этот код на обе платы, но измените MAC-адрес.
Как работает код?
Для лучшего понимания этого кода рекомендуем сначала ознакомиться с асинхронным программированием MicroPython. Вы можете прочитать следующее руководство:
Импорт модулей
Сначала импортируем необходимые модули.
import network
import aioespnow
import asyncio
import time
Инициализация Wi-Fi интерфейса
Затем нужно инициализировать Wi-Fi (даже если мы его не используем) для работы ESP-NOW. Можно использовать режим станции (STA_IF) или точки доступа (AP_IF).
# Initialize Wi-Fi in station mode
sta = network.WLAN(network.STA_IF)
sta.active(True)
sta.config(channel=1) # Set channel
sta.disconnect()
Инициализация ESP-NOW
Затем можно инициализировать ESP-NOW. Сначала создаём экземпляр aioespnow с именем e. Затем активируем его методом active(), передавая значение True в качестве аргумента.
# Initialize AIOESPNow
e = aioespnow.AIOESPNow()
try:
e.active(True)
print("AIOESPNow initialized")
except OSError as err:
print("Failed to initialize AIOESPNow:", err)
raise
Мы активируем ESP-NOW внутри блока try/except, чтобы перехватить возможные ошибки при инициализации.
Добавление пира ESP-NOW
Укажите MAC-адрес пира (MAC-адрес платы, которой вы хотите отправлять данные).
# Receiver MAC address (the board you want to send data to)
peer_mac = b'\x68\xb6\xb3\x22\x9e\x60'
Затем добавляем MAC-адрес приёмника как пира с помощью метода add_peer().
try:
e.add_peer(peer_mac)
except OSError as err:
print("Failed to add peer:", err)
raise
Функция отправки сообщений ESP-NOW
Следующая функция отправляет сообщения ESP-NOW пиру. В качестве аргументов передаются экземпляр ESP-NOW e и пир. Для примера мы отправляем сообщение «Hello from ESP32» с счётчиком. Новое сообщение отправляется каждую секунду.
# Async function to send messages
async def send_messages(e, peer):
message_count = 0
while True:
try:
message = f"Hello from ESP32 #{message_count}"
if await e.asend(peer, message, sync=True):
print(f"Sent message: {message}")
else:
print("Failed to send message")
message_count += 1
await asyncio.sleep(1) # Send every 1 second
except OSError as err:
print("Send error:", err)
await asyncio.sleep(5)
Функция приёма сообщений ESP-NOW
Также необходимо создать функцию для приёма сообщений ESP-NOW. При поступлении нового сообщения мы выводим его в консоль MicroPython.
# Async function to receive messages
async def receive_messages(e):
while True:
try:
async for mac, msg in e:
print(f"Received from {mac.hex()}: {msg.decode()}")
except OSError as err:
print("Receive error:", err)
await asyncio.sleep(5)
Вывод статистики ESP-NOW
Затем создаём функцию print_stats(), которая выводит текущую статистику пакетов ESP-NOW. Для получения количества отправленных/полученных и потерянных пакетов вызываем метод stats() объекта ESP-NOW e.
Метод возвращает кортеж из 5 элементов с количеством отправленных/полученных/потерянных пакетов:
(tx_pkts, tx_responses, tx_failures, rx_packets, rx_dropped_packets)
Вот полная асинхронная функция:
async def print_stats(e):
global last_stats_time
while True:
if time.time() - last_stats_time >= stats_interval:
stats = e.stats()
print("\nESP-NOW Statistics:")
print(f" Packets Sent: {stats[0]}")
print(f" Packets Delivered: {stats[1]}")
print(f" Packets Dropped (TX): {stats[2]}")
print(f" Packets Received: {stats[3]}")
print(f" Packets Dropped (RX): {stats[4]}")
last_stats_time = time.time()
await asyncio.sleep(1) # Check every second
Главная асинхронная функция
Следующая строка определяет асинхронную функцию main(), которая принимает объект ESP-NOW e и MAC-адрес пира peer в качестве аргументов.
async def main(e, peer):
Затем используем asyncio.gather для одновременного запуска трёх асинхронных задач:
send_messages(e, peer): отправляет данные пиру;
receive_messages(e): прослушивает входящие данные от пира;
print_stats(e): периодически выводит статистику ESP-NOW.
await asyncio.gather(send_messages(e, peer), receive_messages(e), print_stats(e))
Запуск асинхронной программы
Наконец, следующие строки запускают программу асинхронно.
# Run the async program
try:
asyncio.run(main(e, peer_mac))
except KeyboardInterrupt:
print("Stopping transceiver...")
e.active(False)
sta.active(False)
Команда asyncio.run(main(e, peer_mac)) запускает цикл событий asyncio MicroPython, выполняя функцию main().
Код заключён в блок try/except для перехвата KeyboardInterrupt (ручная остановка кода). В этом случае отключаем ESP-NOW и выключаем Wi-Fi.
Запуск кода
После подключения платы к компьютеру и установления связи с Thonny IDE можно загрузить код как main.py на плату или запустить его кнопкой Run. Убедитесь, что вы указали MAC-адрес приёмника в коде.
Плата начнёт выводить сообщения в консоль (Shell). Отправка будет завершаться ошибкой, пока вторая плата ещё не готова.
Откройте другой экземпляр Thonny IDE и установите связь с другой платой.
Включение двух экземпляров Thonny IDE
Перейдите в Tools > Options и снимите галочку с опции Allow only single Thonny instance.
Скопируйте тот же код, но укажите MAC-адрес другой платы. Через несколько секунд платы начнут отправлять и получать сообщения ESP-NOW.
Следующие скриншоты показывают консоль MicroPython Shell для каждой из моих плат.
Как видите, после включения обеих плат они начинают обмениваться сообщениями друг с другом.
Двусторонняя связь ESP32 через ESP-NOW — Обмен показаниями датчиков и отображение на OLED
В этом разделе мы создадим проект с реальным применением, в котором две платы ESP32 обмениваются показаниями датчиков и отображают полученные данные на OLED-дисплее.
Обзор проекта
Следующая диаграмма показывает общий обзор проекта, который мы создадим.
В этом проекте у нас будет две платы ESP32. К каждой плате подключены OLED-дисплей и датчик BME280;
Каждая плата получает показания температуры, влажности и давления от своего датчика;
Каждая плата отправляет свои показания другой плате через ESP-NOW;
При получении показаний плата отображает их на OLED-дисплее;
После отправки показаний плата отображает на OLED, было ли сообщение успешно доставлено;
Каждая плата должна знать MAC-адрес другой платы для отправки сообщения.
В этом примере мы используем двустороннюю связь между двумя платами, но вы можете добавить больше плат в эту конфигурацию, обеспечив связь всех плат друг с другом.
Необходимые компоненты
Для этого руководства вам понадобятся следующие компоненты:
Схема подключения
Подключите OLED-дисплей и датчик BME280 к каждой плате ESP32. Следуйте схеме подключения ниже (при необходимости скорректируйте для другой модели ESP32 с другим расположением выводов).
Используйте следующую таблицу в качестве справки при подключении датчика BME280.
BME280 |
ESP32 |
|---|---|
VIN |
3.3V |
GND |
GND |
SCL |
GPIO 22 |
SDA |
GPIO 21 |
Также можно использовать следующую таблицу для подключения OLED-дисплея к ESP32.
OLED Display |
ESP32 |
|---|---|
GND |
GND |
VDD / VCC |
VIN |
SCK / SCL |
GPIO 22 |
SDA |
GPIO 21 |
Импорт библиотек
Библиотеки для работы с OLED-дисплеем и получения данных BME280 не входят в стандартный пакет MicroPython. Поэтому нам нужно импортировать эти модули на наши платы.
Модуль ssd1306.py для MicroPython
Скопируйте следующий код в новый файл в Thonny IDE.
# MicroPython SSD1306 OLED driver, I2C and SPI interfaces created by Adafruit
import time
import framebuf
# register definitions
SET_CONTRAST = const(0x81)
SET_ENTIRE_ON = const(0xa4)
SET_NORM_INV = const(0xa6)
SET_DISP = const(0xae)
SET_MEM_ADDR = const(0x20)
SET_COL_ADDR = const(0x21)
SET_PAGE_ADDR = const(0x22)
SET_DISP_START_LINE = const(0x40)
SET_SEG_REMAP = const(0xa0)
SET_MUX_RATIO = const(0xa8)
SET_COM_OUT_DIR = const(0xc0)
SET_DISP_OFFSET = const(0xd3)
SET_COM_PIN_CFG = const(0xda)
SET_DISP_CLK_DIV = const(0xd5)
SET_PRECHARGE = const(0xd9)
SET_VCOM_DESEL = const(0xdb)
SET_CHARGE_PUMP = const(0x8d)
class SSD1306:
def __init__(self, width, height, external_vcc):
self.width = width
self.height = height
self.external_vcc = external_vcc
self.pages = self.height // 8
# Note the subclass must initialize self.framebuf to a framebuffer.
# This is necessary because the underlying data buffer is different
# between I2C and SPI implementations (I2C needs an extra byte).
self.poweron()
self.init_display()
def init_display(self):
for cmd in (
SET_DISP | 0x00, # off
# address setting
SET_MEM_ADDR, 0x00, # horizontal
# resolution and layout
SET_DISP_START_LINE | 0x00,
SET_SEG_REMAP | 0x01, # column addr 127 mapped to SEG0
SET_MUX_RATIO, self.height - 1,
SET_COM_OUT_DIR | 0x08, # scan from COM[N] to COM0
SET_DISP_OFFSET, 0x00,
SET_COM_PIN_CFG, 0x02 if self.height == 32 else 0x12,
# timing and driving scheme
SET_DISP_CLK_DIV, 0x80,
SET_PRECHARGE, 0x22 if self.external_vcc else 0xf1,
SET_VCOM_DESEL, 0x30, # 0.83*Vcc
# display
SET_CONTRAST, 0xff, # maximum
SET_ENTIRE_ON, # output follows RAM contents
SET_NORM_INV, # not inverted
# charge pump
SET_CHARGE_PUMP, 0x10 if self.external_vcc else 0x14,
SET_DISP | 0x01): # on
self.write_cmd(cmd)
self.fill(0)
self.show()
def poweroff(self):
self.write_cmd(SET_DISP | 0x00)
def contrast(self, contrast):
self.write_cmd(SET_CONTRAST)
self.write_cmd(contrast)
def invert(self, invert):
self.write_cmd(SET_NORM_INV | (invert & 1))
def show(self):
x0 = 0
x1 = self.width - 1
if self.width == 64:
# displays with width of 64 pixels are shifted by 32
x0 += 32
x1 += 32
self.write_cmd(SET_COL_ADDR)
self.write_cmd(x0)
self.write_cmd(x1)
self.write_cmd(SET_PAGE_ADDR)
self.write_cmd(0)
self.write_cmd(self.pages - 1)
self.write_framebuf()
def fill(self, col):
self.framebuf.fill(col)
def pixel(self, x, y, col):
self.framebuf.pixel(x, y, col)
def scroll(self, dx, dy):
self.framebuf.scroll(dx, dy)
def text(self, string, x, y, col=1):
self.framebuf.text(string, x, y, col)
class SSD1306_I2C(SSD1306):
def __init__(self, width, height, i2c, addr=0x3c, external_vcc=False):
self.i2c = i2c
self.addr = addr
self.temp = bytearray(2)
# Add an extra byte to the data buffer to hold an I2C data/command byte
# to use hardware-compatible I2C transactions. A memoryview of the
# buffer is used to mask this byte from the framebuffer operations
# (without a major memory hit as memoryview doesn't copy to a separate
# buffer).
self.buffer = bytearray(((height // 8) * width) + 1)
self.buffer[0] = 0x40 # Set first byte of data buffer to Co=0, D/C=1
self.framebuf = framebuf.FrameBuffer1(memoryview(self.buffer)[1:], width, height)
super().__init__(width, height, external_vcc)
def write_cmd(self, cmd):
self.temp[0] = 0x80 # Co=1, D/C#=0
self.temp[1] = cmd
self.i2c.writeto(self.addr, self.temp)
def write_framebuf(self):
# Blast out the frame buffer using a single I2C transaction to support
# hardware I2C interfaces.
self.i2c.writeto(self.addr, self.buffer)
def poweron(self):
pass
class SSD1306_SPI(SSD1306):
def __init__(self, width, height, spi, dc, res, cs, external_vcc=False):
self.rate = 10 * 1024 * 1024
dc.init(dc.OUT, value=0)
res.init(res.OUT, value=0)
cs.init(cs.OUT, value=1)
self.spi = spi
self.dc = dc
self.res = res
self.cs = cs
self.buffer = bytearray((height // 8) * width)
self.framebuf = framebuf.FrameBuffer1(self.buffer, width, height)
super().__init__(width, height, external_vcc)
def write_cmd(self, cmd):
self.spi.init(baudrate=self.rate, polarity=0, phase=0)
self.cs.high()
self.dc.low()
self.cs.low()
self.spi.write(bytearray([cmd]))
self.cs.high()
def write_framebuf(self):
self.spi.init(baudrate=self.rate, polarity=0, phase=0)
self.cs.high()
self.dc.high()
self.cs.low()
self.spi.write(self.buffer)
self.cs.high()
def poweron(self):
self.res.high()
time.sleep_ms(1)
self.res.low()
time.sleep_ms(10)
self.res.high()
Перейдите в File > Save as и выберите MicroPython device.
Назовите файл ssd1306.py и нажмите OK для сохранения файла в файловой системе ESP.
Вот и всё. Библиотека загружена на вашу плату.
Модуль BME280.py для MicroPython
Скопируйте следующий код в новый файл в Thonny IDE.
from machine import I2C
import time
# BME280 default address.
BME280_I2CADDR = 0x76
# Operating Modes
BME280_OSAMPLE_1 = 1
BME280_OSAMPLE_2 = 2
BME280_OSAMPLE_4 = 3
BME280_OSAMPLE_8 = 4
BME280_OSAMPLE_16 = 5
# BME280 Registers
BME280_REGISTER_DIG_T1 = 0x88 # Trimming parameter registers
BME280_REGISTER_DIG_T2 = 0x8A
BME280_REGISTER_DIG_T3 = 0x8C
BME280_REGISTER_DIG_P1 = 0x8E
BME280_REGISTER_DIG_P2 = 0x90
BME280_REGISTER_DIG_P3 = 0x92
BME280_REGISTER_DIG_P4 = 0x94
BME280_REGISTER_DIG_P5 = 0x96
BME280_REGISTER_DIG_P6 = 0x98
BME280_REGISTER_DIG_P7 = 0x9A
BME280_REGISTER_DIG_P8 = 0x9C
BME280_REGISTER_DIG_P9 = 0x9E
BME280_REGISTER_DIG_H1 = 0xA1
BME280_REGISTER_DIG_H2 = 0xE1
BME280_REGISTER_DIG_H3 = 0xE3
BME280_REGISTER_DIG_H4 = 0xE4
BME280_REGISTER_DIG_H5 = 0xE5
BME280_REGISTER_DIG_H6 = 0xE6
BME280_REGISTER_DIG_H7 = 0xE7
BME280_REGISTER_CHIPID = 0xD0
BME280_REGISTER_VERSION = 0xD1
BME280_REGISTER_SOFTRESET = 0xE0
BME280_REGISTER_CONTROL_HUM = 0xF2
BME280_REGISTER_CONTROL = 0xF4
BME280_REGISTER_CONFIG = 0xF5
BME280_REGISTER_PRESSURE_DATA = 0xF7
BME280_REGISTER_TEMP_DATA = 0xFA
BME280_REGISTER_HUMIDITY_DATA = 0xFD
class Device:
"""Class for communicating with an I2C device.
Allows reading and writing 8-bit, 16-bit, and byte array values to
registers on the device."""
def __init__(self, address, i2c):
"""Create an instance of the I2C device at the specified address using
the specified I2C interface object."""
self._address = address
self._i2c = i2c
def writeRaw8(self, value):
"""Write an 8-bit value on the bus (without register)."""
value = value & 0xFF
self._i2c.writeto(self._address, value)
def write8(self, register, value):
"""Write an 8-bit value to the specified register."""
b=bytearray(1)
b[0]=value & 0xFF
self._i2c.writeto_mem(self._address, register, b)
def write16(self, register, value):
"""Write a 16-bit value to the specified register."""
value = value & 0xFFFF
b=bytearray(2)
b[0]= value & 0xFF
b[1]= (value>>8) & 0xFF
self.i2c.writeto_mem(self._address, register, value)
def readRaw8(self):
"""Read an 8-bit value on the bus (without register)."""
return int.from_bytes(self._i2c.readfrom(self._address, 1),'little') & 0xFF
def readU8(self, register):
"""Read an unsigned byte from the specified register."""
return int.from_bytes(
self._i2c.readfrom_mem(self._address, register, 1),'little') & 0xFF
def readS8(self, register):
"""Read a signed byte from the specified register."""
result = self.readU8(register)
if result > 127:
result -= 256
return result
def readU16(self, register, little_endian=True):
"""Read an unsigned 16-bit value from the specified register, with the
specified endianness (default little endian, or least significant byte
first)."""
result = int.from_bytes(
self._i2c.readfrom_mem(self._address, register, 2),'little') & 0xFFFF
if not little_endian:
result = ((result << 8) & 0xFF00) + (result >> 8)
return result
def readS16(self, register, little_endian=True):
"""Read a signed 16-bit value from the specified register, with the
specified endianness (default little endian, or least significant byte
first)."""
result = self.readU16(register, little_endian)
if result > 32767:
result -= 65536
return result
def readU16LE(self, register):
"""Read an unsigned 16-bit value from the specified register, in little
endian byte order."""
return self.readU16(register, little_endian=True)
def readU16BE(self, register):
"""Read an unsigned 16-bit value from the specified register, in big
endian byte order."""
return self.readU16(register, little_endian=False)
def readS16LE(self, register):
"""Read a signed 16-bit value from the specified register, in little
endian byte order."""
return self.readS16(register, little_endian=True)
def readS16BE(self, register):
"""Read a signed 16-bit value from the specified register, in big
endian byte order."""
return self.readS16(register, little_endian=False)
class BME280:
def __init__(self, mode=BME280_OSAMPLE_1, address=BME280_I2CADDR, i2c=None,
**kwargs):
# Check that mode is valid.
if mode not in [BME280_OSAMPLE_1, BME280_OSAMPLE_2, BME280_OSAMPLE_4,\
BME280_OSAMPLE_8, BME280_OSAMPLE_16]:
raise ValueError(
'Unexpected mode value {0}. Set mode to one of '
'BME280_ULTRALOWPOWER, BME280_STANDARD, BME280_HIGHRES, or '
'BME280_ULTRAHIGHRES'.format(mode))
self._mode = mode
# Create I2C device.
if i2c is None:
raise ValueError('An I2C object is required.')
self._device = Device(address, i2c)
# Load calibration values.
self._load_calibration()
self._device.write8(BME280_REGISTER_CONTROL, 0x3F)
self.t_fine = 0
def _load_calibration(self):
self.dig_T1 = self._device.readU16LE(BME280_REGISTER_DIG_T1)
self.dig_T2 = self._device.readS16LE(BME280_REGISTER_DIG_T2)
self.dig_T3 = self._device.readS16LE(BME280_REGISTER_DIG_T3)
self.dig_P1 = self._device.readU16LE(BME280_REGISTER_DIG_P1)
self.dig_P2 = self._device.readS16LE(BME280_REGISTER_DIG_P2)
self.dig_P3 = self._device.readS16LE(BME280_REGISTER_DIG_P3)
self.dig_P4 = self._device.readS16LE(BME280_REGISTER_DIG_P4)
self.dig_P5 = self._device.readS16LE(BME280_REGISTER_DIG_P5)
self.dig_P6 = self._device.readS16LE(BME280_REGISTER_DIG_P6)
self.dig_P7 = self._device.readS16LE(BME280_REGISTER_DIG_P7)
self.dig_P8 = self._device.readS16LE(BME280_REGISTER_DIG_P8)
self.dig_P9 = self._device.readS16LE(BME280_REGISTER_DIG_P9)
self.dig_H1 = self._device.readU8(BME280_REGISTER_DIG_H1)
self.dig_H2 = self._device.readS16LE(BME280_REGISTER_DIG_H2)
self.dig_H3 = self._device.readU8(BME280_REGISTER_DIG_H3)
self.dig_H6 = self._device.readS8(BME280_REGISTER_DIG_H7)
h4 = self._device.readS8(BME280_REGISTER_DIG_H4)
h4 = (h4 << 24) >> 20
self.dig_H4 = h4 | (self._device.readU8(BME280_REGISTER_DIG_H5) & 0x0F)
h5 = self._device.readS8(BME280_REGISTER_DIG_H6)
h5 = (h5 << 24) >> 20
self.dig_H5 = h5 | (
self._device.readU8(BME280_REGISTER_DIG_H5) >> 4 & 0x0F)
def read_raw_temp(self):
"""Reads the raw (uncompensated) temperature from the sensor."""
meas = self._mode
self._device.write8(BME280_REGISTER_CONTROL_HUM, meas)
meas = self._mode << 5 | self._mode << 2 | 1
self._device.write8(BME280_REGISTER_CONTROL, meas)
sleep_time = 1250 + 2300 * (1 << self._mode)
sleep_time = sleep_time + 2300 * (1 << self._mode) + 575
sleep_time = sleep_time + 2300 * (1 << self._mode) + 575
time.sleep_us(sleep_time) # Wait the required time
msb = self._device.readU8(BME280_REGISTER_TEMP_DATA)
lsb = self._device.readU8(BME280_REGISTER_TEMP_DATA + 1)
xlsb = self._device.readU8(BME280_REGISTER_TEMP_DATA + 2)
raw = ((msb << 16) | (lsb << 8) | xlsb) >> 4
return raw
def read_raw_pressure(self):
"""Reads the raw (uncompensated) pressure level from the sensor."""
"""Assumes that the temperature has already been read """
"""i.e. that enough delay has been provided"""
msb = self._device.readU8(BME280_REGISTER_PRESSURE_DATA)
lsb = self._device.readU8(BME280_REGISTER_PRESSURE_DATA + 1)
xlsb = self._device.readU8(BME280_REGISTER_PRESSURE_DATA + 2)
raw = ((msb << 16) | (lsb << 8) | xlsb) >> 4
return raw
def read_raw_humidity(self):
"""Assumes that the temperature has already been read """
"""i.e. that enough delay has been provided"""
msb = self._device.readU8(BME280_REGISTER_HUMIDITY_DATA)
lsb = self._device.readU8(BME280_REGISTER_HUMIDITY_DATA + 1)
raw = (msb << 8) | lsb
return raw
def read_temperature(self):
"""Get the compensated temperature in 0.01 of a degree celsius."""
adc = self.read_raw_temp()
var1 = ((adc >> 3) - (self.dig_T1 << 1)) * (self.dig_T2 >> 11)
var2 = ((
(((adc >> 4) - self.dig_T1) * ((adc >> 4) - self.dig_T1)) >> 12) *
self.dig_T3) >> 14
self.t_fine = var1 + var2
return (self.t_fine * 5 + 128) >> 8
def read_pressure(self):
"""Gets the compensated pressure in Pascals."""
adc = self.read_raw_pressure()
var1 = self.t_fine - 128000
var2 = var1 * var1 * self.dig_P6
var2 = var2 + ((var1 * self.dig_P5) << 17)
var2 = var2 + (self.dig_P4 << 35)
var1 = (((var1 * var1 * self.dig_P3) >> 8) +
((var1 * self.dig_P2) >> 12))
var1 = (((1 << 47) + var1) * self.dig_P1) >> 33
if var1 == 0:
return 0
p = 1048576 - adc
p = (((p << 31) - var2) * 3125) // var1
var1 = (self.dig_P9 * (p >> 13) * (p >> 13)) >> 25
var2 = (self.dig_P8 * p) >> 19
return ((p + var1 + var2) >> 8) + (self.dig_P7 << 4)
def read_humidity(self):
adc = self.read_raw_humidity()
# print 'Raw humidity = {0:d}'.format (adc)
h = self.t_fine - 76800
h = (((((adc << 14) - (self.dig_H4 << 20) - (self.dig_H5 * h)) +
16384) >> 15) * (((((((h * self.dig_H6) >> 10) * (((h *
self.dig_H3) >> 11) + 32768)) >> 10) + 2097152) *
self.dig_H2 + 8192) >> 14))
h = h - (((((h >> 15) * (h >> 15)) >> 7) * self.dig_H1) >> 4)
h = 0 if h < 0 else h
h = 419430400 if h > 419430400 else h
return h >> 12
@property
def temperature(self):
"Return the temperature in degrees."
t = self.read_temperature()
ti = t // 100
td = t - ti * 100
return "{}.{:02d}C".format(ti, td)
@property
def pressure(self):
"Return the temperature in hPa."
p = self.read_pressure() // 256
pi = p // 100
pd = p - pi * 100
return "{}.{:02d}hPa".format(pi, pd)
@property
def humidity(self):
"Return the humidity in percent."
h = self.read_humidity()
hi = h // 1024
hd = h * 100 // 1024 - hi * 100
return "{}.{:02d}%".format(hi, hd)
Перейдите в File > Save as…
Выберите сохранение на «MicroPython device»:
Назовите файл BME280.py и нажмите кнопку OK:
Вот и всё. Библиотека загружена на вашу плату.
Код MicroPython — обмен показаниями датчика BME280 через ESP-NOW
Теперь, когда все необходимые модули загружены на ваши платы, можно загрузить следующий код на каждую из них.
Важно: не забудьте указать MAC-адрес приёмника в коде.
# Rui Santos & Sara Santos - Random Nerd Tutorials
# Complete project details at https://RandomNerdTutorials.com/micropython-esp32-esp-now-two-way/
import network
import aioespnow
import asyncio
import time
import ujson
from machine import Pin, I2C
import BME280
import ssd1306
# Initialize I2C for BME280 and SSD1306
i2c = I2C(0, scl=Pin(22), sda=Pin(21))
# Initialize BME280 sensor
try:
bme = BME280.BME280(i2c=i2c, address=0x76)
print("BME280 initialized")
except Exception as err:
print("Failed to initialize BME280:", err)
raise
# Initialize SSD1306 OLED display
try:
display = ssd1306.SSD1306_I2C(128, 64, i2c, addr=0x3C)
print("SSD1306 initialized")
except Exception as err:
print("Failed to initialize SSD1306:", err)
raise
# Initialize Wi-Fi in station mode
sta = network.WLAN(network.STA_IF)
sta.active(True)
sta.config(channel=1) # set Wi-Fi channel for more stable communication
sta.disconnect()
print("Wi-Fi initialized")
# Initialize AIOESPNow
e = aioespnow.AIOESPNow()
try:
e.active(True)
print("AIOESPNow initialized")
except OSError as err:
print("Failed to initialize AIOESPNow:", err)
raise
# Receiver MAC address (the board you want to send data to)
peer_mac = b'\xff\xff\xff\xff\xff\xff'
# Add peer
try:
e.add_peer(peer_mac)
except OSError as err:
print("Failed to add peer:", err)
raise
# Variables to store readings and status
last_send_status = " "
incoming_readings = {'temp': 0.0, 'hum': 0.0, 'pres': 0.0}
# Function to get BME280 readings
def get_readings():
try:
temp = float(bme.temperature[:-1]) # Remove 'C'
hum = float(bme.humidity[:-1]) # Remove '%'
pres = float(bme.pressure[:-3]) # Remove 'hPa'
print("BME280 readings:", temp, hum, pres)
return temp, hum, pres
except Exception as err:
print("Error reading BME280:", err)
return 0.0, 0.0, 0.0
# Function to update OLED display
def update_display():
try:
display.fill(0)
display.text("INCOM. READINGS", 0, 0)
display.text("Temp: {:.1f} C".format(incoming_readings['temp']), 0, 15)
display.text("Hum: {:.1f} %".format(incoming_readings['hum']), 0, 25)
display.text("Pres: {:.1f} hPa".format(incoming_readings['pres']), 0, 35)
display.text(last_send_status, 0, 55)
display.show()
print("Display updated")
except Exception as err:
print("Error updating display:", err)
# Async function to send messages
async def send_messages(e, peer):
global last_send_status
while True:
try:
print("Sending data")
temp, hum, pres = get_readings()
# Create JSON string
data_dict = {"temp": temp, "hum": hum, "pres": pres}
json_str = ujson.dumps(data_dict)
data = json_str.encode('utf-8') # Convert to bytes
print("Sending JSON:", json_str)
if await e.asend(peer, data, sync=True):
print("Sent with success")
last_send_status = "Delivery Success :)"
else:
print("Send failed")
last_send_status = "Delivery Fail :("
update_display()
print("Sending task complete")
await asyncio.sleep(10) # Send every 10 seconds
except OSError as err:
print("Send error:", err)
last_send_status = "Delivery Fail :("
update_display()
await asyncio.sleep(0.1) # Shorter delay in case of error
# Async function to receive messages
async def receive_messages(e):
global incoming_readings
while True:
try:
print("Checking for messages")
async for mac, msg in e:
try:
# Decode bytes to string and parse JSON
json_str = msg.decode('utf-8')
data_dict = ujson.loads(json_str)
temp = data_dict['temp']
hum = data_dict['hum']
pres = data_dict['pres']
incoming_readings['temp'] = temp
incoming_readings['hum'] = hum
incoming_readings['pres'] = pres
print("\nINCOMING READINGS")
print("Temperature: {:.1f} C".format(temp))
print("Humidity: {:.1f} %".format(hum))
print("Pressure: {:.1f} hPa".format(pres))
update_display()
except (ValueError, KeyError) as err:
print("Error parsing JSON:", err)
await asyncio.sleep(0.01) # Yield if no received messages
except OSError as err:
print("Receive error:", err)
await asyncio.sleep(0.1) # Shorter delay in case of error
# Main async function
async def main(e, peer):
print("Starting main loop")
await asyncio.gather(send_messages(e, peer), receive_messages(e))
# Run the async program
try:
print("Starting transceiver...")
asyncio.run(main(e, peer_mac))
except KeyboardInterrupt:
print("Stopping transceiver...")
e.active(False)
sta.active(False)
except Exception as err:
print("Main loop error:", err)
finally:
print("Cleaning up...")
e.active(False)
sta.active(False)
Как работает код?
Давайте кратко рассмотрим, как работает код. Также вы можете перейти сразу к разделу Демонстрация.
Импорт библиотек
Начнём с импорта необходимых модулей.
import network
import aioespnow
import asyncio
import time
import ujson
from machine import Pin, I2C
import BME280
import ssd1306
I2C связь
Инициализируем I2C-связь на GPIO 22 и GPIO 21. Скорректируйте, если используете другие выводы.
i2c = I2C(0, scl=Pin(22), sda=Pin(21))
BME280
Инициализируем датчик BME280.
# Initialize BME280 sensor
try:
bme = BME280.BME280(i2c=i2c, address=0x76)
print("BME280 initialized")
except Exception as err:
print("Failed to initialize BME280:", err)
raise
OLED-дисплей
Инициализируем OLED-дисплей.
# Initialize SSD1306 OLED display
try:
display = ssd1306.SSD1306_I2C(128, 64, i2c, addr=0x3C)
print("SSD1306 initialized")
except Exception as err:
print("Failed to initialize SSD1306:", err)
raise
Инициализация Wi-Fi интерфейса
Затем нужно инициализировать Wi-Fi (даже если мы его не используем) для работы ESP-NOW. Можно использовать режим станции (STA_IF) или точки доступа (AP_IF).
# Initialize Wi-Fi in station mode
sta = network.WLAN(network.STA_IF)
sta.active(True)
sta.config(channel=1) # Set channel
sta.disconnect()
Инициализация ESP-NOW
Затем можно инициализировать ESP-NOW. Сначала создаём экземпляр aioespnow с именем e. Затем активируем его методом active(), передавая значение True в качестве аргумента.
# Initialize AIOESPNow
e = aioespnow.AIOESPNow()
try:
e.active(True)
print("AIOESPNow initialized")
except OSError as err:
print("Failed to initialize AIOESPNow:", err)
raise
Мы активируем ESP-NOW внутри блока try/except, чтобы перехватить возможные ошибки при инициализации.
Добавление пира ESP-NOW
Укажите MAC-адрес пира (MAC-адрес платы, которой вы хотите отправлять данные).
# Receiver MAC address (the board you want to send data to)
peer_mac = b'\x68\xb6\xb3\x22\x9e\x60'
Затем добавляем MAC-адрес приёмника как пира с помощью метода add_peer().
try:
e.add_peer(peer_mac)
except OSError as err:
print("Failed to add peer:", err)
raise
Переменные
Создаём переменные для хранения входящих показаний датчиков и состояния процесса отправки. Входящие показания датчиков сохраняем в словаре (dictionary).
last_send_status = " "
incoming_readings = {'temp': 0.0, 'hum': 0.0, 'pres': 0.0}
Получение показаний датчика BME280
Функция get_readings() возвращает показания датчика BME280 в следующем порядке: температура, влажность и давление. Функции библиотеки BME280 возвращают результаты с символом единицы измерения, поэтому мы удаляем символы в конце показаний.
В случае ошибки чтения датчика функция возвращает 0.0 для всех показаний.
# Function to get BME280 readings
def get_readings():
try:
temp = float(bme.temperature[:-1]) # Remove 'C'
hum = float(bme.humidity[:-1]) # Remove '%'
pres = float(bme.pressure[:-3]) # Remove 'hPa'
print("BME280 readings:", temp, hum, pres)
return temp, hum, pres
except Exception as err:
print("Error reading BME280:", err)
return 0.0, 0.0, 0.0
Обновление OLED-дисплея
Следующая функция обновляет дисплей текущими входящими показаниями датчиков. Также отображается последний статус отправки.
# Function to update OLED display
def update_display():
try:
display.fill(0)
display.text("INCOM. READINGS", 0, 0)
display.text("Temp: {:.1f} C".format(incoming_readings['temp']), 0, 15)
display.text("Hum: {:.1f} %".format(incoming_readings['hum']), 0, 25)
display.text("Pres: {:.1f} hPa".format(incoming_readings['pres']), 0, 35)
display.text(last_send_status, 0, 55)
display.show()
print("Display updated")
except Exception as err:
print("Error updating display:", err)
Подробнее об использовании OLED-дисплея с ESP32 на MicroPython читайте в нашем руководстве: MicroPython: OLED Display with ESP32 and ESP8266.
Функция send_message(e, peer) отправляет сообщение указанному пиру, используя экземпляр ESP-NOW e.
# Async function to send messages
async def send_messages(e, peer):
global last_send_status
while True:
try:
print("Sending data")
temp, hum, pres = get_readings()
# Create JSON string
data_dict = {"temp": temp, "hum": hum, "pres": pres}
json_str = ujson.dumps(data_dict)
data = json_str.encode('utf-8') # Convert to bytes
print("Sending JSON:", json_str)
if await e.asend(peer, data, sync=True):
print("Sent with success")
last_send_status = "Delivery Success :)"
else:
print("Send failed")
last_send_status = "Delivery Fail :("
update_display()
print("Sending task complete")
await asyncio.sleep(10) # Send every 10 seconds
except OSError as err:
print("Send error:", err)
last_send_status = "Delivery Fail :("
update_display()
await asyncio.sleep(0.1) # Shorter delay in case of error
В этой функции мы начинаем с вызова get_readings() для получения текущих показаний температуры, влажности и давления. Затем обновляем словарь текущими показаниями. Преобразуем его в JSON-строку, а затем в байты.
temp, hum, pres = get_readings()
# Create JSON string
data_dict = {"temp": temp, "hum": hum, "pres": pres}
json_str = ujson.dumps(data_dict)
data = json_str.encode('utf-8') # Convert to bytes
Затем используем ту же процедуру, описанную ранее, для отправки данных. В процессе мы также обновляем дисплей результатом отправки данных.
if await e.asend(peer, data, sync=True):
print("Sent with success")
last_send_status = "Delivery Success :)"
else:
print("Send failed")
last_send_status = "Delivery Fail :("
update_display()
print("Sending task complete")
await asyncio.sleep(10) # Send every 10 seconds
except OSError as err:
print("Send error:", err)
last_send_status = "Delivery Fail :("
update_display()
await asyncio.sleep(0.1) # Shorter delay in case of error
Приём входящих данных
Функция receive_messages() принимает данные от другой платы.
# Async function to receive messages
async def receive_messages(e):
global incoming_readings
while True:
try:
print("Checking for messages")
async for mac, msg in e:
try:
# Decode bytes to string and parse JSON
json_str = msg.decode('utf-8')
data_dict = ujson.loads(json_str)
temp = data_dict['temp']
hum = data_dict['hum']
pres = data_dict['pres']
incoming_readings['temp'] = temp
incoming_readings['hum'] = hum
incoming_readings['pres'] = pres
print("\nINCOMING READINGS")
print("Temperature: {:.1f} C".format(temp))
print("Humidity: {:.1f} %".format(hum))
print("Pressure: {:.1f} hPa".format(pres))
update_display()
except (ValueError, KeyError) as err:
print("Error parsing JSON:", err)
await asyncio.sleep(0.01) # Yield if no received messages
except OSError as err:
print("Receive error:", err)
await asyncio.sleep(0.1) # Shorter delay in case of error
При получении данных мы преобразуем их в JSON-строку. Затем обновляем словарь incoming_readings полученными данными.
Наконец, выводим показания в последовательный монитор и вызываем функцию update_display() для обновления экрана последними показаниями датчиков.
Создание асинхронного цикла и запуск программы
Наконец, создаём главную асинхронную функцию цикла и запускаем программу асинхронно, как в предыдущем примере.
# Run the async program
try:
print("Starting transceiver...")
asyncio.run(main(e, peer_mac))
except KeyboardInterrupt:
print("Stopping transceiver...")
e.active(False)
sta.active(False)
except Exception as err:
print("Main loop error:", err)
finally:
print("Cleaning up...")
e.active(False)
sta.active(False)
Загрузка кода на платы
Важное замечание: простой запуск файла через Thonny не копирует его на файловую систему платы. Это означает, что если вы отключите плату от компьютера и подадите питание, ничего не произойдёт, так как на файловой системе нет сохранённого Python-файла. Функция Run в Thonny IDE полезна для тестирования кода, но если вы хотите загрузить его на плату постоянно, нужно создать и сохранить файл в файловой системе платы.
Чтобы код запускался на платах без подключения к компьютеру, загрузите его в файловую систему платы с именем main.py.
Перейдите в File > Save as…. MicroPython Device.
Назовите файл main.py и сохраните на плату.
Демонстрация
После загрузки кода на обе платы, на OLED-дисплее должны отобразиться показания датчиков с другой платы, а также сообщение «Delivery Success».
Заключение
Надеемся, что это руководство было для вас полезным. Протокол беспроводной связи ESP-NOW — один из самых простых методов удалённой связи между платами ESP32 без необходимости Wi-Fi-роутера.
В этом руководстве вы научились устанавливать двустороннюю связь между платами. Вы можете легко добавить больше плат в вашу конфигурацию, добавив дополнительных пиров для создания сети плат ESP32, обменивающихся данными друг с другом.
Если вы только начинаете работу с ESP-NOW, рекомендуем начать с нашего руководства по началу работы с ESP-NOW для ESP32 на MicroPython.
Хотите узнать больше о MicroPython с ESP32? Ознакомьтесь с нашими ресурсами: