基于 Consul 和 Redis Streams 构建应用层 In-Sync Replica (ISR) 一致性协议


我们面临一个棘手的架构难题。在一个基于事件溯源(Event Sourcing)的核心交易系统中,单纯的“最终一致性”无法满足业务对关键路径的强一致性要求。例如,一个“创建订单”的命令,必须在订单服务成功创建视图、并且账务服务成功扣款后,才能向用户返回成功。如果采用传统的发布-订阅模式,命令发出后立即返回,后续消费者失败将导致数据不一致,这在金融场景下是不可接受的。我们需要一种机制,确保一个命令产生的事件,已经被一组“关键”的下游服务(我们称之为 In-Sync Replicas)消费并确认后,才认为该命令处理成功。

这个需求本质上是在应用层实现一个同步点。初步构想是引入一个协调器(Coordinator),它负责接收命令,将事件发布到消息队列,然后阻塞等待,直到收到来自所有ISR成员的确认(ACK)信号,或者超时。

技术选型是这个构想的第一个关键决策点。

  1. 事件日志 (Event Log): 我们需要一个持久化的、支持多消费者的消息队列。Kafka 是重量级的黄金标准,但对于我们这个场景而言过于复杂。Redis Streams 是一个绝佳的轻量级替代品。它提供持久化的 append-only log、强大的消费者组(Consumer Groups)功能,以及阻塞式读取,完全满足我们作为事件总线的需求。
  2. ISR成员管理与发现: ISR集合必须是动态的。我们可能需要在线上添加或移除一个关键服务,或者临时将某个服务踢出ISR集合进行维护。此外,协调器需要知道哪些服务实例是健康的。Consul 在这里是完美的选择。它的服务发现能力可以告诉我们健康的实例地址,其KV存储则可以作为动态配置中心,用来存储当前的ISR成员列表。

整个流程的架构设计如下:

sequenceDiagram
    participant Client as 客户端
    participant Coordinator as ISR协调器
    participant Redis as Redis Streams
    participant OrderSvc as 订单服务 (ISR成员)
    participant LedgerSvc as 账务服务 (ISR成员)
    participant Consul as Consul

    Client->>+Coordinator: 发起命令 (e.g., CreateOrder)
    Coordinator->>+Consul: 1. 查询KV获取ISR列表 (e.g., ["order-service", "ledger-service"])
    Coordinator->>+Consul: 2. 查询健康的服务实例
    Consul-->>-Coordinator: 返回健康实例信息
    Coordinator->>+Redis: 3. 生成CorrelationID, 发布事件到 `events:stream`
    Redis-->>-Coordinator: Event ID
    Note right of Coordinator: Coordinator进入等待状态, 监听ACK
    
    OrderSvc->>+Redis: XREADGROUP ... `events:stream`
    Redis-->>-OrderSvc: 消费事件
    OrderSvc->>OrderSvc: 处理业务逻辑...
    OrderSvc->>+Redis: 4. 向 `ack:stream` 发布ACK (含CorrelationID)
    Redis-->>-OrderSvc: ACK Event ID
    
    LedgerSvc->>+Redis: XREADGROUP ... `events:stream`
    Redis-->>-LedgerSvc: 消费事件
    LedgerSvc->>LedgerSvc: 处理业务逻辑...
    LedgerSvc->>+Redis: 5. 向 `ack:stream` 发布ACK (含CorrelationID)
    Redis-->>-LedgerSvc: ACK Event ID

    Coordinator->>+Redis: 阻塞读取 `ack:stream`
    Redis-->>Coordinator: 收到OrderSvc的ACK
    Redis-->>Coordinator: 收到LedgerSvc的ACK
    Note right of Coordinator: Coordinator集齐所有ISR成员的ACK
    Coordinator-->>-Client: 6. 返回命令成功

协调器核心实现 (Coordinator Implementation)

