在這篇文章中,我將引導您了解如何在TypeScript中建立事件驅動的Node.js 應用程式。我們將從傳統應用程式開始,然後採取所需的步驟,透過Pub/Sub進行通信,使服務鬆散耦合。
我們將了解如何在本地執行應用程式,以及如何將事件驅動的應用程式部署到雲端。
影片版本:
https://www.youtube.com/watch?v=hmrqTf4ReLw
我們將關注的應用程式是一個正常執行時間監控系統。我們有一個要監控的網站清單和一個 CronJob,用於檢查每個網站並查看它們是否可存取,如果任何狀態發生變化,我們的應用程式將發送通知。在 CronJob 檢查網站狀態之前,新新增網站的狀態將是unknown
。這是我們在使應用程式成為事件驅動時想要改變的事情之一。
以下是已完成應用程式的完整程式碼:https://github.com/encoredev/examples/tree/main/ts/uptime
這裡我們有兩個架構圖,左邊是我們目前的系統,右邊是我們完成後希望它的外觀。
在我們目前的狀態下,您可以看到我們有四個服務: frontend
、 monitor
、 site
和slack
。填充的箭頭表示monitor
服務正在呼叫site
和slack
服務中的端點, monitor
服務對這兩個服務具有硬依賴關係。我們還可以看到monitor
和site
服務都有資料庫。 monitor
服務具有 CronJob,每小時檢查一次每個站點的狀態。
因此,我們想要引入一個site.added
主題,每當我們新增網站時,我們都可以從site
服務發佈到該主題。我們將在monitor
服務中訂閱site.added
主題,然後在新增網站時 ping 網站以檢查狀態。
我們還希望透過引入uptime-transition
主題來消除monitor
和slack
服務之間的硬依賴。
在繼續之前,我們先來談談為什麼對我們的系統進行這些更改是個好主意。
透過進行這些更改,我們使我們的服務更加鬆散耦合,這幾乎總是一件好事。 monitor
服務不再需要知道slack
服務的存在,且site
服務可以保持獨立於monitor
服務。
slack
服務現在可以離線,而不會影響monitor
服務。然後,當它重新上線時,它只會從事件佇列中讀取並從中斷的地方繼續。因此系統更加健壯。
monitor
和slack
服務現在都依賴一個抽象,而不是彼此依賴,稱為依賴倒置。這使我們能夠替換或加入其他通知管道,例如不和諧或電子郵件,而無需對monitor
服務進行更改。
但讓系統事件驅動並不總是一個好主意。以下是一些需要注意的事項:
這不是一個全有或全無的方法!只需在最適合的地方使用 Pub/Sub。對此沒有必要教條主義。請不要教條主義……哇! 😂
僅僅用事件取代傳統的 API 呼叫是不夠的。使用非同步佇列時需要理解一些基本概念,特別是最終一致性和冪等性。花時間了解這些想法。相信我,以後你會感謝我的。
事件驅動的系統更加複雜。您需要建造更多元件,並且還有其他基礎設施需求。使用正確的工具來完成工作非常重要。如果沒有合適的工具,除錯問題和管理環境將非常令人沮喪。
這就是為什麼我們將使用Encore.ts來建立事件驅動的應用程式。 Encore.ts 是一個開源框架,專門設計用於更輕鬆地使用 TypeScript 建立健全且類型安全的分散式系統,就像我們今天要建立的事件驅動後端一樣。它有很多有用的內建工具,可以讓開發體驗更加流暢,例如我們稍後會看到的本地開發儀表板。
現在,讓我們來看一些程式碼。
從程式碼角度來看,服務只是使用 Encore 時儲存庫中的另一個資料夾。在建立事件驅動的應用程式時,您很可能最終會得到很多服務,因此建立新服務必須很容易。這就是 Encore 非常適合此類應用的原因之一。
讓我們先看看site
服務。
import { api } from "encore.dev/api";
import { SQLDatabase } from "encore.dev/storage/sqldb";
import knex from "knex";
// Site describes a monitored site.
export interface Site {
id: number;
url: string;
}
export interface AddParams {
url: string;
}
// Add a new site to the list of monitored websites.
export const add = api(
{ expose: true, method: "POST", path: "/site" },
async (params: AddParams): Promise<Site> => {
const site = (await Sites().insert({ url: params.url }, "*"))[0];
return site;
},
);
// Get a site by id.
export const get = api(
{ expose: true, method: "GET", path: "/site/:id", auth: false },
async ({ id }: { id: number }): Promise<Site> => {
const site = await Sites().where("id", id).first();
return site ?? Promise.reject(new Error("site not found"));
},
);
// Delete a site by id.
export const del = api(
{ expose: true, method: "DELETE", path: "/site/:id" },
async ({ id }: { id: number }): Promise<void> => {
await Sites().where("id", id).delete();
},
);
export interface ListResponse {
sites: Site[]; // Sites is the list of monitored sites
}
// Lists the monitored websites.
export const list = api(
{ expose: true, method: "GET", path: "/site" },
async (): Promise<ListResponse> => {
const sites = await Sites().select();
return { sites };
},
);
// Define a database named 'site', using the database migrations
// in the "./migrations" folder. Encore automatically provisions,
// migrates, and connects to the database.
const SiteDB = new SQLDatabase("site", {
migrations: "./migrations",
});
const orm = knex({
client: "pg",
connection: SiteDB.connectionString,
});
const Sites = () => orm<Site>("site");
該服務有一些 CRUD 端點,例如add
、 get
、 delete
和list
。我們對add
端點感興趣,因為我們希望在新增網站時發布事件。讓我們開始透過新增site.added
主題來使我們的應用程式成為事件驅動的。我們透過呼叫Topic
類別、指定將在該主題上發布的類型(在本例中為Site
類型)並指定交付保證來實現此目的。
import { Topic } from "encore.dev/pubsub";
export const SiteAddedTopic = new Topic<Site>("site.added", {
deliveryGuarantee: "at-least-once",
});
現在,在add
端點中,我們可以呼叫SiteAddedTopic
物件上的.publish
方法。
export const add = api(
{ expose: true, method: "POST", path: "/site" },
async (params: AddParams): Promise<Site> => {
const site = (await Sites().insert({ url: params.url }, "*"))[0];
await SiteAddedTopic.publish(site);
return site;
},
);
將 Pub/Sub 與 Encore 類型安全結合使用,因此如果您使用不正確的參數發佈到主題,則會出現編譯時錯誤 🤯
我們的架構圖現在如下圖所示:
我們正在發佈到site.added
主題,但我們尚未從monitor
服務訂閱它,所以讓我們現在解決這個問題。
那麼,我們就開啟monitor
服務吧。
import { api } from "encore.dev/api";
import { SQLDatabase } from "encore.dev/storage/sqldb";
import { Site } from "../site/site";
import { ping } from "./ping";
import { site, slack } from "~encore/clients";
import { CronJob } from "encore.dev/cron"; // Check checks a single site.
// Check checks a single site.
export const check = api(
{ expose: true, method: "POST", path: "/check/:siteID" },
async (p: { siteID: number }): Promise<{ up: boolean }> => {
const s = await site.get({ id: p.siteID });
return doCheck(s);
},
);
// CheckAll checks all sites.
export const checkAll = api(
{ expose: true, method: "POST", path: "/check-all" },
async (): Promise<void> => {
const sites = await site.list();
await Promise.all(sites.sites.map(doCheck));
},
);
async function doCheck(site: Site): Promise<{ up: boolean }> {
const { up } = await ping({ url: site.url });
const wasUp = await getPreviousMeasurement(site.id);
if (up !== wasUp) {
const text = `*${site.url} is ${up ? "back up." : "down!"}*`;
await slack.notify({ text });
}
await MonitorDB.exec`
INSERT INTO checks (site_id, up, checked_at)
VALUES (${site.id}, ${up}, NOW())
`;
return { up };
}
async function getPreviousMeasurement(siteID: number): Promise<boolean> {
const row = await MonitorDB.queryRow`
SELECT up
FROM checks
WHERE site_id = ${siteID}
ORDER BY checked_at DESC
LIMIT 1
`;
return row?.up ?? true;
}
const cronJob = new CronJob("check-all", {
title: "Check all sites",
every: "1h",
endpoint: checkAll,
});
export const MonitorDB = new SQLDatabase("monitor", {
migrations: "./migrations",
});
這裡我們有一個check
端點,用於檢查和更新單一站點的狀態。在 API 處理程序中,我們呼叫site
服務中的get
端點。我們透過encore/clients
資料夾導入服務,然後我們可以像呼叫常規函數一樣呼叫端點,並且具有完整的類型安全性。但最酷的部分是,在幕後,這些函數呼叫會轉換為實際的 HTTP 呼叫,從而產生追蹤和日誌。
doCheck
函數看起來就像我們在新增網站時要呼叫的函數,因此讓我們新增 Pub/Sub 訂閱者。
import { Subscription } from "encore.dev/pubsub";
const _ = new Subscription(SiteAddedTopic, "check-site", {
handler: doCheck,
});
為此,我們呼叫Subscription
類,傳入我們想要訂閱的主題、訂閱的名稱和選項物件。在選項物件中,我們只需要指定處理程序,也就是為每個新事件呼叫的函數。 doCheck
函數接受一個Site
,因此無需執行任何其他操作。
那麼,這是如何運作的呢? Encore 配備了自動本地基礎架構。當您使用encore run
在本機啟動 Encore 應用程式時,Encore 會自動在電腦上啟動您的應用程式所需的所有基礎設施,包括資料庫和 Pub/Sub。因此,您無需編寫 YAML,或設定 Docker Compose 以及 LocalStack 等其他工具來執行您的環境。
Encore 還配備了一個內建的開發儀表板。當您啟動 Encore 應用程式時,開發儀表板可在連接埠localhost:9400上使用。從這裡您可以呼叫端點,有點像 Postman。對應用程式的每次呼叫都會產生一個跟踪,您可以檢查該跟踪以查看 API 請求、資料庫呼叫和 Pub/Sub 訊息。
https://www.youtube.com/watch?v=Da\_jHj6bLac
開箱即用的本地追蹤並能夠像這樣輕鬆除錯應用程式,這是 Encore 在建置事件驅動應用程式時成為絕佳選擇的另一個原因。
本機開發儀表板還包括具有自動 API 文件的服務目錄。哦,順便說一下。本文前面的漂亮架構圖是Encore Flow ,它也內建在開發儀表板中。它始終是您系統的最新表示,並隨著您的開發而即時變化。 😎
那麼,我們要如何部署這個應用程式呢?那麼,您可以使用encore build
來建立您的應用程式,並將其作為 Docker 映像獲得,您可以將其部署到任何您想要的地方。您需要提供執行時配置,您可以在其中指定應用程式應如何連接到基礎架構(例如 Pub/Sub 和資料庫)。如果您不想手動管理這些東西,您可以使用 Encore 的雲端平台,它會自動在AWS或GCP上的雲端帳戶中設定所需的基礎設施,並且它帶有內建 CI/CD,因此您只需推送部署。該平台還附帶監控、追蹤和自動預覽環境,因此您可以在專用的臨時環境中測試每個拉取請求。
如果您想自己使用 Uptime 應用程式,您可以透過安裝 Encore輕鬆實現,然後在終端機中執行encore app create
。選擇 TypeScript,然後在入門範本清單中選擇Uptime
應用程式。您需要安裝 Docker 桌面,因為這是本機建立資料庫所需的。簽出程式碼後,您也可以查看slack
服務所使用的uptime-transition
主題。
⭐️在 GitHub 上給予這個專案一顆星來支持 Encore。
查看 Encore 的範例儲存庫,您可以在其中找到一堆可部署的應用程式。
如果您有疑問或想要分享您的工作,請加入Discord 上 Encore 社群的開發人員聚會。
https://dev.to/encore/typescript-vs-go-choosing-your-backend-language-2bc5
原文出處:https://dev.to/encore/building-simple-event-driven-applications-with-pubsub-k57