🔧 阿川の電商水電行
Shopify 顧問、維護與客製化
💡
小任務 / 單次支援方案
單次處理 Shopify 修正/微調
⭐️
維護方案
每月 Shopify 技術支援 + 小修改 + 諮詢
🚀
專案建置
Shopify 功能導入、培訓 + 分階段交付

「周更第5期」實用JS庫推薦:RxJS

RxJS

引言

大家好,歡迎來到第5期的JavaScript庫推薦!本期為大家介紹的是 RxJS,一個用於響應式編程的強大JavaScript庫,它透過Observable序列來處理異步和基於事件的程序。

在日常開發中,我們經常遇到複雜的異步操作、事件處理、數據流管理等需求。傳統的解決方案往往存在回調地獄、狀態管理混亂、錯誤處理困難等問題。RxJS 正是為了解決這些痛點而生的,它以其強大的操作符、優雅的函數式編程風格和統一的異步處理模式在響應式編程領域中脫穎而出,成為了處理複雜異步邏輯的首選方案。

本文將從RxJS的核心特性、實際應用、性能表現、最佳實踐等多個維度進行深入分析,幫助你全面了解這個優秀的響應式編程庫。

庫介紹

基本信息

主要特性

  • 🚀 響應式編程:基於Observable模式,優雅處理異步數據流
  • 💡 豐富的操作符:提供100+個操作符,滿足各種數據轉換需求
  • 🔧 函數式編程:支持鏈式調用,代碼簡潔易讀
  • 📱 統一異步模型:將Promise、事件、定時器等統一為Observable
  • 🎯 強大的錯誤處理:內置完善的錯誤處理和重試機制
  • 性能優化:支持背壓控制和內存管理

兼容性

  • 瀏覽器支持:支持所有現代瀏覽器 (ES5+)
  • Node.js支持:Node.js 14.20.0+
  • 框架兼容:與React、Vue、Angular等主流框架完美集成
  • TypeScript支持:完整的TypeScript類型定義

架構原理

核心概念深入解析

Observable(可觀察對象)

// Observable的本質:一個可以發出多個值的數據流
const observable = new Observable(observer => {
  // 生產者邏輯
  observer.next('第一個值');
  observer.next('第二個值');

  // 異步發送
  setTimeout(() => {
    observer.next('延遲的值');
    observer.complete(); // 標記完成
  }, 1000);

  // 返回清理函數
  return () => {
    console.log('Observable被取消訂閱,執行清理');
  };
});

Observer(觀察者)

// Observer是一個包含三個回調函數的對象
const observer = {
  next: value => console.log('接收到值:', value),
  error: err => console.error('發生錯誤:', err),
  complete: () => console.log('流完成')
};

// 訂閱建立Observable和Observer之間的連接
const subscription = observable.subscribe(observer);

Subscription(訂閱)

// Subscription代表Observable的執行,可以用來取消執行
const subscription = observable.subscribe(observer);

// 取消訂閱,釋放資源
subscription.unsubscribe();

// 組合多個訂閱
const mainSubscription = new Subscription();
mainSubscription.add(subscription1);
mainSubscription.add(subscription2);
// 一次性取消所有訂閱
mainSubscription.unsubscribe();

數據流處理機制

冷Observable vs 熱Observable

// 冷Observable:每次訂閱都會重新執行
const coldObservable = new Observable(observer => {
  console.log('Observable執行'); // 每次訂閱都會打印
  observer.next(Math.random());
});

coldObservable.subscribe(value => console.log('訂閱者1:', value));
coldObservable.subscribe(value => console.log('訂閱者2:', value)); // 輸出:兩個不同的隨機數

// 熱Observable:多個訂閱者共享同一個執行
const subject = new Subject();
subject.subscribe(value => console.log('訂閱者1:', value));
subject.subscribe(value => console.log('訂閱者2:', value));
subject.next('共享的值'); // 兩個訂閱者都會收到相同的值

操作符的工作原理

