Microsoft Agent Framework 是一個由 Microsoft 提供的 OSS AI 代理及多代理工作流程的開發框架。
它整合了 Microsoft 現有的代理框架,Semantic Kernel 和 AutoGen 的優勢,並被定位為同一團隊構建的下一代代理開發基礎。

該框架主要由以下類別組成。
| 類別 | 內容 |
|---|---|
| 代理 | 負責透過 LLM 做出決策、執行工具和生成回應的個別代理。支援多個模型供應商,如 Azure 和 OpenAI。 |
| 工作流程 | 將多個代理和函數連接成圖形結構,以定義工作流程。支援類型安全路由/嵌套/Human-in-the-Loop/檢查點重啟等。 |
Agent Framework 的所有代理都必須繼承 BaseAgent(即滿足 AgentProtocol)作為共同要求。
這樣的設計使得 作為代理的行為標準化,即使是像 Azure AI Foundry 代理或 Copilot Studio 代理這樣不同實作基礎的代理,也能保證使用相同接口進行處理。
換句話說,最大的價值在於 抽象化後端差異,安全地加載多代理協作的能力。

AgentProtocol 定義了「作為代理所需的行為」。
| 種類 | 例子 |
|---|---|
| 屬性 | id, name, display_name, description |
| 方法 | run(...), run_stream(...), get_new_thread(...) |
BaseAgent 是滿足 AgentProtocol 的代理的「共同基礎」。
AgentThread 的相容實作).as_tool())雖然開發者直接接觸的機會不多,但所有代理都在這個層次上保證運作一致。
ChatAgent 是 BaseAgent 的通用實作,也是使用頻率最高的類別。
只需將準則符合 ChatClientProtocol 的客戶端(如 Azure / OpenAI)插入 chat_client 即可轉變為代理。
from agent_framework import ChatAgent
from agent_framework.azure import AzureOpenAIChatClient
agent = ChatAgent(
chat_client=AzureOpenAIChatClient(...),
name="assistant",
instructions="Be helpful and concise.",
tools=[...], # 可選:函數/MCPTool 等
)
如果符合 AgentProtocol,則可以直接連接為工作流程中的節點(Executor)。
因此,
儘管它們的實作來源和執行環境不同,框架的整體仍能實現統一的多代理協同工作。
Agent Framework 的代理解釋使用者意圖並利用 LLM 推論,根據需要自主呼叫工具以解決任務。
以下是簡單的實作範例。
import asyncio
import os
from agent_framework import ChatAgent
from agent_framework.azure import AzureOpenAIChatClient
# ---------- 定義函數 ----------
def get_weather(location: str) -> str:
"""取得某地的天氣。"""
return f"Weather in {location}: 72°F and sunny"
# ---------- 創建代理 ----------
agent = ChatAgent(
name="WeatherAgent",
chat_client=AzureOpenAIChatClient(...),
instructions="你是一個優秀的助手。",
tools=[get_weather],
)
# ---------- 執行代理 ----------
async def main():
response = await agent.run("巴黎的氣溫是多少?")
print(f"助手: {response}")
asyncio.run(main())
# ---------- 輸出範例 ----------
# 助手: 巴黎的當前氣溫約22°C(72°F),並且是晴天。
代理可使用的工具主要分為 函數型工具 和 託管型工具。
| 分類 | 說明 | 典型用途 |
|---|---|---|
| 函數型工具 | 將 Python 函數等工具化,LLM 直接呼叫的形式。 | 自訂處理/本地執行 |
| 託管型工具 | 直接使用服務方內建的工具,如 Web 搜索、代碼執行、檔案搜索、MCP 等。 | 減低運營負擔 |
使用代理客戶端的 服務方提供的內建工具。例如 Web 搜索、代碼執行、檔案搜索和 MCP 連接等。
由於認證、執行和擴展等由服務方負責,因此客戶端實作可以簡化。
from agent_framework import ChatAgent, HostedCodeInterpreterTool
from agent_framework.azure import AzureOpenAIResponsesClient
# ---------- 使用託管型代碼執行工具 ----------
agent = ChatAgent(
name="CodeInterpreterAgent",
chat_client=AzureOpenAIResponsesClient(...),
instructions="...",
tools=HostedCodeInterpreterTool(),
)
將現有代理「工具化」,使其可以從其他代理中被呼叫。
這樣,您可以將專家代理像函數工具一樣使用,並迅速構建簡單的多代理配置。
from agent_framework import ChatAgent
from agent_framework.azure import AzureOpenAIChatClient
# ---------- 構建子代理並工具化 ----------
specialist = ChatAgent(
name="WeatherExpert",
chat_client=AzureOpenAIChatClient(...),
instructions="只回答天氣問題的專家。",
)
weather_tool = specialist.as_tool(...)
# ---------- 構建父代理 ----------
manager = ChatAgent(
name="Manager",
chat_client=AzureOpenAIChatClient(...),
instructions="分解問題,必要時使用工具解決。",
tools=[weather_tool], # ← 指定為工具
)
在 Azure Framework 中,還支援 MCP 工具。
from agent_framework import ChatAgent, MCPStreamableHTTPTool
from agent_framework.azure import AzureOpenAIChatClient
mcp = MCPStreamableHTTPTool(
name="Microsoft Learn MCP",
url="https://learn.microsoft.com/api/mcp",
)
agent = ChatAgent(
name="DocsAgent",
chat_client=AzureOpenAIChatClient(...),
instructions="必要時通過 MCP 搜尋內部文檔回答。",
tools=[mcp],
)
中介軟體可以在代理執行的每個階段介入並執行函數。這樣可以實現不改變核心邏輯的橫向功能。
Agent Framework 提供了三種類型的中介軟體,根據處理粒度而定。
| 類型 | 介入點 | 主要用途 |
|---|---|---|
| 代理 | 從開始到結束 | 執行計測、快取回應、全域政策 |
| 聊天 | LLM 呼叫前後 | PII 隱藏、提示增強、替換 |
| 函數 | 呼叫工具前後 | 參數驗證、白名單/黑名單、批准 |
<details><summary>範例程式碼</summary>
import asyncio
import os
from agent_framework import ChatAgent
from agent_framework.azure import AzureOpenAIChatClient
from typing import Callable, Awaitable
from agent_framework import AgentRunContext, FunctionInvocationContext, ChatContext
# ---------- 定義代理中介軟體 ----------
async def logging_agent_middleware(
context: AgentRunContext,
next: Callable[[AgentRunContext], Awaitable[None]],
) -> None:
"""在代理執行前後記錄日誌"""
print("> 在代理執行前")
await next(context)
print("> 在代理執行後")
# ---------- 定義聊天中介軟體 ----------
async def logging_chat_middleware(
context: ChatContext,
next: Callable[[ChatContext], Awaitable[None]],
) -> None:
"""在 LLM 呼叫前後記錄日誌"""
print(f" > 在 LLM 呼叫前。訊息數: {len(context.messages)}")
for m in context.messages:
print(f" - {m.role}: {m.text}")
await next(context)
print(" > 在 LLM 呼叫後")
# ---------- 定義函數中介軟體 ----------
async def logging_function_middleware(
context: FunctionInvocationContext,
next: Callable[[FunctionInvocationContext], Awaitable[None]],
) -> None:
"""在呼叫工具前後記錄日誌"""
print(f" > 在函數呼叫前: {context.function.name}")
await next(context)
print(" > 在函數呼叫後")
# ---------- 定義函數 ----------
def get_weather(location: str) -> str:
"""取得某地的天氣。"""
return f"Weather in {location}: 72°F and sunny"
# ---------- 創建代理 ----------
agent = ChatAgent(
name="WeatherAgent",
chat_client=AzureOpenAIChatClient(...),
instructions="你是一個優秀的助手。",
tools=[get_weather],
middleware=[
logging_agent_middleware,
logging_function_middleware,
logging_chat_middleware,
],
)
# ---------- 執行代理 ----------
async def main():
response = await agent.run("巴黎的氣溫是多少?")
print(f"助手: {response}")
asyncio.run(main())
</details>
Agent Framework 在代理的記憶方面,標準提供了 短期記憶(執行緒/對話歷史) 和 長期記憶(偏好/知識的延續) 兩層,這使得 對話持續性 與 個性化 可以並存。

