在這篇文章中,我將引導您了解如何在TypeScript中建立事件驅動的Node.js 應用程式。我們將從傳統應用程式開始,然後採取所需的步驟,透過Pub/Sub進行通信,使服務鬆散耦合。

我們將了解如何在本地執行應用程式,以及如何將事件驅動的應用程式部署到雲端

影片版本:

https://www.youtube.com/watch?v=hmrqTf4ReLw

應用

我們將關注的應用程式是一個正常執行時間監控系統。我們有一個要監控的網站清單和一個 CronJob,用於檢查每個網站並查看它們是否可存取,如果任何狀態發生變化,我們的應用程式將發送通知。在 CronJob 檢查網站狀態之前,新新增網站的狀態將是unknown 。這是我們在使應用程式成為事件驅動時想要改變的事情之一。

正常執行時間應用程式

以下是已完成應用程式的完整程式碼:https://github.com/encoredev/examples/tree/main/ts/uptime

建築學

這裡我們有兩個架構圖,左邊是我們目前的系統,右邊是我們完成後希望它的外觀。

建築圖

在我們目前的狀態下,您可以看到我們有四個服務: frontendmonitorsiteslack 。填充的箭頭表示monitor服務正在呼叫siteslack服務中的端點, monitor服務對這兩個服務具有硬依賴關係。我們還可以看到monitorsite服務都有資料庫。 monitor服務具有 CronJob,每小時檢查一次每個站點的狀態。

因此,我們想要引入一個site.added主題,每當我們新增網站時,我們都可以從site服務發佈到該主題。我們將在monitor服務中訂閱site.added主題,然後在新增網站時 ping 網站以檢查狀態。

我們還希望透過引入uptime-transition主題來消除monitorslack服務之間的硬依賴

好處

在繼續之前,我們先來談談為什麼對我們的系統進行這些更改是個好主意。

  • 透過進行這些更改,我們使我們的服務更加鬆散耦合,這幾乎總是一件好事。 monitor服務不再需要知道slack服務的存在,且site服務可以保持獨立於monitor服務。

  • slack服務現在可以離線,而不會影響monitor服務。然後,當它重新上線時,它只會從事件佇列中讀取並從中斷的地方繼續。因此系統更加健壯

  • monitorslack服務現在都依賴一個抽象,而不是彼此依賴,稱為依賴倒置。這使我們能夠替換或加入其他通知管道,例如不和諧或電子郵件,而無需對monitor服務進行更改。

陷阱

但讓系統事件驅動並不總是一個好主意。以下是一些需要注意的事項:

  • 這不是一個全有或全無的方法!只需在最適合的地方使用 Pub/Sub。對此沒有必要教條主義。請不要教條主義……哇! 😂

  • 僅僅用事件取代傳統的 API 呼叫是不夠的。使用非同步佇列時需要理解一些基本概念,特別是最終一致性冪等性。花時間了解這些想法。相信我,以後你會感謝我的。

  • 事件驅動的系統更加複雜。您需要建造更多元件,並且還有其他基礎設施需求。使用正確的工具來完成工作非常重要。如果沒有合適的工具,除錯問題和管理環境將非常令人沮喪。

這就是為什麼我們將使用Encore.ts來建立事件驅動的應用程式。 Encore.ts 是一個開源框架,專門設計用於更輕鬆地使用 TypeScript 建立健全且類型安全的分散式系統,就像我們今天要建立的事件驅動後端一樣。它有很多有用的內建工具,可以讓開發體驗更加流暢,例如我們稍後會看到的本地開發儀表板。

現在,讓我們來看一些程式碼。

加入我們的 Pub/Sub 主題

從程式碼角度來看,服務只是使用 Encore 時儲存庫中的另一個資料夾。在建立事件驅動的應用程式時,您很可能最終會得到很多服務,因此建立新服務必須很容易。這就是 Encore 非常適合此類應用的原因之一。

讓我們先看看site服務。

import { api } from "encore.dev/api";
import { SQLDatabase } from "encore.dev/storage/sqldb";
import knex from "knex";

// Site describes a monitored site.
export interface Site {
  id: number;
  url: string;
}

export interface AddParams {
  url: string;
}