// 操作符本質上是返回新Observable的高階函數
const customMap = (transformFn) => {
  return (source) => {
    return new Observable(observer => {
      return source.subscribe({
        next: value => observer.next(transformFn(value)),
        error: err => observer.error(err),
        complete: () => observer.complete()
      });
    });
  };
};

// 使用自定義操作符
of(1, 2, 3).pipe(
  customMap(x => x * 2)
).subscribe(console.log); // 輸出: 2, 4, 6

與其他庫的對比分析

RxJS vs Promise

特性 Promise RxJS Observable
數據量 單個值 多個值的流
取消性 不可取消 可取消
懶執行 立即執行 懶執行(訂閱時才執行)
操作符 then/catch 100+豐富操作符
錯誤處理 catch 多種錯誤處理策略
// Promise示例
fetch('/api/data')
  .then(response => response.json())
  .then(data => console.log(data))
  .catch(error => console.error(error));

// RxJS等價實現
ajax.getJSON('/api/data').pipe(
  retry(3), // Promise無法實現的重試
  timeout(5000), // 超時控制
  catchError(error => of({ error: '請求失敗' }))
).subscribe(data => console.log(data));

RxJS vs EventEmitter

// EventEmitter方式
const emitter = new EventEmitter();
emitter.on('data', data => console.log(data));
emitter.emit('data', 'hello');

// RxJS方式 - 更強大的數據處理能力
const subject = new Subject();
subject.pipe(
  filter(data => data.length > 3), // 過濾
  map(data => data.toUpperCase()), // 轉換
  debounceTime(300) // 防抖
).subscribe(data => console.log(data));

subject.next('hello');

RxJS vs Async/Await

// Async/Await處理多個異步操作
async function fetchUserData() {
  try {
    const user = await fetch('/api/user').then(r => r.json());
    const posts = await fetch(`/api/users/${user.id}/posts`).then(r => r.json());
    const comments = await fetch(`/api/posts/${posts[0].id}/comments`).then(r => r.json());
    return { user, posts, comments };
  } catch (error) {
    console.error(error);
  }
}

// RxJS處理複雜的異步流
const fetchUserDataRx = () => {
  return ajax.getJSON('/api/user').pipe(
    switchMap(user => forkJoin({
      user: of(user),
      posts: ajax.getJSON(`/api/users/${user.id}/posts`),
      comments: ajax.getJSON(`/api/posts/${user.posts[0]?.id}/comments`)
    })),
    retry(3),
    timeout(10000),
    catchError(error => of({ error: '數據加載失敗' }))
  );
};

生態系統

相關庫和工具

  • @ngrx/store:基於RxJS的Angular狀態管理
  • redux-observable:Redux的RxJS中間件
  • rxjs-hooks:React中使用RxJS的Hook庫
  • rx-angular:Angular的RxJS增強工具集
  • rxjs-spy:RxJS調試工具

學習資源

  • 官方文檔:詳細的API文檔和指南
  • RxJS Marbles:可視化學習操作符
  • RxJS DevTools:瀏覽器調試擴展
  • 社區教程:大量的博客文章和視頻教程

安裝使用

安裝方式

# npm (推薦安裝最新穩定版 7.8+)
npm install rxjs

# yarn
yarn add rxjs

# pnpm
pnpm add rxjs

版本說明: 本文基於 RxJS 7.8+ 版本編寫。RxJS 7.2+ 支持從 'rxjs' 直接導入所有操作符。

基礎使用

1. 導入庫

// RxJS 7.2+ 推薦方式:從 'rxjs' 直接導入所有內容
import { Observable, of, from, interval, map, filter, take } from 'rxjs';

// 兼容舊版本的分開導入方式
// import { Observable } from 'rxjs';
// import { map, filter, take } from 'rxjs/operators';
// import { of, from, interval } from 'rxjs';

// CommonJS 導入
const { Observable, of, map, filter } = require('rxjs');

2. 基礎示例