短期記憶由 AgentThread 管理,透過持續傳遞同一執行緒,能夠根據「前次發話或工具執行的結果」做出回應。
# ---------- 創建新執行緒 ----------
thread = agent.get_new_thread()
# ---------- 第一次回合 ----------
response = await agent.run("日本的首都在哪裡?", thread=thread)
print(response) # 例: 東京。
# ---------- 第二次回合 ----------
response = await agent.run("那裡的人口是多少?", thread=thread)
print(response) # 例: 東京都的估計人口約1400萬。
長期記憶處理的知識是跨執行緒的。
由 ContextProvider 組成,儲存和管理的工作由位於 AgentThread 中的 context_provider(即 AggregateContextProvider)負責。
可以在代理執行 前後 插入方法進行讀寫。

| 時機 | 方法 | 角色 |
|---|---|---|
| 執行代理 前 | invoking() |
從記憶中組裝 instructions / messages / tools 並注入額外上下文到 LLM |
| 執行代理 後 | invoked() |
提取並保存會話中應學習的信息。 |
<details><summary>範例程式碼</summary>
<p>以下範例展示如何學習「使用者喜歡的城市」,並在下次回應中加以利用。</p>
from agent_framework import ContextProvider, Context
# ---------- 定義上下文提供者 ----------
class UserPrefMemory(ContextProvider):
def __init__(self):
self.pref_city = None # 使用者偏好儲存(例:喜歡的城市)
# 執行"前" : 將記憶注入 LLM
async def invoking(self, self, messages, **kwargs) -> Context:
if self.pref_city:
return Context(
instructions=f"使用者喜歡 {self.pref_city},在回應中優先提及。"
# instructions 不僅可以注入,messages/tools 也可以
)
return Context()
# 執行"後" : 檢測新的偏好 → 記憶
async def invoked(self, self, request_messages, response_messages=None, **kwargs) -> None:
text = request_messages[-1].text if request_messages else ""
if "我喜歡 " in text:
self.pref_city = text.split("我喜歡 ", 1)[1].split()[0]
# ---------- 初始化上下文提供者 ----------
memory = UserPrefMemory()
# ---------- 創建代理和執行緒 ----------
agent = ChatAgent(
name="MemoryAgent",
chat_client=AzureOpenAIChatClient(...),
instructions="利用上下文仔細回答。",
context_providers=[memory],
)
thread = agent.get_new_thread()
# ---------- 執行 ----------
async def main():
# 第一次:學習階段
await agent.run("我喜歡巴黎", thread=thread)
# 第二次:應用階段(巴黎將反映在回答中)
await agent.run("有什麼推薦的景點?", thread=thread)
# 確認長期記憶
user_pref_memory = thread.context_provider.providers[0]
if user_pref_memory:
print("\n=== 記憶的資訊 ===")
print(f"使用者喜歡的城市: {user_pref_memory.pref_city}")
asyncio.run(main())
# ---------- 輸出範例 ----------
# === 記憶的資訊 ===
# 使用者喜歡的城市: 巴黎
</details>
工作流程是通過組合多個 Executor 和將它們連接的 Edge,將處理的流程以圖形形式表現的機制。
在 Agent Framework 中,通過並行執行、分流與合流、狀態管理以及 Human-in-the-Loop 等功能,可以定義和執行複雜的業務過程作為工作流程。

