MicroPython: Двусторонняя связь ESP32 через ESP-NOW

Узнайте, как установить двустороннюю связь между двумя платами ESP32 с использованием протокола ESP-NOW. Сначала мы протестируем простой пример, демонстрирующий реализацию двусторонней связи. Затем мы создадим более сложный проект, в котором платы обмениваются показаниями датчиков и отображают результаты на OLED-дисплее.

MicroPython: Двусторонняя связь ESP32 через ESP-NOW

Используете Arduino IDE? Следуйте этому руководству: ESP-NOW: Двусторонняя связь между платами ESP32.

Предварительные требования — прошивка MicroPython

Для выполнения этого руководства вам нужна прошивка MicroPython, установленная на ваших платах ESP32 или ESP8266. Вам также понадобится IDE для написания и загрузки кода на плату. Мы рекомендуем использовать Thonny IDE:

Новичок в MicroPython? Ознакомьтесь с книгой: MicroPython Programming with ESP32 and ESP8266 eBook (2nd Edition)

Знакомство с ESP-NOW

ESP-NOW — это протокол беспроводной связи, разработанный Espressif, который позволяет нескольким платам ESP32 или ESP8266 обмениваться небольшими объёмами данных без использования Wi-Fi или Bluetooth. ESP-NOW не требует полного Wi-Fi-соединения (хотя контроллер Wi-Fi должен быть включён), что делает его идеальным для приложений с низким энергопотреблением и низкой задержкой, таких как сенсорные сети, пульты дистанционного управления или обмен данными между платами.

ESP-NOW — логотип ESP32

ESP-NOW использует модель связи без установления соединения, что означает, что устройства могут отправлять и получать данные без подключения к маршрутизатору или настройки точки доступа (в отличие от HTTP-связи между платами). Он поддерживает одноадресную (unicast — отправка данных на конкретное устройство по его MAC-адресу) и широковещательную (broadcast — отправка данных всем ближайшим устройствам с использованием широковещательного MAC-адреса) передачу сообщений.

Впервые работаете с ESP-NOW? Прочитайте наше руководство по началу работы с MicroPython и ESP-NOW на ESP32.

ESP32: Получение MAC-адреса платы

Для связи через ESP-NOW вам нужно знать MAC-адрес ваших плат. Чтобы получить MAC-адрес вашей платы, скопируйте следующий код в Thonny IDE и запустите его на вашей плате.

# Rui Santos & Sara Santos - Random Nerd Tutorials
# Complete project details at https://RandomNerdTutorials.com/micropython-esp-now-esp32/

import network

wlan = network.WLAN(network.STA_IF)
wlan.active(True)

# Get MAC address (returns bytes)
mac = wlan.config('mac')

# Convert to human-readable format
mac_address = ':'.join('%02x' % b for b in mac)

print("MAC Address:", mac_address)

Скачать raw-код

После запуска кода MAC-адрес платы должен отобразиться в консоли (Shell).

Thonny IDE — получение MAC-адреса платы ESP32 — MicroPython

Рекомендуем наклеить этикетку или стикер на каждую плату, чтобы можно было их легко идентифицировать.

Две платы ESP32 с наклейками с MAC-адресами

Мы будем использовать ESP32 DOIT V1 и ESP32 S3 DevKitC, но код должен работать с любой моделью ESP32.

Двусторонняя связь между двумя платами ESP32 через ESP-NOW (MicroPython)

В этом разделе мы покажем базовый пример обмена простыми сообщениями между платами ESP32 с использованием протокола ESP-NOW. Каждая плата одновременно является приёмником и передатчиком, поэтому их можно назвать трансиверами.

Существуют две библиотеки MicroPython для ESP-NOW, включённые по умолчанию в текущие версии прошивки MicroPython: espnow и aioespnow. Мы будем использовать aioespnow — асинхронную версию библиотеки espnow.

Двусторонняя связь ESP-NOW между двумя платами ESP32

Скопируйте следующий код в Thonny IDE.

# Rui Santos & Sara Santos - Random Nerd Tutorials
# Complete project details at https://RandomNerdTutorials.com/micropython-esp32-esp-now-two-way/

import network
import aioespnow
import asyncio
import time

# Initialize Wi-Fi in station mode
sta = network.WLAN(network.STA_IF)
sta.active(True)
sta.config(channel=1)  # Set channel explicitly if packets are not received
sta.disconnect()

# Initialize AIOESPNow
e = aioespnow.AIOESPNow()
try:
    e.active(True)
except OSError as err:
    print("Failed to initialize AIOESPNow:", err)
    raise

# Peer MAC address (replace with the actual MAC of the other board)
peer_mac = b'\xff\xff\xff\xff\xff\xff'  # Example peer MAC for unicast

# Add peer for unicast reliability
try:
    e.add_peer(peer_mac)
except OSError as err:
    print("Failed to add peer:", err)
    raise

# Stats tracking
last_stats_time = time.time()
stats_interval = 10  # Print stats every 10 seconds

# Async function to send messages
async def send_messages(e, peer):
    message_count = 0
    while True:
        try:
            message = f"Hello from ESP32 #{message_count}"
            if await e.asend(peer, message, sync=True):
                print(f"Sent message: {message}")
            else:
                print("Failed to send message")
            message_count += 1
            await asyncio.sleep(1)  # Send every 1 second
        except OSError as err:
            print("Send error:", err)
            await asyncio.sleep(5)

# Async function to receive messages
async def receive_messages(e):
    while True:
        try:
            async for mac, msg in e:
                print(f"Received from {mac.hex()}: {msg.decode()}")
        except OSError as err:
            print("Receive error:", err)
            await asyncio.sleep(5)

# Async function to print stats periodically
async def print_stats(e):
    global last_stats_time
    while True:
        if time.time() - last_stats_time >= stats_interval:
            stats = e.stats()
            print("\nESP-NOW Statistics:")
            print(f"  Packets Sent: {stats[0]}")
            print(f"  Packets Delivered: {stats[1]}")
            print(f"  Packets Dropped (TX): {stats[2]}")
            print(f"  Packets Received: {stats[3]}")
            print(f"  Packets Dropped (RX): {stats[4]}")
            last_stats_time = time.time()
        await asyncio.sleep(1)  # Check every second