// 示例1:創建簡單的Observable
const createBasicObservable = () => {
  // 步驟1:使用of創建Observable
  const source$ = of(1, 2, 3, 4, 5);

  // 步驟2:訂閱Observable
  const subscription = source$.subscribe({
    next: value => console.log('接收到值:', value),
    error: err => console.error('發生錯誤:', err),
    complete: () => console.log('流完成')
  });

  return subscription;
};

// 示例2:使用操作符轉換數據
const transformData = () => {
  const source$ = of(1, 2, 3, 4, 5);

  // 鏈式調用操作符
  const result$ = source$.pipe(
    filter(x => x % 2 === 0), // 過濾偶數
    map(x => x * 2),          // 每個值乘以2
    take(2)                   // 只取前2個值
  );

  result$.subscribe(value => console.log('轉換後的值:', value));
};

3. 配置選項

// Observable創建配置
const customObservable = new Observable(observer => {
  // 發送數據
  observer.next('Hello');
  observer.next('World');

  // 完成流
  observer.complete();

  // 清理函數(可選)
  return () => {
    console.log('Observable被取消訂閱');
  };
});

// 訂閱配置
const subscription = customObservable.subscribe({
  next: value => console.log(value),
  error: err => console.error(err),
  complete: () => console.log('完成')
});

樣實際應用

應用場景1:HTTP請求處理

在Web應用中,我們經常需要處理HTTP請求、錯誤重試、請求取消等複雜邏輯。RxJS可以優雅地解決這些問題。

// 完整的HTTP請求處理示例
// 包含錯誤重試、超時處理、請求取消

import { ajax } from 'rxjs/ajax';
import { retry, timeout, catchError, map } from 'rxjs/operators';
import { of } from 'rxjs';

const fetchUserData = (userId) => {
  return ajax.getJSON(`/api/users/${userId}`).pipe(
    // 設置5秒超時
    timeout(5000),
    // 提取需要的數據
    map(response => ({
      id: response.id,
      name: response.name,
      email: response.email
    })),
    // 失敗時重試3次
    retry(3),
    // 錯誤處理
    catchError(error => {
      console.error('獲取用戶數據失敗:', error);
      return of({ id: null, name: '未知用戶', email: '' });
    })
  );
};

// 使用示例
const userSubscription = fetchUserData(123).subscribe({
  next: userData => {
    console.log('用戶數據:', userData);
    // 更新UI
  },
  error: err => console.error('最終錯誤:', err)
});

// 可以取消請求
// userSubscription.unsubscribe();

應用場景2:實時搜索功能

// 實時搜索功能實現
// 包含防抖、去重、錯誤處理

import { fromEvent, of } from 'rxjs';
import { debounceTime, distinctUntilChanged, switchMap, filter, map, catchError } from 'rxjs/operators';
import { ajax } from 'rxjs/ajax';

const implementSearchFeature = () => {
  const searchInput = document.getElementById('search-input');
  const searchResults = document.getElementById('search-results');

  // 創建輸入事件流
  const searchTerm$ = fromEvent(searchInput, 'input').pipe(
    // 提取輸入值
    map(event => event.target.value.trim()),
    //過濾空值
    filter(term => term.length > 2),
    // 防抖:等待300ms無新輸入後才觸發搜索
    debounceTime(300),
    // 去重:相同搜索詞不重複請求
    distinctUntilChanged()
  );

  // 搜索邏輯
  const searchResults$ = searchTerm$.pipe(
    // switchMap會自動取消之前未完成的請求,只保留最新請求的結果
    switchMap(term =>
      ajax.getJSON(`/api/search?q=${encodeURIComponent(term)}`).pipe(
        map(response => response.results),
        catchError(error => {
          console.error('搜索失敗:', error);
          return of([]);
        })
      )
    )
  );

  // 訂閱搜索結果
  searchResults$.subscribe(results => {
    // 更新搜索結果UI
    searchResults.innerHTML = results
      .map(item => `<div class="result-item">${item.title}</div>`)
      .join('');
  });
};

應用場景3:WebSocket實時數據處理

// WebSocket實時數據流處理
// 包含連接管理、重連機制、數據過濾

import { webSocket } from 'rxjs/webSocket';
import { retryWhen, delay, take, filter, map } from 'rxjs/operators';

