构建具备反压与容错能力的实时机器学习推理管道


一切始于一个失控的数据流。生产环境的事件生产者开始以每秒数千次的频率向我们的系统推送特征向量,而负责处理这些数据的机器学习推理服务却频繁地内存溢出、响应超时,最终彻底崩溃。最初的实现方案简单得可笑:一个直接从消息队列拉取数据、调用模型API、然后确认消息的Node.js脚本。在低负载下它工作得很好,但面对生产高峰,它脆弱不堪。这次复盘,就是记录如何用Redis Streams、Koa和MLflow构建一个真正具备弹性和容错能力的系统。

最初的失败:一个脆弱的消费者

问题的根源在于消费者模型过于理想化。它假设网络是可靠的,模型推理是瞬时的,且消息速率是平稳的。这是一个典型的错误,在真实项目中,这些假设都会被打破。

我们最初的Node.js消费者伪代码大致如下:

// A naive consumer that is doomed to fail
async function naiveConsumer() {
  while (true) {
    const message = await messageQueue.pop(); // Blocking pop
    if (message) {
      try {
        const prediction = await callMLModel(message.body);
        await saveResult(prediction);
      } catch (error) {
        console.error('Processing failed, message lost!', error);
        // Message is lost if an error occurs here
      }
    }
  }
}

这个模型的缺陷显而易见:

  1. 单点故障: 如果消费者进程崩溃,内存中正在处理的消息将永久丢失。
  2. 无横向扩展能力: 启动多个实例会导致消息竞争和重复处理,除非消息队列本身支持消费者分组。
  3. 无反压机制 (Backpressure): 当消息涌入速度超过模型处理速度时,消费者会持续从队列拉取消息,导致内存迅速被待处理的数据填满,最终因OOM(Out of Memory)而崩溃。
  4. 脆弱的错误处理: 任何下游服务的瞬时故障(例如MLflow服务重启)都可能导致消息丢失。

我们需要一个完全不同的架构。一个能够确认消息已处理、支持多个消费者协同工作、能优雅地处理积压消息,并在消费者崩溃后保证消息不丢失的架构。

技术选型决策:为何是Redis Streams, Koa, MLflow?

在重构之前,我们重新审视了技术栈。

  • 消息队列: Redis Streams vs. Kafka/RabbitMQ
    我们已经大规模使用Redis作为缓存,引入一个独立的Kafka或RabbitMQ集群会增加运维复杂度和成本。Redis Streams提供了我们需要的所有核心功能:持久化日志、支持多个消费者组的消费、消息确认机制(ACK),以及处理失败消息的内在能力。对于这个场景,它足够轻量且强大。

  • 推理服务消费者: Koa.js
    推理任务本质上是I/O密集型的:接收数据、通过网络调用MLflow服务、将结果写入数据库。Node.js的异步非阻塞模型非常适合。我们选择Koa而非Express,是因为它基于async/await的中间件模型更现代、更简洁,能避免“回调地狱”,使错误处理逻辑更清晰。

  • 模型服务: MLflow Serving
    我们的数据科学团队已经在使用MLflow来跟踪实验和打包模型。利用MLflow Serving功能,他们可以一键将训练好的模型部署为一个标准的RESTful API端点。这完美地解耦了模型本身与我们的业务逻辑(消费者服务),使得模型更新迭代无需改动和重新部署消费者代码。

这是一个务实的选择,最大化地利用现有技术栈,同时通过组合它们来构建一个鲁棒的系统。

架构演进:从数据流到健壮的管道

新的架构围绕Redis Streams的消费者组(Consumer Group)概念构建。

graph TD
    subgraph "事件生产者 (Producers)"
        P1[Producer 1] --> RS
        P2[Producer 2] --> RS
        P3[Producer N] --> RS
    end

    subgraph "消息总线 (Message Bus)"
        RS(Redis Stream: feature_vectors)
    end

    subgraph "推理服务 (Inference Service - Koa)"
        RS -- XREADGROUP --> CG
        CG{Consumer Group: inference_group}
        CG --> C1(Koa Consumer 1)
        CG --> C2(Koa Consumer 2)
        CG --> C3(Koa Consumer N)
    end
    
    subgraph "模型服务 (Model Serving)"
        MLFLOW[MLflow Tracking & Serving]
    end

    subgraph "监控与恢复 (Monitoring & Recovery)"
        JANITOR(Janitor Process) -- XPENDING/XCLAIM --> RS
    end
    
    C1 -- HTTP POST --> MLFLOW
    C2 -- HTTP POST --> MLFLOW
    C3 -- HTTP POST --> MLFLOW

    MLFLOW -- Prediction --> C1
    MLFLOW -- Prediction --> C2
    MLFLOW -- Prediction --> C3

    C1 -- XACK --> RS
    C2 -- XACK --> RS
    C3 -- XACK --> RS

    style JANITOR fill:#f9f,stroke:#333,stroke-width:2px