工作流程由以下四個要素組成。
| 要素名稱 | 角色 | 說明 |
|---|---|---|
| Executor | 處理單元 | 接收輸入、進行處理並將結果發送到下一步或最終輸出 |
| Edge | 連接 | 將 Executor 之間連接起來,表達路由、條件分支和並行化 |
| Workflow | 執行器 | 由 Executor 和 Edge 構成的有向圖 |
| Event | 可觀察性 | 追踪執行過程中的狀態和響應(支援串流/非串流) |
Executor 是工作流程的最小單位。
其角色是 接收輸入 → 進行處理 → 返回輸出(訊息或最終結果)。
可以同時使用這兩種功能。
Executor 的定義方法有以下兩種模式受支持。
| 語法 | 說明 | 適合用途 |
|---|---|---|
類別繼承 + @handler |
可以對每種類型有多個處理器,控制明確 | 狀態管理/多輸入情境 |
函數基礎(@executor) |
最簡潔,一個處理完成 | 小型函數性處理 |
類別基礎範例
class UpperCase(Executor):
@handler
async def to_upper_case(self, text: str, ctx: WorkflowContext[str]) -> None:
await ctx.send_message(text.upper())
函數基礎範例
@executor(id="upper_case_executor")
async def upper_case(text: str, ctx: WorkflowContext[str]) -> None:
await ctx.send_message(text.upper())
在 Agent Framework 中,通過對 Executor 添加 類型註釋,可以明確宣告:
在構建工作流程時(構建時)將會檢查類型一致性,這使得
即使在大型和複雜的流程中也能安全構建,這是一個顯著特點。