const createWebSocketConnection = (url) => {
  const subject = webSocket({
    url: url,
    openObserver: {
      next: () => console.log('WebSocket連接已建立')
    },
    closeObserver: {
      next: () => console.log('WebSocket連接已關閉')
    }
  });

  // 添加重連機制
  const messages$ = subject.pipe(
    retryWhen(errors =>
      errors.pipe(
        delay(1000), // 1秒後重試
        take(5)      // 最多重試5次
      )
    )
  );

  return {
    // 發送消息
    send: message => subject.next(message),
    // 接收特定類型的消息
    getMessages: messageType => messages$.pipe(
      filter(msg => msg.type === messageType),
      map(msg => msg.data)
    ),
    // 關閉連接
    close: () => subject.complete()
  };
};

// 使用示例
const wsConnection = createWebSocketConnection('ws://localhost:8080');

// 監聽股票價格更新
wsConnection.getMessages('stock-price').subscribe(priceData => {
  console.log('股票價格更新:', priceData);
  // 更新價格顯示
});

// 發送訂閱消息
wsConnection.send({
  type: 'subscribe',
  symbol: 'AAPL'
});

應用場景4:複雜狀態管理

// 使用RxJS實現複雜的應用狀態管理
// 類似Redux但更靈活的狀態管理方案

import { BehaviorSubject, combineLatest } from 'rxjs';
import { map, scan, shareReplay } from 'rxjs/operators';

class StateManager {
  constructor() {
    // 私有狀態主題
    this.userSubject = new BehaviorSubject(null);
    this.todosSubject = new BehaviorSubject([]);
    this.loadingSubject = new BehaviorSubject(false);
    this.errorSubject = new BehaviorSubject(null);
  }

  // 公共狀態流
  user$ = this.userSubject.asObservable();
  todos$ = this.todosSubject.asObservable();
  loading$ = this.loadingSubject.asObservable();
  error$ = this.errorSubject.asObservable();

  // 組合狀態
  state$ = combineLatest([this.user$, this.todos$, this.loading$, this.error$]).pipe(
    map(([user, todos, loading, error]) => ({
      user,
      todos,
      loading,
      error
    })),
    shareReplay(1) // 緩存最新狀態
  );

  // 狀態更新方法
  setUser(user) {
    this.userSubject.next(user);
  }

  addTodo(todo) {
    const currentTodos = this.todosSubject.value;
    this.todosSubject.next([...currentTodos, todo]);
  }

  setLoading(loading) {
    this.loadingSubject.next(loading);
  }

  setError(error) {
    this.errorSubject.next(error);
  }
}

// 使用狀態管理器
const stateManager = new StateManager();

// 監聽狀態變化
stateManager.state$.subscribe(state => {
  console.log('應用狀態更新:', state);
  // 更新UI
});

// 更新狀態
stateManager.setUser({ id: 1, name: 'John' });
stateManager.addTodo({ id: 1, text: '學習RxJS', completed: false });

應用場景5:複雜表單驗證

// 實時表單驗證系統
// 支持異步驗證、防抖、依賴驗證等

import { fromEvent, combineLatest, of } from 'rxjs';
import {
  debounceTime,
  distinctUntilChanged,
  switchMap,
  map,
  startWith,
  catchError
} from 'rxjs/operators';
import { ajax } from 'rxjs/ajax';

class FormValidator {
  constructor() {
    this.setupValidation();
  }