# Main async function
async def main(e, peer):
    # Run send, receive, and stats tasks concurrently
    await asyncio.gather(send_messages(e, peer), receive_messages(e), print_stats(e))

# Run the async program
try:
    asyncio.run(main(e, peer_mac))
except KeyboardInterrupt:
    print("Stopping transceiver...")
    e.active(False)
    sta.active(False)

Скачать raw-код

Этот код настраивает плату ESP32 одновременно как приёмник и передатчик ESP-NOW. В коде необходимо указать MAC-адрес платы, которой вы хотите отправлять данные.

Например, если MAC-адрес платы-приёмника 30:AE:A4:F6:7D:4C, необходимо преобразовать его в формат bytes следующим образом:

  • 30:AE:A4:F6:7D:4C -> b'\x30\xae\xa4\xf6\x7d\x4c'

# Peer MAC address
receiver_mac = b'\x30\xae\xa4\xf6\x7d\x4c'

Загрузите этот код на обе платы, но измените MAC-адрес.

Как работает код?

Для лучшего понимания этого кода рекомендуем сначала ознакомиться с асинхронным программированием MicroPython. Вы можете прочитать следующее руководство:

Импорт модулей

Сначала импортируем необходимые модули.

import network
import aioespnow
import asyncio
import time

Инициализация Wi-Fi интерфейса

Затем нужно инициализировать Wi-Fi (даже если мы его не используем) для работы ESP-NOW. Можно использовать режим станции (STA_IF) или точки доступа (AP_IF).

# Initialize Wi-Fi in station mode
sta = network.WLAN(network.STA_IF)
sta.active(True)
sta.config(channel=1)  # Set channel
sta.disconnect()

Инициализация ESP-NOW

Затем можно инициализировать ESP-NOW. Сначала создаём экземпляр aioespnow с именем e. Затем активируем его методом active(), передавая значение True в качестве аргумента.

# Initialize AIOESPNow
e = aioespnow.AIOESPNow()
try:
    e.active(True)
    print("AIOESPNow initialized")
except OSError as err:
    print("Failed to initialize AIOESPNow:", err)
    raise

Мы активируем ESP-NOW внутри блока try/except, чтобы перехватить возможные ошибки при инициализации.

Добавление пира ESP-NOW

Укажите MAC-адрес пира (MAC-адрес платы, которой вы хотите отправлять данные).

# Receiver MAC address (the board you want to send data to)
peer_mac = b'\x68\xb6\xb3\x22\x9e\x60'

Затем добавляем MAC-адрес приёмника как пира с помощью метода add_peer().

try:
    e.add_peer(peer_mac)
except OSError as err:
    print("Failed to add peer:", err)
    raise

Функция отправки сообщений ESP-NOW

Следующая функция отправляет сообщения ESP-NOW пиру. В качестве аргументов передаются экземпляр ESP-NOW e и пир. Для примера мы отправляем сообщение «Hello from ESP32» с счётчиком. Новое сообщение отправляется каждую секунду.

# Async function to send messages
async def send_messages(e, peer):
    message_count = 0
    while True:
        try:
            message = f"Hello from ESP32 #{message_count}"
            if await e.asend(peer, message, sync=True):
                print(f"Sent message: {message}")
            else:
                print("Failed to send message")
            message_count += 1
            await asyncio.sleep(1)  # Send every 1 second
        except OSError as err:
            print("Send error:", err)
            await asyncio.sleep(5)

Функция приёма сообщений ESP-NOW

Также необходимо создать функцию для приёма сообщений ESP-NOW. При поступлении нового сообщения мы выводим его в консоль MicroPython.

# Async function to receive messages
async def receive_messages(e):
    while True:
        try:
            async for mac, msg in e:
                print(f"Received from {mac.hex()}: {msg.decode()}")
        except OSError as err:
            print("Receive error:", err)
            await asyncio.sleep(5)

Вывод статистики ESP-NOW

Затем создаём функцию print_stats(), которая выводит текущую статистику пакетов ESP-NOW. Для получения количества отправленных/полученных и потерянных пакетов вызываем метод stats() объекта ESP-NOW e.

Метод возвращает кортеж из 5 элементов с количеством отправленных/полученных/потерянных пакетов:

(tx_pkts, tx_responses, tx_failures, rx_packets, rx_dropped_packets)

Вот полная асинхронная функция:

async def print_stats(e):
    global last_stats_time
    while True:
        if time.time() - last_stats_time >= stats_interval:
            stats = e.stats()
            print("\nESP-NOW Statistics:")
            print(f"  Packets Sent: {stats[0]}")
            print(f"  Packets Delivered: {stats[1]}")
            print(f"  Packets Dropped (TX): {stats[2]}")
            print(f"  Packets Received: {stats[3]}")
            print(f"  Packets Dropped (RX): {stats[4]}")
            last_stats_time = time.time()
        await asyncio.sleep(1)  # Check every second

Главная асинхронная функция

Следующая строка определяет асинхронную функцию main(), которая принимает объект ESP-NOW e и MAC-адрес пира peer в качестве аргументов.

async def main(e, peer):

Затем используем asyncio.gather для одновременного запуска трёх асинхронных задач:

  • send_messages(e, peer): отправляет данные пиру;

  • receive_messages(e): прослушивает входящие данные от пира;

  • print_stats(e): периодически выводит статистику ESP-NOW.

await asyncio.gather(send_messages(e, peer), receive_messages(e), print_stats(e))

Запуск асинхронной программы

Наконец, следующие строки запускают программу асинхронно.

# Run the async program
try:
    asyncio.run(main(e, peer_mac))
except KeyboardInterrupt:
    print("Stopping transceiver...")
    e.active(False)
    sta.active(False)

Команда asyncio.run(main(e, peer_mac)) запускает цикл событий asyncio MicroPython, выполняя функцию main().

Код заключён в блок try/except для перехвата KeyboardInterrupt (ручная остановка кода). В этом случае отключаем ESP-NOW и выключаем Wi-Fi.

Запуск кода

После подключения платы к компьютеру и установления связи с Thonny IDE можно загрузить код как main.py на плату или запустить его кнопкой Run. Убедитесь, что вы указали MAC-адрес приёмника в коде.

Запуск кода в Thonny IDE — MicroPython

Плата начнёт выводить сообщения в консоль (Shell). Отправка будет завершаться ошибкой, пока вторая плата ещё не готова.

