突然提到,我家的愛犬,會吃自己的便便。
聽起來可能令人驚訝,但對於年輕的狗狗來說,這似乎是種常見的習性。
據說這是母犬清理窩的習性留下的習慣,或是野外為了保護自己而隱藏排泄物的行為的遺留。
到此我還能接受。
問題在於吃的地方。沒錯,就是在床上吃。
而且不是狗床,而是人類睡的那張床。
因此我曾經一天洗過兩次床單。
雖然它在廁所墊上排泄,
但不知為何會把它特地搬到床上,悠然自得地享用。
(到底為什麼啊!請在墊子上吃啊)
更麻煩的是,它只在無人看見時進行此行為。
雖然有放置寵物攝影機,但也不可能一直監視,當我察覺時事件已經結束了。
因此這次我決定用手頭閒置的樹莓派和USB攝影機,
製作便便檢測警報系統。
我愛的狗。波美拉尼亞犬與斯皮茨的混血,九個月大,十分可愛。
・樹莓派 5
樹莓派 3 或 4 也能製作。
<br>
・USB攝影機
我使用了剛好閒置的設備。大多數USB攝影機都能正常使用。
Buffallo BSW305MBK
<br>
・Arduino UNO R4 WiFi
這次使用Arduino是為了能在離廁所墊較遠的房間聽到消息。
稍微花點錢還可以設置直接發送通知到手機的系統,但我太小氣了。
<br>
這次製作的是――「監視狗狗便便並用光線提醒的系統」。
流程如下:
樹莓派24小時不斷監視廁所墊。
每秒報告一次「是否有便便」的結果給Arduino。
如果Arduino聽到「有」,便會立即閃爍LED。
這樣,即使在不同的房間也能透過光線接收到“便便消息”。
本次製作系統的概要
這次因為有閒置的Arduino,所以使用了它,但也可以使用IFTTT發送通知到手機。
生活變得稍微方便的電子製作 (https://qiita.com/s_iijima/items/035ee89be0ebc9aaf7c9)
<br>
首先請看攝影機的安裝。
這個黑色的USB攝影機用藍色膠帶固定緊實,正監視著正下方。視線的前方,就是那個廁所墊。
右手邊的白色盒子裡是本次的指揮塔——樹莓派。裡面靜靜地佇立的是,平時作為寵物攝影機使用的eufy。這次它的角色不再是助手,而只是做為觀眾而已。
<br>
這是本次的系統全景圖。USB攝影機和樹莓派,還有監控目標的廁所墊―― …這本來是如此,卻不知為何我的愛犬也堂而皇之地進入畫面。
這毫無疑問是在說「不如把我一起介紹吧!」的自信表情。而且廁所墊已經使用過,真是實時感。硬體、軟體,連同狗狗一起,全都在運作中。
<br>
首先,我將完整的源代碼貼出來,然後再逐塊進行解釋。
import cv2
import numpy as np
import asyncio
import threading
import queue
import httpx
import time
from flask import Flask, Response
# ====== 全局(最新幀的共享)======
# 用於在執行緒間安全共享最新的相機幀的鎖和變數
latest_frame_lock = threading.Lock()
latest_frame = None
# ====== 從攝影機圖像檢測白色廁所墊的區域 ======
def detect_tray_area(frame):
"""
從幀中檢測白色廁所墊的區域。
Args:
frame: 被分析的相機幀 (BGR格式)。
Returns:
tuple: (最大的輪廓, 廁所墊區域的遮罩, 顏色檢測遮罩)
如果未檢測到則返回(None, None, 顏色檢測遮罩)
"""
# 從BGR轉換到HSV顏色空間
hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
# 定義白色的HSV範圍
lower_white = np.array([0, 0, 150])
upper_white = np.array([180, 60, 255])
# 創建一個遮罩以提取指定範圍的顏色
mask = cv2.inRange(hsv, lower_white, upper_white)
# 通過形態學變換去除噪音
kernel = np.ones((5, 5), np.uint8)
# 開運算處理:去除小的噪音(白點)
mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel)
# 關閉運算處理:填補區域內的小孔(黑點)
mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel)
# 從遮罩中檢測輪廓
contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
if not contours:
return None, None, mask
# 選擇面積最大的輪廓作為廁所墊
largest_contour = max(contours, key=cv2.contourArea)
# 過小的區域不被視為廁所墊 (防止噪音)
if cv2.contourArea(largest_contour) < 5000:
return None, None, mask
# 從最大輪廓創建一個填充內部的遮罩
contour_mask = np.zeros_like(mask)
cv2.drawContours(contour_mask, [largest_contour], -1, 255, -1)
# 為了確保輪廓內部為目標,此遮罩稍微收縮
contour_mask = cv2.erode(contour_mask, np.ones((15, 15), np.uint8))
return largest_contour, contour_mask, mask
# ====== OpenCV 執行緒 ======
def opencv_worker(state_queue_latest, stop_event, camera_index=0, show_window=True):
"""
從相機獲取影像,並進行物體檢測的工作執行緒。
將檢測狀態的變化發送到佇列,並將最新幀存儲於全局變數中。
Args:
state_queue_latest (queue.Queue): 用於發送檢測狀態的佇列。
stop_event (threading.Event): 用於停止執行緒的事件。
camera_index (int): 使用的相機索引。
show_window (bool): 是否在窗口中顯示處理中的影像。
"""
global latest_frame
cap = cv2.VideoCapture(camera_index)
if not cap.isOpened():
print("無法打開攝影機")
stop_event.set()
return
is_currently_detected = False # 當前幀中是否檢測到物體
while not stop_event.is_set():
ret, frame = cap.read()
if not ret:
print("獲取幀失敗。重試中...")
time.sleep(0.1)
continue
# 檢測白色廁所墊的區域
tray_contour, tray_mask, _ = detect_tray_area(frame)
if tray_contour is not None and tray_mask is not None:
# 使用綠色繪製廁所墊的輪廓
cv2.drawContours(frame, [tray_contour], -1, (0, 255, 0), 2)
hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
# 定義檢測目標(便便)的顏色的HSV範圍(黑色至棕色)
lower_black = np.array([0, 0, 0])
upper_black = np.array([180, 255, 70])
lower_brown = np.array([5, 50, 20])
upper_brown = np.array([35, 255, 255])
# 根據顏色範圍創建遮罩
mask_black = cv2.inRange(hsv, lower_black, upper_black)
mask_brown = cv2.inRange(hsv, lower_brown, upper_brown)
# 合成黑色與棕色的遮罩
unko_mask = cv2.bitwise_or(mask_black, mask_brown)
# 僅針對廁所墊區域
unko_mask = cv2.bitwise_and(unko_mask, tray_mask)
# 去除遮罩的噪音
kernel = np.ones((5, 5), np.uint8)
unko_mask = cv2.morphologyEx(unko_mask, cv2.MORPH_OPEN, kernel)
unko_mask = cv2.morphologyEx(unko_mask, cv2.MORPH_CLOSE, kernel)
# 從處理後的遮罩中檢測輪廓
contours, _ = cv2.findContours(unko_mask, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
detection_count = 0
for cnt in contours:
# 過小的輪廓被視為噪音
if cv2.contourArea(cnt) > 50:
detection_count += 1
# 使用紅色繪製檢測到的物體的輪廓
cv2.drawContours(frame, [cnt], -1, (0, 0, 255), 2)
x, y, w, h = cv2.boundingRect(cnt)
text_pos = (x + w + 5, y + 20)
# 繪製表示檢測的文本
cv2.putText(frame, "!!便便檢測到!!", text_pos, cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2, cv2.LINE_AA)
# 檢測狀態是否變化
object_found = detection_count > 0
if object_found != is_currently_detected:
is_currently_detected = object_found
# 將檢測狀態作為1(開)或0(關)發送到佇列中
data_to_send = 0x01 if object_found else 0x00
try:
# 非阻塞方式添加到佇列中
state_queue_latest.put(data_to_send, block=False)
except queue.Full:
# 若佇列已滿,丟棄舊數據並只保留最新的數據
with state_queue_latest.mutex:
state_queue_latest.queue.clear()
state_queue_latest.put_nowait(data_to_send)
# 將最新的處理後的幀存儲到全局變數中(用於Flask共享)
with latest_frame_lock:
latest_frame = frame
# 在本地PC上進行除錯時顯示窗口
if show_window:
cv2.imshow('Camera', frame)
# 如果按下'q'鍵則退出循環
if cv2.waitKey(1) & 0xFF == ord('q'):
stop_event.set()
break
# 結束處理
cap.release()
if show_window:
cv2.destroyAllWindows()
print("OpenCV已結束。")
# ====== Flask 伺服器(MJPEG)======
app = Flask(__name__)
def generate_mjpeg():
"""獲取最新幀的全局變數並作為MJPEG流發送的生成器。"""
global latest_frame
while True:
with latest_frame_lock:
frame = latest_frame
if frame is None:
# 等待OpenCV執行緒準備幀
time.sleep(0.03)
continue
# 將幀編碼為JPEG格式
ok, jpeg = cv2.imencode('.jpg', frame, [int(cv2.IMWRITE_JPEG_QUALITY), 80])
if not ok:
time.sleep(0.01)
continue
frame_bytes = jpeg.tobytes()
try:
# 作為MJPEG流的幀發送
yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + frame_bytes + b'\r\n')
except GeneratorExit:
# 當瀏覽器關閉連接時
break
except Exception:
# 其他發送錯誤(如暫時的網絡問題等)
time.sleep(0.01)
@app.route('/')
def index():
"""返回網頁的UI。"""
return '''<!DOCTYPE html>
<html>
<head>
<title>便便攝影機 [實時]</title>
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<style>
body {
margin: 0;
background-color: black;
display: flex;
flex-direction: column;
align-items: center;
justify-content: center;
height: 100vh;
}
.video-container {
position: relative;
width: 100%;
max-height: 100vh;
display: flex;
justify-content: center;
align-items: center;
background-color: black;
}
img {
width: 100%;
height: auto;
object-fit: contain;
display: block;
}
/* REC顯示重疊於影片左上角 */
.rec {
position: absolute;
top: 10px;
left: 10px;
color: red;
font-weight: bold;
font-size: 1.2em;
display: flex;
align-items: center;
animation: blink 1s step-start infinite;
background-color: rgba(0,0,0,0.4);
padding: 4px 8px;
border-radius: 4px;
}
.rec-dot {
width: 10px;
height: 10px;
background-color: red;
border-radius: 50%;
margin-right: 5px;
}
@keyframes blink {
50% { opacity: 0; }
}
h2 {
color: white;
margin: 5px 0;
font-size: 1.5em;
text-align: center;
}
</style>
</head>
<body>
<h2>便便攝影機 [實時]</h2>
<div class="video-container">
<div class="rec"><div class="rec-dot"></div>REC</div>
<img src="/video_feed">
</div>
</body>
</html>'''
@app.route('/video_feed')
def video_feed():
"""發送MJPEG流的端點。"""
return Response(generate_mjpeg(), mimetype='multipart/x-mixed-replace; boundary=frame')
def flask_worker(stop_event, host='0.0.0.0', port=5000):
"""在不同執行緒中運行Flask網頁伺服器。
由於是守護線程,主線程結束後會自動終止。
"""
# Flask無法明確停止,但因為是守護線程,進程結束後會關閉
print(f"[Flask] 實時直播: http://{host}:{port}/")
# use_reloader=False 以防止Flask生成子進程
app.run(host=host, port=port, debug=False, use_reloader=False, threaded=True)
# ====== HTTP通訊任務 ======
async def http_post_task(arduino_ip, state_queue_latest, stop_event):
"""從佇列中獲取檢測狀態,並異步發送HTTP POST到Arduino的任務。
當狀態變化時,或定期重發。
Args:
arduino_ip (str): 目標Arduino的IP地址。
state_queue_latest (queue.Queue): 儲存檢測狀態的佇列。
stop_event (threading.Event): 用於停止任務的事件。
"""
url = f"http://{arduino_ip}/unko"
current_state = 0 # 0: 關, 1: 開
last_sent_state = -1 # 上次發送的狀態 (-1表示尚未發送)
async with httpx.AsyncClient(timeout=5.0) as client:
print(f"[HTTP] 開始向Arduino ({url}) 發送。")
while not stop_event.is_set():
# 從佇列中獲取最新的檢測狀態
try:
# 跳過佇列中所有舊數據,只獲取最新值
while True:
state_from_queue = state_queue_latest.get_nowait()
current_state = state_from_queue
except queue.Empty:
# 佇列為空時不進行任何操作
pass
data_to_send = str(current_state).encode('utf-8')
try:
# 只在狀態變化或上次發送失敗時發送
if current_state != last_sent_state:
print(f"[HTTP] 發送狀態: {'開' if current_state == 1 else '關'}")
response = await client.post(url, content=data_to_send)
response.raise_for_status()
# 發送成功
if current_state != last_sent_state:
print(f"[HTTP] 發送成功。Arduino的回應: {response.text.strip()}")
last_sent_state = current_state
except httpx.RequestError as exc:
# 超時或連接錯誤等
print(f"[HTTP] 發送錯誤: {exc}")
# 重新發送嘗試時重置 last_sent_state
last_sent_state = -1
except Exception as e:
# 其他意外錯誤
print(f"[HTTP] 意外錯誤: {e}")
last_sent_state = -1
# 輕微的等待時間以避免循環的超負荷
await asyncio.sleep(0.1)
print("HTTP任務已結束。")
# ====== 主程式 ======
async def main_async():
"""主非同步函數。
啟動和管理各執行緒(OpenCV、Flask)及非同步任務(HTTP)。
"""
# --- 設定值 ---
ARDUINO_IP_ADDRESS = "192.168.0.67" # 設定Arduino的IP地址
CAMERA_INDEX = 0 # 若需切換相機請修改
SHOW_WINDOW = False # 若不顯示伺服器端GUI窗口則為False
# --- 共享資源的初始化 ---
# 用於傳遞最新檢測狀態至HTTP任務的佇列 (大小1)
state_queue_latest = queue.Queue(maxsize=1)
# 用於安全停止所有執行緒和任務的事件
stop_event = threading.Event()
# --- 啟動OpenCV執行緒 ---
t_cv = threading.Thread(
target=opencv_worker,
args=(state_queue_latest, stop_event, CAMERA_INDEX, SHOW_WINDOW),
daemon=True # 主線程結束時自動結束
)
t_cv.start()
# --- 啟動Flask執行緒 ---
t_flask = threading.Thread(
target=flask_worker,
args=(stop_event, '0.0.0.0', 5000),
daemon=True # 主線程結束時自動結束
)
t_flask.start()
# --- 啟動HTTP非同步任務 ---
http_task = asyncio.create_task(http_post_task(
ARDUINO_IP_ADDRESS, state_queue_latest, stop_event
))
try:
# 主循環:當 stop_event 被設置或 HTTP 任務完成時等待
while not stop_event.is_set() and not http_task.done():
await asyncio.sleep(0.2)
except KeyboardInterrupt:
# 當按下 Ctrl+C 的處理
print("\n檢測到鍵盤中斷,開始結束處理...")
finally:
# --- 清理處理 ---
print("向各任務/執行緒發送停止信號...")
stop_event.set()
print("取消HTTP任務...")
http_task.cancel()
try:
# 等待取消完成
await http_task
except asyncio.CancelledError:
# 取消是正常的結束路徑
pass
# 守護執行緒(t_cv, t_flask)在這裡不需要join(),
# 結束進程時會自動結束
print("程序已結束。")
if __name__ == '__main__':
# 執行非同步主函數
asyncio.run(main_async())
---
原文出處:https://qiita.com/RaspiKoubou/items/59f4ec31dbd0dd4bbfb9