我们面临一个棘手的架构难题。在一个基于事件溯源(Event Sourcing)的核心交易系统中,单纯的“最终一致性”无法满足业务对关键路径的强一致性要求。例如,一个“创建订单”的命令,必须在订单服务
成功创建视图、并且账务服务
成功扣款后,才能向用户返回成功。如果采用传统的发布-订阅模式,命令发出后立即返回,后续消费者失败将导致数据不一致,这在金融场景下是不可接受的。我们需要一种机制,确保一个命令产生的事件,已经被一组“关键”的下游服务(我们称之为 In-Sync Replicas)消费并确认后,才认为该命令处理成功。
这个需求本质上是在应用层实现一个同步点。初步构想是引入一个协调器(Coordinator),它负责接收命令,将事件发布到消息队列,然后阻塞等待,直到收到来自所有ISR成员的确认(ACK)信号,或者超时。
技术选型是这个构想的第一个关键决策点。
- 事件日志 (Event Log): 我们需要一个持久化的、支持多消费者的消息队列。Kafka 是重量级的黄金标准,但对于我们这个场景而言过于复杂。Redis Streams 是一个绝佳的轻量级替代品。它提供持久化的 append-only log、强大的消费者组(Consumer Groups)功能,以及阻塞式读取,完全满足我们作为事件总线的需求。
- ISR成员管理与发现: ISR集合必须是动态的。我们可能需要在线上添加或移除一个关键服务,或者临时将某个服务踢出ISR集合进行维护。此外,协调器需要知道哪些服务实例是健康的。Consul 在这里是完美的选择。它的服务发现能力可以告诉我们健康的实例地址,其KV存储则可以作为动态配置中心,用来存储当前的ISR成员列表。
整个流程的架构设计如下:
sequenceDiagram participant Client as 客户端 participant Coordinator as ISR协调器 participant Redis as Redis Streams participant OrderSvc as 订单服务 (ISR成员) participant LedgerSvc as 账务服务 (ISR成员) participant Consul as Consul Client->>+Coordinator: 发起命令 (e.g., CreateOrder) Coordinator->>+Consul: 1. 查询KV获取ISR列表 (e.g., ["order-service", "ledger-service"]) Coordinator->>+Consul: 2. 查询健康的服务实例 Consul-->>-Coordinator: 返回健康实例信息 Coordinator->>+Redis: 3. 生成CorrelationID, 发布事件到 `events:stream` Redis-->>-Coordinator: Event ID Note right of Coordinator: Coordinator进入等待状态, 监听ACK OrderSvc->>+Redis: XREADGROUP ... `events:stream` Redis-->>-OrderSvc: 消费事件 OrderSvc->>OrderSvc: 处理业务逻辑... OrderSvc->>+Redis: 4. 向 `ack:stream` 发布ACK (含CorrelationID) Redis-->>-OrderSvc: ACK Event ID LedgerSvc->>+Redis: XREADGROUP ... `events:stream` Redis-->>-LedgerSvc: 消费事件 LedgerSvc->>LedgerSvc: 处理业务逻辑... LedgerSvc->>+Redis: 5. 向 `ack:stream` 发布ACK (含CorrelationID) Redis-->>-LedgerSvc: ACK Event ID Coordinator->>+Redis: 阻塞读取 `ack:stream` Redis-->>Coordinator: 收到OrderSvc的ACK Redis-->>Coordinator: 收到LedgerSvc的ACK Note right of Coordinator: Coordinator集齐所有ISR成员的ACK Coordinator-->>-Client: 6. 返回命令成功
协调器核心实现 (Coordinator Implementation)
我们将使用 Go 语言来实现这个协调器。它是一个独立的、高可用的服务。
项目结构与依赖
# a brief directory structure
isr-coordinator/
├── cmd/
│ └── main.go
├── internal/
│ ├── config/
│ │ └── config.go
│ ├── coordinator/
│ │ └── service.go
│ └── transport/
│ └── http.go
└── go.mod
我们需要 go-redis
和 consul/api
两个核心库。
// go.mod
module github.com/your-org/isr-coordinator
go 1.21
require (
github.com/go-redis/redis/v8 v8.11.5
github.com/google/uuid v1.6.0
github.com/hashicorp/consul/api v1.28.2
// ... other dependencies
)
配置管理 (config.go
)
在真实项目中,配置必须是外部化的。
// internal/config/config.go
package config
import (
"os"
"time"
)
type Config struct {
RedisAddr string
ConsulAddr string
ListenAddr string
EventStreamKey string
AckStreamKey string
ISRConfigKey string
RequestTimeout time.Duration
}
func Load() *Config {
return &Config{
RedisAddr: getEnv("REDIS_ADDR", "localhost:6379"),
ConsulAddr: getEnv("CONSUL_ADDR", "localhost:8500"),
ListenAddr: getEnv("LISTEN_ADDR", ":8080"),
EventStreamKey: getEnv("EVENT_STREAM_KEY", "events:stream"),
AckStreamKey: getEnv("ACK_STREAM_KEY", "ack:stream"),
ISRConfigKey: getEnv("ISR_CONFIG_KEY", "config/isr/services"),
RequestTimeout: 5 * time.Second, // Hardcoded for simplicity, should be configurable
}
}
func getEnv(key, fallback string) string {
if value, ok := os.LookupEnv(key); ok {
return value
}
return fallback
}
协调器服务 (service.go
)
这是整个系统的核心逻辑,包含了与 Consul 和 Redis 的交互,以及等待 ACK 的并发控制。
// internal/coordinator/service.go
package coordinator
import (
"context"
"encoding/json"
"errors"
"log"
"strings"
"sync"
"time"
"github.com/go-redis/redis/v8"
"github.com/google/uuid"
consul "github.com/hashicorp/consul/api"
"github.com/your-org/isr-coordinator/internal/config"
)
var (
ErrTimeout = errors.New("request timed out waiting for ISR acknowledgements")
ErrISREmpty = errors.New("ISR set is empty or not configured in Consul")
ErrNotEnoughAcks = errors.New("did not receive acknowledgements from all ISR members")
)
type Service struct {
cfg *config.Config
redis *redis.Client
consul *consul.Client
ackSubs sync.Map // map[correlationID]chan map[string]struct{}
ackSubLock sync.Mutex
}
func NewService(cfg *config.Config) (*Service, error) {
// --- Redis Client Initialization ---
rdb := redis.NewClient(&redis.Options{
Addr: cfg.RedisAddr,
})
if _, err := rdb.Ping(context.Background()).Result(); err != nil {
return nil, err
}
// --- Consul Client Initialization ---
consulConfig := consul.DefaultConfig()
consulConfig.Address = cfg.ConsulAddr
c, err := consul.NewClient(consulConfig)
if err != nil {
return nil, err
}
svc := &Service{
cfg: cfg,
redis: rdb,
consul: c,
}
// Start a long-running goroutine to listen for all ACKs
go svc.listenForAcks()
return svc, nil
}
// ProcessCommand is the main entry point for handling a command.
func (s *Service) ProcessCommand(ctx context.Context, commandPayload map[string]interface{}) (string, error) {
correlationID := uuid.New().String()
commandPayload["correlation_id"] = correlationID
// 1. Fetch the current ISR set from Consul KV
isrSet, err := s.getISRSet(ctx)
if err != nil {
return "", err
}
if len(isrSet) == 0 {
return "", ErrISREmpty
}
log.Printf("[CorrelationID: %s] ISR set resolved to: %v", correlationID, isrSet)
// 2. Prepare to wait for ACKs
// The channel will receive the acks from the listener goroutine
ackChan := make(chan map[string]struct{}, 1)
s.registerAckWaiter(correlationID, ackChan)
defer s.unregisterAckWaiter(correlationID)
// 3. Publish the event to Redis Streams
eventData, _ := json.Marshal(commandPayload)
eventID, err := s.redis.XAdd(ctx, &redis.XAddArgs{
Stream: s.cfg.EventStreamKey,
Values: map[string]interface{}{"data": eventData},
}).Result()
if err != nil {
log.Printf("[CorrelationID: %s] Failed to publish event: %v", correlationID, err)
return "", err
}
log.Printf("[CorrelationID: %s] Event published with ID: %s", correlationID, eventID)
// 4. Wait for ACKs or timeout
timeoutCtx, cancel := context.WithTimeout(ctx, s.cfg.RequestTimeout)
defer cancel()
select {
case <-timeoutCtx.Done():
log.Printf("[CorrelationID: %s] Timed out waiting for ACKs.", correlationID)
return "", ErrTimeout
case receivedAcks := <-ackChan:
log.Printf("[CorrelationID: %s] Received ACKs from: %v", correlationID, receivedAcks)
// Check if all required acks are present
for member := range isrSet {
if _, ok := receivedAcks[member]; !ok {
log.Printf("[CorrelationID: %s] Missing ACK from member: %s", correlationID, member)
return "", ErrNotEnoughAcks
}
}
}
log.Printf("[CorrelationID: %s] Successfully processed, all ISR ACKs received.", correlationID)
return eventID, nil
}
// getISRSet fetches the list of service names from Consul's K/V store.
func (s *Service) getISRSet(ctx context.Context) (map[string]struct{}, error) {
kv := s.consul.KV()
pair, _, err := kv.Get(s.cfg.ISRConfigKey, nil)
if err != nil {
return nil, err
}
if pair == nil || len(pair.Value) == 0 {
return nil, nil // No config found is a valid case
}
serviceNames := strings.Split(string(pair.Value), ",")
isrSet := make(map[string]struct{})
for _, name := range serviceNames {
trimmed := strings.TrimSpace(name)
if trimmed != "" {
// A production system should also check if the service is registered and healthy in Consul
// Here we simplify by just trusting the KV store.
isrSet[trimmed] = struct{}{}
}
}
return isrSet, nil
}
// listenForAcks is a background goroutine that perpetually reads from the ACK stream.
// This is more efficient than having one reader per request.
func (s *Service) listenForAcks() {
// Ensure the stream and group exist, to prevent errors on first run
s.redis.XGroupCreateMkStream(context.Background(), s.cfg.AckStreamKey, "isr_coordinator_group", "$").Result()
log.Println("ACK listener started...")
for {
streams, err := s.redis.XReadGroup(context.Background(), &redis.XReadGroupArgs{
Group: "isr_coordinator_group",
Consumer: "coordinator-instance-1", // Should be unique per instance
Streams: []string{s.cfg.AckStreamKey, ">"},
Count: 10,
Block: 0, // Block indefinitely
}).Result()
if err != nil {
log.Printf("Error reading ACK stream: %v. Retrying in 1s...", err)
time.Sleep(1 * time.Second)
continue
}
for _, stream := range streams {
for _, msg := range stream.Messages {
s.dispatchAck(msg)
// Acknowledge the message so it's not re-delivered
s.redis.XAck(context.Background(), s.cfg.AckStreamKey, "isr_coordinator_group", msg.ID)
}
}
}
}
// dispatchAck finds the waiting request goroutine and sends it the ACK.
func (s *Service) dispatchAck(msg redis.XMessage) {
correlationID, okID := msg.Values["correlation_id"].(string)
serviceName, okSvc := msg.Values["service_name"].(string)
if !okID || !okSvc {
log.Printf("Received malformed ACK message: %v", msg.Values)
return
}
// This is the tricky part: delivering the ack to the right waiter
s.ackSubLock.Lock()
defer s.ackSubLock.Unlock()
if val, ok := s.ackSubs.Load(correlationID); ok {
ackChan := val.(chan map[string]struct{})
// We pass a copy to avoid race conditions
// In a real system, you'd have a more sophisticated state management object here.
currentAcks := <-ackChan
currentAcks[serviceName] = struct{}{}
ackChan <- currentAcks
}
}
func (s *Service) registerAckWaiter(correlationID string, ackChan chan map[string]struct{}) {
s.ackSubLock.Lock()
defer s.ackSubLock.Unlock()
// Initialize with an empty map
ackChan <- make(map[string]struct{})
s.ackSubs.Store(correlationID, ackChan)
}
func (s *Service) unregisterAckWaiter(correlationID string) {
s.ackSubLock.Lock()
defer s.ackSubLock.Unlock()
s.ackSubs.Delete(correlationID)
}
一个关键的设计点是 listenForAcks
goroutine。它使用一个单独的、长轮询的 XReadGroup
来消费所有ACK消息。这比为每个进来的请求都启动一个监听器要高效得多。当收到ACK时,dispatchAck
函数通过 sync.Map
找到与 correlation_id
对应的等待中的 ProcessCommand
goroutine,并将确认信号发送给它。这里的并发控制需要非常小心,代码中使用了锁和channel来确保线程安全。
下游服务(ISR成员)的实现
下游服务需要一个通用的逻辑来消费事件并发送ACK。这可以封装成一个中间件或库。
// Example of a downstream service consumer (e.g., in ledger-service)
package main
import (
"context"
"encoding/json"
"log"
"os"
"time"
"github.com/go-redis/redis/v8"
)
const (
EventStreamKey = "events:stream"
AckStreamKey = "ack:stream"
ConsumerGroup = "ledger_service_group"
ConsumerName = "ledger_instance_1" // Should be unique, e.g., hostname
ServiceName = "ledger-service"
)
func main() {
redisAddr := os.Getenv("REDIS_ADDR")
if redisAddr == "" {
redisAddr = "localhost:6379"
}
rdb := redis.NewClient(&redis.Options{Addr: redisAddr})
// Ensure stream and group exist
rdb.XGroupCreateMkStream(context.Background(), EventStreamKey, ConsumerGroup, "$").Result()
log.Printf("Consumer '%s' in group '%s' starting...", ConsumerName, ConsumerGroup)
for {
streams, err := rdb.XReadGroup(context.Background(), &redis.XReadGroupArgs{
Group: ConsumerGroup,
Consumer: ConsumerName,
Streams: []string{EventStreamKey, ">"}, // ">" means new messages only
Count: 1,
Block: 0, // Block forever
}).Result()
if err != nil {
log.Printf("Error reading from event stream: %v", err)
time.Sleep(1 * time.Second)
continue
}
for _, stream := range streams {
for _, msg := range stream.Messages {
processMessage(rdb, msg)
// Important: Acknowledge the message in the event stream
rdb.XAck(context.Background(), EventStreamKey, ConsumerGroup, msg.ID)
}
}
}
}
func processMessage(rdb *redis.Client, msg redis.XMessage) {
log.Printf("Processing message ID: %s", msg.ID)
data, ok := msg.Values["data"].(string)
if !ok {
log.Println("Message data is not a string")
return
}
var payload map[string]interface{}
if err := json.Unmarshal([]byte(data), &payload); err != nil {
log.Printf("Failed to unmarshal message data: %v", err)
return
}
correlationID, ok := payload["correlation_id"].(string)
if !ok {
log.Println("Message is missing 'correlation_id'")
return
}
// --- CRITICAL BUSINESS LOGIC GOES HERE ---
// e.g., update database, call other services, etc.
// This part MUST be idempotent.
log.Printf("[CorrelationID: %s] Executing ledger logic...", correlationID)
time.Sleep(50 * time.Millisecond) // Simulate work
// After successful processing, send an ACK
err := sendAck(rdb, correlationID)
if err != nil {
log.Printf("[CorrelationID: %s] Failed to send ACK: %v", correlationID, err)
// Here, a real system needs a retry mechanism or dead-letter queue.
// If we fail to send ACK, the coordinator will time out, which is the correct failure mode.
} else {
log.Printf("[CorrelationID: %s] ACK sent successfully.", correlationID)
}
}
func sendAck(rdb *redis.Client, correlationID string) error {
ctx := context.Background()
return rdb.XAdd(ctx, &redis.XAddArgs{
Stream: AckStreamKey,
Values: map[string]interface{}{
"correlation_id": correlationID,
"service_name": ServiceName,
"timestamp": time.Now().Unix(),
},
}).Err()
}
这个消费者代码展示了一个ISR成员的工作模式:
- 它属于一个特定的消费者组 (
ledger_service_group
),从主事件流 (events:stream
) 中拉取消息。 - 处理完业务逻辑后,它必须从消息中提取
correlation_id
。 - 然后,它向一个专用的ACK流 (
ack:stream
) 发送一条消息,其中包含correlation_id
和自己的服务名。 - 这个简单的例子省略了错误处理,但在生产环境中,如果业务逻辑失败,消息不应该被ACK,以便消费者组的重试机制可以生效。如果ACK发送失败,协调器会超时,客户端会收到错误,这也是一种可接受的、安全的失败状态。
配置 Consul
最后,我们需要在 Consul 的 KV 存储中设置 ISR 列表。使用 Consul CLI 可以轻松完成:
# Set the list of services that must acknowledge an event
consul kv put config/isr/services "order-service,ledger-service"
现在,协调器启动时会读取这个键,并要求 order-service
和 ledger-service
两者都返回 ACK。如果想临时将 ledger-service
移出ISR集合进行维护,只需更新这个值:consul kv put config/isr/services "order-service"
协调器会动态地适应这个变化,不再等待 ledger-service
的确认。
这个方案引入了同步等待,必然会增加命令处理的延迟。它本质上是用延迟换取更强的一致性保证。这种模式不适用于所有场景,但对于那些无法容忍短暂数据不一致的核心业务流程,它提供了一个介于完全同步调用和纯粹异步事件通知之间的、可控的折衷方案。
当前实现的协调器是单点的。为了实现高可用,可以部署多个协调器实例。但由于 listenForAcks
使用了 Redis 消费者组,同一个ACK消息只会被一个协调器实例消费。这会导致ACK被分发到错误的实例(一个没有处理原始请求的协调器)。一个可行的改进路径是,让协调器在发布事件时,在事件中注入一个“回复主题”,例如 ack:stream:<coordinator-instance-id>
,下游服务将ACK发送到这个特定的流,而不是一个公共流。这样每个协调器实例只监听自己的回复流,从而解决ACK路由问题。