Откройте другой экземпляр Thonny IDE и установите связь с другой платой.

Включение двух экземпляров Thonny IDE

Перейдите в Tools > Options и снимите галочку с опции Allow only single Thonny instance.

Скопируйте тот же код, но укажите MAC-адрес другой платы. Через несколько секунд платы начнут отправлять и получать сообщения ESP-NOW.

Две платы ESP32 с наклейками с MAC-адресами

Следующие скриншоты показывают консоль MicroPython Shell для каждой из моих плат.

ESP32 ESP-NOW трансивер — двусторонняя связь — результаты в консоли MicroPython shell ESP32 ESP-NOW трансивер — двусторонняя связь — результаты в консоли MicroPython shell

Как видите, после включения обеих плат они начинают обмениваться сообщениями друг с другом.


Двусторонняя связь ESP32 через ESP-NOW — Обмен показаниями датчиков и отображение на OLED

В этом разделе мы создадим проект с реальным применением, в котором две платы ESP32 обмениваются показаниями датчиков и отображают полученные данные на OLED-дисплее.

ESP32 ESP-NOW двусторонняя связь — обмен показаниями датчиков и отображение на OLED

Обзор проекта

Следующая диаграмма показывает общий обзор проекта, который мы создадим.

ESP-NOW двусторонняя связь — отправка показаний датчиков между платами
  • В этом проекте у нас будет две платы ESP32. К каждой плате подключены OLED-дисплей и датчик BME280;

  • Каждая плата получает показания температуры, влажности и давления от своего датчика;

  • Каждая плата отправляет свои показания другой плате через ESP-NOW;

  • При получении показаний плата отображает их на OLED-дисплее;

  • После отправки показаний плата отображает на OLED, было ли сообщение успешно доставлено;

  • Каждая плата должна знать MAC-адрес другой платы для отправки сообщения.

В этом примере мы используем двустороннюю связь между двумя платами, но вы можете добавить больше плат в эту конфигурацию, обеспечив связь всех плат друг с другом.

Необходимые компоненты

Для этого руководства вам понадобятся следующие компоненты:

Схема подключения

Подключите OLED-дисплей и датчик BME280 к каждой плате ESP32. Следуйте схеме подключения ниже (при необходимости скорректируйте для другой модели ESP32 с другим расположением выводов).

Схема подключения ESP32 к датчику BME280 и OLED-дисплею

Используйте следующую таблицу в качестве справки при подключении датчика BME280.

BME280

ESP32

VIN

3.3V

GND

GND

SCL

GPIO 22

SDA

GPIO 21

Также можно использовать следующую таблицу для подключения OLED-дисплея к ESP32.

OLED Display

ESP32

GND

GND

VDD / VCC

VIN

SCK / SCL

GPIO 22

SDA

GPIO 21

Импорт библиотек

Библиотеки для работы с OLED-дисплеем и получения данных BME280 не входят в стандартный пакет MicroPython. Поэтому нам нужно импортировать эти модули на наши платы.

Модуль ssd1306.py для MicroPython

  1. Скопируйте следующий код в новый файл в Thonny IDE.

# MicroPython SSD1306 OLED driver, I2C and SPI interfaces created by Adafruit

import time
import framebuf

# register definitions
SET_CONTRAST        = const(0x81)
SET_ENTIRE_ON       = const(0xa4)
SET_NORM_INV        = const(0xa6)
SET_DISP            = const(0xae)
SET_MEM_ADDR        = const(0x20)
SET_COL_ADDR        = const(0x21)
SET_PAGE_ADDR       = const(0x22)
SET_DISP_START_LINE = const(0x40)
SET_SEG_REMAP       = const(0xa0)
SET_MUX_RATIO       = const(0xa8)
SET_COM_OUT_DIR     = const(0xc0)
SET_DISP_OFFSET     = const(0xd3)
SET_COM_PIN_CFG     = const(0xda)
SET_DISP_CLK_DIV    = const(0xd5)
SET_PRECHARGE       = const(0xd9)
SET_VCOM_DESEL      = const(0xdb)
SET_CHARGE_PUMP     = const(0x8d)

class SSD1306:
    def __init__(self, width, height, external_vcc):
        self.width = width
        self.height = height
        self.external_vcc = external_vcc
        self.pages = self.height // 8
        # Note the subclass must initialize self.framebuf to a framebuffer.
        # This is necessary because the underlying data buffer is different
        # between I2C and SPI implementations (I2C needs an extra byte).
        self.poweron()
        self.init_display()

    def init_display(self):
        for cmd in (
            SET_DISP | 0x00, # off
            # address setting
            SET_MEM_ADDR, 0x00, # horizontal
            # resolution and layout
            SET_DISP_START_LINE | 0x00,
            SET_SEG_REMAP | 0x01, # column addr 127 mapped to SEG0
            SET_MUX_RATIO, self.height - 1,
            SET_COM_OUT_DIR | 0x08, # scan from COM[N] to COM0
            SET_DISP_OFFSET, 0x00,
            SET_COM_PIN_CFG, 0x02 if self.height == 32 else 0x12,
            # timing and driving scheme
            SET_DISP_CLK_DIV, 0x80,
            SET_PRECHARGE, 0x22 if self.external_vcc else 0xf1,
            SET_VCOM_DESEL, 0x30, # 0.83*Vcc
            # display
            SET_CONTRAST, 0xff, # maximum
            SET_ENTIRE_ON, # output follows RAM contents
            SET_NORM_INV, # not inverted
            # charge pump
            SET_CHARGE_PUMP, 0x10 if self.external_vcc else 0x14,
            SET_DISP | 0x01): # on
            self.write_cmd(cmd)
        self.fill(0)
        self.show()

    def poweroff(self):
        self.write_cmd(SET_DISP | 0x00)

    def contrast(self, contrast):
        self.write_cmd(SET_CONTRAST)
        self.write_cmd(contrast)

    def invert(self, invert):
        self.write_cmd(SET_NORM_INV | (invert & 1))

    def show(self):
        x0 = 0
        x1 = self.width - 1
        if self.width == 64:
            # displays with width of 64 pixels are shifted by 32
            x0 += 32
            x1 += 32
        self.write_cmd(SET_COL_ADDR)
        self.write_cmd(x0)
        self.write_cmd(x1)
        self.write_cmd(SET_PAGE_ADDR)
        self.write_cmd(0)
        self.write_cmd(self.pages - 1)
        self.write_framebuf()

    def fill(self, col):
        self.framebuf.fill(col)

    def pixel(self, x, y, col):
        self.framebuf.pixel(x, y, col)

    def scroll(self, dx, dy):
        self.framebuf.scroll(dx, dy)

    def text(self, string, x, y, col=1):
        self.framebuf.text(string, x, y, col)