// Add a new site to the list of monitored websites.
export const add = api(
  { expose: true, method: "POST", path: "/site" },
  async (params: AddParams): Promise<Site> => {
    const site = (await Sites().insert({ url: params.url }, "*"))[0];
    return site;
  },
);

// Get a site by id.
export const get = api(
  { expose: true, method: "GET", path: "/site/:id", auth: false },
  async ({ id }: { id: number }): Promise<Site> => {
    const site = await Sites().where("id", id).first();
    return site ?? Promise.reject(new Error("site not found"));
  },
);

// Delete a site by id.
export const del = api(
  { expose: true, method: "DELETE", path: "/site/:id" },
  async ({ id }: { id: number }): Promise<void> => {
    await Sites().where("id", id).delete();
  },
);

export interface ListResponse {
  sites: Site[]; // Sites is the list of monitored sites
}

// Lists the monitored websites.
export const list = api(
  { expose: true, method: "GET", path: "/site" },
  async (): Promise<ListResponse> => {
    const sites = await Sites().select();
    return { sites };
  },
);

// Define a database named 'site', using the database migrations
// in the "./migrations" folder. Encore automatically provisions,
// migrates, and connects to the database.
const SiteDB = new SQLDatabase("site", {
  migrations: "./migrations",
});

const orm = knex({
  client: "pg",
  connection: SiteDB.connectionString,
});

const Sites = () => orm<Site>("site");

該服務有一些 CRUD 端點,例如addgetdeletelist 。我們對add端點感興趣,因為我們希望在新增網站時發布事件。讓我們開始透過新增site.added主題來使我們的應用程式成為事件驅動的。我們透過呼叫Topic類別、指定將在該主題上發布的類型(在本例中為Site類型)並指定交付保證來實現此目的。

import { Topic } from "encore.dev/pubsub";

export const SiteAddedTopic = new Topic<Site>("site.added", {
  deliveryGuarantee: "at-least-once",
});

現在,在add端點中,我們可以呼叫SiteAddedTopic物件上的.publish方法。

export const add = api(
  { expose: true, method: "POST", path: "/site" },
  async (params: AddParams): Promise<Site> => {
    const site = (await Sites().insert({ url: params.url }, "*"))[0];
    await SiteAddedTopic.publish(site);
    return site;
  },
);

將 Pub/Sub 與 Encore 類型安全結合使用,因此如果您使用不正確的參數發佈到主題,則會出現編譯時錯誤 🤯

我們的架構圖現在如下圖所示:

建築圖

我們正在發佈到site.added主題,但我們尚未從monitor服務訂閱它,所以讓我們現在解決這個問題。

新增訂閱者

那麼,我們就開啟monitor服務吧。

import { api } from "encore.dev/api";
import { SQLDatabase } from "encore.dev/storage/sqldb";
import { Site } from "../site/site";
import { ping } from "./ping";
import { site, slack } from "~encore/clients";
import { CronJob } from "encore.dev/cron"; // Check checks a single site.

// Check checks a single site.
export const check = api(
  { expose: true, method: "POST", path: "/check/:siteID" },
  async (p: { siteID: number }): Promise<{ up: boolean }> => {
    const s = await site.get({ id: p.siteID });
    return doCheck(s);
  },
);

// CheckAll checks all sites.
export const checkAll = api(
  { expose: true, method: "POST", path: "/check-all" },
  async (): Promise<void> => {
    const sites = await site.list();
    await Promise.all(sites.sites.map(doCheck));
  },
);

async function doCheck(site: Site): Promise<{ up: boolean }> {
  const { up } = await ping({ url: site.url });

  const wasUp = await getPreviousMeasurement(site.id);
  if (up !== wasUp) {
    const text = `*${site.url} is ${up ? "back up." : "down!"}*`;
    await slack.notify({ text });
  }

  await MonitorDB.exec`
      INSERT INTO checks (site_id, up, checked_at)
      VALUES (${site.id}, ${up}, NOW())
  `;
  return { up };
}

async function getPreviousMeasurement(siteID: number): Promise<boolean> {
  const row = await MonitorDB.queryRow`
      SELECT up
      FROM checks
      WHERE site_id = ${siteID}
      ORDER BY checked_at DESC
      LIMIT 1
  `;
  return row?.up ?? true;
}