这个架构的核心思想是:

  1. 生产者解耦: 生产者只管通过XADD命令将特征向量添加到名为feature_vectors的Stream中。
  2. 消费者组协同: 多个Koa消费者实例组成一个名为inference_group的消费者组。Redis会自动将Stream中的消息分发给组内的某个可用消费者。这天然地实现了负载均衡和横向扩展。
  3. 消息生命周期管理:
    • 消费者通过XREADGROUP获取消息。此时消息状态变为“已投递”,并被记录在消费者组的Pending Entries List (PEL)中。
    • 消费者成功处理完消息(即获得MLflow的预测结果)后,必须发送XACK命令,Redis才会将该消息从PEL中移除。
    • 如果消费者在XACK前崩溃,消息会一直留在PEL中。
  4. 僵尸进程处理: 一个独立的“清道夫”(Janitor)进程会定期扫描PEL,找出那些被投递了很长时间但仍未被ACK的消息(意味着处理它的消费者可能已经死亡),并通过XCLAIM命令将这些消息重新分配给组内其他健康的消费者。

生产级代码实现

下面是构成这个系统的关键代码片段,它们包含了配置、日志、错误处理和核心逻辑。

1. 环境变量与配置 (config.js)

在真实项目中,配置应与代码分离。

// src/config.js

require('dotenv').config();

const config = {
  redis: {
    host: process.env.REDIS_HOST || '127.0.0.1',
    port: parseInt(process.env.REDIS_PORT || '6379', 10),
    // Add other options like password if needed
  },
  stream: {
    key: process.env.STREAM_KEY || 'feature_vectors',
    group: process.env.STREAM_GROUP || 'inference_group',
    consumerName: `${process.env.HOSTNAME || 'consumer'}-${process.pid}`,
  },
  mlflow: {
    endpoint: process.env.MLFLOW_ENDPOINT || 'http://127.0.0.1:5001/invocations',
    timeout: parseInt(process.env.MLFLOW_TIMEOUT_MS || '2000', 10),
  },
  consumer: {
    // How many messages to fetch at once
    batchSize: parseInt(process.env.CONSUMER_BATCH_SIZE || '10', 10),
    // Block for 5 seconds if no messages are available
    blockTime: parseInt(process.env.CONSUMER_BLOCK_MS || '5000', 10),
    // Concurrency limit for processing messages
    concurrency: parseInt(process.env.CONSUMER_CONCURRENCY || '5', 10),
  },
  janitor: {
      // Check for stale messages every 30 seconds
      checkIntervalMs: 30 * 1000,
      // A message is considered stale if pending for more than 60 seconds
      staleMessageMs: 60 * 1000,
  }
};

module.exports = config;

2. 健壮的Koa消费者服务 (consumer_service.js)

这是系统的核心。它不仅消费消息,还实现了并发控制来施加反压。

// src/consumer_service.js

const Redis = require('ioredis');
const axios = require('axios');
const { RateLimiter } = require('limiter');
const pLimit = require('p-limit');
const config = require('./config');
const logger = require('./logger'); // A proper logger like Winston

const redisClient = new Redis(config.redis);

const streamKey = config.stream.key;
const groupName = config.stream.group;
const consumerName = config.stream.consumerName;

// This is our backpressure mechanism. It ensures we only process 'concurrency' messages at a time.
const limit = pLimit(config.consumer.concurrency);

/**
 * Ensures the stream and consumer group exist.
 * In a production setup, this might be handled by a separate deployment script.
 */
async function initializeStreamAndGroup() {
  try {
    // Using MKSTREAM to create the group only if the stream exists.
    // The 'mkstream' option will create the stream if it doesn't exist.
    await redisClient.xgroup('CREATE', streamKey, groupName, '$', 'MKSTREAM');
    logger.info(`Consumer group '${groupName}' created or already exists for stream '${streamKey}'.`);
  } catch (error) {
    // 'BUSYGROUP' error is expected if the group already exists.
    if (error.message.includes('BUSYGROUP')) {
      logger.info(`Consumer group '${groupName}' already exists.`);
    } else {
      logger.error('Failed to create consumer group:', error);
      process.exit(1);
    }
  }
}

/**
 * Calls the MLflow model serving endpoint.
 * @param {object} featureData - The feature vector to send.
 * @returns {Promise<object>} The prediction result.
 */
