iot-expert by personamanagmentlayer/pcl
npx skills add https://github.com/personamanagmentlayer/pcl --skill iot-expert为物联网系统、嵌入式设备、边缘计算、传感器网络和物联网协议提供专家指导。
import paho.mqtt.client as mqtt
import json
from datetime import datetime
from typing import Callable, Dict
class MQTTClient:
def __init__(self, broker: str, port: int = 1883, client_id: str = "iot_device"):
self.broker = broker
self.port = port
self.client = mqtt.Client(client_id)
self.subscriptions: Dict[str, Callable] = {}
self.client.on_connect = self._on_connect
self.client.on_message = self._on_message
self.client.on_disconnect = self._on_disconnect
def _on_connect(self, client, userdata, flags, rc):
if rc == 0:
print(f"Connected to MQTT broker at {self.broker}:{self.port}")
# Resubscribe to topics on reconnect
for topic in self.subscriptions.keys():
self.client.subscribe(topic)
else:
print(f"Connection failed with code {rc}")
def _on_message(self, client, userdata, msg):
topic = msg.topic
payload = msg.payload.decode()
if topic in self.subscriptions:
try:
data = json.loads(payload)
self.subscriptions[topic](data)
except json.JSONDecodeError:
self.subscriptions[topic](payload)
def _on_disconnect(self, client, userdata, rc):
if rc != 0:
print(f"Unexpected disconnect. Reconnecting...")
def connect(self, username: str = None, password: str = None):
if username and password:
self.client.username_pw_set(username, password)
self.client.connect(self.broker, self.port, 60)
self.client.loop_start()
def publish(self, topic: str, payload: Dict, qos: int = 1, retain: bool = False):
"""Publish message to MQTT topic"""
message = json.dumps(payload)
result = self.client.publish(topic, message, qos=qos, retain=retain)
return result.rc == mqtt.MQTT_ERR_SUCCESS
def subscribe(self, topic: str, callback: Callable, qos: int = 1):
"""Subscribe to MQTT topic with callback"""
self.subscriptions[topic] = callback
self.client.subscribe(topic, qos=qos)
def disconnect(self):
self.client.loop_stop()
self.client.disconnect()
# IoT Device Example
class TemperatureSensor:
def __init__(self, device_id: str, mqtt_client: MQTTClient):
self.device_id = device_id
self.mqtt = mqtt_client
self.topic = f"sensors/temperature/{device_id}"
def read_temperature(self) -> float:
# In real device, read from actual sensor
import random
return round(random.uniform(20.0, 30.0), 2)
def publish_reading(self):
temperature = self.read_temperature()
payload = {
"device_id": self.device_id,
"temperature": temperature,
"unit": "celsius",
"timestamp": datetime.utcnow().isoformat()
}
self.mqtt.publish(self.topic, payload)
return payload
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
// Arduino/ESP32 Example
#include <WiFi.h>
#include <PubSubClient.h>
#include "DHT.h"
#define DHTPIN 4
#define DHTTYPE DHT22
const char* ssid = "YourWiFiSSID";
const char* password = "YourPassword";
const char* mqtt_server = "broker.example.com";
WiFiClient espClient;
PubSubClient client(espClient);
DHT dht(DHTPIN, DHTTYPE);
void setup_wifi() {
delay(10);
Serial.println("Connecting to WiFi...");
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED) {
delay(500);
Serial.print(".");
}
Serial.println("WiFi connected");
Serial.println("IP address: ");
Serial.println(WiFi.localIP());
}
void callback(char* topic, byte* payload, unsigned int length) {
Serial.print("Message arrived [");
Serial.print(topic);
Serial.print("] ");
for (int i = 0; i < length; i++) {
Serial.print((char)payload[i]);
}
Serial.println();
}
void reconnect() {
while (!client.connected()) {
Serial.print("Attempting MQTT connection...");
if (client.connect("ESP32Client")) {
Serial.println("connected");
client.subscribe("device/control");
} else {
Serial.print("failed, rc=");
Serial.print(client.state());
Serial.println(" try again in 5 seconds");
delay(5000);
}
}
}
void setup() {
Serial.begin(115200);
setup_wifi();
client.setServer(mqtt_server, 1883);
client.setCallback(callback);
dht.begin();
}
void loop() {
if (!client.connected()) {
reconnect();
}
client.loop();
// Read sensor every 10 seconds
static unsigned long lastRead = 0;
if (millis() - lastRead > 10000) {
float humidity = dht.readHumidity();
float temperature = dht.readTemperature();
if (!isnan(humidity) && !isnan(temperature)) {
char msg[100];
snprintf(msg, sizeof(msg),
"{\"temperature\":%.2f,\"humidity\":%.2f}",
temperature, humidity);
client.publish("sensors/data", msg);
Serial.println(msg);
}
lastRead = millis();
}
}
import asyncio
from typing import Dict, List
import numpy as np
class EdgeProcessor:
"""Process data at edge before sending to cloud"""
def __init__(self, buffer_size: int = 100):
self.buffer: List[Dict] = []
self.buffer_size = buffer_size
def add_reading(self, reading: Dict):
"""Add sensor reading to buffer"""
self.buffer.append(reading)
if len(self.buffer) >= self.buffer_size:
self.process_buffer()
def process_buffer(self) -> Dict:
"""Process buffered data at edge"""
if not self.buffer:
return {}
# Extract temperature values
temperatures = [r['temperature'] for r in self.buffer]
# Compute statistics at edge
summary = {
"count": len(temperatures),
"mean": np.mean(temperatures),
"std": np.std(temperatures),
"min": np.min(temperatures),
"max": np.max(temperatures),
"anomalies": self.detect_anomalies(temperatures)
}
# Clear buffer
self.buffer = []
return summary
def detect_anomalies(self, values: List[float]) -> List[int]:
"""Detect anomalies using simple threshold"""
mean = np.mean(values)
std = np.std(values)
threshold = 2.5
anomalies = []
for i, v in enumerate(values):
if abs(v - mean) > threshold * std:
anomalies.append(i)
return anomalies
class IoTPipeline:
"""Complete IoT data pipeline"""
def __init__(self, mqtt_client: MQTTClient):
self.mqtt = mqtt_client
self.edge_processor = EdgeProcessor()
self.devices: Dict[str, TemperatureSensor] = {}
def register_device(self, device: TemperatureSensor):
"""Register IoT device"""
self.devices[device.device_id] = device
# Subscribe to device topic
topic = f"sensors/temperature/{device.device_id}"
self.mqtt.subscribe(topic, self.handle_device_data)
def handle_device_data(self, data: Dict):
"""Handle incoming device data"""
# Process at edge
self.edge_processor.add_reading(data)
async def collect_data_loop(self, interval: int = 5):
"""Continuous data collection from devices"""
while True:
for device in self.devices.values():
reading = device.publish_reading()
print(f"Device {device.device_id}: {reading['temperature']}°C")
await asyncio.sleep(interval)
from datetime import datetime
from enum import Enum
class DeviceStatus(Enum):
ONLINE = "online"
OFFLINE = "offline"
MAINTENANCE = "maintenance"
ERROR = "error"
class IoTDevice:
def __init__(self, device_id: str, device_type: str):
self.device_id = device_id
self.device_type = device_type
self.status = DeviceStatus.OFFLINE
self.last_seen = None
self.firmware_version = "1.0.0"
self.metadata = {}
def update_status(self, status: DeviceStatus):
self.status = status
self.last_seen = datetime.utcnow()
def needs_firmware_update(self, latest_version: str) -> bool:
return self.firmware_version < latest_version
class DeviceManager:
def __init__(self):
self.devices: Dict[str, IoTDevice] = {}
def register_device(self, device: IoTDevice):
"""Register new device"""
self.devices[device.device_id] = device
def update_device_heartbeat(self, device_id: str):
"""Update device last seen timestamp"""
if device_id in self.devices:
self.devices[device_id].update_status(DeviceStatus.ONLINE)
def get_offline_devices(self, timeout_seconds: int = 300) -> List[IoTDevice]:
"""Get devices that haven't reported recently"""
offline = []
now = datetime.utcnow()
for device in self.devices.values():
if device.last_seen:
elapsed = (now - device.last_seen).total_seconds()
if elapsed > timeout_seconds:
offline.append(device)
return offline
def schedule_firmware_update(self, device_id: str, new_version: str):
"""Schedule OTA firmware update"""
if device_id in self.devices:
# Send update command via MQTT
payload = {
"command": "firmware_update",
"version": new_version,
"url": f"https://updates.example.com/{new_version}.bin"
}
return payload
❌ 没有电源管理策略 ❌ 未加密的通信 ❌ 对网络故障没有错误处理 ❌ 将所有原始数据发送到云端 ❌ 没有设备身份验证 ❌ 固件中硬编码凭据 ❌ 没有 OTA 更新机制
每周安装次数
69
代码仓库
GitHub 星标数
12
首次出现
2026 年 1 月 24 日
安全审计
安装于
opencode59
codex58
gemini-cli56
cursor54
github-copilot52
kimi-cli49
Expert guidance for IoT systems, embedded devices, edge computing, sensor networks, and IoT protocols.
import paho.mqtt.client as mqtt
import json
from datetime import datetime
from typing import Callable, Dict
class MQTTClient:
def __init__(self, broker: str, port: int = 1883, client_id: str = "iot_device"):
self.broker = broker
self.port = port
self.client = mqtt.Client(client_id)
self.subscriptions: Dict[str, Callable] = {}
self.client.on_connect = self._on_connect
self.client.on_message = self._on_message
self.client.on_disconnect = self._on_disconnect
def _on_connect(self, client, userdata, flags, rc):
if rc == 0:
print(f"Connected to MQTT broker at {self.broker}:{self.port}")
# Resubscribe to topics on reconnect
for topic in self.subscriptions.keys():
self.client.subscribe(topic)
else:
print(f"Connection failed with code {rc}")
def _on_message(self, client, userdata, msg):
topic = msg.topic
payload = msg.payload.decode()
if topic in self.subscriptions:
try:
data = json.loads(payload)
self.subscriptions[topic](data)
except json.JSONDecodeError:
self.subscriptions[topic](payload)
def _on_disconnect(self, client, userdata, rc):
if rc != 0:
print(f"Unexpected disconnect. Reconnecting...")
def connect(self, username: str = None, password: str = None):
if username and password:
self.client.username_pw_set(username, password)
self.client.connect(self.broker, self.port, 60)
self.client.loop_start()
def publish(self, topic: str, payload: Dict, qos: int = 1, retain: bool = False):
"""Publish message to MQTT topic"""
message = json.dumps(payload)
result = self.client.publish(topic, message, qos=qos, retain=retain)
return result.rc == mqtt.MQTT_ERR_SUCCESS
def subscribe(self, topic: str, callback: Callable, qos: int = 1):
"""Subscribe to MQTT topic with callback"""
self.subscriptions[topic] = callback
self.client.subscribe(topic, qos=qos)
def disconnect(self):
self.client.loop_stop()
self.client.disconnect()
# IoT Device Example
class TemperatureSensor:
def __init__(self, device_id: str, mqtt_client: MQTTClient):
self.device_id = device_id
self.mqtt = mqtt_client
self.topic = f"sensors/temperature/{device_id}"
def read_temperature(self) -> float:
# In real device, read from actual sensor
import random
return round(random.uniform(20.0, 30.0), 2)
def publish_reading(self):
temperature = self.read_temperature()
payload = {
"device_id": self.device_id,
"temperature": temperature,
"unit": "celsius",
"timestamp": datetime.utcnow().isoformat()
}
self.mqtt.publish(self.topic, payload)
return payload
// Arduino/ESP32 Example
#include <WiFi.h>
#include <PubSubClient.h>
#include "DHT.h"
#define DHTPIN 4
#define DHTTYPE DHT22
const char* ssid = "YourWiFiSSID";
const char* password = "YourPassword";
const char* mqtt_server = "broker.example.com";
WiFiClient espClient;
PubSubClient client(espClient);
DHT dht(DHTPIN, DHTTYPE);
void setup_wifi() {
delay(10);
Serial.println("Connecting to WiFi...");
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED) {
delay(500);
Serial.print(".");
}
Serial.println("WiFi connected");
Serial.println("IP address: ");
Serial.println(WiFi.localIP());
}
void callback(char* topic, byte* payload, unsigned int length) {
Serial.print("Message arrived [");
Serial.print(topic);
Serial.print("] ");
for (int i = 0; i < length; i++) {
Serial.print((char)payload[i]);
}
Serial.println();
}
void reconnect() {
while (!client.connected()) {
Serial.print("Attempting MQTT connection...");
if (client.connect("ESP32Client")) {
Serial.println("connected");
client.subscribe("device/control");
} else {
Serial.print("failed, rc=");
Serial.print(client.state());
Serial.println(" try again in 5 seconds");
delay(5000);
}
}
}
void setup() {
Serial.begin(115200);
setup_wifi();
client.setServer(mqtt_server, 1883);
client.setCallback(callback);
dht.begin();
}
void loop() {
if (!client.connected()) {
reconnect();
}
client.loop();
// Read sensor every 10 seconds
static unsigned long lastRead = 0;
if (millis() - lastRead > 10000) {
float humidity = dht.readHumidity();
float temperature = dht.readTemperature();
if (!isnan(humidity) && !isnan(temperature)) {
char msg[100];
snprintf(msg, sizeof(msg),
"{\"temperature\":%.2f,\"humidity\":%.2f}",
temperature, humidity);
client.publish("sensors/data", msg);
Serial.println(msg);
}
lastRead = millis();
}
}
import asyncio
from typing import Dict, List
import numpy as np
class EdgeProcessor:
"""Process data at edge before sending to cloud"""
def __init__(self, buffer_size: int = 100):
self.buffer: List[Dict] = []
self.buffer_size = buffer_size
def add_reading(self, reading: Dict):
"""Add sensor reading to buffer"""
self.buffer.append(reading)
if len(self.buffer) >= self.buffer_size:
self.process_buffer()
def process_buffer(self) -> Dict:
"""Process buffered data at edge"""
if not self.buffer:
return {}
# Extract temperature values
temperatures = [r['temperature'] for r in self.buffer]
# Compute statistics at edge
summary = {
"count": len(temperatures),
"mean": np.mean(temperatures),
"std": np.std(temperatures),
"min": np.min(temperatures),
"max": np.max(temperatures),
"anomalies": self.detect_anomalies(temperatures)
}
# Clear buffer
self.buffer = []
return summary
def detect_anomalies(self, values: List[float]) -> List[int]:
"""Detect anomalies using simple threshold"""
mean = np.mean(values)
std = np.std(values)
threshold = 2.5
anomalies = []
for i, v in enumerate(values):
if abs(v - mean) > threshold * std:
anomalies.append(i)
return anomalies
class IoTPipeline:
"""Complete IoT data pipeline"""
def __init__(self, mqtt_client: MQTTClient):
self.mqtt = mqtt_client
self.edge_processor = EdgeProcessor()
self.devices: Dict[str, TemperatureSensor] = {}
def register_device(self, device: TemperatureSensor):
"""Register IoT device"""
self.devices[device.device_id] = device
# Subscribe to device topic
topic = f"sensors/temperature/{device.device_id}"
self.mqtt.subscribe(topic, self.handle_device_data)
def handle_device_data(self, data: Dict):
"""Handle incoming device data"""
# Process at edge
self.edge_processor.add_reading(data)
async def collect_data_loop(self, interval: int = 5):
"""Continuous data collection from devices"""
while True:
for device in self.devices.values():
reading = device.publish_reading()
print(f"Device {device.device_id}: {reading['temperature']}°C")
await asyncio.sleep(interval)
from datetime import datetime
from enum import Enum
class DeviceStatus(Enum):
ONLINE = "online"
OFFLINE = "offline"
MAINTENANCE = "maintenance"
ERROR = "error"
class IoTDevice:
def __init__(self, device_id: str, device_type: str):
self.device_id = device_id
self.device_type = device_type
self.status = DeviceStatus.OFFLINE
self.last_seen = None
self.firmware_version = "1.0.0"
self.metadata = {}
def update_status(self, status: DeviceStatus):
self.status = status
self.last_seen = datetime.utcnow()
def needs_firmware_update(self, latest_version: str) -> bool:
return self.firmware_version < latest_version
class DeviceManager:
def __init__(self):
self.devices: Dict[str, IoTDevice] = {}
def register_device(self, device: IoTDevice):
"""Register new device"""
self.devices[device.device_id] = device
def update_device_heartbeat(self, device_id: str):
"""Update device last seen timestamp"""
if device_id in self.devices:
self.devices[device_id].update_status(DeviceStatus.ONLINE)
def get_offline_devices(self, timeout_seconds: int = 300) -> List[IoTDevice]:
"""Get devices that haven't reported recently"""
offline = []
now = datetime.utcnow()
for device in self.devices.values():
if device.last_seen:
elapsed = (now - device.last_seen).total_seconds()
if elapsed > timeout_seconds:
offline.append(device)
return offline
def schedule_firmware_update(self, device_id: str, new_version: str):
"""Schedule OTA firmware update"""
if device_id in self.devices:
# Send update command via MQTT
payload = {
"command": "firmware_update",
"version": new_version,
"url": f"https://updates.example.com/{new_version}.bin"
}
return payload
❌ No power management strategy ❌ Unencrypted communications ❌ No error handling for network failures ❌ Sending all raw data to cloud ❌ No device authentication ❌ Hard-coded credentials in firmware ❌ No OTA update mechanism
Weekly Installs
69
Repository
GitHub Stars
12
First Seen
Jan 24, 2026
Security Audits
Gen Agent Trust HubPassSocketPassSnykWarn
Installed on
opencode59
codex58
gemini-cli56
cursor54
github-copilot52
kimi-cli49
FastAPI官方技能:Python Web开发最佳实践与CLI工具使用指南
1,200 周安装
TorchTitan:PyTorch原生分布式大语言模型预训练平台,支持4D并行与H100 GPU加速
69 周安装
screenshot 截图技能:跨平台桌面截图工具,支持macOS/Linux权限管理与多模式捕获
69 周安装
tmux进程管理最佳实践:交互式Shell初始化、会话命名与生命周期管理
69 周安装
Git Rebase Sync:安全同步分支的Git变基工具,解决冲突与备份
69 周安装
LinkedIn自动化工具 - Claude Code专属,自然对话拓展人脉,避免垃圾信息
69 周安装
实验流水线框架:4阶段科研实验执行与消融研究方法论 | EvoScientist
69 周安装