我们将使用 Go 语言来实现这个协调器。它是一个独立的、高可用的服务。

项目结构与依赖

# a brief directory structure
isr-coordinator/
├── cmd/
│   └── main.go
├── internal/
│   ├── config/
│   │   └── config.go
│   ├── coordinator/
│   │   └── service.go
│   └── transport/
│       └── http.go
└── go.mod

我们需要 go-redisconsul/api 两个核心库。

// go.mod
module github.com/your-org/isr-coordinator

go 1.21

require (
	github.com/go-redis/redis/v8 v8.11.5
	github.com/google/uuid v1.6.0
	github.com/hashicorp/consul/api v1.28.2
	// ... other dependencies
)

配置管理 (config.go)

在真实项目中,配置必须是外部化的。

// internal/config/config.go
package config

import (
	"os"
	"time"
)

type Config struct {
	RedisAddr      string
	ConsulAddr     string
	ListenAddr     string
	EventStreamKey string
	AckStreamKey   string
	ISRConfigKey   string
	RequestTimeout time.Duration
}

func Load() *Config {
	return &Config{
		RedisAddr:      getEnv("REDIS_ADDR", "localhost:6379"),
		ConsulAddr:     getEnv("CONSUL_ADDR", "localhost:8500"),
		ListenAddr:     getEnv("LISTEN_ADDR", ":8080"),
		EventStreamKey: getEnv("EVENT_STREAM_KEY", "events:stream"),
		AckStreamKey:   getEnv("ACK_STREAM_KEY", "ack:stream"),
		ISRConfigKey:   getEnv("ISR_CONFIG_KEY", "config/isr/services"),
		RequestTimeout: 5 * time.Second, // Hardcoded for simplicity, should be configurable
	}
}

func getEnv(key, fallback string) string {
	if value, ok := os.LookupEnv(key); ok {
		return value
	}
	return fallback
}

协调器服务 (service.go)

这是整个系统的核心逻辑,包含了与 Consul 和 Redis 的交互,以及等待 ACK 的并发控制。

// internal/coordinator/service.go
package coordinator

import (
	"context"
	"encoding/json"
	"errors"
	"log"
	"strings"
	"sync"
	"time"

	"github.com/go-redis/redis/v8"
	"github.com/google/uuid"
	consul "github.com/hashicorp/consul/api"
	"github.com/your-org/isr-coordinator/internal/config"
)

var (
	ErrTimeout       = errors.New("request timed out waiting for ISR acknowledgements")
	ErrISREmpty      = errors.New("ISR set is empty or not configured in Consul")
	ErrNotEnoughAcks = errors.New("did not receive acknowledgements from all ISR members")
)

type Service struct {
	cfg        *config.Config
	redis      *redis.Client
	consul     *consul.Client
	ackSubs    sync.Map // map[correlationID]chan map[string]struct{}
	ackSubLock sync.Mutex
}

func NewService(cfg *config.Config) (*Service, error) {
	// --- Redis Client Initialization ---
	rdb := redis.NewClient(&redis.Options{
		Addr: cfg.RedisAddr,
	})
	if _, err := rdb.Ping(context.Background()).Result(); err != nil {
		return nil, err
	}

	// --- Consul Client Initialization ---
	consulConfig := consul.DefaultConfig()
	consulConfig.Address = cfg.ConsulAddr
	c, err := consul.NewClient(consulConfig)
	if err != nil {
		return nil, err
	}

	svc := &Service{
		cfg:    cfg,
		redis:  rdb,
		consul: c,
	}

	// Start a long-running goroutine to listen for all ACKs
	go svc.listenForAcks()

	return svc, nil
}

