🔧 阿川の電商水電行
Shopify 顧問、維護與客製化
💡
小任務 / 單次支援方案
單次處理 Shopify 修正/微調
⭐️
維護方案
每月 Shopify 技術支援 + 小修改 + 諮詢
🚀
專案建置
Shopify 功能導入、培訓 + 分階段交付

目次

初步

突然提到,我家的愛犬,會吃自己的便便
聽起來可能令人驚訝,但對於年輕的狗狗來說,這似乎是種常見的習性。

據說這是母犬清理窩的習性留下的習慣,或是野外為了保護自己而隱藏排泄物的行為的遺留。

到此我還能接受。
問題在於吃的地方。沒錯,就是在床上吃
而且不是狗床,而是人類睡的那張床。
因此我曾經一天洗過兩次床單。

雖然它在廁所墊上排泄,
但不知為何會把它特地搬到床上,悠然自得地享用。
(到底為什麼啊!請在墊子上吃啊)

更麻煩的是,它只在無人看見時進行此行為。
雖然有放置寵物攝影機,但也不可能一直監視,當我察覺時事件已經結束了。

因此這次我決定用手頭閒置的樹莓派和USB攝影機,
製作便便檢測警報系統

IMG_0073_2.jpg

我愛的狗。波美拉尼亞犬與斯皮茨的混血,九個月大,十分可愛。

使用的設備

・樹莓派 5
樹莓派 3 或 4 也能製作。
image.png

<br>
・USB攝影機
我使用了剛好閒置的設備。大多數USB攝影機都能正常使用。
image.png

Buffallo BSW305MBK

<br>
・Arduino UNO R4 WiFi
這次使用Arduino是為了能在離廁所墊較遠的房間聽到消息。
稍微花點錢還可以設置直接發送通知到手機的系統,但我太小氣了。
image.png

<br>

系統圖

這次製作的是――「監視狗狗便便並用光線提醒的系統」。
流程如下:

  1. 狗狗便便。
  2. 樹莓派用USB攝影機捕捉那一瞬間。
  3. 樹莓派透過Wi-Fi向Arduino發送「便便檢知標誌」。
  4. Arduino使LED閃爍,提醒主人「事件發生!」。
  5. (為了除錯)可以用手機查看實時影像。

樹莓派24小時不斷監視廁所墊。
每秒報告一次「是否有便便」的結果給Arduino。
如果Arduino聽到「有」,便會立即閃爍LED。
這樣,即使在不同的房間也能透過光線接收到“便便消息”。

image.png

本次製作系統的概要

這次因為有閒置的Arduino,所以使用了它,但也可以使用IFTTT發送通知到手機。

生活變得稍微方便的電子製作 (https://qiita.com/s_iijima/items/035ee89be0ebc9aaf7c9)

<br>

樹莓派的硬體

首先請看攝影機的安裝。

這個黑色的USB攝影機用藍色膠帶固定緊實,正監視著正下方。視線的前方,就是那個廁所墊。

右手邊的白色盒子裡是本次的指揮塔——樹莓派。裡面靜靜地佇立的是,平時作為寵物攝影機使用的eufy。這次它的角色不再是助手,而只是做為觀眾而已。
IMG_3037_2.jpg

<br>
這是本次的系統全景圖。USB攝影機和樹莓派,還有監控目標的廁所墊―― …這本來是如此,卻不知為何我的愛犬也堂而皇之地進入畫面。

這毫無疑問是在說「不如把我一起介紹吧!」的自信表情。而且廁所墊已經使用過,真是實時感。硬體、軟體,連同狗狗一起,全都在運作中。
IMG_3038_2.jpg

<br>

樹莓派的軟體

首先,我將完整的源代碼貼出來,然後再逐塊進行解釋。


import cv2
import numpy as np
import asyncio
import threading
import queue
import httpx
import time
from flask import Flask, Response

# ====== 全局(最新幀的共享)======
# 用於在執行緒間安全共享最新的相機幀的鎖和變數
latest_frame_lock = threading.Lock()
latest_frame = None

# ====== 從攝影機圖像檢測白色廁所墊的區域 ======
def detect_tray_area(frame):
    """
    從幀中檢測白色廁所墊的區域。

    Args:
        frame: 被分析的相機幀 (BGR格式)。

    Returns:
        tuple: (最大的輪廓, 廁所墊區域的遮罩, 顏色檢測遮罩)
               如果未檢測到則返回(None, None, 顏色檢測遮罩)
    """
    # 從BGR轉換到HSV顏色空間
    hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
    # 定義白色的HSV範圍
    lower_white = np.array([0, 0, 150])
    upper_white = np.array([180, 60, 255])
    # 創建一個遮罩以提取指定範圍的顏色
    mask = cv2.inRange(hsv, lower_white, upper_white)

    # 通過形態學變換去除噪音
    kernel = np.ones((5, 5), np.uint8)
    # 開運算處理:去除小的噪音(白點)
    mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel)
    # 關閉運算處理:填補區域內的小孔(黑點)
    mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel)

    # 從遮罩中檢測輪廓
    contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    if not contours:
        return None, None, mask

    # 選擇面積最大的輪廓作為廁所墊
    largest_contour = max(contours, key=cv2.contourArea)
    # 過小的區域不被視為廁所墊 (防止噪音)
    if cv2.contourArea(largest_contour) < 5000:
        return None, None, mask

    # 從最大輪廓創建一個填充內部的遮罩
    contour_mask = np.zeros_like(mask)
    cv2.drawContours(contour_mask, [largest_contour], -1, 255, -1)
    # 為了確保輪廓內部為目標,此遮罩稍微收縮
    contour_mask = cv2.erode(contour_mask, np.ones((15, 15), np.uint8))
    return largest_contour, contour_mask, mask