輸入類型(Input Type)
表示 Executor 接收的訊息 類型。
| 樣式 | 指定輸入類型(Input Type)的位置 |
|---|---|
| 類別基礎 | @handler 方法的第二個參數(第一個是 self) |
| 函數基礎 | 函數的第一個參數 |
訊息傳遞類型(T_Out)
表示將數據傳遞到下一個 Executor 的訊息類型和傳遞方式。
WorkflowContext[T_Out, T_W_Out] 的 第一個 T_Out 中指定類型。ctx.send_message() 中指定 向下一個 Executor 傳遞的數據。class UpperCase(Executor):
@handler
async def to_upper_case(self, text: str, ctx: WorkflowContext[str]) -> None:
await ctx.send_message(text.upper()) # 使 T_Out = str
工作流程輸出類型(T_W_Out)
當需向工作流程全局返回數據的類型及其傳遞方式。
WorkflowContext[T_Out, T_W_Out] 的 第二個 T_W_Out 中指定類型。ctx.yield_output() 中指定 工作流程的最終輸出。@executor(id="reverse_text_executor")
async def reverse_text(text: str, ctx: WorkflowContext[Never, str]) -> None:
await ctx.yield_output(text[::-1]) # 使 T_W_Out = str
類型指定範例
| 範例 | 訊息傳遞 | 最終輸出 | 意義 |
|---|---|---|---|
WorkflowContext |
無 | 無 | 行為等同於 [Never, Never] |
WorkflowContext[str] |
str | 無 | 僅傳遞訊息 |
WorkflowContext[Any, Any] |
Any | Any | 兩者皆無類型限制(適合試作) |
WorkflowContext[Never, int] |
無 | int | 僅返回最終輸出 |
構建工作流程時,使用 WorkflowBuilder 類。
WorkflowBuilder# 1. 初始化 WorkflowBuilder
builder = WorkflowBuilder()
# 2. 指定起始 Executor
builder.set_start_executor(upper_case)
# 3. 通過 Edge 連接 Executors(設計有向圖)
builder.add_edge(executor1, executor2)
builder.add_fan_out_edges(executor2, [executor3, executor4])
builder.add_fan_in_edge([executor3, executor4], executor5)
...
# 4. Build 確定執行模型
workflow = builder.build()
在建構時,將進行以下 靜態驗證,以防止執行時錯誤。
| 驗證項目 | 內容 |
|---|---|
| 類型兼容性 | 確認連接的 Executor 之間的訊息類型是兼容的 |
| 圖形連接性 | 驗證所有 Executor 從 起始點 都可 到達 |
| Executor 綁定 | 確認每個 Executor 是否正確 綁定/實例化 |
| Edge 驗證 | 檢測 重複的邊 和 無效的連接 |
Agent Framework 的 Edge 提供直接可用於業務流程設計的路由手段。可以靈活表達條件分支和分流/合流等。
| 邊緣類型 | 說明 |
|---|---|
| 直接邊緣 | 一對一直接連接 |
| 條件邊緣 | 僅在條件滿足時通過 |
| 開關/情況邊緣 | 按任意條件的多選擇 |
| 分流邊緣 | 1 → 多的分支(並行執行) |
| 合流邊緣 | 多 → 1 的合併(匯總並行結果) |
將兩個 Executors 直列連接。使用 add_edge(source, target)。