const cronJob = new CronJob("check-all", {
  title: "Check all sites",
  every: "1h",
  endpoint: checkAll,
});

export const MonitorDB = new SQLDatabase("monitor", {
  migrations: "./migrations",
});

這裡我們有一個check端點,用於檢查和更新單一站點的狀態。在 API 處理程序中,我們呼叫site服務中的get端點。我們透過encore/clients資料夾導入服務,然後我們可以像呼叫常規函數一樣呼叫端點,並且具有完整的類型安全性。但最酷的部分是,在幕後,這些函數呼叫會轉換為實際的 HTTP 呼叫,從而產生追蹤和日誌。

doCheck函數看起來就像我們在新增網站時要呼叫的函數,因此讓我們新增 Pub/Sub 訂閱者。

import { Subscription } from "encore.dev/pubsub";

const _ = new Subscription(SiteAddedTopic, "check-site", {
  handler: doCheck,
});

為此,我們呼叫Subscription類,傳入我們想要訂閱的主題、訂閱的名稱和選項物件。在選項物件中,我們只需要指定處理程序,也就是為每個新事件呼叫的函數。 doCheck函數接受一個Site ,因此無需執行任何其他操作。

當地基礎設施

那麼,這是如何運作的呢? Encore 配備了自動本地基礎架構。當您使用encore run在本機啟動 Encore 應用程式時,Encore 會自動在電腦上啟動您的應用程式所需的所有基礎設施,包括資料庫和 Pub/Sub。因此,您無需編寫 YAML,或設定 Docker Compose 以及 LocalStack 等其他工具來執行您的環境。

本地發展儀表板

Encore 還配備了一個內建的開發儀表板。當您啟動 Encore 應用程式時,開發儀表板可在連接埠localhost:9400上使用。從這裡您可以呼叫端點,有點像 Postman。對應用程式的每次呼叫都會產生一個跟踪,您可以檢查該跟踪以查看 API 請求、資料庫呼叫和 Pub/Sub 訊息。

https://www.youtube.com/watch?v=Da\_jHj6bLac

開箱即用的本地追蹤並能夠像這樣輕鬆除錯應用程式,這是 Encore 在建置事件驅動應用程式時成為絕佳選擇的另一個原因。

本機開發儀表板還包括具有自動 API 文件的服務目錄。哦,順便說一下。本文前面的漂亮架構圖是Encore Flow ,它也內建在開發儀表板中。它始終是您系統的最新表示,並隨著您的開發而即時變化。 😎

部署

那麼,我們要如何部署這個應用程式呢?那麼,您可以使用encore build來建立您的應用程式,並將其作為 Docker 映像獲得,您可以將其部署到任何您想要的地方。您需要提供執行時配置,您可以在其中指定應用程式應如何連接到基礎架構(例如 Pub/Sub 和資料庫)。如果您不想手動管理這些東西,您可以使用 Encore 的雲端平台,它會自動在AWSGCP上的雲端帳戶中設定所需的基礎設施,並且它帶有內建 CI/CD,因此您只需推送部署。該平台還附帶監控、追蹤和自動預覽環境,因此您可以在專用的臨時環境中測試每個拉取請求。

自己執行 Uptime 應用程式

如果您想自己使用 Uptime 應用程式,您可以透過安裝 Encore輕鬆實現,然後在終端機中執行encore app create 。選擇 TypeScript,然後在入門範本清單中選擇Uptime應用程式。您需要安裝 Docker 桌面,因為這是本機建立資料庫所需的。簽出程式碼後,您也可以查看slack服務所使用的uptime-transition主題。

總結

其他相關貼文

https://dev.to/encore/nodejs-frameworks-roundup-2024-elysia-hono-nest-encore-which-should-you-pick-19oj

https://dev.to/encore/how-to-deploy-a-backend-application-to-digitalocean-using-docker-and-encore-1eh0

https://dev.to/encore/typescript-vs-go-choosing-your-backend-language-2bc5


原文出處:https://dev.to/encore/building-simple-event-driven-applications-with-pubsub-k57


共有 0 則留言