Delia's Blog

菜鳥工程師小鈞的筆記

0%

.NET core集成CAP框架,實現微服務最終一致性方案

CAP定理

提到分散式系統就一定要了解CAP定理,CAP簡單來說就是在分散式系統中不可能同時滿足 一致性可用性分區容錯性,以下為wiki百科的介紹。

  • 一致性(Consistency):要求資料一致。
  • 可用性(Availability):要求即時非錯誤的回應。
  • 分區容錯性(Partition tolerance):服務之間通信出現故障,但不會影響整體系統運作。

我們可以用簡單一點的方式理解這三點。

一致性可用性 應該站在”使用者的角度”去看。

分區容忍性 則是”伺服器端”在遇到網路分區的問題時,選擇如何去影響使用者感知到的「一致性」和「可用性」。

分散式系統在斷網的情況下,2 台服務器變成了獨立網絡,彼此無法通信,這個情況就是分區。在分區的情況下,如果要保證數據一致性和可用性的話,那就滿足分區容錯性了,所以分散式系統P(分區容錯性)是一定存在的,就必須在C(一致性)及A(可用性)上做取捨。

強一致性 v.s 最終一致性

  • 強一致性(CP) : 適合處理訂單、支付、庫存、會員帳密…等,資料同步高度要求的任務。

  • 最終一致性(AP) : 適合處理文章、會員簡介、個人訂單、發送簡訊、email…等,短時間不一致不影響使用。

.Net Core CAP

優點

  • 使用Nuget包的方式提供,只需要簡單安裝不會對現有專案有任何衝突。

  • 提供多種MQ、資料庫擴充。

  • 共通的Publish及Receive接口,搭配.NET core DI容器註冊,可輕鬆發送至各種MQ,也可無痛抽換,與原代碼耦合性低。

  • 具有消息持久化的功能,與本地事務一同提交,使用最終一致性的同時也有足夠的可靠性。

  • 人性化的儀表板,方便查看事件進度,監聽事件數量,並可重新發送、消費事件。

  • 7.0版本中提供完整的效能測試結果
    https://www.cnblogs.com/savorboard/p/cap-7-0.html

  • 有詳細中文文件。

  • MIT協議開源,完全免費。

架構

  1. Service A會在資料庫建立消息表,用來記錄及跟踪消息內容及狀態。
  2. Service A 接收到請求後,處理完業務邏輯,與即將發布消息一同寫入本地資料庫中。
  3. 成功寫入後將消息發送至MQ,如果消息發送失敗可依據資料庫中的消息表進行重發。
  4. Service B 收到MQ中的消息先寫入資料庫中,當事件處理失敗時也可進行重新處理。
  5. 最終A、B事務最終達成一致。

官方網站

https://cap.dotnetcore.xyz/

Github範例

https://github.com/DeliaHung/dotnet-cap

模擬成立訂單後發送Email通知,建立兩個API專案分別為OrderService、MessageService

使用docker啟動RabbitMQ

docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.12-management

進入rabbitmq網頁
http://localhost:15672

預設帳號密碼皆為guest

下載CAP套件

主要CAP庫

PM> Install-Package DotNetCore.CAP

官方提供了非常多種Message Queue

PM> Install-Package DotNetCore.CAP.Kafka
PM> Install-Package DotNetCore.CAP.RabbitMQ
PM> Install-Package DotNetCore.CAP.AzureServiceBus
PM> Install-Package DotNetCore.CAP.AmazonSQS
PM> Install-Package DotNetCore.CAP.NATS
PM> Install-Package DotNetCore.CAP.RedisStreams
PM> Install-Package DotNetCore.CAP.Pulsar

也提供了相關的數據持久化

PM> Install-Package DotNetCore.CAP.SqlServer
PM> Install-Package DotNetCore.CAP.MySql
PM> Install-Package DotNetCore.CAP.PostgreSql
PM> Install-Package DotNetCore.CAP.MongoDB

註冊CAP

我們使用RabbitMQ + Sql Server來示範,在program做註冊,其他細節設定官方網站都有介紹,非常清楚。

builder.Services.AddCap(x =>
{
x.UseRabbitMQ(opt => {
opt.HostName = "localhost";
opt.Port = 15672;
opt.UserName = "guest";
opt.Password = "guest";
});

x.UseSqlServer(opt =>
{
opt.ConnectionString = "Data Source=.;Initial Catalog=CAPdemo;Integrated Security=True;TrustServerCertificate=true";
});
});

發送(Publish)

[ApiController]
[Route("[controller]")]
public class OrderController : ControllerBase
{
[HttpPost(Name = "CreateOrder")]
public void CreateOrder()
{
using var tran = _dbContext.Database.BeginTransaction(_capPublisher);
try
{
Order order = new Order();
order.OrderNumber = "2023010100001";
//建立訂單完成
_dbContext.Orders.Add(order);
_dbContext.SaveChanges();
SendSmsDto sendSmsDto = new() { PhoneNumber = "0987887887", Content = "使用.NET core CAP 實現微服務事件發送" };
_capPublisher.Publish("SendSms", sendSmsDto);
tran.Commit();
}
catch (Exception ex)
{
tran.Rollback();
}
}
}

public record SendSmsDto
{
public string PhoneNumber { get; set; }
public string Content { get; set; }
}
  • 事件持久化

發送完成會發現CAP幫你建了兩個資料表,就照字面上的意思一樣,一個是發佈者(cap.Published)、一個是消費者(cap.Received)。

cap.Publish內會將事件訊息持久化,目的是可以保證當Message Queue傳送異常或者網絡錯誤時候事件沒有丟失。 為了保證這種機制的可靠性,CAP「建議使用和業務代碼相同的資料庫」事務來保證業務操作和CAP的消息在持久化的過程中是強一致的。也就是說與業務代碼一同包進Transastion事務。