<details><summary>範例程式碼</summary>
import asyncio
from agent_framework import (
WorkflowBuilder,
WorkflowContext,
executor,
)
from typing_extensions import Never
# ---------- 定義 Executor ----------
@executor(id="upper_case_executor")
async def upper_case(text: str, ctx: WorkflowContext[str]) -> None:
"""將輸入文本轉為大寫"""
await ctx.send_message(text.upper())
@executor(id="reverse_text_executor")
async def reverse_text(text: str, ctx: WorkflowContext[Never, str]) -> None:
"""反轉輸入文本"""
await ctx.yield_output(text[::-1])
# ---------- 構建工作流程 ----------
# 工作流程的構建
# 1. 初始化 WorkflowBuilder
# 2. 指定起始 Executor
# 3. 通過 Edge 連接 Executors(設計有向圖)
# 4. Build 確定執行模型
builder = WorkflowBuilder()
builder.set_start_executor(upper_case)
builder.add_edge(upper_case, reverse_text)
workflow = builder.build()
# ---------- 執行工作流程 ----------
async def main():
events = await workflow.run("hello world")
print(events.get_outputs())
if __name__ == "__main__":
asyncio.run(main())
# ---------- 輸出範例 ----------
# ['DLROW OLLEH']
</details>
僅在滿足特定條件的情況下可通過的「條件邊緣」。
透過 add_edge(..., condition=<條件函數>) 方式指定 condition 屬性,只有當該函數返回 True 時,邊緣才會啟用。

<details><summary>範例程式碼</summary>
import asyncio
from typing import Never
from agent_framework import (
WorkflowBuilder,
WorkflowContext,
executor,
)
# ---------- 定義 Executor ----------
@executor(id="start")
async def start(text: str, ctx: WorkflowContext[str]) -> None:
"""起始節點: 將輸入字串直接流到下游"""
await ctx.send_message(text)
@executor(id="handle_normal")
async def handle_normal(text: str, ctx: WorkflowContext[Never, str]) -> None:
"""正常處理: 返回最終輸出(yield_output)"""
await ctx.yield_output(f"OK: {text}")
@executor(id="handle_spam")
async def handle_spam(text: str, ctx: WorkflowContext[Never, str]) -> None:
"""垃圾郵件處理: 返回最終輸出(yield_output)"""
await ctx.yield_output("SPAM: blocked")
# ---------- 路由條件 ----------
def is_spam(msg: str) -> bool:
"""檢查消息是否含有 'spam'(簡單的條件判斷)"""
return isinstance(msg, str) and ("spam" in msg.lower())
def is_not_spam(msg: str) -> bool:
"""檢查是否不是垃圾郵件"""
return isinstance(msg, str) and ("spam" not in msg.lower())
# ---------- 構建工作流程 ----------
builder = WorkflowBuilder()
builder.set_start_executor(start)
builder.add_edge(start, handle_spam, condition=is_spam) # - 含有 "spam" → handle_spam
builder.add_edge(start, handle_normal, condition=is_not_spam) # - 不含 "spam" → handle_normal
workflow = builder.build()
# ---------- 執行示例 ----------
async def main():
events = await workflow.run("Hello team")
print("輸出(正常):", events.get_outputs()[0])
events = await workflow.run("Buy now!! SPAM offer!!")
print("輸出(垃圾郵件):", events.get_outputs()[0])
if __name__ == "__main__":
asyncio.run(main())
# ---------- 輸出範例 ----------
# 輸出(正常): OK: Hello team
# 輸出(垃圾郵件): SPAM: blocked
</details>