// ProcessCommand is the main entry point for handling a command.
func (s *Service) ProcessCommand(ctx context.Context, commandPayload map[string]interface{}) (string, error) {
	correlationID := uuid.New().String()
	commandPayload["correlation_id"] = correlationID

	// 1. Fetch the current ISR set from Consul KV
	isrSet, err := s.getISRSet(ctx)
	if err != nil {
		return "", err
	}
	if len(isrSet) == 0 {
		return "", ErrISREmpty
	}

	log.Printf("[CorrelationID: %s] ISR set resolved to: %v", correlationID, isrSet)

	// 2. Prepare to wait for ACKs
	// The channel will receive the acks from the listener goroutine
	ackChan := make(chan map[string]struct{}, 1)
	s.registerAckWaiter(correlationID, ackChan)
	defer s.unregisterAckWaiter(correlationID)

	// 3. Publish the event to Redis Streams
	eventData, _ := json.Marshal(commandPayload)
	eventID, err := s.redis.XAdd(ctx, &redis.XAddArgs{
		Stream: s.cfg.EventStreamKey,
		Values: map[string]interface{}{"data": eventData},
	}).Result()

	if err != nil {
		log.Printf("[CorrelationID: %s] Failed to publish event: %v", correlationID, err)
		return "", err
	}
	log.Printf("[CorrelationID: %s] Event published with ID: %s", correlationID, eventID)

	// 4. Wait for ACKs or timeout
	timeoutCtx, cancel := context.WithTimeout(ctx, s.cfg.RequestTimeout)
	defer cancel()

	select {
	case <-timeoutCtx.Done():
		log.Printf("[CorrelationID: %s] Timed out waiting for ACKs.", correlationID)
		return "", ErrTimeout
	case receivedAcks := <-ackChan:
		log.Printf("[CorrelationID: %s] Received ACKs from: %v", correlationID, receivedAcks)
		// Check if all required acks are present
		for member := range isrSet {
			if _, ok := receivedAcks[member]; !ok {
				log.Printf("[CorrelationID: %s] Missing ACK from member: %s", correlationID, member)
				return "", ErrNotEnoughAcks
			}
		}
	}

	log.Printf("[CorrelationID: %s] Successfully processed, all ISR ACKs received.", correlationID)
	return eventID, nil
}

// getISRSet fetches the list of service names from Consul's K/V store.
func (s *Service) getISRSet(ctx context.Context) (map[string]struct{}, error) {
	kv := s.consul.KV()
	pair, _, err := kv.Get(s.cfg.ISRConfigKey, nil)
	if err != nil {
		return nil, err
	}
	if pair == nil || len(pair.Value) == 0 {
		return nil, nil // No config found is a valid case
	}

	serviceNames := strings.Split(string(pair.Value), ",")
	isrSet := make(map[string]struct{})
	for _, name := range serviceNames {
		trimmed := strings.TrimSpace(name)
		if trimmed != "" {
			// A production system should also check if the service is registered and healthy in Consul
			// Here we simplify by just trusting the KV store.
			isrSet[trimmed] = struct{}{}
		}
	}
	return isrSet, nil
}

// listenForAcks is a background goroutine that perpetually reads from the ACK stream.
// This is more efficient than having one reader per request.
func (s *Service) listenForAcks() {
	// Ensure the stream and group exist, to prevent errors on first run
	s.redis.XGroupCreateMkStream(context.Background(), s.cfg.AckStreamKey, "isr_coordinator_group", "$").Result()
	log.Println("ACK listener started...")

	for {
		streams, err := s.redis.XReadGroup(context.Background(), &redis.XReadGroupArgs{
			Group:    "isr_coordinator_group",
			Consumer: "coordinator-instance-1", // Should be unique per instance
			Streams:  []string{s.cfg.AckStreamKey, ">"},
			Count:    10,
			Block:    0, // Block indefinitely
		}).Result()

		if err != nil {
			log.Printf("Error reading ACK stream: %v. Retrying in 1s...", err)
			time.Sleep(1 * time.Second)
			continue
		}

		for _, stream := range streams {
			for _, msg := range stream.Messages {
				s.dispatchAck(msg)
				// Acknowledge the message so it's not re-delivered
				s.redis.XAck(context.Background(), s.cfg.AckStreamKey, "isr_coordinator_group", msg.ID)
			}
		}
	}
}