class SSD1306_I2C(SSD1306):
    def __init__(self, width, height, i2c, addr=0x3c, external_vcc=False):
        self.i2c = i2c
        self.addr = addr
        self.temp = bytearray(2)
        # Add an extra byte to the data buffer to hold an I2C data/command byte
        # to use hardware-compatible I2C transactions.  A memoryview of the
        # buffer is used to mask this byte from the framebuffer operations
        # (without a major memory hit as memoryview doesn't copy to a separate
        # buffer).
        self.buffer = bytearray(((height // 8) * width) + 1)
        self.buffer[0] = 0x40  # Set first byte of data buffer to Co=0, D/C=1
        self.framebuf = framebuf.FrameBuffer1(memoryview(self.buffer)[1:], width, height)
        super().__init__(width, height, external_vcc)

    def write_cmd(self, cmd):
        self.temp[0] = 0x80 # Co=1, D/C#=0
        self.temp[1] = cmd
        self.i2c.writeto(self.addr, self.temp)

    def write_framebuf(self):
        # Blast out the frame buffer using a single I2C transaction to support
        # hardware I2C interfaces.
        self.i2c.writeto(self.addr, self.buffer)

    def poweron(self):
        pass

class SSD1306_SPI(SSD1306):
    def __init__(self, width, height, spi, dc, res, cs, external_vcc=False):
        self.rate = 10 * 1024 * 1024
        dc.init(dc.OUT, value=0)
        res.init(res.OUT, value=0)
        cs.init(cs.OUT, value=1)
        self.spi = spi
        self.dc = dc
        self.res = res
        self.cs = cs
        self.buffer = bytearray((height // 8) * width)
        self.framebuf = framebuf.FrameBuffer1(self.buffer, width, height)
        super().__init__(width, height, external_vcc)

    def write_cmd(self, cmd):
        self.spi.init(baudrate=self.rate, polarity=0, phase=0)
        self.cs.high()
        self.dc.low()
        self.cs.low()
        self.spi.write(bytearray([cmd]))
        self.cs.high()

    def write_framebuf(self):
        self.spi.init(baudrate=self.rate, polarity=0, phase=0)
        self.cs.high()
        self.dc.high()
        self.cs.low()
        self.spi.write(self.buffer)
        self.cs.high()

    def poweron(self):
        self.res.high()
        time.sleep_ms(1)
        self.res.low()
        time.sleep_ms(10)
        self.res.high()

Скачать raw-код

  1. Перейдите в File > Save as и выберите MicroPython device.

Thonny IDE — сохранение на устройство MicroPython
  1. Назовите файл ssd1306.py и нажмите OK для сохранения файла в файловой системе ESP.

Вот и всё. Библиотека загружена на вашу плату.

Модуль BME280.py для MicroPython

  1. Скопируйте следующий код в новый файл в Thonny IDE.

from machine import I2C
import time

# BME280 default address.
BME280_I2CADDR = 0x76

# Operating Modes
BME280_OSAMPLE_1 = 1
BME280_OSAMPLE_2 = 2
BME280_OSAMPLE_4 = 3
BME280_OSAMPLE_8 = 4
BME280_OSAMPLE_16 = 5

# BME280 Registers

BME280_REGISTER_DIG_T1 = 0x88  # Trimming parameter registers
BME280_REGISTER_DIG_T2 = 0x8A
BME280_REGISTER_DIG_T3 = 0x8C

BME280_REGISTER_DIG_P1 = 0x8E
BME280_REGISTER_DIG_P2 = 0x90
BME280_REGISTER_DIG_P3 = 0x92
BME280_REGISTER_DIG_P4 = 0x94
BME280_REGISTER_DIG_P5 = 0x96
BME280_REGISTER_DIG_P6 = 0x98
BME280_REGISTER_DIG_P7 = 0x9A
BME280_REGISTER_DIG_P8 = 0x9C
BME280_REGISTER_DIG_P9 = 0x9E

BME280_REGISTER_DIG_H1 = 0xA1
BME280_REGISTER_DIG_H2 = 0xE1
BME280_REGISTER_DIG_H3 = 0xE3
BME280_REGISTER_DIG_H4 = 0xE4
BME280_REGISTER_DIG_H5 = 0xE5
BME280_REGISTER_DIG_H6 = 0xE6
BME280_REGISTER_DIG_H7 = 0xE7

BME280_REGISTER_CHIPID = 0xD0
BME280_REGISTER_VERSION = 0xD1
BME280_REGISTER_SOFTRESET = 0xE0

BME280_REGISTER_CONTROL_HUM = 0xF2
BME280_REGISTER_CONTROL = 0xF4
BME280_REGISTER_CONFIG = 0xF5
BME280_REGISTER_PRESSURE_DATA = 0xF7
BME280_REGISTER_TEMP_DATA = 0xFA
BME280_REGISTER_HUMIDITY_DATA = 0xFD

class Device:
  """Class for communicating with an I2C device.

  Allows reading and writing 8-bit, 16-bit, and byte array values to
  registers on the device."""

  def __init__(self, address, i2c):
    """Create an instance of the I2C device at the specified address using
    the specified I2C interface object."""
    self._address = address
    self._i2c = i2c

  def writeRaw8(self, value):
    """Write an 8-bit value on the bus (without register)."""
    value = value & 0xFF
    self._i2c.writeto(self._address, value)

  def write8(self, register, value):
    """Write an 8-bit value to the specified register."""
    b=bytearray(1)
    b[0]=value & 0xFF
    self._i2c.writeto_mem(self._address, register, b)

  def write16(self, register, value):
    """Write a 16-bit value to the specified register."""
    value = value & 0xFFFF
    b=bytearray(2)
    b[0]= value & 0xFF
    b[1]= (value>>8) & 0xFF
    self.i2c.writeto_mem(self._address, register, value)

  def readRaw8(self):
    """Read an 8-bit value on the bus (without register)."""
    return int.from_bytes(self._i2c.readfrom(self._address, 1),'little') & 0xFF

  def readU8(self, register):
    """Read an unsigned byte from the specified register."""
    return int.from_bytes(
        self._i2c.readfrom_mem(self._address, register, 1),'little') & 0xFF

  def readS8(self, register):
    """Read a signed byte from the specified register."""
    result = self.readU8(register)
    if result > 127:
      result -= 256
    return result

  def readU16(self, register, little_endian=True):
    """Read an unsigned 16-bit value from the specified register, with the
    specified endianness (default little endian, or least significant byte
    first)."""
    result = int.from_bytes(
        self._i2c.readfrom_mem(self._address, register, 2),'little') & 0xFFFF
    if not little_endian:
      result = ((result << 8) & 0xFF00) + (result >> 8)
    return result

  def readS16(self, register, little_endian=True):
    """Read a signed 16-bit value from the specified register, with the
    specified endianness (default little endian, or least significant byte
    first)."""
    result = self.readU16(register, little_endian)
    if result > 32767:
      result -= 65536
    return result

  def readU16LE(self, register):
    """Read an unsigned 16-bit value from the specified register, in little
    endian byte order."""
    return self.readU16(register, little_endian=True)

  def readU16BE(self, register):
    """Read an unsigned 16-bit value from the specified register, in big
    endian byte order."""
    return self.readU16(register, little_endian=False)

  def readS16LE(self, register):
    """Read a signed 16-bit value from the specified register, in little
    endian byte order."""
    return self.readS16(register, little_endian=True)

  def readS16BE(self, register):
    """Read a signed 16-bit value from the specified register, in big
    endian byte order."""
    return self.readS16(register, little_endian=False)

class BME280:
  def __init__(self, mode=BME280_OSAMPLE_1, address=BME280_I2CADDR, i2c=None,
               **kwargs):
    # Check that mode is valid.
    if mode not in [BME280_OSAMPLE_1, BME280_OSAMPLE_2, BME280_OSAMPLE_4,\
                    BME280_OSAMPLE_8, BME280_OSAMPLE_16]:
        raise ValueError(
            'Unexpected mode value {0}. Set mode to one of '
            'BME280_ULTRALOWPOWER, BME280_STANDARD, BME280_HIGHRES, or '
            'BME280_ULTRAHIGHRES'.format(mode))
    self._mode = mode
    # Create I2C device.
    if i2c is None:
      raise ValueError('An I2C object is required.')
    self._device = Device(address, i2c)
    # Load calibration values.
    self._load_calibration()
    self._device.write8(BME280_REGISTER_CONTROL, 0x3F)
    self.t_fine = 0

  def _load_calibration(self):

    self.dig_T1 = self._device.readU16LE(BME280_REGISTER_DIG_T1)
    self.dig_T2 = self._device.readS16LE(BME280_REGISTER_DIG_T2)
    self.dig_T3 = self._device.readS16LE(BME280_REGISTER_DIG_T3)

    self.dig_P1 = self._device.readU16LE(BME280_REGISTER_DIG_P1)
    self.dig_P2 = self._device.readS16LE(BME280_REGISTER_DIG_P2)
    self.dig_P3 = self._device.readS16LE(BME280_REGISTER_DIG_P3)
    self.dig_P4 = self._device.readS16LE(BME280_REGISTER_DIG_P4)
    self.dig_P5 = self._device.readS16LE(BME280_REGISTER_DIG_P5)
    self.dig_P6 = self._device.readS16LE(BME280_REGISTER_DIG_P6)
    self.dig_P7 = self._device.readS16LE(BME280_REGISTER_DIG_P7)
    self.dig_P8 = self._device.readS16LE(BME280_REGISTER_DIG_P8)
    self.dig_P9 = self._device.readS16LE(BME280_REGISTER_DIG_P9)

    self.dig_H1 = self._device.readU8(BME280_REGISTER_DIG_H1)
    self.dig_H2 = self._device.readS16LE(BME280_REGISTER_DIG_H2)
    self.dig_H3 = self._device.readU8(BME280_REGISTER_DIG_H3)
    self.dig_H6 = self._device.readS8(BME280_REGISTER_DIG_H7)

    h4 = self._device.readS8(BME280_REGISTER_DIG_H4)
    h4 = (h4 << 24) >> 20
    self.dig_H4 = h4 | (self._device.readU8(BME280_REGISTER_DIG_H5) & 0x0F)

    h5 = self._device.readS8(BME280_REGISTER_DIG_H6)
    h5 = (h5 << 24) >> 20
    self.dig_H5 = h5 | (
        self._device.readU8(BME280_REGISTER_DIG_H5) >> 4 & 0x0F)

  def read_raw_temp(self):
    """Reads the raw (uncompensated) temperature from the sensor."""
    meas = self._mode
    self._device.write8(BME280_REGISTER_CONTROL_HUM, meas)
    meas = self._mode << 5 | self._mode << 2 | 1
    self._device.write8(BME280_REGISTER_CONTROL, meas)
    sleep_time = 1250 + 2300 * (1 << self._mode)

    sleep_time = sleep_time + 2300 * (1 << self._mode) + 575
    sleep_time = sleep_time + 2300 * (1 << self._mode) + 575
    time.sleep_us(sleep_time)  # Wait the required time
    msb = self._device.readU8(BME280_REGISTER_TEMP_DATA)
    lsb = self._device.readU8(BME280_REGISTER_TEMP_DATA + 1)
    xlsb = self._device.readU8(BME280_REGISTER_TEMP_DATA + 2)
    raw = ((msb << 16) | (lsb << 8) | xlsb) >> 4
    return raw

  def read_raw_pressure(self):
    """Reads the raw (uncompensated) pressure level from the sensor."""
    """Assumes that the temperature has already been read """
    """i.e. that enough delay has been provided"""
    msb = self._device.readU8(BME280_REGISTER_PRESSURE_DATA)
    lsb = self._device.readU8(BME280_REGISTER_PRESSURE_DATA + 1)
    xlsb = self._device.readU8(BME280_REGISTER_PRESSURE_DATA + 2)
    raw = ((msb << 16) | (lsb << 8) | xlsb) >> 4
    return raw

  def read_raw_humidity(self):
    """Assumes that the temperature has already been read """
    """i.e. that enough delay has been provided"""
    msb = self._device.readU8(BME280_REGISTER_HUMIDITY_DATA)
    lsb = self._device.readU8(BME280_REGISTER_HUMIDITY_DATA + 1)
    raw = (msb << 8) | lsb
    return raw

  def read_temperature(self):
    """Get the compensated temperature in 0.01 of a degree celsius."""
    adc = self.read_raw_temp()
    var1 = ((adc >> 3) - (self.dig_T1 << 1)) * (self.dig_T2 >> 11)
    var2 = ((
        (((adc >> 4) - self.dig_T1) * ((adc >> 4) - self.dig_T1)) >> 12) *
        self.dig_T3) >> 14
    self.t_fine = var1 + var2
    return (self.t_fine * 5 + 128) >> 8

  def read_pressure(self):
    """Gets the compensated pressure in Pascals."""
    adc = self.read_raw_pressure()
    var1 = self.t_fine - 128000
    var2 = var1 * var1 * self.dig_P6
    var2 = var2 + ((var1 * self.dig_P5) << 17)
    var2 = var2 + (self.dig_P4 << 35)
    var1 = (((var1 * var1 * self.dig_P3) >> 8) +
            ((var1 * self.dig_P2) >> 12))
    var1 = (((1 << 47) + var1) * self.dig_P1) >> 33
    if var1 == 0:
      return 0
    p = 1048576 - adc
    p = (((p << 31) - var2) * 3125) // var1
    var1 = (self.dig_P9 * (p >> 13) * (p >> 13)) >> 25
    var2 = (self.dig_P8 * p) >> 19
    return ((p + var1 + var2) >> 8) + (self.dig_P7 << 4)

  def read_humidity(self):
    adc = self.read_raw_humidity()
    # print 'Raw humidity = {0:d}'.format (adc)
    h = self.t_fine - 76800
    h = (((((adc << 14) - (self.dig_H4 << 20) - (self.dig_H5 * h)) +
         16384) >> 15) * (((((((h * self.dig_H6) >> 10) * (((h *
                          self.dig_H3) >> 11) + 32768)) >> 10) + 2097152) *
                          self.dig_H2 + 8192) >> 14))
    h = h - (((((h >> 15) * (h >> 15)) >> 7) * self.dig_H1) >> 4)
    h = 0 if h < 0 else h
    h = 419430400 if h > 419430400 else h
    return h >> 12

  @property
  def temperature(self):
    "Return the temperature in degrees."
    t = self.read_temperature()
    ti = t // 100
    td = t - ti * 100
    return "{}.{:02d}C".format(ti, td)

  @property
  def pressure(self):
    "Return the temperature in hPa."
    p = self.read_pressure() // 256
    pi = p // 100
    pd = p - pi * 100
    return "{}.{:02d}hPa".format(pi, pd)

  @property
  def humidity(self):
    "Return the humidity in percent."
    h = self.read_humidity()
    hi = h // 1024
    hd = h * 100 // 1024 - hi * 100
    return "{}.{:02d}%".format(hi, hd)

Скачать raw-код

  1. Перейдите в File > Save as…

Thonny IDE ESP32 ESP8266 MicroPython — сохранить файл библиотеки — save as
  1. Выберите сохранение на «MicroPython device»:

Thonny IDE ESP32 ESP8266 MicroPython — сохранить файл библиотеки на устройство
  1. Назовите файл BME280.py и нажмите кнопку OK:

Библиотека BME280 — новый файл MicroPython в Thonny IDE

Вот и всё. Библиотека загружена на вашу плату.

Код MicroPython — обмен показаниями датчика BME280 через ESP-NOW

Теперь, когда все необходимые модули загружены на ваши платы, можно загрузить следующий код на каждую из них.

Важно: не забудьте указать MAC-адрес приёмника в коде.

# Rui Santos & Sara Santos - Random Nerd Tutorials
# Complete project details at https://RandomNerdTutorials.com/micropython-esp32-esp-now-two-way/

import network
import aioespnow
import asyncio
import time
import ujson
from machine import Pin, I2C
import BME280
import ssd1306

# Initialize I2C for BME280 and SSD1306
i2c = I2C(0, scl=Pin(22), sda=Pin(21))

# Initialize BME280 sensor
try:
    bme = BME280.BME280(i2c=i2c, address=0x76)
    print("BME280 initialized")
except Exception as err:
    print("Failed to initialize BME280:", err)
    raise

# Initialize SSD1306 OLED display
try:
    display = ssd1306.SSD1306_I2C(128, 64, i2c, addr=0x3C)
    print("SSD1306 initialized")
except Exception as err:
    print("Failed to initialize SSD1306:", err)
    raise

# Initialize Wi-Fi in station mode
sta = network.WLAN(network.STA_IF)
sta.active(True)
sta.config(channel=1) # set Wi-Fi channel for more stable communication
sta.disconnect()
print("Wi-Fi initialized")

# Initialize AIOESPNow
e = aioespnow.AIOESPNow()
try:
    e.active(True)
    print("AIOESPNow initialized")
except OSError as err:
    print("Failed to initialize AIOESPNow:", err)
    raise

# Receiver MAC address (the board you want to send data to)
peer_mac = b'\xff\xff\xff\xff\xff\xff'

# Add peer
try:
    e.add_peer(peer_mac)
except OSError as err:
    print("Failed to add peer:", err)
    raise

# Variables to store readings and status
last_send_status = " "
incoming_readings = {'temp': 0.0, 'hum': 0.0, 'pres': 0.0}

# Function to get BME280 readings
def get_readings():
    try:
        temp = float(bme.temperature[:-1]) # Remove 'C'
        hum = float(bme.humidity[:-1])     # Remove '%'
        pres = float(bme.pressure[:-3])    # Remove 'hPa'
        print("BME280 readings:", temp, hum, pres)
        return temp, hum, pres
    except Exception as err:
        print("Error reading BME280:", err)
        return 0.0, 0.0, 0.0

# Function to update OLED display
def update_display():
    try:
        display.fill(0)
        display.text("INCOM. READINGS", 0, 0)
        display.text("Temp: {:.1f} C".format(incoming_readings['temp']), 0, 15)
        display.text("Hum: {:.1f} %".format(incoming_readings['hum']), 0, 25)
        display.text("Pres: {:.1f} hPa".format(incoming_readings['pres']), 0, 35)
        display.text(last_send_status, 0, 55)
        display.show()
        print("Display updated")
    except Exception as err:
        print("Error updating display:", err)

# Async function to send messages
async def send_messages(e, peer):
    global last_send_status
    while True:
        try:
            print("Sending data")
            temp, hum, pres = get_readings()
            # Create JSON string
            data_dict = {"temp": temp, "hum": hum, "pres": pres}
            json_str = ujson.dumps(data_dict)
            data = json_str.encode('utf-8')  # Convert to bytes
            print("Sending JSON:", json_str)
            if await e.asend(peer, data, sync=True):
                print("Sent with success")
                last_send_status = "Delivery Success :)"
            else:
                print("Send failed")
                last_send_status = "Delivery Fail :("
            update_display()
            print("Sending task complete")
            await asyncio.sleep(10)  # Send every 10 seconds
        except OSError as err:
            print("Send error:", err)
            last_send_status = "Delivery Fail :("
            update_display()
            await asyncio.sleep(0.1)  # Shorter delay in case of error

# Async function to receive messages
async def receive_messages(e):
    global incoming_readings
    while True:
        try:
            print("Checking for messages")
            async for mac, msg in e:
                try:
                    # Decode bytes to string and parse JSON
                    json_str = msg.decode('utf-8')
                    data_dict = ujson.loads(json_str)
                    temp = data_dict['temp']
                    hum = data_dict['hum']
                    pres = data_dict['pres']
                    incoming_readings['temp'] = temp
                    incoming_readings['hum'] = hum
                    incoming_readings['pres'] = pres
                    print("\nINCOMING READINGS")
                    print("Temperature: {:.1f} C".format(temp))
                    print("Humidity: {:.1f} %".format(hum))
                    print("Pressure: {:.1f} hPa".format(pres))
                    update_display()
                except (ValueError, KeyError) as err:
                    print("Error parsing JSON:", err)
            await asyncio.sleep(0.01)  # Yield if no received messages
        except OSError as err:
            print("Receive error:", err)
            await asyncio.sleep(0.1)  # Shorter delay in case of error

# Main async function
async def main(e, peer):
    print("Starting main loop")
    await asyncio.gather(send_messages(e, peer), receive_messages(e))

# Run the async program
try:
    print("Starting transceiver...")
    asyncio.run(main(e, peer_mac))
except KeyboardInterrupt:
    print("Stopping transceiver...")
    e.active(False)
    sta.active(False)
except Exception as err:
    print("Main loop error:", err)
finally:
    print("Cleaning up...")
    e.active(False)
    sta.active(False)

Скачать raw-код

Как работает код?

Давайте кратко рассмотрим, как работает код. Также вы можете перейти сразу к разделу Демонстрация.

Импорт библиотек

Начнём с импорта необходимых модулей.

import network
import aioespnow
import asyncio
import time
import ujson
from machine import Pin, I2C
import BME280
import ssd1306
I2C связь

Инициализируем I2C-связь на GPIO 22 и GPIO 21. Скорректируйте, если используете другие выводы.

i2c = I2C(0, scl=Pin(22), sda=Pin(21))
BME280

Инициализируем датчик BME280.

# Initialize BME280 sensor
try:
    bme = BME280.BME280(i2c=i2c, address=0x76)
    print("BME280 initialized")
except Exception as err:
    print("Failed to initialize BME280:", err)
    raise
OLED-дисплей

Инициализируем OLED-дисплей.

# Initialize SSD1306 OLED display
try:
    display = ssd1306.SSD1306_I2C(128, 64, i2c, addr=0x3C)
    print("SSD1306 initialized")
except Exception as err:
    print("Failed to initialize SSD1306:", err)
    raise
Инициализация Wi-Fi интерфейса

Затем нужно инициализировать Wi-Fi (даже если мы его не используем) для работы ESP-NOW. Можно использовать режим станции (STA_IF) или точки доступа (AP_IF).

# Initialize Wi-Fi in station mode
sta = network.WLAN(network.STA_IF)
sta.active(True)
sta.config(channel=1)  # Set channel
sta.disconnect()
Инициализация ESP-NOW

Затем можно инициализировать ESP-NOW. Сначала создаём экземпляр aioespnow с именем e. Затем активируем его методом active(), передавая значение True в качестве аргумента.

# Initialize AIOESPNow
e = aioespnow.AIOESPNow()
try:
    e.active(True)
    print("AIOESPNow initialized")
except OSError as err:
    print("Failed to initialize AIOESPNow:", err)
    raise

Мы активируем ESP-NOW внутри блока try/except, чтобы перехватить возможные ошибки при инициализации.

Добавление пира ESP-NOW

Укажите MAC-адрес пира (MAC-адрес платы, которой вы хотите отправлять данные).

# Receiver MAC address (the board you want to send data to)
peer_mac = b'\x68\xb6\xb3\x22\x9e\x60'

Затем добавляем MAC-адрес приёмника как пира с помощью метода add_peer().

try:
    e.add_peer(peer_mac)
except OSError as err:
    print("Failed to add peer:", err)
    raise
Переменные

Создаём переменные для хранения входящих показаний датчиков и состояния процесса отправки. Входящие показания датчиков сохраняем в словаре (dictionary).

last_send_status = " "
incoming_readings = {'temp': 0.0, 'hum': 0.0, 'pres': 0.0}
Получение показаний датчика BME280

Функция get_readings() возвращает показания датчика BME280 в следующем порядке: температура, влажность и давление. Функции библиотеки BME280 возвращают результаты с символом единицы измерения, поэтому мы удаляем символы в конце показаний.

В случае ошибки чтения датчика функция возвращает 0.0 для всех показаний.

# Function to get BME280 readings
def get_readings():
    try:
        temp = float(bme.temperature[:-1]) # Remove 'C'
        hum = float(bme.humidity[:-1])     # Remove '%'
        pres = float(bme.pressure[:-3])    # Remove 'hPa'
        print("BME280 readings:", temp, hum, pres)
        return temp, hum, pres
    except Exception as err:
        print("Error reading BME280:", err)
        return 0.0, 0.0, 0.0
Обновление OLED-дисплея

Следующая функция обновляет дисплей текущими входящими показаниями датчиков. Также отображается последний статус отправки.

# Function to update OLED display
def update_display():
    try:
        display.fill(0)
        display.text("INCOM. READINGS", 0, 0)
        display.text("Temp: {:.1f} C".format(incoming_readings['temp']), 0, 15)
        display.text("Hum: {:.1f} %".format(incoming_readings['hum']), 0, 25)
        display.text("Pres: {:.1f} hPa".format(incoming_readings['pres']), 0, 35)
        display.text(last_send_status, 0, 55)
        display.show()
        print("Display updated")
    except Exception as err:
        print("Error updating display:", err)

Подробнее об использовании OLED-дисплея с ESP32 на MicroPython читайте в нашем руководстве: MicroPython: OLED Display with ESP32 and ESP8266.

Функция send_message(e, peer) отправляет сообщение указанному пиру, используя экземпляр ESP-NOW e.

# Async function to send messages
async def send_messages(e, peer):
    global last_send_status
    while True:
        try:
            print("Sending data")
            temp, hum, pres = get_readings()
            # Create JSON string
            data_dict = {"temp": temp, "hum": hum, "pres": pres}
            json_str = ujson.dumps(data_dict)
            data = json_str.encode('utf-8')  # Convert to bytes
            print("Sending JSON:", json_str)
            if await e.asend(peer, data, sync=True):
                print("Sent with success")
                last_send_status = "Delivery Success :)"
            else:
                print("Send failed")
                last_send_status = "Delivery Fail :("
            update_display()
            print("Sending task complete")
            await asyncio.sleep(10)  # Send every 10 seconds
        except OSError as err:
            print("Send error:", err)
            last_send_status = "Delivery Fail :("
            update_display()
            await asyncio.sleep(0.1)  # Shorter delay in case of error

В этой функции мы начинаем с вызова get_readings() для получения текущих показаний температуры, влажности и давления. Затем обновляем словарь текущими показаниями. Преобразуем его в JSON-строку, а затем в байты.

temp, hum, pres = get_readings()
# Create JSON string
data_dict = {"temp": temp, "hum": hum, "pres": pres}
json_str = ujson.dumps(data_dict)
data = json_str.encode('utf-8')  # Convert to bytes

Затем используем ту же процедуру, описанную ранее, для отправки данных. В процессе мы также обновляем дисплей результатом отправки данных.

if await e.asend(peer, data, sync=True):
    print("Sent with success")
    last_send_status = "Delivery Success :)"
else:
    print("Send failed")
    last_send_status = "Delivery Fail :("
    update_display()
    print("Sending task complete")
    await asyncio.sleep(10)  # Send every 10 seconds
except OSError as err:
    print("Send error:", err)
    last_send_status = "Delivery Fail :("
    update_display()
    await asyncio.sleep(0.1)  # Shorter delay in case of error
Приём входящих данных

Функция receive_messages() принимает данные от другой платы.

# Async function to receive messages
async def receive_messages(e):
    global incoming_readings
    while True:
        try:
            print("Checking for messages")
            async for mac, msg in e:
                try:
                    # Decode bytes to string and parse JSON
                    json_str = msg.decode('utf-8')
                    data_dict = ujson.loads(json_str)
                    temp = data_dict['temp']
                    hum = data_dict['hum']
                    pres = data_dict['pres']
                    incoming_readings['temp'] = temp
                    incoming_readings['hum'] = hum
                    incoming_readings['pres'] = pres
                    print("\nINCOMING READINGS")
                    print("Temperature: {:.1f} C".format(temp))
                    print("Humidity: {:.1f} %".format(hum))
                    print("Pressure: {:.1f} hPa".format(pres))
                    update_display()
                except (ValueError, KeyError) as err:
                    print("Error parsing JSON:", err)
            await asyncio.sleep(0.01)  # Yield if no received messages
        except OSError as err:
            print("Receive error:", err)
            await asyncio.sleep(0.1)  # Shorter delay in case of error

При получении данных мы преобразуем их в JSON-строку. Затем обновляем словарь incoming_readings полученными данными.

Наконец, выводим показания в последовательный монитор и вызываем функцию update_display() для обновления экрана последними показаниями датчиков.

Создание асинхронного цикла и запуск программы

Наконец, создаём главную асинхронную функцию цикла и запускаем программу асинхронно, как в предыдущем примере.

# Run the async program
try:
    print("Starting transceiver...")
    asyncio.run(main(e, peer_mac))
except KeyboardInterrupt:
    print("Stopping transceiver...")
    e.active(False)
    sta.active(False)
except Exception as err:
    print("Main loop error:", err)
finally:
    print("Cleaning up...")
    e.active(False)
    sta.active(False)

Загрузка кода на платы

Важное замечание: простой запуск файла через Thonny не копирует его на файловую систему платы. Это означает, что если вы отключите плату от компьютера и подадите питание, ничего не произойдёт, так как на файловой системе нет сохранённого Python-файла. Функция Run в Thonny IDE полезна для тестирования кода, но если вы хотите загрузить его на плату постоянно, нужно создать и сохранить файл в файловой системе платы.

Чтобы код запускался на платах без подключения к компьютеру, загрузите его в файловую систему платы с именем main.py.

Перейдите в File > Save as…. MicroPython Device.

Thonny IDE ESP32 ESP8266 MicroPython — сохранить файл на устройство

Назовите файл main.py и сохраните на плату.

Сохранение main.py на устройство MicroPython — Thonny IDE

Демонстрация

После загрузки кода на обе платы, на OLED-дисплее должны отобразиться показания датчиков с другой платы, а также сообщение «Delivery Success».

Обмен показаниями датчика BME280 между ESP32 через ESP-NOW

Заключение

Надеемся, что это руководство было для вас полезным. Протокол беспроводной связи ESP-NOW — один из самых простых методов удалённой связи между платами ESP32 без необходимости Wi-Fi-роутера.

В этом руководстве вы научились устанавливать двустороннюю связь между платами. Вы можете легко добавить больше плат в вашу конфигурацию, добавив дополнительных пиров для создания сети плат ESP32, обменивающихся данными друг с другом.

Если вы только начинаете работу с ESP-NOW, рекомендуем начать с нашего руководства по началу работы с ESP-NOW для ESP32 на MicroPython.

Хотите узнать больше о MicroPython с ESP32? Ознакомьтесь с нашими ресурсами: