鮮為人知的架構:我是如何建立真正可擴展的系統(以及為什麼大多數系統無法做到這一點)
讓我來告訴你我職業生涯中最糟糕的製作事故。
那是星期二凌晨2點47分。我的手機突然響起各種警報。我們的主API回傳了503錯誤。資料庫連線數已達上限。錯誤率在不到三分鐘的時間內從0.01%飆升至47%。我們每分鐘處理的請求量從5萬個驟降至勉強5千個。
我翻身下床,摸索著找到筆記型電腦,透過 SSH 連接到我們的監視控制面板。我的手在發抖——不是因為冷,而是因為意識到自己對發生的一切一無所知。我們有負載平衡器、自動伸縮組、Redis 快取、資料庫唯讀副本,所有必要的安全措施都已到位。我們「遵循了最佳實踐」。我們當初也是為擴展性而建構的。
我當時是這麼想的。
那天晚上,以及第二天殘酷的事故分析中,我學到的東西徹底改變了我對軟體開發的看法。問題不在於我們的程式碼,也不在於我們的基礎設施,而是一些更根本的原因:我們建構了一個看起來可擴展的系統,但實際上卻像紙牌屋一樣不堪一擊。
那次事件讓我們損失了34萬美元的收入,失去了三家重要的企業客戶,幾乎摧毀了我們工程團隊的士氣。但它教會了我很多關於實際架構的知識,比任何書籍、課程或會議演講都多。
這篇文章是關於我從中學到的東西。不僅是從那次失敗中學到的,更是從七年來建構、破壞和重建分散式系統,最終使其在壓力下穩定運作的經驗中學到的。這不是理論,而是傷痕累累的歲月沉澱出的寶貴知識。
我花了多年時間才接受這個令人不舒服的事實:大多數開發者,包括我自己在內,很長一段時間以來,其實並不理解可擴展性的含義。
我們認為它的意思是「處理更多流量」。我們認為它的意思是「增加伺服器,速度更快」。我們認為它的意思是橫向擴展、微服務、Kubernetes、事件驅動架構——所有這些時髦的詞彙都能讓履歷看起來很棒。
但可擴展性並非在於處理更多流量,而是優雅地應對混亂局面。
讓我用一個故事來解釋一下我的意思。
在那次災難性的宕機事件發生六個月後,我們徹底重寫了核心 API。並非因為舊程式碼「不好」——實際上它相當簡潔,經過充分測試,並遵循了 SOLID 原則。我們重寫它的原因是,我們從根本上誤解了我們正在解決的問題。
舊版 API 的工作方式如下:當收到請求時,我們會:
檢查 Redis 中是否有快取資料
如果快取未命中,則查詢資料庫
如果找到資料,則使用其他兩個服務的資料對其進行豐富。
將一切轉化為回應
快取結果
返回客戶
教科書式的程式碼。高效。快速。分層結構合理。那種在程式碼審查中會受到表揚的程式碼。
我們沒有看到的是:我們創造了 47 種不同的故障模式,但我們只知道如何處理其中的三種。
當 Redis 執行緩慢但未宕機時會發生什麼?當資料庫容量達到 95%,每次查詢耗時 4 秒而不是 40 毫秒時會發生什麼?當某個資料增強服務開始間歇性地回傳 500 錯誤時會發生什麼事?當它們開始返回 200 錯誤但資料損壞時會發生什麼?
我們的系統無法回答這些問題。因此,當週二早上流量激增 40% 時——這完全是正常的業務波動——一切都崩潰了。響應緩慢導致連接池耗盡。重試加劇了負載。超時問題不斷累積。最終,整個系統不堪負荷而崩潰。
六個月後我們開發的版本,每台伺服器處理的流量更少,平均速度更慢,而且元件更多。
而且它的韌性是原來的100倍。
為什麼?因為我們不再追求成功,而是開始為失敗做準備。
在深入探討程式碼和架構之前,我需要先分享一下我建構系統方式的思維模型。一旦你了解它,你對軟體的看法將會徹底改變。
把你的系統想像成一個活的有機體,而不是一台機器。
機器的運作是可預測的。你拉動控制桿,齒輪轉動,然後輸出。機器的設計旨在實現最佳運作。一旦機器發生故障,它們就會完全停止運作。
生物體各不相同。它們生活在充滿敵意的環境中,面臨不確定性、資源匱乏、攻擊和持續變化。它們追求的並非最佳性能,而是生存。當生物體受傷時,它們會適應環境、自我修復並繼續運作。
你的生產系統就是一個有機體。
它生活在這樣的環境中:
網路呼叫隨機失敗
依賴項在沒有任何警告的情況下變得不可用
交通模式變化難以預測
資料損壞
硬體故障
人為錯誤是難免的(而且肯定會發生——我曾經不小心刪除了生產資料庫,在周五晚上部署了有問題的程式碼,也曾經因為輸錯 AWS CLI 命令而導致整個區域癱瘓)。
如果你像設計機器一樣設計系統——只追求完美執行,假定係統可靠,把故障視為例外——那麼它就會很脆弱,非常不堪一擊。它會在生產環境中以你在開發過程中從未想像過的方式崩潰。
如果你像設計有機體一樣設計你的系統——預先考慮故障、建造冗餘、優雅地降級、適應環境變化——它就會具有韌性,甚至具有反脆弱性。它能夠經得起生產的混亂局面。
這不僅僅是理念問題,它會改變你寫程式的方式。
讓我來演示一下這在實踐中是如何實現的。我們將從基本原理入手,逐步建立一個可用於生產環境的模式,這個模式已經無數次救過我的命了。
讓我們先從最糟糕的版本說起——我以前寫過的程式碼,也是我在大多數程式碼庫中看到的程式碼:
def get_user_profile(user_id):
# Get user from database
user = db.query("SELECT * FROM users WHERE id = ?", user_id)
# Get their posts
posts = posts_service.get_user_posts(user_id)
# Get their friend count
friend_count = social_service.get_friend_count(user_id)
# Combine and return
return {
"user": user,
"posts": posts,
"friend_count": friend_count
}
這段程式碼看起來似乎沒問題。它簡潔易讀,功能也符合預期。但它其實暗藏隱患。
讓我細數一下這會在生產環境中為你帶來哪些災難:
沒有逾時:如果資料庫掛起,此函數將永遠掛起,佔用一個執行緒/進程。
沒有備用方案:如果posts_service宕機,即使我們有用戶資料,整個請求也會失敗。
沒有重試邏輯:如果出現短暫的網路故障,我們會立即失敗,而不是再次嘗試。
沒有熔斷機制:如果social_service運作不暢,我們只會不斷打擊它,使情況變得更糟。
同步級聯:所有這些呼叫都按順序發生,因此延遲會累積。
絕不降低標準:我們採取要麼全有要麼全無的原則——要麼你得到一切,要麼你出錯。
讓我們一步一步解決這個問題,我會解釋每個決定背後的原因。
from contextlib import contextmanager
import signal
@contextmanager
def timeout(seconds):
def timeout_handler(signum, frame):
raise TimeoutError()
old_handler = signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(seconds)
try:
yield
finally:
signal.alarm(0)
signal.signal(signal.SIGALRM, old_handler)
def get_user_profile(user_id):
try:
with timeout(2): # Max 2 seconds for DB query
user = db.query("SELECT * FROM users WHERE id = ?", user_id)
except TimeoutError:
raise ServiceError("Database timeout")
try:
with timeout(3):
posts = posts_service.get_user_posts(user_id)
except TimeoutError:
posts = [] # Degrade gracefully
try:
with timeout(1):
friend_count = social_service.get_friend_count(user_id)
except TimeoutError:
friend_count = None
return {
"user": user,
"posts": posts,
"friend_count": friend_count
}
好多了。現在不會一直卡住了。但請注意其他變化:我們引入了降級機制。如果 POST 服務逾時,我們會傳回空白 POST 請求,而不是讓整個請求失敗。
這一點至關重要。在生物體模型中,如果你的手臂受傷,你的身體不會停止運作──它會繼續運作,只是那隻手臂的功能無法完全發揮。這裡也是同樣的道理。
但我們仍然忽略了一個重要問題:如果服務沒有超時,只是速度非常慢呢?如果服務有回應,但每次都需要 2.9 秒,而我們將超時時間設定為 3 秒怎麼辦?
大多數開發者對彈性的理解就止步於此。他們加入超時機制,或許再加一些重試,然後就草草了事。但最強大的模式卻幾乎無人問津:熔斷器。
斷路器模式直接借鏡自電機工程。在家用電器中,如果某個設備電流過大,斷路器就會跳閘,切斷電源以防止火災。在軟體開發中,如果某個依賴項發生故障,斷路器也會“跳閘”,我們會暫停呼叫該依賴項一段時間,讓它有時間恢復。
以下是一個基本實作:
from datetime import datetime, timedelta
from enum import Enum
import threading
class CircuitState(Enum):
CLOSED = "closed" # Everything working, requests go through
OPEN = "open" # Too many failures, blocking requests
HALF_OPEN = "half_open" # Testing if service recovered
class CircuitBreaker:
def __init__(self, failure_threshold=5, timeout_duration=60, success_threshold=2):
self.failure_threshold = failure_threshold
self.timeout_duration = timeout_duration
self.success_threshold = success_threshold
self.failure_count = 0
self.success_count = 0
self.last_failure_time = None
self.state = CircuitState.CLOSED
self.lock = threading.Lock()
def call(self, func, *args, **kwargs):
with self.lock:
if self.state == CircuitState.OPEN:
if datetime.now() - self.last_failure_time > timedelta(seconds=self.timeout_duration):
# Try transitioning to half-open
self.state = CircuitState.HALF_OPEN
self.success_count = 0
else:
# Still open, fail fast
raise CircuitBreakerOpen("Service unavailable")
try:
result = func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise e
def _on_success(self):
with self.lock:
self.failure_count = 0
if self.state == CircuitState.HALF_OPEN:
self.success_count += 1
if self.success_count >= self.success_threshold:
self.state = CircuitState.CLOSED
def _on_failure(self):
with self.lock:
self.failure_count += 1
self.last_failure_time = datetime.now()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
# Usage
posts_circuit = CircuitBreaker(failure_threshold=5, timeout_duration=30)
def get_user_posts_with_cb(user_id):
try:
return posts_circuit.call(posts_service.get_user_posts, user_id)
except CircuitBreakerOpen:
return [] # Fail fast, return empty
這設計巧妙而精妙。現在,如果貼文服務開始反覆出現故障,我們會完全停止對其請求 30 秒。這樣做有三個目的:
保護下游服務:我們給下游服務喘息的空間來恢復,而不是用請求轟炸它。
保護我們的服務:我們快速失敗而不是等待超時,從而保持較低的回應時間。
保護我們的用戶:他們可以獲得更快的錯誤回應(即時快速失敗),而不是等待緩慢的超時。
但真正讓這項技術威力無窮的是:斷路器賦予系統反脆弱性。當一個部件發生故障時,系統的其他部分反而會更加穩定,而不是更加不穩定。這就像是發炎隔離體內的感染一樣——雖然會帶來疼痛,但它能阻止感染擴散。
現在讓我來展示完整的模式——它將我們學到的所有內容整合到一個可用於生產環境的方法中。這是我現在建立的每個關鍵服務都使用的架構模式。
from typing import Optional, Callable, Any
from dataclasses import dataclass
from functools import wraps
import time
import logging
@dataclass
class CallOptions:
timeout: float
retries: int = 3
retry_delay: float = 0.5
circuit_breaker: Optional[CircuitBreaker] = None
fallback: Optional[Callable] = None
cache_key: Optional[str] = None
cache_ttl: int = 300
class ResilientCaller:
def __init__(self, cache, metrics):
self.cache = cache
self.metrics = metrics
self.logger = logging.getLogger(__name__)
def call(self, func: Callable, options: CallOptions, *args, **kwargs) -> Any:
# Try cache first
if options.cache_key:
cached = self.cache.get(options.cache_key)
if cached is not None:
self.metrics.increment("cache.hit")
return cached
self.metrics.increment("cache.miss")
# Track timing
start_time = time.time()
try:
result = self._call_with_resilience(func, options, *args, **kwargs)
# Cache successful result
if options.cache_key and result is not None:
self.cache.set(options.cache_key, result, ttl=options.cache_ttl)
# Record metrics
duration = time.time() - start_time
self.metrics.histogram("call.duration", duration)
self.metrics.increment("call.success")
return result
except Exception as e:
duration = time.time() - start_time
self.metrics.histogram("call.duration", duration)
self.metrics.increment("call.failure")
# Try fallback
if options.fallback:
self.logger.warning(f"Call failed, using fallback: {e}")
return options.fallback(*args, **kwargs)
raise
def _call_with_resilience(self, func, options, *args, **kwargs):
last_exception = None
for attempt in range(options.retries):
try:
# Apply circuit breaker if provided
if options.circuit_breaker:
return options.circuit_breaker.call(
self._call_with_timeout,
func,
options.timeout,
*args,
**kwargs
)
else:
return self._call_with_timeout(func, options.timeout, *args, **kwargs)
except CircuitBreakerOpen:
# Circuit is open, don't retry
raise
except Exception as e:
last_exception = e
self.logger.warning(f"Attempt {attempt + 1} failed: {e}")
if attempt < options.retries - 1:
# Exponential backoff
sleep_time = options.retry_delay * (2 ** attempt)
time.sleep(sleep_time)
raise last_exception
def _call_with_timeout(self, func, timeout_seconds, *args, **kwargs):
# Implementation depends on whether you're using threading, asyncio, etc.
# This is a simplified version
with timeout(timeout_seconds):
return func(*args, **kwargs)
# Now let's use this to build our user profile endpoint properly
class UserProfileService:
def __init__(self, db, posts_service, social_service, cache, metrics):
self.db = db
self.posts_service = posts_service
self.social_service = social_service
self.caller = ResilientCaller(cache, metrics)
# Set up circuit breakers
self.posts_cb = CircuitBreaker(failure_threshold=5, timeout_duration=30)
self.social_cb = CircuitBreaker(failure_threshold=5, timeout_duration=30)
def get_user_profile(self, user_id):
# Get user from database - critical, no fallback
user = self.caller.call(
self._get_user_from_db,
CallOptions(
timeout=2.0,
retries=3,
cache_key=f"user:{user_id}",
cache_ttl=300
),
user_id
)
# Get posts - non-critical, can degrade
posts = self.caller.call(
self.posts_service.get_user_posts,
CallOptions(
timeout=3.0,
retries=2,
circuit_breaker=self.posts_cb,
fallback=lambda uid: [], # Empty list if fails
cache_key=f"posts:{user_id}",
cache_ttl=60
),
user_id
)
# Get friend count - non-critical, can degrade
friend_count = self.caller.call(
self.social_service.get_friend_count,
CallOptions(
timeout=1.0,
retries=1,
circuit_breaker=self.social_cb,
fallback=lambda uid: None, # Null if fails
cache_key=f"friends:{user_id}",
cache_ttl=300
),
user_id
)
return {
"user": user,
"posts": posts,
"friend_count": friend_count,
"degraded": friend_count is None or len(posts) == 0
}
def _get_user_from_db(self, user_id):
return self.db.query("SELECT * FROM users WHERE id = ?", user_id)
看看我們在這裡建造的是什麼。這不僅僅是「帶有錯誤處理的程式碼」。這是一個具有以下功能的彈性系統:
積極使用快取來降低依賴項的負載
依嚴重程度適當超時
採用指數退避策略進行智慧重試
為保護陷入困境的服務而採取的「熔斷」措施
當非關鍵元件發生故障時,能夠優雅地降級。
衡量一切可觀測性
記錄有意義的日誌以進行除錯
更令人驚訝的是:當我們在所有服務中部署這種模式後,即使增加了更多步驟,P99 延遲也降低了60%。為什麼?因為我們不再陷入緩慢的惡性循環。一旦出現問題,我們就能快速排除;盡可能使用快取;從而確保系統持續運作。
除非你為此吃過虧,否則沒人會告訴你:你的應用程式程式碼很少是瓶頸,資料庫才是。
多年來,我審查過數百個生產架構,據我估計,80% 的效能問題和 90% 的服務中斷都源自於資料庫問題。這並非因為資料庫本身不好,而是因為開發人員,包括經驗豐富的開發人員,常常誤解如何在規模化環境中使用資料庫。
讓我來告訴你我遇到的最陰險的資料庫問題:看起來像 1+1 查詢的 N+1 查詢。
我們有一個用於顯示用戶動態的介面。很簡單:獲取用戶,獲取他們的帖子,返回 JSON 資料。在開發階段,我們用 10 個測試用戶和 50 個貼文進行了測試,速度非常快。我們對自己的程式碼很滿意。
在實際生產環境中,使用真實資料時,它使我們的資料庫徹底崩潰。
以下是程式碼內容:
def get_user_feed(user_id):
user = User.query.get(user_id)
posts = Post.query.filter_by(user_id=user_id).limit(20).all()
feed_items = []
for post in posts:
# Seems innocent: just getting the author for each post
author = User.query.get(post.author_id)
feed_items.append({
"post": post.to_dict(),
"author": author.to_dict()
})
return feed_items
我們當時一共執行了 21 次查詢:一次查詢初始帖子,然後每次查詢帖子的作者。典型的 N+1 查詢。 “等等,”我記得當時我想,“這些帖子都屬於同一個用戶,所以我們只是在重複查詢同一個用戶。資料庫應該會緩存這些查詢結果吧?”
錯了。大錯特錯。
即使查詢的是同一個用戶,每個查詢也都要經過完整的請求棧:連接池檢出、查詢解析、查詢計劃、執行、結果序列化、連接返回。資料庫的查詢快取有幫助,但還不夠。大規模應用程式時,僅資料庫往返就會導致每個請求額外消耗約 40 毫秒。
我們一看就知道解決方法很明顯:
def get_user_feed(user_id):
user = User.query.get(user_id)
posts = Post.query.filter_by(user_id=user_id).limit(20).all()
# Get all unique author IDs
author_ids = list(set(post.author_id for post in posts))
# Single query to fetch all authors
authors = User.query.filter(User.id.in_(author_ids)).all()
authors_by_id = {author.id: author for author in authors}
feed_items = []
for post in posts:
feed_items.append({
"post": post.to_dict(),
"author": authors_by_id[post.author_id].to_dict()
})
return feed_items
總共執行了三個查詢。反應時間從 40 毫秒降至 8 毫秒。資料庫 CPU 使用率下降了 35%。
但真正的教訓並非關於 N+1 查詢——每個開發人員都知道要警惕這類查詢。真正的教訓是:在生產環境中,看似微小的效率低下會累積成嚴重的問題。
讓我們來談談一件看似平凡卻在我職業生涯中造成生產中斷次數最多的事情:連接池耗盡。
您的資料庫有其最大連線數限制,假設是 100。您的應用程式有一個連線池,可能會指派 20 個連線。如果您有 5 台應用程式伺服器,那麼總共就有 100 個連線——完美,剛好達到資料庫的上限。
現在想像一下這種情況:你部署了一個新功能,它會讓查詢速度稍慢一些——查詢並沒有崩潰,只是需要 200 毫秒而不是 50 毫秒。會發生什麼事?
請求耗時開始延長(200毫秒對比50毫秒)
在先前的請求仍保持連線的情況下,又有新的請求湧入。
連接池開始耗盡可用連線數
新請求會等待連線可用。
等待請求超時或速度變慢
用戶瀏覽器/應用程式重試失敗的請求
還需要更多聯繫
整個系統徹底癱瘓
這被稱為線程/連接池耗盡,它是一個隱形的殺手。
真正可怕的是:它會形成惡性循環。系統運作速度越慢,需要的連線就越多;所需的連線越多,系統運作速度就越慢。這是一個正向回饋迴路——從數學角度來說是正回饋,從實際角度來看卻是災難性的。
我學會了用四管齊下的方法來預防這種情況:
# Database configuration
DATABASE_CONFIG = {
'pool_size': 20,
'max_overflow': 5,
'pool_timeout': 10, # Max seconds to wait for connection
'pool_recycle': 3600, # Recycle connections after 1 hour
'pool_pre_ping': True, # Test connections before using
'connect_args': {
'connect_timeout': 5, # Max seconds to establish connection
'command_timeout': 10, # Max seconds for query execution
}
}
class ConnectionPoolMonitor:
def __init__(self, engine):
self.engine = engine
def get_stats(self):
pool = self.engine.pool
return {
'size': pool.size(),
'checked_in': pool.checkedin(),
'checked_out': pool.checkedout(),
'overflow': pool.overflow(),
'utilization': pool.checkedout() / (pool.size() + pool.overflow()) * 100
}
def check_health(self):
stats = self.get_stats()
# Alert if utilization is high
if stats['utilization'] > 80:
logger.warning(f"Connection pool utilization high: {stats['utilization']}%")
metrics.gauge('db.pool.utilization', stats['utilization'])
# Alert if we're using overflow connections
if stats['overflow'] > 0:
logger.warning(f"Using {stats['overflow']} overflow connections")
metrics.gauge('db.pool.overflow', stats['overflow'])
from contextlib import contextmanager
@contextmanager
def query_timeout(session, seconds):
"""Set a timeout for a specific query."""
connection = session.connection()
cursor = connection.connection.cursor()
# PostgreSQL-specific, adjust for your database
cursor.execute(f"SET statement_timeout = {seconds * 1000}")
try:
yield
finally:
cursor.execute("SET statement_timeout = 0")
# Usage
with query_timeout(db.session, 5):
results = db.session.query(User).filter_by(email=email).all()
這是最後的手段,但有時也是必要的:
class DatabaseCircuitBreaker:
def __init__(self, engine, threshold=0.8):
self.engine = engine
self.threshold = threshold
self.monitor = ConnectionPoolMonitor(engine)
def should_allow_query(self):
stats = self.monitor.get_stats()
utilization = stats['utilization']
if utilization > self.threshold * 100:
# Pool is near exhaustion, start rejecting non-critical queries
return False
return True
def execute_if_allowed(self, query_func, is_critical=False):
if is_critical or self.should_allow_query():
return query_func()
else:
raise DatabaseOverloadError("Database pool near exhaustion, rejecting query")
# Usage
db_breaker = DatabaseCircuitBreaker(engine)
try:
result = db_breaker.execute_if_allowed(
lambda: db.session.query(Post).all(),
is_critical=False
)
except DatabaseOverloadError:
# Serve from cache or return degraded response
result = cache.get('all_posts_fallback')
快取技術人人都知道。 Redis、Memcached、記憶體快取——這些都是標準技術。但大多數生產環境中的快取策略都過於簡單,甚至有害。
我的意思是:大多數開發者都會快取成功的回應。但這只是成功的一半。
讓我來給你展示一下智慧型快取是什麼樣子的:
def get_user_by_email(email):
cache_key = f"user:email:{email}"
# Check cache
cached = cache.get(cache_key)
if cached is not None:
if cached == "NOT_FOUND":
return None # Cached negative result
return cached
# Query database
user = db.query("SELECT * FROM users WHERE email = ?", email)
if user:
cache.set(cache_key, user, ttl=300)
return user
else:
# Cache the fact that this user doesn't exist
cache.set(cache_key, "NOT_FOUND", ttl=60)
return None
為什麼這很重要?因為攻擊者喜歡查詢不存在的資料。如果您不快取否定結果,每次嘗試使用不存在的電子郵件地址登入都會存取您的資料庫。大規模攻擊時,這會演變成DDoS攻擊漏洞。
def get_enriched_user_profile(user_id):
cache_key = f"profile:{user_id}"
cached = cache.get(cache_key)
if cached:
return cached
profile = {"user_id": user_id}
# Try to get user data
try:
profile["user"] = user_service.get_user(user_id)
except Exception:
profile["user"] = None
# Try to get posts
try:
profile["posts"] = posts_service.get_posts(user_id)
except Exception:
profile["posts"] = []
# Cache even if partially failed
# Use shorter TTL for degraded responses
ttl = 300 if profile["user"] else 30
cache.set(cache_key, profile, ttl=ttl)
return profile
這樣可以確保即使依賴項發生故障,也不會重複呼叫它們。您只需提供效能降低但已快取的回應即可。
class CacheWarmer:
def __init__(self, cache, db):
self.cache = cache
self.db = db
def warm_popular_items(self):
"""Pre-populate cache with frequently accessed items."""
# Get most active users from last 24 hours
popular_users = self.db.query("""
SELECT user_id, COUNT(*) as activity
FROM user_events
WHERE created_at > NOW() - INTERVAL '24 hours'
GROUP BY user_id
ORDER BY activity DESC
LIMIT 1000
""")
for user in popular_users:
try:
# Fetch and cache their profile
profile = self.get_user_profile(user.user_id)
cache_key = f"profile:{user.user_id}"
self.cache.set(cache_key, profile, ttl=3600)
except Exception as e:
logger.warning(f"Failed to warm cache for user {user.user_id}: {e}")
def schedule_warming(self):
"""Run cache warming every hour."""
schedule.every(1).hours.do(self.warm_popular_items)
快取預熱可以防止快取崩潰-當熱門快取專案過期時,突然有數百個請求同時衝擊資料庫,試圖重新產生該快取專案。
雖然這有點難,但卻是我最喜歡的圖案之一:
import random
import time
def get_with_probabilistic_refresh(key, fetch_func, ttl):
"""
Fetch from cache, but probabilistically refresh before expiration.
This prevents cache stampedes on popular keys.
"""
cached = cache.get_with_ttl(key) # Returns (value, remaining_ttl)
if cached is None:
# Cache miss, fetch and store
value = fetch_func()
cache.set(key, value, ttl=ttl)
return value
value, remaining_ttl = cached
# Calculate probability of early refresh
# As remaining_ttl decreases, probability increases
beta = 1.0 # Adjust this to tune early refresh behavior
delta = remaining_ttl / ttl
probability = beta * math.log(random.random()) * delta
if probability < 0:
# Refresh early
try:
new_value = fetch_func()
cache.set(key, new_value, ttl=ttl)
return new_value
except Exception:
# If refresh fails, return old value
return value
return value
這種模式意味著,隨著快取專案接近過期,每個請求主動刷新它的機率都會增加。這樣可以分散負載,避免快取過期時出現請求激增的情況。
在凌晨2點47分那場災難性的事件之後,我開始沉迷於可觀測性。不是監控-而是可觀測性。這兩者之間有著至關重要的差異。
監控告訴你出了問題。可觀測性告訴你為什麼出了問題。
以下是我希望從一開始就建構的可觀測性技術堆疊:
大多數團隊都會實施指標管理,有些團隊會實施日誌記錄,但幾乎沒有人真正有效地實施追蹤。正因如此,他們才會花數小時來除錯原本只需幾分鐘就能解決的生產環境故障。
讓我用一個實際的例子來說明我的意思。
我們有一個端點偶爾會很慢——慢得驚人。 P50 請求需要 100 毫秒,P95 請求需要 200 毫秒,而 P99 請求卻要 8 秒。這些 P99 請求嚴重影響了使用者體驗,但我們卻不知道是什麼原因造成的。
我們的指標顯示端點速度很慢。謝謝你啊,指標,真是太有用了。
我們的日誌顯示了請求的傳入和傳出。很好,但這並不能告訴我們時間都去哪了。
然後我們實現了分散式追踪,突然間我們就能看到發生了什麼:
from opentelemetry import trace
from opentelemetry.trace import Status, StatusCode
tracer = trace.get_tracer(__name__)
def get_user_profile(user_id):
with tracer.start_as_current_span("get_user_profile") as span:
span.set_attribute("user.id", user_id)
# Get user from database
with tracer.start_as_current_span("database.get_user") as db_span:
db_span.set_attribute("db.system", "postgresql")
db_span.set_attribute("db.operation", "SELECT")
start = time.time()
user = db.query("SELECT * FROM users WHERE id = ?", user_id)
db_span.set_attribute("db.duration_ms", (time.time() - start) * 1000)
# Get posts
with tracer.start_as_current_span("posts_service.get_posts") as posts_span:
posts_span.set_attribute("service.name", "posts")
try:
posts = posts_service.get_user_posts(user_id)
posts_span.set_attribute("posts.count", len(posts))
posts_span.set_status(Status(StatusCode.OK))
except Exception as e:
posts_span.set_status(Status(StatusCode.ERROR))
posts_span.record_exception(e)
posts = []
# Get friend count
with tracer.start_as_current_span("social_service.get_friend_count") as social_span:
social_span.set_attribute("service.name", "social")
try:
friend_count = social_service.get_friend_count(user_id)
social_span.set_attribute("friends.count", friend_count)
except Exception as e:
social_span.record_exception(e)
friend_count = None
span.set_attribute("response.degraded", friend_count is None)
return {
"user": user,
"posts": posts,
"friend_count": friend_count
}
啟用追蹤功能後,我們查看了一個速度較慢的 P99 請求,並立即發現了問題所在:貼文服務耗時 7.8 秒。我們深入分析了該服務的追蹤訊息,發現它執行了一個未建立索引的資料庫查詢,掃描了 200 萬行資料。
修改一個索引後,問題解決。尋找和修復總共耗時:15分鐘。
如果沒有追踪,我們可能需要花費數天時間加入日誌語句、部署、等待問題重現、檢查日誌,然後重複這些操作,直到縮小問題範圍。
但僅僅追蹤是不夠的。你需要真正有用的日誌。以下是日誌記錄從糟糕到優秀的演變過程:
壞的:
print("Getting user profile")
# ... do stuff ...
print("Done getting user profile")
更好的:
logger.info(f"Getting user profile for user {user_id}")
# ... do stuff ...
logger.info(f"Successfully retrieved profile for user {user_id}")
好的:
logger.info("Retrieving user profile", extra={
"user_id": user_id,
"operation": "get_user_profile",
"trace_id": trace.get_current_span().get_span_context().trace_id
})
# ... do stuff ...
logger.info("User profile retrieved", extra={
"user_id": user_id,
"operation": "get_user_profile",
"duration_ms": duration,
"had_posts": len(posts) > 0,
"had_friend_count": friend_count is not None,
"trace_id": trace.get_current_span().get_span_context().trace_id
})
主要差異在於:結構化日誌可查詢。您可以搜尋「duration_ms > 5000 的所有請求」或「had_friend_count = false 的所有請求」。您可以使用 trace_id 將日誌與追蹤關聯起來。您也可以進行聚合和分析。
我現在會在建置的每個服務中加入以下指標,它已經幫我省去了無數次麻煩:
class LatencyTracker:
def __init__(self, metrics_client):
self.metrics = metrics_client
def track_operation(self, operation_name, tags=None):
"""Context manager to track operation latency and success."""
start = time.time()
success = False
try:
yield
success = True
finally:
duration = time.time() - start
final_tags = tags or {}
final_tags['operation'] = operation_name
final_tags['success'] = success
# Record latency histogram
self.metrics.histogram('operation.duration', duration, tags=final_tags)
# Record success/failure counter
self.metrics.increment('operation.count', tags=final_tags)
# Record the actual latency bucket for easier alerting
if duration < 0.1:
bucket = 'fast'
elif duration < 0.5:
bucket = 'medium'
elif duration < 2.0:
bucket = 'slow'
else:
bucket = 'very_slow'
final_tags['bucket'] = bucket
self.metrics.increment('operation.bucket', tags=final_tags)
# Usage
tracker = LatencyTracker(metrics)
def get_user_profile(user_id):
with tracker.track_operation('get_user_profile', {'user_id': user_id}):
# ... your code ...
pass
延遲分組至關重要。它們使您能夠建立簡單的警報,例如“如果非常慢的分組請求佔比超過 5%,則發出警報”,而無需進行複雜的百分位數計算。
大多數儀錶板都沒什麼用,要么顯示太多訊息,要么顯示太少訊息。以下是我在主服務儀錶板上顯示的內容:
請求速率(每秒請求數)
錯誤率(每秒錯誤次數及百分比)
潛伏期百分位數(P50、P95、P99)
延遲等級(% 快速、中、慢速、非常慢)
依賴項健康狀況(每個依賴項的斷路器狀態)
資源利用率(CPU、記憶體、連線池)
降級指標(服務降級的請求百分比)
最後一點至關重要。大多數儀錶板無法區分「完全成功」和「部分成功」。但在一個旨在提高系統韌性的系統中,這種區分至關重要。
def record_response_metrics(response_data):
"""Record metrics about the response we're sending."""
# Count the response
metrics.increment('response.count')
# Check if response is degraded
is_degraded = (
response_data.get('friend_count') is None or
len(response_data.get('posts', [])) == 0 or
response_data.get('degraded', False)
)
if is_degraded:
metrics.increment('response.degraded')
# Tag which parts are degraded
if response_data.get('friend_count') is None:
metrics.increment('response.degraded.missing_friends')
if len(response_data.get('posts', [])) == 0:
metrics.increment('response.degraded.missing_posts')
else:
metrics.increment('response.complete')
現在您可以建立警報:「如果回應速度下降超過 20%,請通知相關人員。」這樣您就可以在問題演變成服務中斷之前將其解決。
我們來談談部署。大多數團隊都有某種形式的持續整合/持續交付(CI/CD)。許多團隊使用藍綠部署或滾動更新。但很少有團隊能真正實現具有自動回滾功能的漸進式發布。
以下幾點改變了我的部署策略:
class FeatureFlag:
def __init__(self, name, redis_client):
self.name = name
self.redis = redis_client
def is_enabled_for_user(self, user_id):
"""Check if feature is enabled for a specific user."""
# Check if feature is globally enabled/disabled
global_state = self.redis.get(f"feature:{self.name}:global")
if global_state == "disabled":
return False
if global_state == "enabled":
return True
# Check rollout percentage
rollout_pct = float(self.redis.get(f"feature:{self.name}:rollout_pct") or 0)
# Use consistent hashing to determine if user is in rollout
user_hash = int(hashlib.md5(f"{self.name}:{user_id}".encode()).hexdigest(), 16)
user_pct = (user_hash % 100)
return user_pct < rollout_pct
def set_rollout_percentage(self, percentage):
"""Set the rollout percentage (0-100)."""
self.redis.set(f"feature:{self.name}:rollout_pct", percentage)
def enable_globally(self):
"""Enable feature for everyone."""
self.redis.set(f"feature:{self.name}:global", "enabled")
def disable_globally(self):
"""Disable feature for everyone."""
self.redis.set(f"feature:{self.name}:global", "disabled")
# Usage
new_profile_rendering = FeatureFlag("new_profile_rendering", redis)
def get_user_profile(user_id):
if new_profile_rendering.is_enabled_for_user(user_id):
return get_user_profile_v2(user_id)
else:
return get_user_profile_v1(user_id)
現在,當您部署新功能時:
將該功能的程式碼部署到帶有標誌的後台(0% 推出率)
逐步擴大推廣範圍:1% → 5% → 10% → 25% → 50% → 100%
監測每個階段的指標
如果錯誤率飆升或延遲增加,立即將部署頻率設為 0%。
這在我們部署了一項實際上使情況變得更糟的「效能改進」功能時幫了我們大忙。我們將其推廣到 5% 的用戶,發現 P99 延遲從 200 毫秒飆升至 1.2 秒,於是在 30 秒內就取消了該功能。只有 5% 的用戶遇到了效能下降,而且只持續了 30 秒。
如果沒有逐步推出,在我們部署回溯之前,100% 的用戶都會受到影響——這至少需要 10-15 分鐘。
您也可以透過自動回滾功能進一步擴展此功能:
class DeploymentMonitor:
def __init__(self, metrics_client, feature_flag):
self.metrics = metrics_client
self.flag = feature_flag
self.baseline_metrics = None
def set_baseline(self):
"""Capture baseline metrics before rollout."""
self.baseline_metrics = {
'error_rate': self.metrics.get_rate('errors.count'),
'p99_latency': self.metrics.get_percentile('request.duration', 99),
'p95_latency': self.metrics.get_percentile('request.duration', 95),
}
def check_health(self):
"""Check if current metrics are healthy compared to baseline."""
if not self.baseline_metrics:
return True, "No baseline set"
current_metrics = {
'error_rate': self.metrics.get_rate('errors.count'),
'p99_latency': self.metrics.get_percentile('request.duration', 99),
'p95_latency': self.metrics.get_percentile('request.duration', 95),
}
# Check error rate increase
error_increase = (
(current_metrics['error_rate'] - self.baseline_metrics['error_rate']) /
max(self.baseline_metrics['error_rate'], 0.0001) # Avoid division by zero
)
if error_increase > 0.5: # 50% increase in errors
return False, f"Error rate increased by {error_increase*100:.1f}%"
# Check latency degradation
p99_increase = (
(current_metrics['p99_latency'] - self.baseline_metrics['p99_latency']) /
self.baseline_metrics['p99_latency']
)
if p99_increase > 0.3: # 30% increase in P99 latency
return False, f"P99 latency increased by {p99_increase*100:.1f}%"
return True, "Metrics healthy"
def progressive_rollout(self, stages=[1, 5, 10, 25, 50, 100]):
"""Progressively roll out feature with health checks."""
self.set_baseline()
for stage in stages:
logger.info(f"Rolling out to {stage}% of users")
self.flag.set_rollout_percentage(stage)
# Wait for metrics to stabilize
time.sleep(60)
# Check health
healthy, reason = self.check_health()
if not healthy:
logger.error(f"Health check failed at {stage}%: {reason}")
logger.error("Rolling back to 0%")
self.flag.set_rollout_percentage(0)
# Alert the team
self.send_alert(f"Automatic rollback triggered: {reason}")
return False
logger.info(f"Health check passed at {stage}%")
logger.info("Rollout complete!")
return True
這種自動化方式讓您充滿信心地進行部署。您無需祈禱部署順利進行—因為您擁有一個能夠主動監控和保護生產環境的系統。
現在讓我分享我學到的最重要的架構模式:用於零停機時間遷移的絞殺者無花果模式。
這種模式以纏繞在宿主樹周圍並最終取代宿主樹的絞殺榕樹命名,它使你能夠從舊系統遷移到新系統,而無需進行大爆炸式重寫。
設想一下:你有一個單體服務,它運作緩慢、難以維護,而且需要替換。最簡單的做法是建立一個新服務並一次全部切換過去。但這非常冒險,而且通常都會出錯。
絞殺榕方法:
class UserServiceRouter:
"""Routes requests between old and new user service implementations."""
def __init__(self, old_service, new_service, feature_flag, metrics):
self.old_service = old_service
self.new_service = new_service
self.flag = feature_flag
self.metrics = metrics
def get_user(self, user_id):
"""Route to new or old service based on feature flag."""
use_new_service = self.flag.is_enabled_for_user(user_id)
if use_new_service:
try:
# Try new service
result = self.new_service.get_user(user_id)
self.metrics.increment('user_service.new.success')
# Shadow call to old service for comparison
self._shadow_call_old_service(user_id, result)
return result
except Exception as e:
# If new service fails, fall back to old
self.metrics.increment('user_service.new.failure')
logger.error(f"New service failed, falling back to old: {e}")
return self.old_service.get_user(user_id)
else:
# Use old service
self.metrics.increment('user_service.old.used')
return self.old_service.get_user(user_id)
def _shadow_call_old_service(self, user_id, new_result):
"""
Make a shadow call to old service to compare results.
This runs async so it doesn't slow down the response.
"""
def compare():
try:
old_result = self.old_service.get_user(user_id)
# Compare results
if self._results_match(old_result, new_result):
self.metrics.increment('shadow.match')
else:
self.metrics.increment('shadow.mismatch')
logger.warning(
f"Results mismatch for user {user_id}",
extra={
'old': old_result,
'new': new_result
}
)
except Exception as e:
logger.error(f"Shadow call failed: {e}")
# Run in background thread
threading.Thread(target=compare).start()
def _results_match(self, old_result, new_result):
"""Compare old and new results for consistency."""
# Implement your comparison logic
# This might ignore certain fields, timestamps, etc.
return old_result['id'] == new_result['id'] and \
old_result['email'] == new_result['email']
這種模式非常強大,因為:
您可以部署這項新服務,而無需任何人使用(0% 部署率)。
您可以逐步調整交通流量(1% → 5% → 10% → ...)
如果新服務失敗,您將有自動回退機制。
您可以比較新舊服務的結果,以驗證其正確性。
如果出現問題,您可以立即回滾。
我們利用這項技術遷移了一項每秒處理 5 萬次請求的關鍵服務。遷移過程歷時 6 週,但使用者毫無察覺。沒有停機,沒有事故發生,一切都在逐步且受監控的過渡過程中完成。
讓我們來談談一個很少被提及但影響巨大的效能優化:請求合併。
問題是這樣的:想像一下,在幾毫秒內,有 100 個請求同時到達,目標資料相同。如果不進行合併,就需要執行 100 次完全相同的資料庫查詢或 API 呼叫。而使用合併後,只需要執行一次。
import asyncio
from collections import defaultdict
from typing import Any, Callable
class RequestCoalescer:
"""Coalesce multiple identical requests into a single operation."""
def __init__(self):
self.pending_requests = defaultdict(list)
self.locks = defaultdict(asyncio.Lock)
async def coalesce(self, key: str, fetch_func: Callable) -> Any:
"""
Coalesce requests with the same key.
Only the first request executes fetch_func, others wait for the result.
"""
# Check if there's already a request in flight for this key
lock = self.locks[key]
async with lock:
# Check if we're the first request for this key
if key not in self.pending_requests or not self.pending_requests[key]:
# We're first! Execute the actual fetch
future = asyncio.Future()
self.pending_requests[key] = [future]
try:
result = await fetch_func()
future.set_result(result)
# Notify all waiting requests
for waiting_future in self.pending_requests[key][1:]:
waiting_future.set_result(result)
return result
except Exception as e:
future.set_exception(e)
# Propagate exception to all waiting requests
for waiting_future in self.pending_requests[key][1:]:
waiting_future.set_exception(e)
raise
finally:
# Clean up
del self.pending_requests[key]
del self.locks[key]
else:
# Another request is already fetching, wait for it
future = asyncio.Future()
self.pending_requests[key].append(future)
# Wait for the first request to complete
return await future
# Usage
coalescer = RequestCoalescer()
async def get_user_cached(user_id):
"""Get user with request coalescing."""
async def fetch():
# This only gets called once even if 100 requests arrive simultaneously
return await db.query("SELECT * FROM users WHERE id = ?", user_id)
return await coalescer.coalesce(f"user:{user_id}", fetch)
我在一個流量高峰期經常遭受重複請求攻擊的服務中實施了這項措施。效果非常顯著:
資料庫負載下降了60%
反應時間縮短了40%。
我們用同樣的基建可以處理三倍的流量。
關鍵洞察:在高流量系統中,請求模式具有局部性。當一個使用者發出請求時,很可能許多其他使用者也會在同一時間發出相同的請求。合併請求正是利用了這種模式。
殘酷的現實是:單元測試無法捕捉導致生產環境崩潰的 bug。整合測試有幫助,但還遠遠不夠。你需要的是混沌工程和基於屬性的測試。
你不需要Netflix完整的Chaos Monkey設定。你可以在開發環境中從簡單的Chaos Monkey開始:
class ChaoticDependency:
"""Wraps a dependency to inject random failures."""
def __init__(self, real_dependency, failure_rate=0.1, slow_rate=0.2):
self.real = real_dependency
self.failure_rate = failure_rate
self.slow_rate = slow_rate
def __getattr__(self, name):
"""Wrap all method calls with chaos."""
real_method = getattr(self.real, name)
def chaotic_method(*args, **kwargs):
# Random failures
if random.random() < self.failure_rate:
raise ConnectionError("Chaotic failure injected")
# Random slowness
if random.random() < self.slow_rate:
time.sleep(random.uniform(2, 5))
return real_method(*args, **kwargs)
return chaotic_method
# In development/staging
if settings.CHAOS_ENABLED:
posts_service = ChaoticDependency(posts_service, failure_rate=0.1, slow_rate=0.2)
social_service = ChaoticDependency(social_service, failure_rate=0.15, slow_rate=0.15)
啟用此功能後執行整合測試。如果測試通過,隨機失敗率在 10% 以內,反應速度在 20% 以內,則表示系統具有良好的彈性。如果測試失敗,則表示您在生產環境出現問題之前就發現了真正的問題。
不要測試特定的輸入,而是測試那些應該始終為真的屬性:
from hypothesis import given, strategies as st
class TestUserProfile:
@given(st.integers(min_value=1, max_value=1000000))
def test_get_profile_always_returns_user_id(self, user_id):
"""Property: response should always include the requested user_id."""
profile = get_user_profile(user_id)
assert profile['user']['id'] == user_id
@given(st.integers(min_value=1, max_value=1000000))
def test_get_profile_never_returns_other_users_data(self, user_id):
"""Property: should never return data for a different user."""
profile = get_user_profile(user_id)
# Check all posts belong to this user
for post in profile.get('posts', []):
assert post['author_id'] == user_id
@given(st.integers(min_value=1, max_value=1000000))
def test_get_profile_is_idempotent(self, user_id):
"""Property: calling twice should return same result."""
profile1 = get_user_profile(user_id)
profile2 = get_user_profile(user_id)
assert profile1['user'] == profile2['user']
@given(st.lists(st.integers(min_value=1, max_value=1000), min_size=10, max_size=100))
def test_batch_get_profile_performance(self, user_ids):
"""Property: batch fetching should be more efficient than individual fetches."""
start = time.time()
for user_id in user_ids:
get_user_profile(user_id)
individual_time = time.time() - start
start = time.time()
batch_get_user_profiles(user_ids)
batch_time = time.time() - start
# Batch should be at least 2x faster
assert batch_time < individual_time / 2
基於屬性的測試發現了我程式碼中一些用基於範例的測試永遠無法發現的錯誤。它產生數百個隨機輸入,並檢查你的不變數是否始終成立。
資料庫遷移令人擔憂,因為它們通常需要停機。以下是如何在不停機的情況下完成資料庫遷移的方法:
第一階段:新增列(可為空)
ALTER TABLE users ADD COLUMN email_normalized VARCHAR(255) NULL;
CREATE INDEX CONCURRENTLY idx_users_email_normalized ON users(email_normalized);
部署此功能。該列已存在但尚未使用。無重大變更。
第二階段:雙寫
def create_user(email, name):
normalized_email = email.lower().strip()
return db.execute("""
INSERT INTO users (email, name, email_normalized)
VALUES (?, ?, ?)
""", email, name, normalized_email)
現在新記錄會同時填入這兩個欄位。舊記錄的 email_normalized 欄位仍然為 NULL。
第三階段:回填
def backfill_normalized_emails(batch_size=1000):
"""Backfill email_normalized for existing records."""
while True:
# Get batch of records without normalized email
users = db.execute("""
SELECT id, email
FROM users
WHERE email_normalized IS NULL
LIMIT ?
""", batch_size)
if not users:
break
# Update in batch
for user in users:
normalized = user['email'].lower().strip()
db.execute("""
UPDATE users
SET email_normalized = ?
WHERE id = ?
""", normalized, user['id'])
# Sleep to avoid overloading database
time.sleep(0.1)
logger.info(f"Backfilled {len(users)} users")
將此任務作為背景作業執行。它會在不鎖定表的情況下逐步遷移舊資料。
第四階段:開關讀取
def find_user_by_email(email):
normalized_email = email.lower().strip()
# Now use the new column
return db.query("""
SELECT * FROM users
WHERE email_normalized = ?
""", normalized_email)
啟用此功能。您現在正在閱讀新列的內容。
第五階段:移除舊柱
ALTER TABLE users DROP COLUMN email;
ALTER TABLE users RENAME COLUMN email_normalized TO email;
ALTER TABLE users ALTER COLUMN email SET NOT NULL;
只有在確認遷移成功後,才能刪除舊列。
這比簡單的遷移耗時更長,但可以實現零停機時間。使用者不會察覺到任何變化。
讓我分享一下我所知的最強大的除錯技術:鑑別診斷。
當生產環境中發生故障且無法找出原因時,請使用以下步驟:
第一步:準確定義症狀
缺點:“API 速度很慢”
良好:「/api/users/{id} 端點的 P99 延遲為 8 秒,但僅適用於使用者 ID 大於 1,000,000 的使用者」。
步驟二:確定發生了哪些變化
# Build a timeline of changes
changes = [
"2024-11-15 14:30 - Deployed v2.3.1",
"2024-11-15 15:00 - First slow request observed",
"2024-11-15 15:15 - Database backup completed",
"2024-11-15 15:30 - Traffic increased 40%",
]
通常情況下,問題與某個特定變化密切相關。
步驟三:形成假設
Hypothesis 1: New deployment introduced slow query
Hypothesis 2: Database backup caused resource contention
Hypothesis 3: Traffic spike exposed scalability issue
Hypothesis 4: New user IDs trigger different code path
第四步:系統性地檢驗假設
# Hypothesis 1: Roll back deployment in staging
if test_in_staging_with_old_code():
print("Problem persists, hypothesis 1 false")
# Hypothesis 2: Check database metrics during backup window
if database_metrics['cpu'] < 50 during backup:
print("Database not constrained, hypothesis 2 false")
# Hypothesis 3: Simulate traffic spike in load test
if problem_reproduces_under_load():
print("Hypothesis 3 likely true - investigate further")
步驟 5:在隔離環境中重複實驗
一旦有了可靠的假設,就要在盡可能簡單的環境中重現問題。這對於確認根本原因至關重要。
# If you think it's a data-specific issue:
def minimal_reproduction():
# Use production data snapshot
user_id = 1_500_000 # Known slow ID
with profiler.profile():
result = get_user_profile(user_id)
profiler.print_stats()
我用這種方法只花了 10 分鐘就找到了一些 bug,而如果用隨機除錯的話,可能需要好幾天才能找到。
安全不是事後增加的功能,而是貫穿所有決策的概念。以下是我建構的每個系統中行之有效的安全實踐:
永遠不要依賴單一的安全機制。要採用多層安全機制:
class SecureAPI:
def get_user_data(self, request):
# Layer 1: Authentication
user = self.authenticate(request)
if not user:
raise AuthenticationError("Invalid credentials")
# Layer 2: Authorization
requested_user_id = request.params['user_id']
if not self.authorize(user, 'read:user', requested_user_id):
raise AuthorizationError("Insufficient permissions")
# Layer 3: Rate limiting
if not self.check_rate_limit(user.id):
raise RateLimitError("Too many requests")
# Layer 4: Input validation
if not self.validate_user_id(requested_user_id):
raise ValidationError("Invalid user ID format")
# Layer 5: SQL injection prevention (parameterized queries)
user_data = db.query(
"SELECT * FROM users WHERE id = ?", # Parameterized
requested_user_id
)
# Layer 6: Output sanitization
return self.sanitize_output(user_data)
即使一層防護失效,其他層也能保護你。
# Bad: Service account with admin privileges
DATABASE_USER = "admin"
DATABASE_PASSWORD = "..."
# Good: Service account with only needed privileges
DATABASE_USER = "api_readonly" # Can only SELECT from specific tables
# In database:
# CREATE USER api_readonly;
# GRANT SELECT ON users, posts TO api_readonly;
# GRANT INSERT, UPDATE ON api_logs TO api_readonly;
如果您的服務遭到入侵,攻擊者只能執行您的服務帳戶被允許執行的操作,而不是所有操作。
預設安全
即使開發人員犯了錯誤,您的系統也應該是安全的。
# Bad: Explicitly allowing everything
CORS_CONFIG = {
'origins': '*',
'methods': '*',
'headers': '*'
}
# Good: Deny by default, allow explicitly
CORS_CONFIG = {
'origins': ['https://myapp.com', 'https://staging.myapp.com'],
'methods': ['GET', 'POST'],
'headers': ['Content-Type', 'Authorization']
}
# Even better: Environment-specific defaults
CORS_CONFIG = {
'origins': os.getenv('ALLOWED_ORIGINS', 'https://myapp.com').split(','),
'methods': ['GET', 'POST'] if os.getenv('ENVIRONMENT') == 'production' else ['*'],
}
同樣的原則也適用於資料序列化-除非明確允許,否則永遠不要暴露內部欄位。
class UserSerializer:
# Explicitly define what can be exposed
EXPOSED_FIELDS = {'id', 'email', 'name', 'created_at'}
def serialize(self, user):
return {
field: getattr(user, field)
for field in self.EXPOSED_FIELDS
if hasattr(user, field)
}
# Never do this:
# def serialize(self, user):
# return user.__dict__ # Exposes everything, including password hashes!
可隨系統擴展的團隊實踐
技術架構只是成功的一半。另一半是人為架構——你的團隊如何建構和營運系統。
不會導致人員過度疲勞的輪班值守制度
我吃過虧才明白這個道理:如果你的值班輪