// dispatchAck finds the waiting request goroutine and sends it the ACK.
func (s *Service) dispatchAck(msg redis.XMessage) {
	correlationID, okID := msg.Values["correlation_id"].(string)
	serviceName, okSvc := msg.Values["service_name"].(string)

	if !okID || !okSvc {
		log.Printf("Received malformed ACK message: %v", msg.Values)
		return
	}

	// This is the tricky part: delivering the ack to the right waiter
	s.ackSubLock.Lock()
	defer s.ackSubLock.Unlock()

	if val, ok := s.ackSubs.Load(correlationID); ok {
		ackChan := val.(chan map[string]struct{})
		// We pass a copy to avoid race conditions
		// In a real system, you'd have a more sophisticated state management object here.
		currentAcks := <-ackChan
		currentAcks[serviceName] = struct{}{}
		ackChan <- currentAcks
	}
}

func (s *Service) registerAckWaiter(correlationID string, ackChan chan map[string]struct{}) {
	s.ackSubLock.Lock()
	defer s.ackSubLock.Unlock()
	// Initialize with an empty map
	ackChan <- make(map[string]struct{})
	s.ackSubs.Store(correlationID, ackChan)
}

func (s *Service) unregisterAckWaiter(correlationID string) {
	s.ackSubLock.Lock()
	defer s.ackSubLock.Unlock()
	s.ackSubs.Delete(correlationID)
}

一个关键的设计点是 listenForAcks goroutine。它使用一个单独的、长轮询的 XReadGroup 来消费所有ACK消息。这比为每个进来的请求都启动一个监听器要高效得多。当收到ACK时,dispatchAck 函数通过 sync.Map 找到与 correlation_id 对应的等待中的 ProcessCommand goroutine,并将确认信号发送给它。这里的并发控制需要非常小心,代码中使用了锁和channel来确保线程安全。

下游服务(ISR成员)的实现

下游服务需要一个通用的逻辑来消费事件并发送ACK。这可以封装成一个中间件或库。

// Example of a downstream service consumer (e.g., in ledger-service)
package main

import (
	"context"
	"encoding/json"
	"log"
	"os"
	"time"

	"github.com/go-redis/redis/v8"
)

const (
	EventStreamKey = "events:stream"
	AckStreamKey   = "ack:stream"
	ConsumerGroup  = "ledger_service_group"
	ConsumerName   = "ledger_instance_1" // Should be unique, e.g., hostname
	ServiceName    = "ledger-service"
)

func main() {
	redisAddr := os.Getenv("REDIS_ADDR")
	if redisAddr == "" {
		redisAddr = "localhost:6379"
	}
	rdb := redis.NewClient(&redis.Options{Addr: redisAddr})

	// Ensure stream and group exist
	rdb.XGroupCreateMkStream(context.Background(), EventStreamKey, ConsumerGroup, "$").Result()
	log.Printf("Consumer '%s' in group '%s' starting...", ConsumerName, ConsumerGroup)

	for {
		streams, err := rdb.XReadGroup(context.Background(), &redis.XReadGroupArgs{
			Group:    ConsumerGroup,
			Consumer: ConsumerName,
			Streams:  []string{EventStreamKey, ">"}, // ">" means new messages only
			Count:    1,
			Block:    0, // Block forever
		}).Result()

		if err != nil {
			log.Printf("Error reading from event stream: %v", err)
			time.Sleep(1 * time.Second)
			continue
		}

		for _, stream := range streams {
			for _, msg := range stream.Messages {
				processMessage(rdb, msg)
				// Important: Acknowledge the message in the event stream
				rdb.XAck(context.Background(), EventStreamKey, ConsumerGroup, msg.ID)
			}
		}
	}
}