async function getPrediction(featureData) {
  try {
    const response = await axios.post(config.mlflow.endpoint, {
      dataframe_split: {
        columns: Object.keys(featureData),
        data: [Object.values(featureData)],
      }
    }, { timeout: config.mlflow.timeout });
    return response.data;
  } catch (error) {
    logger.error({
      message: 'MLflow invocation failed',
      error: error.message,
      data: featureData
    });
    // Re-throw to be caught by the processing loop
    throw error;
  }
}

/**
 * Processes a single message from the stream.
 * @param {string} messageId - The ID of the message.
 * @param {object} messageData - The parsed message content.
 */
async function processMessage(messageId, messageData) {
  logger.info(`Processing message ${messageId}`);
  try {
    const prediction = await getPrediction(messageData);
    logger.info({ message: `Prediction successful for ${messageId}`, prediction });

    // Here you would save the result to a database or another stream.
    // For this example, we just log it.

    // Acknowledge the message only after successful processing.
    await redisClient.xack(streamKey, groupName, messageId);
    logger.info(`ACKed message ${messageId}`);
  } catch (error) {
    logger.error(`Failed to process message ${messageId}. It will remain pending for re-delivery.`, error);
    // DO NOT ACK the message. It will be picked up later by the janitor process.
  }
}

/**
 * The main consumer loop.
 */
async function consume() {
  logger.info(`Consumer '${consumerName}' starting to listen to group '${groupName}'...`);
  while (true) {
    try {
      // Using XREADGROUP to read from the group.
      // '>' means read new messages not yet delivered to any consumer.
      // COUNT limits the batch size, BLOCK makes it wait.
      const result = await redisClient.xreadgroup(
        'GROUP', groupName, consumerName,
        'COUNT', config.consumer.batchSize,
        'BLOCK', config.consumer.blockTime,
        'STREAMS', streamKey, '>'
      );

      if (!result) {
        // Timed out (BLOCK duration passed), loop again.
        continue;
      }

      const messages = result[0][1];
      if (messages.length === 0) {
        continue;
      }

      logger.info(`Received batch of ${messages.length} messages.`);
      
      const processingPromises = messages.map(message => {
        const [messageId, fields] = message;
        // The message format from Redis is [key1, value1, key2, value2, ...].
        // We need to parse it into an object.
        const messageData = {};
        for (let i = 0; i < fields.length; i += 2) {
          messageData[fields[i]] = JSON.parse(fields[i + 1]);
        }
        
        // Use the p-limit wrapper to enforce concurrency.
        return limit(() => processMessage(messageId, messageData));
      });

      await Promise.all(processingPromises);

    } catch (error) {
      logger.error('Consumer loop error:', error);
      // Wait a bit before retrying to avoid spamming logs on persistent errors.
      await new Promise(resolve => setTimeout(resolve, 5000));
    }
  }
}

// Graceful shutdown
function handleShutdown() {
    logger.info('Shutting down consumer...');
    redisClient.quit().then(() => {
        logger.info('Redis connection closed.');
        process.exit(0);
    }).catch(err => {
        logger.error('Error during Redis disconnection:', err);
        process.exit(1);
    });
}

process.on('SIGTERM', handleShutdown);
process.on('SIGINT', handleShutdown);


initializeStreamAndGroup().then(consume);

代码解析:

  • 并发控制 (p-limit): 这是实现反压的关键。p-limit创建了一个并发执行的“令牌池”。即使我们一次性从Redis获取了10条消息(batchSize: 10),p-limit也会确保最多只有5个processMessage函数(concurrency: 5)在同时执行。这能有效防止我们的服务因瞬间涌入大量消息而过载,将处理压力平滑地传递给上游(即消息在Redis Stream中积压),而不是让消费者自身崩溃。
  • 错误处理: processMessage函数中的try...catch块至关重要。只有当getPrediction和后续处理全部成功时,才会执行xack。如果发生任何错误(例如MLflow服务503),消息不会被确认,它会保留在PEL中,等待被Janitor进程救援。
  • Graceful Shutdown: 监听SIGTERMSIGINT信号,确保在容器或进程被停止时,能有机会完成当前正在处理的消息并优雅地关闭Redis连接。

3. 清道夫(Janitor)进程 (janitor.js)

这个进程是系统容错能力的保障。它独立运行,确保没有消息被永久遗忘。

// src/janitor.js

const Redis = require('ioredis');
const config = require('./config');
const logger = require('./logger');

const redisClient = new Redis(config.redis);

const streamKey = config.stream.key;
const groupName = config.stream.group;

