大家在想要從某個 Lambda 函數調用其他 Lambda 函數時會怎麼做呢?本文將介紹反模式及其解決方案。
這是本文的主題。首先,我粗略準備了兩個 Lambda 函數。一個為輸出 Hello World 的 lambda_callee.py
,另一個為調用此 Lambda 函數的 lambda_caller.py
。lambda_caller.py
被賦予了調用 Lambda 函數的權限。
import json
import boto3
def lambda_handler(event, context):
# 建立 Lambda 客戶端
lambda_client = boto3.client('lambda')
# 調用 Lambda 函數 B
response = lambda_client.invoke(FunctionName='lambda_callee')
# 讀取回應
payload = json.loads(response['Payload'].read())
print(f"Lambda callee response: {payload}")
return {
'statusCode': 200,
'body': json.dumps({
'message': 'Successfully called Lambda callee',
'lambda_callee_response': payload
})
}
import json
def lambda_handler(event, context):
message = "Hello World"
print(message)
return {
'statusCode': 200,
'body': json.dumps({
'message': message,
})
}
當測試執行 lambda_caller.py
時,lambda_callee.py
也能順利執行並回傳回應。若是只要能運行,則可能不會有問題。
可以確認被調用的 Lambda 也被執行了。
然而,這種執行方式會有以下問題。
AWS 帳戶的配額之一是 Lambda 的同時執行數。預設情況下,此數量為 1000。雖然此同時執行數可以提升,但如同前面範例所示,若一個請求需執行兩個 Lambda 函數時,將會額外消耗這個同時執行數。若有 500 個請求來到 lambda_caller
,總共將執行 1000 個函數,更後面的請求會發生節流。
此外,此配額是 AWS 帳戶層級的限制,因此若有其他 Lambda 函數存在,也可能會影響其運行。雖然可以持續提高 AWS 帳戶的同時執行數,但若能避免一次請求造成即時兩個同時執行的配置,則為上策。
在範例函數中,若處理相對簡單且執行時間不長的情況下,這可能不會造成問題。然而,若需要等待生成 AI 的回應,則可能會造成函數長達數十秒的執行。此時,呼叫來源的 Lambda 函數只是等待被呼叫的 Lambda 函數完成。這樣的狀態會引發成本的問題。
如前所述,依據 Lambda 函數的內容,可能會發生等待時間。在等待的那段時間,即使沒有進行任何處理,Lambda 仍被視為執行中,將會因此產生計費。若僅計算處理時間倒無妨,但對於等待時間也需計費的設計應盡量避免。
Lambda 函數可以設定 Timeout 時間(預設為 3 秒)。一般來說,Timeout 值應該隨著距離客戶端的增加而縮短。
此處出於方便使用「函數」這種表達,實際上可替換為「系統」或「(內部/外部的)API」。在系統間進行聯結時,也需要多加思考 Timeout 問題。
下圖顯示自客戶端越近的 Timeout 值 A > B > C。
然而,如下圖所示,若 Timeout 值 A > C > B 能會發生什麼問題呢?
雖然函數 C 本身能正常處理,但函數 B 卻因 Timeout 而回傳錯誤。結果函數 A 也會對客戶端回傳錯誤回應。雖然客戶端可能會有重試機制,但對於函數 C 的處理請求卻會又飛來已處理過的請求,這樣會變得相當複雜(這一點也相關到冪等性的問題)。
因此,對於 Timeout 值也需要盡可能仔細考慮。
提及 Timeout 值的問題是因為這與可維護性有關。因為函數之間是直接連接的,所以在修改函數代碼時,必須時刻考慮到其他函數。考慮重試機制時也同樣如此。以上問題皆可視為因為過於緊密耦合而發生的問題。
當 Lambda 函數調用時,可以設置 InvocationType
這個參數。預設為 RequestResponse
,這是同步處理,即需要等待被呼叫端的處理完成。
另一方面,若將 InvocationType
設為 Event
,則會變成非同步處理,可能可以解決上述某些問題。然而,實際上仍然存在一些未處理的問題,如錯誤處理無法實現,無法確認呼叫的函數是否正確處理等,因此並不能算根本的解決方法。
接下來是當想從 Lambda 函數調用其他 Lambda 函數時的解決方法。
個人認為這是最簡單的解決方案。
事先建立好 SQS。
調用來源的 Lambda 修改為發送消息到 SQS。調用來源的 Lambda 需要有發送消息到 SQS 的權限。
import json
import boto3
QUEUE_URL = 'https://sqs.ap-northeast-1.amazonaws.com/(AWS帳戶ID)/lambda_queue'
def lambda_handler(event, context):
# 建立 SQS 客戶端
sqs_client = boto3.client('sqs')
message_body = {
'message': 'Hello from Lambda caller!'
}
# 發送消息到 SQS 隊列
response = sqs_client.send_message(
QueueUrl=QUEUE_URL,
MessageBody=json.dumps(message_body),
)
# 讀取回應
payload = response['MessageId']
print(f"SQS response: {payload}")
return {
'statusCode': 200,
'body': json.dumps({
'message': 'Successfully SQS Request',
'sqs_response': payload
})
}
接下來也修改調用的 Lambda。
import json
def lambda_handler(event, context):
# 處理來自 SQS 的消息
for record in event['Records']:
# 獲取消息內容
message_body = json.loads(record['body'])
print(f"Processing message: {message_body['message']}")
return {
'statusCode': 200,
}
為調用的 Lambda 設置觸發器,以便能從 SQS 獲取消息。在這個時候,需要賦予 Lambda 獲取 SQS 消息的權限。
在這個狀態下測試 caller 端的 Lambda 時,成功執行得到了確認。
也可以確認調用的 Lambda 被執行。
利用 SQS 的 DLQ(死信隊列)能實現錯誤處理。
通過引入 SQS,使得 Lambda 之間的關聯變得鬆散。在成本方面,雖然需要支付 SQS 的使用費,但在每月的前100萬請求是免費的,所以依規模而定,這個成本應該好在接受範圍內。
在 Step Functions 的方式中,簡單的準備了一個返回狀態碼 200 的調用來源 Lambda 和一個調用的 Lambda。
import json
def lambda_handler(event, context):
print('Caller Lambda')
return {
'statusCode': 200,
'body': json.dumps({
'message': 'Successfully SF Request',
})
}
import json
def lambda_handler(event, context):
print('Callee Lambda')
return {
'statusCode': 200,
}
然後,從 Step Functions 的 GUI 選擇 Lambda Invoke 並進行配置。
在創建工作流程時,所需的角色會自動給予。
接著執行工作流程,確認能夠運行兩個 Lambda 函數。
也可以從日誌檢查到各自的執行。
使用 Step Functions 可以簡單地在 GUI 中設定,且不需要編寫調用其他 Lambda 的代碼,提高了可讀性,這一點也是其優點。
在這裡雖然未進行測試,但也可以考慮使用 EventBridge 的方法。
Lambda 函數的 invoke 使用十分便利,但實際使用時仍需考慮錯誤處理等方面。無論是 Lambda 還是系統(服務)之間,都需要保持鬆散耦合的觀念。
此次輕鬆地使用了 Step Functions,構建起來比預想中簡單。在需要更複雜的系統間聯結時,可以考慮使用。
<iframe id="qiita-embed-content__e27d37f57166f1e8b040f52aff5230a9"></iframe>