func processMessage(rdb *redis.Client, msg redis.XMessage) {
	log.Printf("Processing message ID: %s", msg.ID)
	data, ok := msg.Values["data"].(string)
	if !ok {
		log.Println("Message data is not a string")
		return
	}

	var payload map[string]interface{}
	if err := json.Unmarshal([]byte(data), &payload); err != nil {
		log.Printf("Failed to unmarshal message data: %v", err)
		return
	}
	
	correlationID, ok := payload["correlation_id"].(string)
	if !ok {
		log.Println("Message is missing 'correlation_id'")
		return
	}

	// --- CRITICAL BUSINESS LOGIC GOES HERE ---
	// e.g., update database, call other services, etc.
	// This part MUST be idempotent.
	log.Printf("[CorrelationID: %s] Executing ledger logic...", correlationID)
	time.Sleep(50 * time.Millisecond) // Simulate work

	// After successful processing, send an ACK
	err := sendAck(rdb, correlationID)
	if err != nil {
		log.Printf("[CorrelationID: %s] Failed to send ACK: %v", correlationID, err)
		// Here, a real system needs a retry mechanism or dead-letter queue.
		// If we fail to send ACK, the coordinator will time out, which is the correct failure mode.
	} else {
		log.Printf("[CorrelationID: %s] ACK sent successfully.", correlationID)
	}
}

func sendAck(rdb *redis.Client, correlationID string) error {
	ctx := context.Background()
	return rdb.XAdd(ctx, &redis.XAddArgs{
		Stream: AckStreamKey,
		Values: map[string]interface{}{
			"correlation_id": correlationID,
			"service_name":   ServiceName,
			"timestamp":      time.Now().Unix(),
		},
	}).Err()
}

这个消费者代码展示了一个ISR成员的工作模式:

  1. 它属于一个特定的消费者组 (ledger_service_group),从主事件流 (events:stream) 中拉取消息。
  2. 处理完业务逻辑后,它必须从消息中提取 correlation_id
  3. 然后,它向一个专用的ACK流 (ack:stream) 发送一条消息,其中包含 correlation_id 和自己的服务名。
  4. 这个简单的例子省略了错误处理,但在生产环境中,如果业务逻辑失败,消息不应该被ACK,以便消费者组的重试机制可以生效。如果ACK发送失败,协调器会超时,客户端会收到错误,这也是一种可接受的、安全的失败状态。

配置 Consul

最后,我们需要在 Consul 的 KV 存储中设置 ISR 列表。使用 Consul CLI 可以轻松完成:

# Set the list of services that must acknowledge an event
consul kv put config/isr/services "order-service,ledger-service"

现在,协调器启动时会读取这个键,并要求 order-serviceledger-service 两者都返回 ACK。如果想临时将 ledger-service 移出ISR集合进行维护,只需更新这个值:
consul kv put config/isr/services "order-service"
协调器会动态地适应这个变化,不再等待 ledger-service 的确认。

这个方案引入了同步等待,必然会增加命令处理的延迟。它本质上是用延迟换取更强的一致性保证。这种模式不适用于所有场景,但对于那些无法容忍短暂数据不一致的核心业务流程,它提供了一个介于完全同步调用和纯粹异步事件通知之间的、可控的折衷方案。

当前实现的协调器是单点的。为了实现高可用,可以部署多个协调器实例。但由于 listenForAcks 使用了 Redis 消费者组,同一个ACK消息只会被一个协调器实例消费。这会导致ACK被分发到错误的实例(一个没有处理原始请求的协调器)。一个可行的改进路径是,让协调器在发布事件时,在事件中注入一个“回复主题”,例如 ack:stream:<coordinator-instance-id>,下游服务将ACK发送到这个特定的流,而不是一个公共流。这样每个协调器实例只监听自己的回复流,从而解决ACK路由问题。


  目录