本文档详细讲解如何利用树莓派采集 LD2460 毫米波雷达数据,并通过 MQTT 协议实时推送到 ThingsPanel 物联网平台。
最终效果


人体距离传感器的距离测量
1. 整体架构


graph LR
A[HLK-LD2460雷达] -->|串口 UART| B(树莓派 4B)
B -->|MQTT协议| C[ThingsPanel 平台]
subgraph 树莓派处理逻辑
B1[读取串口十六进制数据] --> B2[解析目标坐标与人数]
B2 --> B3[封装JSON数据]
B3 --> B4[MQTT发布]
end
整个系统不仅是简单的透传,还在树莓派边缘端做了智能预处理:仅在数据变化或定时周期到达时上报,有效节省流量和服务器资源。
2. 准备工作
硬件连接
- 将 LD2460 模块通过 USB 线(或 TTL转USB)插入树莓派 USB 口。
- 树莓派会自动识别设备为
/dev/ttyUSB0 或 /dev/ttyACM0。
ThingsPanel 平台配置
在将代码运行之前,你需要在 ThingsPanel 上创建一个设备来接收数据。
登录 ThingsPanel
进入设备管理页面。
创建设备
- 点击“新建设备”。
- 选择或创建一个产品(例如命名为“毫米波雷达”)。
- 输入设备名称(例如
RaspberryPi_Radar)。
获取连接信息
创建成功后,进入设备详情页,找到 MQTT 连接信息(或连接配置)。通常包含:
- 服务器地址 (Broker): 例如
47.115.210.16
- 端口: 默认
1883
- Client ID: 设备唯一标识
- 用户名 (User Name): 认证用户
- 密码 (Password): 认证密钥
💡 提示:这些信息将直接填入我们的 Python 代码中。
3. 核心代码逻辑解析
我们的核心程序 ld2460_pi.py 扮演了“网关”的角色。这里深入讲解它是如何工作的。
第一步:串口数据采集与解析
LD2460 雷达输出的是十六进制字节流。代码通过 pyserial 库读取数据,并按照协议手册进行解码。
- 帧头识别: 程序不断扫描
F4 F3 F2 F1 帧头。
- 载荷解析: 提取 0x13 类型的数据包,其中包含:
- 目标 X/Y 坐标: 原始单位是 dp(分米),代码转换为 cm(厘米)。
- 速度: 目标移动速度。
# 代码片段示意
x_cm = x_dm * 10
y_cm = y_dm * 10
dist = (x_cm**2 + y_cm**2) ** 0.5 # 计算距离
第二步:智能上报策略
为了防止海量雷达数据(10Hz+)淹没服务器,我们在代码中实现了边缘侧过滤:
| 场景 | 行为 | 目的 |
|---|---|---|
| 没人 ↔ 有人 | 立即上报 | 确保告警实时性 |
| 人数变化 | 立即上报 | 实时更新占用情况 |
| 人员持续移动 | 1Hz 频率上报 | 既保证轨迹流畅,又节省90%流量 |
| 无人状态 | 不持续上报 | 极致静默,零流量消耗 |
第三步:MQTT 数据推送
处理好的数据被封装成 ThingsPanel 友好的 JSON 改式,发送到 attributes 或 telemetry 主题(根据平台要求,本例使用 devices/telemetry)。
上报数据格式示例:
{
"people_count": 1,
"min_distance_cm": 120.5,
"targets": [
{
"id": 0,
"x_cm": -30,
"y_cm": 120,
"speed_cm_s": 5
}
]
}
4. 如何部署与运行
配置文件
打开 ld2460_pi.py,找到开头的配置区域,填入在 ThingsPanel 获取的信息:
# MQTT Configuration
MQTT_BROKER = "47.115.210.16" # 你的平台地址
MQTT_PORT = 1883
MQTT_CLIENT_ID = "你的Client_ID"
MQTT_USERNAME = "你的用户名"
MQTT_PASSWORD = "你的密码"
运行服务
我们在树莓派上配置了开机自启服务。
- 启动:
sudo systemctl start ld2460
- 查看日志:
journalctl -u ld2460 -f
当日志显示 [MQTT OK] 时,说明数据已成功推送到云端。
5. 在 ThingsPanel 上看数据
- 实时数据: 进入设备详情的“最新数据”或“属性”面板,可以看到
people_count 和 min_distance_cm 在实时跳动。
- 历史趋势: 点击属性旁边的“历史数据”,可以查看人员活动的时间分布。
- 仪表盘 (Dashboard):
- 拖拽一个 数字组件 绑定
people_count,实时显示当前人数。
- 拖拽一个 曲线图 绑定
min_distance_cm,观察人员进出距离变化。
6. 完整代码
import serial
import time
import binascii
import struct
import json
import logging
import glob
import os
import paho.mqtt.client as mqtt
from logging.handlers import RotatingFileHandler
# --- Configuration ---
# 自动检测串口 (将在 run() 中处理)
BAUD_RATE = 115200
# MQTT Configuration
MQTT_BROKER = "47.115.210.16"
MQTT_PORT = 1883
MQTT_CLIENT_ID = "pi_ld2460_sender" # 更改为唯一ID
MQTT_USERNAME = "a6349640-6b46-1046-f7a"
MQTT_PASSWORD = "bc895fa"
MQTT_TOPIC = "devices/telemetry"
# Protocol Constants
HEADER = b'\xF4\xF3\xF2\xF1'
FOOTER = b'\xF8\xF7\xF6\xF5'
# Setup Logging
LOG_FILE = "ld2460.log"
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
handlers=[
RotatingFileHandler(LOG_FILE, maxBytes=10*1024*1024, backupCount=3),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class LD2460Reader:
def __init__(self):
self.ser = None
self.mqtt_client = None
self.running = True
# 上报频率控制
self.last_presence = None # 上次 presence 状态 (True=有人, False=无人)
self.last_people_count = None # 上次人数
self.last_periodic_report = 0 # 上次定时上报时间
self.REPORT_INTERVAL = 1.0 # 定时上报间隔 (秒), 更改为 1Hz 以降低功耗
# 缓存数据
self.mqtt_connected = False
def find_serial_port(self):
"""尝试自动查找雷达串口"""
# 常见的 USB 串口模式
patterns = ['/dev/ttyUSB*', '/dev/ttyACM*']
for pattern in patterns:
ports = glob.glob(pattern)
if ports:
logger.info(f"Found serial ports: {ports}")
# 简单起见,返回第一个找到的
return ports[0]
return None
def setup_mqtt(self):
try:
logger.info(f"Connecting to MQTT Broker {MQTT_BROKER}...")
self.mqtt_client = mqtt.Client(client_id=MQTT_CLIENT_ID)
self.mqtt_client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD)
self.mqtt_client.on_connect = self.on_mqtt_connect
self.mqtt_client.on_disconnect = self.on_mqtt_disconnect
# 设置遗嘱消息 (Last Will and Testament)
self.mqtt_client.will_set(MQTT_TOPIC, json.dumps({"status": "offline", "reason": "unexpected"}), retain=True)
self.mqtt_client.connect(MQTT_BROKER, MQTT_PORT, 60)
self.mqtt_client.loop_start()
except Exception as e:
logger.error(f"MQTT Connection Failed: {e}")
self.mqtt_client = None
def on_mqtt_connect(self, client, userdata, flags, rc):
if rc == 0:
logger.info("MQTT Connected Successfully")
self.mqtt_connected = True
# 发送上线消息
self.publish_data({"status": "online", "device": "rpi4_ld2460"})
else:
logger.error(f"MQTT Connect Failed with code {rc}")
self.mqtt_connected = False
def on_mqtt_disconnect(self, client, userdata, rc):
logger.warning(f"MQTT Disconnected (rc={rc})")
self.mqtt_connected = False
def publish_data(self, data):
if self.mqtt_client and self.mqtt_connected:
try:
payload = json.dumps(data)
self.mqtt_client.publish(MQTT_TOPIC, payload)
return True
except Exception as e:
logger.error(f"Publish Failed: {e}")
return False
def should_report(self, json_data):
current_count = json_data['people_count']
current_presence = current_count > 0
now = time.time()
immediate_report = False
report_reason = ""
# 1. presence 变化检测
if self.last_presence is not None and current_presence != self.last_presence:
immediate_report = True
report_reason = "presence_change"
# 2. people_count 变化检测
if self.last_people_count is not None and current_count != self.last_people_count:
immediate_report = True
report_reason = "count_change"
# 更新状态
self.last_presence = current_presence
self.last_people_count = current_count
if immediate_report:
self.last_periodic_report = now
return True, report_reason
# 3. 定时上报 (1Hz)
# 只有当有人时才定时上报详细轨迹,为了省电和省流量
if current_presence:
if now - self.last_periodic_report >= self.REPORT_INTERVAL:
self.last_periodic_report = now
return True, "periodic"
return False, ""
def parse_targets(self, payload):
count = 0
targets = []
min_distance_cm = 9999
try:
# Check for Empty / Status Packet
if len(payload) == 4:
return {
"people_count": 0,
"min_distance_cm": 0,
"targets": []
}
header_size = 2
target_size = 6
if len(payload) >= header_size + target_size:
data_bytes = payload[header_size:]
num_targets = len(data_bytes) // target_size
for i in range(num_targets):
base = i * target_size
x_dm = struct.unpack('<h', data_bytes[base:base+2])[0]
y_dm = struct.unpack('<h', data_bytes[base+2:base+4])[0]
speed = struct.unpack('<h', data_bytes[base+4:base+6])[0]
x_cm = x_dm * 10
y_cm = y_dm * 10
targets.append({
"id": i,
"x_cm": x_cm,
"y_cm": y_cm,
"speed_cm_s": speed
})
dist = (x_cm**2 + y_cm**2) ** 0.5
if dist < min_distance_cm:
min_distance_cm = dist
count = len(targets)
except Exception as e:
logger.error(f"Parsing error: {e}")
if min_distance_cm == 9999:
min_distance_cm = 0
data = {
"people_count": count,
"min_distance_cm": round(min_distance_cm, 1) if count > 0 else 0,
"targets": targets
}
return data
def enable_engineering_mode(self):
try:
cmd = b'\xFD\xFC\xFB\xFA\x02\x00\x62\x00\x04\x03\x02\x01'
logger.info("Sending Enable Engineering Mode Command...")
self.ser.write(cmd)
time.sleep(0.5)
except Exception as e:
logger.error(f"Failed to send command: {e}")
def run(self):
self.setup_mqtt()
logger.info("-" * 60)
reconnect_count = 0
while self.running:
try:
# 自动查找串口
port = self.find_serial_port()
while port is None:
logger.warning("No serial port found matching ttyUSB* or ttyACM*. Retrying in 5s...")
time.sleep(5)
port = self.find_serial_port()
logger.info(f"Connecting Serial Port: {port}")
self.ser = serial.Serial(
port=port,
baudrate=BAUD_RATE,
timeout=0.5,
rtscts=False,
dsrdtr=False,
exclusive=True
)
self.ser.rts = False
self.ser.dtr = False
logger.info("Serial Connected. Waiting for stability...")
time.sleep(2.0)
self.ser.reset_input_buffer()
self.ser.reset_output_buffer()
self.enable_engineering_mode()
logger.info("Listening...")
buffer = b""
last_data_time = time.time()
WATCHDOG_TIMEOUT = 10
while self.running:
try:
chunk = self.ser.read(256)
if chunk:
buffer += chunk
last_data_time = time.time()
reconnect_count = 0
else:
if time.time() - last_data_time > WATCHDOG_TIMEOUT:
logger.warning(f"No data for {WATCHDOG_TIMEOUT}s. Re-sending enable command...")
self.enable_engineering_mode()
last_data_time = time.time()
except OSError:
raise
# Parser Logic
while len(buffer) > 8:
try:
start_idx = buffer.index(HEADER)
except ValueError:
buffer = buffer[-3:]
break
if start_idx > 0:
buffer = buffer[start_idx:]
if len(buffer) < 8:
break
len_l = buffer[4]
len_h = buffer[5]
if len_h in [0x0F, 0x13] and len_l < 50:
payload_len = len_l
payload_start = 5
else:
payload_len = len_l + (len_h << 8)
if len(buffer) < 12 + payload_len:
payload_len = len_l
payload_start = 5
else:
payload_start = 6
try:
scan_limit = min(len(buffer), 256)
try:
next_footer = buffer.index(FOOTER, payload_start, scan_limit)
except ValueError:
if len(buffer) > 256:
buffer = buffer[1:]
break
packet_end = next_footer + 4
packet = buffer[:packet_end]
buffer = buffer[packet_end:]
if next_footer - 2 > payload_start:
payload = packet[payload_start : next_footer - 2]
else:
payload = packet[payload_start : next_footer]
if len(payload) > 0:
cmd = payload[0]
if cmd == 0x13:
json_data = self.parse_targets(payload)
need_report, reason = self.should_report(json_data)
if need_report:
self.publish_data(json_data)
reason_cn = {"presence_change": "状态变化", "count_change": "人数变化", "periodic": "定时"}.get(reason, reason)
if json_data['people_count'] > 0:
logger.info(f"[{reason_cn}] Count: {json_data['people_count']} | MinDist: {json_data['min_distance_cm']}cm")
else:
logger.info(f"[{reason_cn}] 无人")
elif cmd == 0x0F:
pass
except ValueError:
break
except OSError as e:
reconnect_count += 1
logger.error(f"System/Serial Error #{reconnect_count}: {e}")
if self.ser and self.ser.is_open:
try:
self.ser.close()
except:
pass
time.sleep(2)
except Exception as e:
reconnect_count += 1
logger.error(f"Unexpected Error #{reconnect_count}: {e}")
time.sleep(2)
if __name__ == "__main__":
# 确保日志目录存在
# 如果作为服务运行,当前目录可能是 /,所以最好使用绝对路径或依赖 WorkingDirectory
reader = LD2460Reader()
try:
reader.run()
except KeyboardInterrupt:
logger.info("Stopping...")