若有兴趣土炮一个机房自动发报系统,可参考下方式,当机房过热/过湿,会自动发送,商家(机房)频道的LINE讯息。
简易流程:
- a.DTH22感测器
- b.Arduino nano 控制板
- c.机房内的WINDOWS电脑 (原本有的)
一、购买零件材料1.DHT22 模组 (三脚)2.杜邦线 (母-母)3. Arduino Nano 开发板 (已焊好针脚)4.Mini USB 转 USB TYPE A (此开发板的输出为mini USB)
二、连接感测器 > Arduino > 电脑1. 红线 3.3V2. 黑线接地3. 使用 Arduino Nano D2 接收数位讯号
三、安装 Arduino IDE1.需确定有驱动到连接埠 (相容nano开发板可能需安装 CH340驱动)2.选择使用的开发板跟埠3.范例电脑用NANO相容板与COM3 (依据实际变动)
四、安装 Arduino 的 Libraries1.工具 > 管理程式库2.输入找 DTH sensor library 找到后安装3.输入找 Adafruit DHT sensor library 找到后安装
五、Arduino 运行程式1.复制下方程式码 到 Arduino IDE2.检查后上传3.若上传出现错误,请改选择一下所使用的开发板晶片4.开启连接埠监测视窗
#include <DHT.h>
// 定义 DHT22 感测器型号与连接脚位
#define DHTPIN 2 // DHT22 的数据脚接 Arduino 的 D2 脚位
#define DHTTYPE DHT22 // 感测器型号为 DHT22
// 初始化 DHT22 感测器
DHT dht(DHTPIN, DHTTYPE);
void setup() {
// 设定串列埠速率为 9600
Serial.begin(9600);
// 启动 DHT22 感测器
dht.begin();
}
void loop() {
// 延迟以确保感测器稳定 (至少 2 秒)
delay(2000);
// 读取温度与湿度
float temperature = dht.readTemperature(); // 摄氏温度
float humidity = dht.readHumidity(); // 湿度百分比
// 确认读取是否成功
if (isnan(temperature) || isnan(humidity)) {
return;
}
// 只输出温度与湿度数值
Serial.print(temperature);
Serial.print(",");
Serial.println(humidity);
}
六、测试连接埠监测视窗是否正常回传1.有回传 温度 与 湿度数值2.关闭 Arduino IDE
七、Line Developers 注册后要加入成为商家 (才会有 200则/月 免费讯息)Line Developers 网址连结
建立供应商及供应商下的频道
建立频道,使用 Messaging API
1.建好频道后 在 Basic settings 分页可以看到 Your user ID: 『记下:等等PY程式要填入』2.在 Messaging API 分页可以看到 Channel access token (long-lived) 『记下:等等PY程式要填入』3.其他用QR CODE加入机房群的同仁 user ID 必须过 webhook.site 监听 LINE user ID在 Messaging API 分页可以加入你的 webhook.site 透过讯息传送记录,抓取 LINE user ID关于LINE USER ID-1关于LINE USER ID-2关于Webhook.site
八、机房电脑安装 Python (不想安装也可以透过打包成exe,在机房电脑上执行)GUI画面如下,执行若出现错误则可能需加装缺少python程式库。
- a. 要修改程式内 "输入自己申请的LINEACCESS_TOKEN" 跟 "输入传送LINE user ID1~N"
- b. 机房温度这边是温度 30度发报 湿度80%发报 发报间隔10分钟 (免费只有200则/月)
- c. 路径预设为"C:\\Temperature"
- d. 每一天温/溼度变化会存一个日期.TXT档案可以用EXCEL回看
- e. 图表内X轴 3600点,若每2秒读取一点,只会显示约2小时内的数据变化
import tkinter as tk
from tkinter import ttk, messagebox
import serial
import serial.tools.list_ports
import threading
import datetime
import matplotlib.pyplot as plt
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
import os
import requests
import time
import os
# 设置目录
base_path = "C:\\\\Temperature"
# 检查目录是否存在,如果不存在则创建
if not os.path.exists(base_path):
os.makedirs(base_path)
# 设置工作目录
os.chdir(base_path)
print("目前工作目录:", os.getcwd())
# LINE Messaging API 设定
LINE_CHANNEL_ACCESS_TOKEN = "输入自己申请的LINEACCESS_TOKEN"
user_ids = [
"输入传送LINE user ID1",
"输入传送LINE user ID2",
"输入传送LINE user ID3",
"输入传送LINE user ID4"
]
# 初始化全域变数
serial_port = None
data_lock = threading.Lock()
data_points = []
is_reading = False
last_alert_time = 0 # 追踪最后一次发送LINE通知的时间
# GUI 设定
root = tk.Tk()
root.title("串列埠监控工具")
root.option_add("*Font", "max.ttf")
# 实时数据显示
current_temp = tk.StringVar(value="N/A")
current_humid = tk.StringVar(value="N/A")
# 创建图表
fig, ax1 = plt.subplots()
ax2 = ax1.twinx()
ax1.set_xlabel("TIME ( 3600 POINT )")
ax1.set_ylabel("Temperature (°C)", color="red")
ax2.set_ylabel("Humidity (%)", color="blue")
lines_temp, = ax1.plot([], [], "r-", label="Temperature")
lines_humid, = ax2.plot([], [], "b-", label="Humidity")
# 添加画布到 GUI
canvas = FigureCanvasTkAgg(fig, master=root)
canvas_widget = canvas.get_tk_widget()
canvas_widget.grid(row=5, column=0, columnspan=3, padx=5, pady=5)
# 帮助函式
def scan_ports():
"""扫描可用的COM埠"""
ports = serial.tools.list_ports.comports()
return [port.device for port in ports]
def connect():
"""连接串列埠"""
global serial_port, is_reading
com = com_combobox.get()
baudrate = baudrate_combobox.get()
if not com or not baudrate:
messagebox.showerror("错误", "请选择COM埠与交涉率")
return
try:
serial_port = serial.Serial(com, int(baudrate), timeout=1)
connect_button.config(state=tk.DISABLED)
disconnect_button.config(state=tk.NORMAL)
status_label.config(text=f"已连接到 {com} @ {baudrate}bps", fg="green")
is_reading = True
threading.Thread(target=read_serial, daemon=True).start()
except Exception as e:
messagebox.showerror("错误", f"无法连接:{e}")
def disconnect():
"""断开串列埠"""
global serial_port, is_reading
if serial_port and serial_port.is_open:
serial_port.close()
serial_port = None
is_reading = False
connect_button.config(state=tk.NORMAL)
disconnect_button.config(state=tk.DISABLED)
status_label.config(text="已断开连接", fg="red")
def read_serial():
"""读取串列埠资料"""
global serial_port, data_points, is_reading, last_alert_time
while is_reading and serial_port and serial_port.is_open:
try:
line = serial_port.readline().decode().strip()
if line:
temp, humid = map(float, line.split(","))
current_temp.set(f"{temp:.1f} °C")
current_humid.set(f"{humid:.1f} %")
record_data(temp, humid)
with data_lock:
data_points.append((temp, humid))
if len(data_points) > 3600: # 修改为3600个点
data_points.pop(0)
update_plot()
# 检查过高温度或湿度,并间隔10分钟才发送一次LINE
current_time = time.time()
if (temp > 30 or humid > 80) and (current_time - last_alert_time > 600): # 600秒 = 10分钟
send_line_alert(temp, humid)
last_alert_time = current_time
except Exception as e:
print(f"读取错误:{e}")
break
def record_data(temp, humid):
"""纪录数据到每日档案"""
date_str = datetime.date.today().strftime("%Y-%m-%d")
filename = f"{date_str}.txt"
timestamp = datetime.datetime.now().strftime("%H:%M:%S")
with open(filename, "a") as f:
f.write(f"{timestamp}\\t{temp}\\t{humid}\\n")
def update_plot():
"""更新图表"""
with data_lock:
temps = [point[0] for point in data_points]
humids = [point[1] for point in data_points]
times = list(range(len(data_points)))
lines_temp.set_data(times, temps)
lines_humid.set_data(times, humids)
ax1.set_xlim(max(0, len(times) - 3600), len(times)) # X轴显示最多3600个点
ax1.set_ylim(min(temps, default=0) - 5, max(temps, default=40) + 5)
ax2.set_ylim(min(humids, default=0) - 5, max(humids, default=100) + 5)
canvas.draw()
def send_line_alert(temp, humid):
"""发送LINE通知"""
message = f"警告!温度: {temp:.1f}°C, 湿度: {humid:.1f}%"
headers = {
"Authorization": f"Bearer {LINE_CHANNEL_ACCESS_TOKEN}",
"Content-Type": "application/json"
}
for user_id in user_ids:
payload = {
"to": user_id,
"messages": [{"type": "text", "text": message}]
}
try:
response = requests.post("https://api.line.me/v2/bot/message/push", headers=headers, json=payload)
if response.status_code != 200:
print(f"LINE 通知错误: {response.status_code}, {response.text}")
except Exception as e:
print(f"LINE 通知错误: {e}")
# GUI 元素
com_label = tk.Label(root, text="COM埠:")
com_label.grid(row=0, column=0, padx=5, pady=5, sticky=tk.W)
com_combobox = ttk.Combobox(root, values=scan_ports(), state="readonly")
com_combobox.grid(row=0, column=1, padx=5, pady=5)
baudrate_label = tk.Label(root, text="交涉率:")
baudrate_label.grid(row=1, column=0, padx=5, pady=5, sticky=tk.W)
baudrate_combobox = ttk.Combobox(root, values=["9600", "115200"], state="readonly")
baudrate_combobox.grid(row=1, column=1, padx=5, pady=5)
baudrate_combobox.set("9600")
connect_button = tk.Button(root, text="连接", command=connect)
connect_button.grid(row=2, column=0, padx=5, pady=5)
disconnect_button = tk.Button(root, text="断开", command=disconnect, state=tk.DISABLED)
disconnect_button.grid(row=2, column=1, padx=5, pady=5)
status_label = tk.Label(root, text="未连接", fg="red")
status_label.grid(row=3, column=0, columnspan=2, pady=5)
refresh_button = tk.Button(root, text="刷新COM埠", command=lambda: com_combobox.config(values=scan_ports()))
refresh_button.grid(row=0, column=2, padx=5, pady=5)
real_time_label_temp = tk.Label(root, textvariable=current_temp, fg="red")
real_time_label_temp.grid(row=4, column=0, padx=5, pady=5)
real_time_label_humid = tk.Label(root, textvariable=current_humid, fg="blue")
real_time_label_humid.grid(row=4, column=1, padx=5, pady=5)
# 开始 GUI 事件循环
root.mainloop()