  setupValidation() {
    // 獲取表單元素
    const emailInput = document.getElementById('email');
    const passwordInput = document.getElementById('password');
    const confirmPasswordInput = document.getElementById('confirmPassword');

    // 創建輸入流
    const email$ = fromEvent(emailInput, 'input').pipe(
      map(e => e.target.value),
      debounceTime(300),
      distinctUntilChanged()
    );

    const password$ = fromEvent(passwordInput, 'input').pipe(
      map(e => e.target.value),
      debounceTime(300),
      distinctUntilChanged()
    );

    const confirmPassword$ = fromEvent(confirmPasswordInput, 'input').pipe(
      map(e => e.target.value),
      debounceTime(300),
      distinctUntilChanged()
    );

    // 郵箱驗證(包含異步驗證)
    const emailValidation$ = email$.pipe(
      switchMap(email => {
        if (!email) {
          return of({ valid: false, error: '郵箱不能為空' });
        }
        if (!this.isValidEmail(email)) {
          return of({ valid: false, error: '郵箱格式不正確' });
        }
        // 異步驗證郵箱是否已存在
        return this.checkEmailExists(email).pipe(
          map(exists => (exists ? { valid: false, error: '郵箱已存在' } : { valid: true, error: null })),
          catchError(() => of({ valid: false, error: '驗證失敗,請重試' }))
        );
      }),
      startWith({ valid: false, error: null })
    );

    // 密碼驗證
    const passwordValidation$ = password$.pipe(
      map(password => {
        if (!password) {
          return { valid: false, error: '密碼不能為空' };
        }
        if (password.length < 8) {
          return { valid: false, error: '密碼至少8位' };
        }
        if (!/(?=.*[a-z])(?=.*[A-Z])(?=.*\d)/.test(password)) {
          return { valid: false, error: '密碼必須包含大小寫字母和數字' };
        }
        return { valid: true, error: null };
      }),
      startWith({ valid: false, error: null })
    );

    // 確認密碼驗證(依賴密碼字段)
    const confirmPasswordValidation$ = combineLatest([password$, confirmPassword$]).pipe(
      map(([password, confirmPassword]) => {
        if (!confirmPassword) {
          return { valid: false, error: '請確認密碼' };
        }
        if (password !== confirmPassword) {
          return { valid: false, error: '兩次密碼不一致' };
        }
        return { valid: true, error: null };
      }),
      startWith({ valid: false, error: null })
    );

    // 整體表單驗證狀態
    const formValidation$ = combineLatest([emailValidation$, passwordValidation$, confirmPasswordValidation$]).pipe(
      map(([email, password, confirmPassword]) => ({
        email,
        password,
        confirmPassword,
        isValid: email.valid && password.valid && confirmPassword.valid
      }))
    );

    // 訂閱驗證結果,更新UI
    formValidation$.subscribe(validation => {
      this.updateValidationUI(validation);
      this.toggleSubmitButton(validation.isValid);
    });
  }

  isValidEmail(email) {
    return /^[^\s@]+@[^\s@]+\.[^\s@]+$/.test(email);
  }

  checkEmailExists(email) {
    return ajax.getJSON(`/api/check-email?email=${email}`).pipe(
      map(response => response.exists)
    );
  }

  updateValidationUI(validation) {
    // 更新各字段的驗證狀態顯示
    Object.keys(validation).forEach(field => {
      if (field !== 'isValid') {
        const fieldValidation = validation[field];
        const errorElement = document.getElementById(`${field}-error`);
        const inputElement = document.getElementById(field);
        if (fieldValidation.error) {
          errorElement.textContent = fieldValidation.error;
          inputElement.classList.add('error');
        } else {
          errorElement.textContent = '';
          inputElement.classList.remove('error');
        }
      }
    });
  }

  toggleSubmitButton(isValid) {
    const submitButton = document.getElementById('submit');
    submitButton.disabled = !isValid;
  }
}

應用場景6:數據流處理管道

// 複雜數據處理管道
// 處理大量數據的轉換、過濾、聚合等操作

import { from, interval, merge, of } from 'rxjs';
import {
  map,
  filter,
  groupBy,
  mergeMap,
  reduce,
  bufferTime,
  scan,
  share
} from 'rxjs/operators';

// 模擬數據源
const generateSensorData = () => {
  return interval(100).pipe(
    map(i => ({
      id: `sensor_${Math.floor(Math.random() * 5) + 1}`,
      timestamp: Date.now(),
      temperature: 20 + Math.random() * 15,
      humidity: 40 + Math.random() * 30,
      pressure: 1000 + Math.random() * 50
    }))
  );
};