# ====== OpenCV 執行緒 ======
def opencv_worker(state_queue_latest, stop_event, camera_index=0, show_window=True):
    """
    從相機獲取影像,並進行物體檢測的工作執行緒。
    將檢測狀態的變化發送到佇列,並將最新幀存儲於全局變數中。

    Args:
        state_queue_latest (queue.Queue): 用於發送檢測狀態的佇列。
        stop_event (threading.Event): 用於停止執行緒的事件。
        camera_index (int): 使用的相機索引。
        show_window (bool): 是否在窗口中顯示處理中的影像。
    """
    global latest_frame
    cap = cv2.VideoCapture(camera_index)
    if not cap.isOpened():
        print("無法打開攝影機")
        stop_event.set()
        return

    is_currently_detected = False  # 當前幀中是否檢測到物體
    while not stop_event.is_set():
        ret, frame = cap.read()
        if not ret:
            print("獲取幀失敗。重試中...")
            time.sleep(0.1)
            continue

        # 檢測白色廁所墊的區域
        tray_contour, tray_mask, _ = detect_tray_area(frame)
        if tray_contour is not None and tray_mask is not None:
            # 使用綠色繪製廁所墊的輪廓
            cv2.drawContours(frame, [tray_contour], -1, (0, 255, 0), 2)
            hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)

            # 定義檢測目標(便便)的顏色的HSV範圍(黑色至棕色)
            lower_black = np.array([0, 0, 0])
            upper_black = np.array([180, 255, 70])
            lower_brown = np.array([5, 50, 20])
            upper_brown = np.array([35, 255, 255])

            # 根據顏色範圍創建遮罩
            mask_black = cv2.inRange(hsv, lower_black, upper_black)
            mask_brown = cv2.inRange(hsv, lower_brown, upper_brown)
            # 合成黑色與棕色的遮罩
            unko_mask = cv2.bitwise_or(mask_black, mask_brown)
            # 僅針對廁所墊區域
            unko_mask = cv2.bitwise_and(unko_mask, tray_mask)

            # 去除遮罩的噪音
            kernel = np.ones((5, 5), np.uint8)
            unko_mask = cv2.morphologyEx(unko_mask, cv2.MORPH_OPEN, kernel)
            unko_mask = cv2.morphologyEx(unko_mask, cv2.MORPH_CLOSE, kernel)
            # 從處理後的遮罩中檢測輪廓
            contours, _ = cv2.findContours(unko_mask, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
            detection_count = 0
            for cnt in contours:
                # 過小的輪廓被視為噪音
                if cv2.contourArea(cnt) > 50:
                    detection_count += 1
                    # 使用紅色繪製檢測到的物體的輪廓
                    cv2.drawContours(frame, [cnt], -1, (0, 0, 255), 2)
                    x, y, w, h = cv2.boundingRect(cnt)
                    text_pos = (x + w + 5, y + 20)
                    # 繪製表示檢測的文本
                    cv2.putText(frame, "!!便便檢測到!!", text_pos, cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2, cv2.LINE_AA)

            # 檢測狀態是否變化
            object_found = detection_count > 0
            if object_found != is_currently_detected:
                is_currently_detected = object_found
                # 將檢測狀態作為1(開)或0(關)發送到佇列中
                data_to_send = 0x01 if object_found else 0x00
                try:
                    # 非阻塞方式添加到佇列中
                    state_queue_latest.put(data_to_send, block=False)
                except queue.Full:
                    # 若佇列已滿,丟棄舊數據並只保留最新的數據
                    with state_queue_latest.mutex:
                        state_queue_latest.queue.clear()
                    state_queue_latest.put_nowait(data_to_send)

        # 將最新的處理後的幀存儲到全局變數中(用於Flask共享)
        with latest_frame_lock:
            latest_frame = frame

        # 在本地PC上進行除錯時顯示窗口
        if show_window:
            cv2.imshow('Camera', frame)
            # 如果按下'q'鍵則退出循環
            if cv2.waitKey(1) & 0xFF == ord('q'):
                stop_event.set()
                break

    # 結束處理
    cap.release()
    if show_window:
        cv2.destroyAllWindows()
    print("OpenCV已結束。")

# ====== Flask 伺服器(MJPEG)======
app = Flask(__name__)

def generate_mjpeg():
    """獲取最新幀的全局變數並作為MJPEG流發送的生成器。"""
    global latest_frame
    while True:
        with latest_frame_lock:
            frame = latest_frame
        if frame is None:
            # 等待OpenCV執行緒準備幀
            time.sleep(0.03)
            continue

        # 將幀編碼為JPEG格式
        ok, jpeg = cv2.imencode('.jpg', frame, [int(cv2.IMWRITE_JPEG_QUALITY), 80])
        if not ok:
            time.sleep(0.01)
            continue
        frame_bytes = jpeg.tobytes()
        try:
            # 作為MJPEG流的幀發送
            yield (b'--frame\r\n'
                   b'Content-Type: image/jpeg\r\n\r\n' + frame_bytes + b'\r\n')
        except GeneratorExit:
            # 當瀏覽器關閉連接時
            break
        except Exception:
            # 其他發送錯誤(如暫時的網絡問題等)
            time.sleep(0.01)

@app.route('/')
def index():
    """返回網頁的UI。"""
    return '''<!DOCTYPE html>
<html>
<head>
    <title>便便攝影機 [實時]</title>
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <style>
        body {
            margin: 0;
            background-color: black;
            display: flex;
            flex-direction: column;
            align-items: center;
            justify-content: center;
            height: 100vh;
        }
        .video-container {
            position: relative;
            width: 100%;
            max-height: 100vh;
            display: flex;
            justify-content: center;
            align-items: center;
            background-color: black;
        }
        img {
            width: 100%;
            height: auto;
            object-fit: contain;
            display: block;
        }
        /* REC顯示重疊於影片左上角 */
        .rec {
            position: absolute;
            top: 10px;
            left: 10px;
            color: red;
            font-weight: bold;
            font-size: 1.2em;
            display: flex;
            align-items: center;
            animation: blink 1s step-start infinite;
            background-color: rgba(0,0,0,0.4);
            padding: 4px 8px;
            border-radius: 4px;
        }
        .rec-dot {
            width: 10px;
            height: 10px;
            background-color: red;
            border-radius: 50%;
            margin-right: 5px;
        }
        @keyframes blink {
            50% { opacity: 0; }
        }
        h2 {
            color: white;
            margin: 5px 0;
            font-size: 1.5em;
            text-align: center;
        }
    </style>
</head>
<body>
    <h2>便便攝影機 [實時]</h2>
    <div class="video-container">
        <div class="rec"><div class="rec-dot"></div>REC</div>
        <img src="/video_feed">
    </div>
</body>
</html>'''

@app.route('/video_feed')
def video_feed():
    """發送MJPEG流的端點。"""
    return Response(generate_mjpeg(), mimetype='multipart/x-mixed-replace; boundary=frame')

def flask_worker(stop_event, host='0.0.0.0', port=5000):
    """在不同執行緒中運行Flask網頁伺服器。
    由於是守護線程,主線程結束後會自動終止。
    """
    # Flask無法明確停止,但因為是守護線程,進程結束後會關閉
    print(f"[Flask] 實時直播: http://{host}:{port}/")
    # use_reloader=False 以防止Flask生成子進程
    app.run(host=host, port=port, debug=False, use_reloader=False, threaded=True)

# ====== HTTP通訊任務 ======
async def http_post_task(arduino_ip, state_queue_latest, stop_event):
    """從佇列中獲取檢測狀態,並異步發送HTTP POST到Arduino的任務。
    當狀態變化時,或定期重發。

    Args:
        arduino_ip (str): 目標Arduino的IP地址。
        state_queue_latest (queue.Queue): 儲存檢測狀態的佇列。
        stop_event (threading.Event): 用於停止任務的事件。
    """
    url = f"http://{arduino_ip}/unko"
    current_state = 0  # 0: 關, 1: 開
    last_sent_state = -1  # 上次發送的狀態 (-1表示尚未發送)
    async with httpx.AsyncClient(timeout=5.0) as client:
        print(f"[HTTP] 開始向Arduino ({url}) 發送。")
        while not stop_event.is_set():
            # 從佇列中獲取最新的檢測狀態
            try:
                # 跳過佇列中所有舊數據,只獲取最新值
                while True:
                    state_from_queue = state_queue_latest.get_nowait()
                    current_state = state_from_queue
            except queue.Empty:
                # 佇列為空時不進行任何操作
                pass

            data_to_send = str(current_state).encode('utf-8')
            try:
                # 只在狀態變化或上次發送失敗時發送
                if current_state != last_sent_state:
                    print(f"[HTTP] 發送狀態: {'開' if current_state == 1 else '關'}")

                response = await client.post(url, content=data_to_send)
                response.raise_for_status()

                # 發送成功
                if current_state != last_sent_state:
                    print(f"[HTTP] 發送成功。Arduino的回應: {response.text.strip()}")

                last_sent_state = current_state
            except httpx.RequestError as exc:
                # 超時或連接錯誤等
                print(f"[HTTP] 發送錯誤: {exc}")
                # 重新發送嘗試時重置 last_sent_state
                last_sent_state = -1
            except Exception as e:
                # 其他意外錯誤
                print(f"[HTTP] 意外錯誤: {e}")
                last_sent_state = -1

            # 輕微的等待時間以避免循環的超負荷
            await asyncio.sleep(0.1)
    print("HTTP任務已結束。")

# ====== 主程式 ======
async def main_async():
    """主非同步函數。
    啟動和管理各執行緒(OpenCV、Flask)及非同步任務(HTTP)。
    """
    # --- 設定值 ---
    ARDUINO_IP_ADDRESS = "192.168.0.67"  # 設定Arduino的IP地址
    CAMERA_INDEX = 0                     # 若需切換相機請修改
    SHOW_WINDOW = False                  # 若不顯示伺服器端GUI窗口則為False

    # --- 共享資源的初始化 ---
    # 用於傳遞最新檢測狀態至HTTP任務的佇列 (大小1)
    state_queue_latest = queue.Queue(maxsize=1)
    # 用於安全停止所有執行緒和任務的事件
    stop_event = threading.Event()

    # --- 啟動OpenCV執行緒 ---
    t_cv = threading.Thread(
        target=opencv_worker,
        args=(state_queue_latest, stop_event, CAMERA_INDEX, SHOW_WINDOW),
        daemon=True  # 主線程結束時自動結束
    )
    t_cv.start()

    # --- 啟動Flask執行緒 ---
    t_flask = threading.Thread(
        target=flask_worker,
        args=(stop_event, '0.0.0.0', 5000),
        daemon=True  # 主線程結束時自動結束
    )
    t_flask.start()

    # --- 啟動HTTP非同步任務 ---
    http_task = asyncio.create_task(http_post_task(
        ARDUINO_IP_ADDRESS, state_queue_latest, stop_event
    ))

    try:
        # 主循環:當 stop_event 被設置或 HTTP 任務完成時等待
        while not stop_event.is_set() and not http_task.done():
            await asyncio.sleep(0.2)
    except KeyboardInterrupt:
        # 當按下 Ctrl+C 的處理
        print("\n檢測到鍵盤中斷,開始結束處理...")
    finally:
        # --- 清理處理 ---
        print("向各任務/執行緒發送停止信號...")
        stop_event.set()

        print("取消HTTP任務...")
        http_task.cancel()
        try:
            # 等待取消完成
            await http_task
        except asyncio.CancelledError:
            # 取消是正常的結束路徑
            pass

        # 守護執行緒(t_cv, t_flask)在這裡不需要join(),
        # 結束進程時會自動結束
        print("程序已結束。")

if __name__ == '__main__':
    # 執行非同步主函數
    asyncio.run(main_async())

---

原文出處:https://qiita.com/RaspiKoubou/items/59f4ec31dbd0dd4bbfb9

精選技術文章翻譯,幫助開發者持續吸收新知。

共有 0 則留言


精選技術文章翻譯,幫助開發者持續吸收新知。
🏆 本月排行榜
🥇
站長阿川
📝10   💬10   ❤️3
446
🥈
我愛JS
📝4   💬9   ❤️10
230
🥉
AppleLily
📝1   💬4   ❤️1
63
#4
💬1  
5
#5
xxuan
💬1  
3
評分標準:發文×10 + 留言×3 + 獲讚×5 + 點讚×1 + 瀏覽數÷10
本數據每小時更新一次
🔧 阿川の電商水電行
Shopify 顧問、維護與客製化
💡
小任務 / 單次支援方案
單次處理 Shopify 修正/微調
⭐️
維護方案
每月 Shopify 技術支援 + 小修改 + 諮詢
🚀
專案建置
Shopify 功能導入、培訓 + 分階段交付