async function claimStaleMessages() {
  logger.info('Janitor checking for stale pending messages...');
  try {
    // Step 1: Check the pending list for the entire group.
    // XPENDING <stream> <group>
    const pendingSummary = await redisClient.xpending(streamKey, groupName);

    if (!pendingSummary || pendingSummary[0] === 0) {
      logger.info('No pending messages found.');
      return;
    }
    
    // pendingSummary is [total_pending, first_id, last_id, [consumer_info...]]
    const totalPending = pendingSummary[0];
    logger.info(`${totalPending} pending messages in total.`);

    // Step 2: Get detailed info for pending messages.
    // We get the first 100 stale messages to avoid fetching too much data.
    const pendingMessages = await redisClient.xpending(
      streamKey,
      groupName,
      '-', // Start from the beginning
      '+', // End at the end
      100 // Limit count
    );

    if (pendingMessages.length === 0) {
      return;
    }

    const messagesToClaim = [];
    for (const msg of pendingMessages) {
      const [messageId, consumer, idleTime, deliveryCount] = msg;
      if (idleTime > config.janitor.staleMessageMs) {
        messagesToClaim.push(messageId);
        logger.warn({
          message: 'Found stale message',
          messageId,
          originalConsumer: consumer,
          idleTimeMs: idleTime,
          deliveryCount,
        });
      }
    }

    if (messagesToClaim.length === 0) {
      logger.info('No messages exceeded stale threshold.');
      return;
    }

    // Step 3: Claim the stale messages for a generic "recovery" consumer.
    // Any consumer in the group can then pick them up.
    const claimerName = `${config.stream.consumerName}-janitor`;
    const claimedIds = await redisClient.xclaim(
      streamKey,
      groupName,
      claimerName,
      config.janitor.staleMessageMs, // min-idle-time to claim
      ...messagesToClaim
    );
    
    // XCLAIM returns the messages it successfully claimed.
    if (claimedIds && claimedIds.length > 0) {
      logger.info(`Successfully claimed ${claimedIds.length} stale messages.`);
    }

  } catch (error) {
    // If the stream doesn't exist yet, XPENDING will fail. This is okay.
    if (!error.message.includes('NOGROUP')) {
        logger.error('Janitor error:', error);
    }
  }
}

function startJanitor() {
    logger.info('Janitor process started.');
    setInterval(claimStaleMessages, config.janitor.checkIntervalMs);
    claimStaleMessages(); // Run once immediately
}

startJanitor();

代码解析:

  • XPENDING: 这是Janitor的核心。它能查询PEL,返回每个待处理消息的ID、所属消费者、空闲时间(从上次投递到现在的毫秒数)和总投递次数。
  • XCLAIM: 当Janitor发现一个消息的空闲时间超过了我们设定的阈值(例如60秒),它就认为原消费者已死。XCLAIM命令可以将这些消息的所有权转移给另一个消费者(这里我们用janitor自己作为名义上的认领者),并重置其空t闲时间。一旦被认领,这些消息就可以被组内任何一个活跃的消费者通过XREADGROUP再次消费。
  • 幂等性思考: 一个重要的单元测试思路是,我们的processMessage逻辑必须是幂等的。因为XCLAIM可能导致同一条消息被处理多次(例如,原消费者处理了一半但没来得及ACK就崩溃了)。确保重复处理不会产生错误的副作用(如重复记账)是设计这类系统的关键。

方案的局限性与未来优化路径

这个架构解决了最初的稳定性和扩展性问题,但它并非完美。

  1. 毒丸消息 (Poison Pill): 如果某条消息本身格式错误,导致getPrediction函数每次调用都会失败,那么这条消息会不断地被消费、失败、再被Janitor认领,陷入死循环,阻塞整个管道。一个更完善的方案是引入一个“死信队列”(Dead Letter Queue)。在processMessagecatch块中,检查消息的投递次数(通过XPENDING可以获取),如果超过某个阈值(例如3次),就将其XACK掉,并将其内容写入另一个专门用于异常排查的Stream。

  2. 可观测性: 当前的日志是基础的。一个生产级系统需要更丰富的可观测性。我们应该暴露Prometheus指标,监控:

    • Stream的长度 (XLEN),以观察消息积压情况。
    • 消费者组的Pending消息数 (XPENDING),以判断系统处理能力。
    • 消息处理的端到端延迟(从XADDXACK的时间)。
    • MLflow服务的响应时间和错误率。
  3. 动态并发调整: 当前的并发数是静态配置的。一个更智能的系统可以根据下游MLflow服务的健康状况(例如,错误率或延迟)或上游消息积压的程度,动态调整p-limit的并发值,实现更自适应的反压控制。

  4. MLflow服务的高可用: MLflow Serving本身是单点。在生产环境中,应将其容器化,并通过Kubernetes等编排工具部署多个副本,前面再挂一个负载均衡器,以确保其高可用。消费者调用的MLFLOW_ENDPOINT应指向这个负载均衡器的地址。


  目录