// 數據處理管道
class DataProcessingPipeline {
  constructor() {
    this.setupPipeline();
  }

  setupPipeline() {
    const sensorData$ = generateSensorData().pipe(share());

    // 1. 實時數據監控
    const realtimeMonitoring$ = sensorData$.pipe(
      filter(data => data.temperature > 30), // 過濾高溫數據
      map(data => ({
        ...data,
        alert: 'HIGH_TEMPERATURE',
        severity: data.temperature > 35 ? 'CRITICAL' : 'WARNING'
      }))
    );

    // 2. 按傳感器分組統計
    const sensorStats$ = sensorData$.pipe(
      groupBy(data => data.id),
      mergeMap(group =>
        group.pipe(
          bufferTime(5000), // 每5秒統計一次
          filter(buffer => buffer.length > 0),
          map(buffer => ({
            sensorId: group.key,
            count: buffer.length,
            avgTemperature: buffer.reduce((sum, item) => sum + item.temperature, 0) / buffer.length,
            avgHumidity: buffer.reduce((sum, item) => sum + item.humidity, 0) / buffer.length,
            maxTemperature: Math.max(...buffer.map(item => item.temperature)),
            minTemperature: Math.min(...buffer.map(item => item.temperature))
          }))
        )
      )
    );

    // 3. 累積統計
    const cumulativeStats$ = sensorData$.pipe(
      scan((acc, current) => {
        acc.totalReadings++;
        acc.totalTemperature += current.temperature;
        acc.avgTemperature = acc.totalTemperature / acc.totalReadings;

        if (current.temperature > acc.maxTemperature) {
          acc.maxTemperature = current.temperature;
        }
        if (current.temperature < acc.minTemperature) {
          acc.minTemperature = current.temperature;
        }

        return acc;
      }, {
        totalReadings: 0,
        totalTemperature: 0,
        avgTemperature: 0,
        maxTemperature: -Infinity,
        minTemperature: Infinity
      })
    );

    // 4. 異常檢測
    const anomalyDetection$ = sensorData$.pipe(
      bufferTime(2000),
      filter(buffer => buffer.length > 0),
      map(buffer => {
        const avgTemp = buffer.reduce((sum, item) => sum + item.temperature, 0) / buffer.length;
        const anomalies = buffer.filter(item => Math.abs(item.temperature - avgTemp) > 5); // 偏差超過5度認為異常

        return {
          timestamp: Date.now(),
          periodAvg: avgTemp,
          anomalies,
          anomalyCount: anomalies.length
        };
      }),
      filter(result => result.anomalyCount > 0)
    );

    // 訂閱各種數據流
    realtimeMonitoring$.subscribe(alert => {
      console.log('🚨 溫度警報:', alert);
      this.sendAlert(alert);
    });

    sensorStats$.subscribe(stats => {
      console.log('📊 傳感器統計:', stats);
      this.updateDashboard(stats);
    });

    cumulativeStats$.subscribe(stats => {
      console.log('📈 累積統計:', stats);
      this.updateOverallStats(stats);
    });

    anomalyDetection$.subscribe(anomaly => {
      console.log('⚠️ 異常檢測:', anomaly);
      this.handleAnomaly(anomaly);
    });
  }

  sendAlert(alert) {
    // 發送警報邏輯
  }

  updateDashboard(stats) {
    // 更新儀表板
  }

  updateOverallStats(stats) {
    // 更新總體統計
  }

  handleAnomaly(anomaly) {
    // 處理異常
  }
}

優缺點分析

優點 ✅

  • 統一的異步模型:將Promise、事件、定時器等統一為Observable,簡化異步編程
  • 強大的操作符生態:100+個操作符覆蓋各種數據轉換和處理需求
  • 優雅的錯誤處理:內置完善的錯誤處理和恢復機制
  • 內存管理:自動管理訂閱和取消訂閱,避免內存洩漏
  • 函數式編程:支持鏈式調用,代碼簡潔且易於測試
  • 框架無關:可以與任何JavaScript框架或庫集成