RabbitMQ後台也能看到發送的內容

監聽(Subscribe)

  1. In Controller Action
[NonAction]
[CapSubscribe("SendSms")]
public void SendSms(SendSmsDto dto, [FromCap] CapHeader header)
{
Console.WriteLine(dto.ToString());
Console.WriteLine(JsonSerializer.Serialize(header));
return;
}
  1. In Business Logic Service
public interface ISmsService
{
void Send(SendSmsDto dto, [FromCap] CapHeader header);
}
public class SmsService : ISmsService, ICapSubscribe
{
[CapSubscribe("SendSms")]
public void Send(SendSmsDto dto, [FromCap] CapHeader header)
{
//業務代碼...
return;
}
}
builder.Services.AddTransient<ISmsService, SmsService>();
//CAP
builder.Services.AddCap(x =>
.....

Sql Server的cap.Received一樣會有消費紀錄,這邊特地用了兩個不同的DB模擬微服務環境

Dashboard

安裝

PM> Install-Package DotNetCore.Dashboard

自 5.1.0 開始,CAP Dashboard 授權預設使用 ASP.NET Core 的方式,不再提供自定義授權篩檢程式。

services.AddCap(options =>
{
//options.UseDashboard();
options.UseDashboard(x =>
{
//加上JWT就必須要有Bearer token才能進入頁面
x.UseAuth = true;
x.DefaultAuthenticationScheme = JwtBearerDefaults.AuthenticationScheme;
x.DefaultChallengeScheme = JwtBearerDefaults.AuthenticationScheme;
});
}

消息重發設定

1、 發送重試
第一次重試次數為 3,4分鐘后以後每分鐘重試一次,進行次數 +1,當總次數達到50次后,CAP將不對其進行重試。

2、 消費重試
重試策略和上面的 發送重試 是相同的。

//自訂義
builder.Services.AddCap(x =>
{
x.FailedRetryCount = 10;//重試次數(寫入MQ失敗)
x.FailedRetryInterval = 30;//重式間隔頻率
}

失敗處理

builder.Services.AddCap(x =>
{
x.FailedThresholdCallback = failed =>
{
var logger = failed.ServiceProvider.GetService<ILogger>();
logger.LogError($@"【MqType:{failed.MessageType}失敗】【重試了{option.FailedRetryCount}次】,【消息名稱:{failed.Message.GetName()}】");
};
}

數據清理

資料庫消息表中具有一個 ExpiresAt 欄位表示消息的過期時間,當消息發送成功或者消費成功后,CAP會將消息狀態為 Successed 的 ExpiresAt 設置為 1天 後過期,會將消息狀態為 Failed 的 ExpiresAt 設置為 15天 後過期。

CAP 預設情況下會每隔5分鐘將消息表的數據進行清理刪除,避免數據量過多導致性能的降低。 清理規則為 ExpiresAt 不為空並且小於當前時間的數據。 也就是說狀態為Failed的消息(正常情況他們已經被重試了 50 次),如果你15天沒有人工介入處理,同樣會被清理掉。

//自訂義
builder.Services.AddCap(x =>
{
x.FailedMessageExpiredAfter = 300;//失敗消息的过期时间(秒)
x.SucceedMessageExpiredAfter = 500;//成功消息的过期时间(秒)
x.CollectorCleaningInterval = 300; //刪除已經過期消息的時間間隔
}

命名設定

  • GroupNamePrefix : 為訂閱Group統一加上前綴,以 ‘.’ 做區隔。

  • TopicNamePrefix : 為Topic統一加上前綴,以 ‘.’ 做區隔。

  • Version : 版本,用於給消息指定版本來隔離不同版本服務的消息。

  • DefaultGroupName : 預設Group名稱。

在 RabbitMQ 中映射到 Queue Names。
在 Apache Kafka 中映射到 Consumer Group Id。
在 Azure Service Bus 中映射到 Subscription Name。
在 NATS 中映射到 Queue Group Name. 在 Redis Streams 中映射到 Consumer Group.
builder.Services.AddCap(x =>
{
x.DefaultGroupName = "dev";
x.TopicNamePrefix = "dev";
x.GroupNamePrefix = "dev";
x.Version = "v3";
}

採坑日記

如果生產者不是使用CAP發送,會沒有CAP預設的Header訊息,導致消費者(使用CAP),Header解析時發生錯誤。當初因為公司舊專案不是.net core所以只能單純發出事件,用的是kafka,也因為踩了這個坑才知道原來kafka有Header訊息。

  1. 可以在生產者發送事件時手動加上CAP要求的Header。
    官方說明 : https://cap.dotnetcore.xyz/user-guide/zh/cap/messaging/
  1. 或是在消費者專案內CustomHeaders解決。
    官方說明 : https://cap.dotnetcore.xyz/user-guide/zh/transport/rabbitmq/
builder.Services.AddCap(x =>
{
//kafka
x.UseKafka(x =>
{
x.Servers = kafkaConn;
x.CustomHeaders = kafkaResult => new List<KeyValuePair<string, string>>
{
new KeyValuePair<string, string>(Headers.MessageId, SnowflakeId.Default().NextId().ToString()),
new KeyValuePair<string, string>(Headers.MessageName, kafkaResult.Topic),
};
});

//RabbitMQ
x.UseRabbitMQ(x =>
{
x.CustomHeaders = e => new List<KeyValuePair<string, string>>
{
new KeyValuePair<string, string>(Headers.MessageId, SnowflakeId.Default().NextId().ToString()),
new KeyValuePair<string, string>(Headers.MessageName, e.RoutingKey),
};
});
});