缺點 ❌

  • 學習曲線陡峭:概念較多,需要時間理解Observable、操作符等概念
  • 包體積較大:完整版本約300KB,需要按需引入來優化體積
  • 調試困難:異步流的調試比傳統代碼更複雜,需要專門的調試工具
  • 過度工程化風險:簡單場景使用RxJS可能會增加不必要的複雜性

最佳實踐

開發建議

1. 性能優化技巧

// 最佳實踐示例1:按需引入操作符
// 推薦做法 - 只引入需要的操作符
import { map, filter, take } from 'rxjs/operators';
import { of } from 'rxjs';

const optimizedUsage = () => {
  return of(1, 2, 3, 4, 5).pipe(
    filter(x => x > 2),
    map(x => x * 2),
    take(2)
  );
};

// 避免的做法 - 引入整個rxjs庫
// import * as Rx from 'rxjs'; // ❌ 不推薦

2. 內存管理策略

// 完善的訂閱管理示例
class ComponentWithRxJS {
  constructor() {
    this.subscriptions = [];
  }

  init() {
    // 收集所有訂閱
    const subscription1 = interval(1000).subscribe(/* ... */);
    const subscription2 = fromEvent(window, 'resize').subscribe(/* ... */);

    this.subscriptions.push(subscription1, subscription2);
  }

  destroy() {
    // 統一取消所有訂閱,防止內存洩漏
    this.subscriptions.forEach(sub => sub.unsubscribe());
  }
}

3. 錯誤處理策略

// 完善的錯誤處理示例
const robustDataFetching = (url) => {
  return ajax.getJSON(url).pipe(
    // 重試機制
    retry({ count: 3, delay: (error, retryCount) => timer(retryCount * 1000) }),
    // 超時處理
    timeout(10000),
    // 錯誤恢復
    catchError(error => {
      if (error.status === 404) {
        return of({ data: [], message: '數據不存在' });
      }
      throw error;
    })
  );
};

性能優化專題

性能分析與監控

內存使用分析

// 監控Observable的內存使用情況
import { Observable } from 'rxjs';

class PerformanceMonitor {
  constructor() {
    this.subscriptions = new Map();
    this.metrics = {
      activeSubscriptions: 0,
      totalSubscriptions: 0,
      memoryLeaks: 0
    };
  }

  monitor(observable$, name) {
    return new Observable(observer => {
      const startTime = performance.now();
      const startMemory = performance.memory?.usedJSHeapSize || 0;

      this.metrics.activeSubscriptions++;
      this.metrics.totalSubscriptions++;

      console.log(`[${name}] 訂閱開始 - 活躍訂閱數: ${this.metrics.activeSubscriptions}`);

      const subscription = observable$.subscribe({
        next: value => observer.next(value),
        error: error => {
          this.logError(name, error);
          observer.error(error);
        },
        complete: () => {
          this.logCompletion(name, startTime, startMemory);
          observer.complete();
        }
      });

      return () => {
        this.metrics.activeSubscriptions--;
        console.log(`[${name}] 訂閱結束 - 活躍訂閱數: ${this.metrics.activeSubscriptions}`);
        subscription.unsubscribe();
      };
    });
  }

  logError(name, error) {
    console.error(`❌ [${name}] 發生錯誤:`, error);
  }

  logCompletion(name, startTime, startMemory) {
    const endTime = performance.now();
    const endMemory = performance.memory?.usedJSHeapSize || 0;
    const duration = endTime - startTime;
    const memoryDiff = endMemory - startMemory;

    console.log(`[${name}] 性能統計:`, {
      duration: `${duration.toFixed(2)}ms`,
      memoryChange: `${(memoryDiff / 1024 / 1024).toFixed(2)}MB`
    });
  }

  detectMemoryLeaks() {
    if (this.metrics.activeSubscriptions > 100) {
      console.warn(`⚠️ 檢測到大量活躍訂閱 (${this.metrics.activeSubscriptions}),可能存在內存洩漏`);
      this.metrics.memoryLeaks++;
    }
  }

  getMetrics() {
    return this.metrics;
  }
}

// 使用性能監控
const monitor = new PerformanceMonitor();
const monitoredStream$ = monitor.monitor(
  interval(1000).pipe(take(10)),
  'IntervalStream'
);

monitoredStream$.subscribe();

操作符性能對比

// 性能測試工具
class OperatorBenchmark {
  static async compareOperators(testCases) {
    const results = [];

    for (const testCase of testCases) {
      const { name, observable$ } = testCase;
      const startTime = performance.now();

      await new Promise(resolve => {
        observable$.subscribe({
          complete: () => {
            const endTime = performance.now();
            results.push({
              name,
              duration: endTime - startTime
            });
            resolve();
          }
        });
      });
    }

    return results.sort((a, b) => a.duration - b.duration);
  }
}

// 對比不同操作符的性能
import { range, EMPTY } from 'rxjs';
import { concatMap, toArray } from 'rxjs/operators';

const testData = range(1, 10000);
const testCases = [
  {
    name: 'map + filter',
    observable$: testData.pipe(
      map(x => x * 2),
      filter(x => x % 4 === 0),
      toArray()
    )
  },
  {
    name: 'filter + map',
    observable$: testData.pipe(
      filter(x => x % 2 === 0),
      map(x => x * 2),
      toArray()
    )
  },
  {
    name: 'single operator',
    observable$: testData.pipe(
      concatMap(x => x % 2 === 0 ? of(x * 2) : EMPTY),
      toArray()
    )
  }
];

OperatorBenchmark.compareOperators(testCases).then(results => {
  console.log('性能對比結果:', results);
});

內存使用監控

// 內存監控工具
class MemoryMonitor {
  constructor() {
    this.measurements = [];
    this.isMonitoring = false;
  }

  start(interval = 1000) {
    if (this.isMonitoring) return;

    this.isMonitoring = true;
    this.intervalId = setInterval(() => {
      if (performance.memory) {
        const memory = {
          timestamp: Date.now(),
          used: performance.memory.usedJSHeapSize,
          total: performance.memory.totalJSHeapSize,
          limit: performance.memory.jsHeapSizeLimit
        };
        this.measurements.push(memory);
        // 保留最近100個測量值
        if (this.measurements.length > 100) {
          this.measurements.shift();
        }
      }
    }, interval);
  }

  stop() {
    if (this.intervalId) {
      clearInterval(this.intervalId);
      this.isMonitoring = false;
    }
  }

  getReport() {
    if (this.measurements.length === 0) return null;
    const latest = this.measurements[this.measurements.length - 1];
    const first = this.measurements[0];

    return {
      current: {
        used: `${(latest.used / 1024 / 1024).toFixed(2)} MB`,
        total: `${(latest.total / 1024 / 1024).toFixed(2)} MB`,
        usage: `${((latest.used / latest.total) * 100).toFixed(1)}%`
      },
      growth: {
        used: `${((latest.used - first.used) / 1024 / 1024).toFixed(2)} MB`,
        total: `${((latest.total - first.total) / 1024 / 1024).toFixed(2)} MB`
      }
    };
  }
}

原文出處:https://juejin.cn/post/7554961105325424683


精選技術文章翻譯,幫助開發者持續吸收新知。

共有 0 則留言


精選技術文章翻譯,幫助開發者持續吸收新知。
🏆 本月排行榜
🥇
站長阿川
📝19   💬9   ❤️4
713
🥈
我愛JS
📝4   💬13   ❤️8
242
🥉
AppleLily
📝1   💬3   ❤️1
69
#4
御魂
💬1  
3
評分標準:發文×10 + 留言×3 + 獲讚×5 + 點讚×1 + 瀏覽數÷10
本數據每小時更新一次
🔧 阿川の電商水電行
Shopify 顧問、維護與客製化
💡
小任務 / 單次支援方案
單次處理 Shopify 修正/微調
⭐️
維護方案
每月 Shopify 技術支援 + 小修改 + 諮詢
🚀
專案建置
Shopify 功能導入、培訓 + 分階段交付