一套从 PWA 到数据湖的低延迟事件采集网关的 Fastify 实现


项目初期,我们依赖第三方SaaS产品进行用户行为分析。这套方案在启动阶段确实便捷,但随着PWA(Progressive Web App)业务的深入,其弊端愈发明显:阻塞主线程的JS脚本、数据所有权的缺失,以及无法将原始事件数据导入我们内部的数据湖进行更深层次的机器学习模型训练。技术债的积累迫使我们必须构建一套自主可控、高性能、高可靠的数据采集管道。

我们的目标很明确:

  1. 低延迟: 采集端点响应必须在毫秒级别,绝不能影响用户体验。
  2. 高吞吐: 能够应对突发流量,例如市场活动带来的用户量激增。
  3. 数据可靠性: 即便在用户网络不佳或短暂离线时,也要最大限度保证数据不丢失。
  4. 数据湖集成: 原始数据需要以结构化的方式高效地归档到对象存储(我们选用AWS S3作为数据湖的存储层),便于后续的ETL和分析。

初步构想是 PWA 通过 Fetch API 向一个 Node.js 服务发送埋点数据。但这过于理想化。在真实的生产环境中,我们需要解决的问题远不止于此:如何处理网络请求失败?服务器如何应对海量瞬时写入?直接写入 S3 会产生大量小文件和高昂的 API 调用成本,这在数据湖实践中是灾难性的。

技术选型决策:PWA + Fastify + S3

这套架构的核心是利用每项技术的独特优势来解决特定问题。

  • PWA (Service Worker): 它的作用远不止是缓存静态资源。Service Worker 运行在独立的线程中,是实现数据发送可靠性的关键。我们可以利用它的后台同步(Background Sync)API和IndexedDB来构建一个可靠的离线事件队列。当API请求失败时,事件被存入本地队列,待网络恢复后由Service Worker在后台自动重试。这是客户端数据可靠性的第一道防线。

  • Fastify: 作为采集网关,性能是第一考量。与Express.js相比,Fastify基于激进的路由优化和更少的抽象层,提供了近乎原生的HTTP处理性能。更重要的是,它内置了基于JSON Schema的请求体校验能力。在数据管道的入口处强制执行严格的Schema校验,是保障数据质量、防止“脏数据”污染数据湖的根本手段。这是一个常见的错误,很多团队在管道末端才进行数据清洗,成本高昂且为时已晚。

  • 数据湖 (Amazon S3): 我们的目标不是构建一个传统的OLTP数据库,而是收集海量的原始事件。直接写入数据库不仅成本高,而且Schema变更困难,不利于未来的探索性分析。将原始事件以JSON Lines或Avro/Parquet格式存储在S3中,成本极低,且能与整个大数据生态(如Athena, Spark, Redshift Spectrum)无缝集成。

架构流程设计

整个数据流被设计为一条清晰的单向管道。

sequenceDiagram
    participant PWA Client as PWA 客户端
    participant ServiceWorker as Service Worker
    participant FastifyGateway as Fastify 采集网关
    participant S3DataLake as S3 数据湖

    PWA Client->>ServiceWorker: 派发行为事件 (trackEvent)
    ServiceWorker->>FastifyGateway: 发送Beacon请求 (navigator.sendBeacon)
    alt 请求失败或离线
        ServiceWorker->>ServiceWorker: 事件存入IndexedDB队列
        Note right of ServiceWorker: 注册后台同步任务
    end
    FastifyGateway-->>ServiceWorker: 202 Accepted
    Note right of FastifyGateway: 校验Schema,压入内存批处理队列
    
    loop 周期性或定量触发
        FastifyGateway->>FastifyGateway: 聚合内存中的事件
        FastifyGateway->>S3DataLake: 压缩并上传批处理文件 (e.g., event_batch.jsonl.gz)
    end

第一站:PWA端的韧性事件发送器

客户端的实现核心在于健壮性。我们不能假设用户的网络永远在线。navigator.sendBeacon 是一个理想的选择,它允许浏览器在后台异步发送数据,并且不阻塞页面卸载流程。但它并非万无一失。

我们将结合 sendBeacon 和 Service Worker 的后台同步能力。

// src/analytics-sdk.js

// 简化的事件发送SDK
class AnalyticsSDK {
  constructor(endpoint) {
    this.endpoint = endpoint;
    this.swRegistration = null;
    this._initServiceWorker();
  }

  async _initServiceWorker() {
    if ('serviceWorker' in navigator) {
      try {
        this.swRegistration = await navigator.serviceWorker.register('/sw.js');
        console.log('Service Worker registered successfully');
      } catch (error) {
        console.error('Service Worker registration failed:', error);
      }
    }
  }

  track(eventName, payload) {
    const event = {
      id: self.crypto.randomUUID(),
      name: eventName,
      timestamp: new Date().toISOString(),
      payload: payload,
      // ... 其他元数据,如 user_id, session_id
    };

    // 优先尝试使用sendBeacon,因为它不会延迟页面卸载
    const blob = new Blob([JSON.stringify(event)], { type: 'application/json' });
    const sent = navigator.sendBeacon(this.endpoint, blob);

    // 如果sendBeacon失败(例如数据过大或浏览器不支持),则通过Service Worker消息传递
    if (!sent && this.swRegistration && this.swRegistration.active) {
      console.warn('sendBeacon failed, falling back to Service Worker postMessage.');
      this.swRegistration.active.postMessage({
        type: 'QUEUE_EVENT',
        event: event,
      });
    } else if (!sent) {
        // 在真实项目中,这里应该有一个更优雅的降级方案
        console.error('sendBeacon failed and Service Worker is not available.');
    }
  }
}

export const analytics = new AnalyticsSDK('/api/ingest');

Service Worker (sw.js) 的逻辑则更为关键,它扮演了离线队列和重试机制的角色。

// public/sw.js
import { openDB } from 'idb';

const DB_NAME = 'analytics-queue-db';
const STORE_NAME = 'event-queue';
const SYNC_TAG = 'analytics-sync';

// 初始化IndexedDB
const dbPromise = openDB(DB_NAME, 1, {
  upgrade(db) {
    if (!db.objectStoreNames.contains(STORE_NAME)) {
      db.createObjectStore(STORE_NAME, { keyPath: 'id' });
    }
  },
});

// 监听来自客户端的消息
self.addEventListener('message', (event) => {
  if (event.data && event.data.type === 'QUEUE_EVENT') {
    const { event: analyticsEvent } = event.data;
    event.waitUntil(
      (async () => {
        try {
            await queueEvent(analyticsEvent);
            // 事件入队后,立即尝试注册后台同步任务
            await self.registration.sync.register(SYNC_TAG);
        } catch (error) {
            console.error('Failed to queue event or register sync:', error);
        }
      })()
    );
  }
});

// 监听后台同步事件
self.addEventListener('sync', (event) => {
  if (event.tag === SYNC_TAG) {
    event.waitUntil(processQueue());
  }
});

async function queueEvent(event) {
  const db = await dbPromise;
  await db.put(STORE_NAME, event);
}

async function processQueue() {
  const db = await dbPromise;
  const events = await db.getAll(STORE_NAME);
  if (events.length === 0) {
    return;
  }

  console.log(`Processing ${events.length} queued events.`);

  // 这里的坑在于:如果批量发送,一个失败可能导致全部重试。
  // 在真实项目中,应该逐个发送并从队列中删除成功的事件。
  // 为简化示例,我们这里采用批量发送。
  try {
    const response = await fetch('/api/ingest/batch', {
      method: 'POST',
      headers: {
        'Content-Type': 'application/json',
      },
      body: JSON.stringify(events),
    });

    if (response.ok) {
      console.log('Successfully synced queued events.');
      // 成功后清空队列
      await db.clear(STORE_NAME);
    } else {
      console.error('Failed to sync events, server responded with:', response.status);
      // 如果服务器错误(5xx),则应该保留队列等待下次同步
      // 如果是客户端错误(4xx),可能需要丢弃这些事件
      if (response.status >= 400 && response.status < 500) {
        await db.clear(STORE_NAME);
      }
    }
  } catch (error) {
    console.error('Network error during sync, will retry later.', error);
    // 网络错误,保留队列,sync会稍后自动重试
  }
}

第二站:Fastify高性能采集网关

网关是整个系统的咽喉,其设计直接决定了系统的吞吐能力和数据质量。

1. 项目结构和基础配置

# 项目结构
.
├── src
│   ├── app.js          # Fastify应用入口
│   ├── config.js       # 应用配置
│   ├── routes
│   │   └── ingest.js   # 数据采集路由
│   └── services
│       └── batcher.js    # 批处理服务
└── package.json
// src/config.js
import pino from 'pino';

// 在真实项目中,这些配置应来自环境变量
export const config = {
  server: {
    host: '0.0.0.0',
    port: 3000,
  },
  aws: {
    s3: {
      bucketName: 'my-datalake-bucket',
      region: 'us-east-1',
      // 假设已通过IAM Role或环境变量配置了凭证
    },
  },
  batcher: {
    maxSize: 1000, // 每个批次最大事件数
    maxTime: 60 * 1000, // 最大等待时间 (ms)
  },
  logger: pino({ level: 'info' }),
};

2. 核心:事件批处理服务

直接为每个事件调用S3 PutObject API 是一场性能和成本的灾难。我们需要一个批处理服务,它在内存中聚合事件,直到达到一定数量或超时,然后将整个批次一次性写入S3。

// src/services/batcher.js
import { S3Client, PutObjectCommand } from "@aws-sdk/client-s3";
import { Readable } from 'stream';
import { createGzip } from 'zlib';
import { config } from '../config.js';

class S3Batcher {
  constructor(logger) {
    this.logger = logger;
    this.queue = [];
    this.timer = null;
    this.s3Client = new S3Client({ region: config.aws.s3.region });
    this.isFlushing = false;

    this.startTimer();
  }

  add(event) {
    this.queue.push(event);
    if (this.queue.length >= config.batcher.maxSize) {
      this.logger.info(`Batch size reached (${this.queue.length}), flushing...`);
      // 使用 setImmediate 避免在当前请求的事件循环中执行耗时操作
      setImmediate(() => this.flush());
    }
  }
  
  startTimer() {
    this.timer = setTimeout(() => {
        this.logger.info(`Batch time reached (${config.batcher.maxTime}ms), flushing...`);
        this.flush();
    }, config.batcher.maxTime);
  }

  resetTimer() {
    if (this.timer) {
      clearTimeout(this.timer);
    }
    this.startTimer();
  }

  async flush() {
    if (this.isFlushing || this.queue.length === 0) {
      // 如果正在刷写或队列为空,则重置计时器并返回
      if (!this.isFlushing) this.resetTimer();
      return;
    }

    this.isFlushing = true;
    clearTimeout(this.timer); // 停止当前计时器

    const batch = this.queue;
    this.queue = []; // 立即清空队列以接收新事件

    try {
      // 将事件对象数组转换为JSON Lines格式的字符串
      const body = batch.map(JSON.stringify).join('\n');
      
      const key = this.generateS3Key();
      this.logger.info(`Flushing batch of ${batch.length} events to s3://${config.aws.s3.bucketName}/${key}`);

      // 生产环境必须压缩,S3存储和传输成本显著降低
      const compressedBody = await this.gzip(body);

      const command = new PutObjectCommand({
        Bucket: config.aws.s3.bucketName,
        Key: key,
        Body: compressedBody,
        ContentType: 'application/json',
        ContentEncoding: 'gzip',
      });

      await this.s3Client.send(command);
      this.logger.info(`Successfully flushed batch to ${key}`);

    } catch (error) {
      this.logger.error({ error }, 'Failed to flush batch to S3. Re-queueing events.');
      // 失败重入队列是一个常见策略,但需要注意无限重试的风险
      // 在真实项目中,这里可能需要一个死信队列(DLQ)机制
      this.queue.unshift(...batch);
    } finally {
      this.isFlushing = false;
      this.resetTimer(); // 无论成功失败,都重新开始计时
    }
  }

  gzip(data) {
    return new Promise((resolve, reject) => {
      const buffer = Buffer.from(data, 'utf-8');
      const stream = Readable.from(buffer);
      const gzip = createGzip();
      const chunks = [];
      stream.pipe(gzip)
        .on('data', chunk => chunks.push(chunk))
        .on('end', () => resolve(Buffer.concat(chunks)))
        .on('error', err => reject(err));
    });
  }

  generateS3Key() {
    const now = new Date();
    const year = now.getUTCFullYear();
    const month = (now.getUTCMonth() + 1).toString().padStart(2, '0');
    const day = now.getUTCDate().toString().padStart(2, '0');
    const hour = now.getUTCHours().toString().padStart(2, '0');
    const timestamp = now.getTime();
    const randomId = self.crypto.randomUUID();
    
    // 这种分区友好的命名方式对数据湖查询至关重要
    return `raw_events/year=${year}/month=${month}/day=${day}/hour=${hour}/${timestamp}-${randomId}.jsonl.gz`;
  }
  
  // 优雅停机处理
  async gracefulShutdown() {
    this.logger.info('Graceful shutdown initiated, flushing remaining events...');
    // 在关闭前执行最后一次刷写
    await this.flush();
    this.logger.info('Shutdown flush complete.');
  }
}

export const s3Batcher = new S3Batcher(config.logger);

这里的坑在于状态管理。isFlushing 标志位防止了并发刷写。失败重入队列的策略很简单,但生产系统需要更复杂的机制,比如指数退避重试或将失败的批次写入另一个持久化队列。

3. Fastify路由与Schema校验

// src/routes/ingest.js
import { s3Batcher } from '../services/batcher.js';

const eventSchema = {
  type: 'object',
  properties: {
    id: { type: 'string', format: 'uuid' },
    name: { type: 'string', minLength: 1, maxLength: 100 },
    timestamp: { type: 'string', format: 'date-time' },
    payload: { type: 'object' },
  },
  required: ['id', 'name', 'timestamp'],
};

const batchSchema = {
    type: 'array',
    items: eventSchema
};

export default async function ingestRoutes(fastify, options) {

  // 单事件采集端点,用于 navigator.sendBeacon
  fastify.post('/ingest', {
    schema: {
      body: eventSchema,
    },
  }, async (request, reply) => {
    // 这里的校验是Fastify自动完成的,如果失败会直接返回400
    s3Batcher.add(request.body);
    // 立即返回202 Accepted,表示请求已被接受但尚未处理完成
    // 这是异步网关的标准实践,可以最大化吞吐量
    reply.code(202).send({ status: 'queued' });
  });

  // 批处理端点,用于Service Worker后台同步
  fastify.post('/ingest/batch', {
    schema: {
      body: batchSchema
    }
  }, async (request, reply) => {
    const events = request.body;
    for(const event of events) {
      s3Batcher.add(event);
    }
    reply.code(202).send({ status: 'queued', received: events.length });
  });
}

4. 应用入口与优雅停机

// src/app.js
import Fastify from 'fastify';
import { config } from './config.js';
import ingestRoutes from './routes/ingest.js';
import { s3Batcher } from './services/batcher.js';

const fastify = Fastify({
  logger: config.logger,
});

fastify.register(ingestRoutes, { prefix: '/api' });

const closeListeners = [];
const gracefulShutdown = async () => {
  fastify.log.warn('Server closing...');
  // 先关闭服务器,不再接受新请求
  await fastify.close();
  // 执行自定义的关闭逻辑,比如刷写缓冲区
  await s3Batcher.gracefulShutdown();
  process.exit(0);
};

// 监听系统信号
process.on('SIGINT', gracefulShutdown);
process.on('SIGTERM', gracefulShutdown);

const start = async () => {
  try {
    await fastify.listen({ port: config.server.port, host: config.server.host });
  } catch (err) {
    fastify.log.error(err);
    process.exit(1);
  }
};

start();

优雅停机是生产环境服务的必备能力。在收到SIGTERM信号(例如在K8s中Pod被终止时)时,我们需要确保内存中的事件缓冲区被完全刷写到S3,否则这部分数据就会永久丢失。

单元测试思路

对于这样一个系统,测试至关重要。

  • Batcher Service:
    • 模拟add方法,验证达到maxSize时是否触发flush
    • 使用sinonjestfake timers来测试超时maxTime是否触发flush
    • Mock AWS S3 SDK,验证flush方法生成的S3 Key格式是否正确,上传内容是否经过了压缩。
    • 测试flush失败时,事件是否被正确地重新加入队列。
  • Fastify Route:
    • 使用fastify.inject方法进行集成测试。
    • 测试无效的请求体是否返回400错误。
    • 测试有效的请求是否返回202,并验证s3Batcher.add方法是否被正确调用。

局限性与未来迭代路径

当前这套实现方案作为一个V1版本,在很多中小型项目中已经足够健壮。但在一个严肃的、大规模的生产环境中,它仍有其局限性。

最核心的局限在于采集网关的状态性。内存中的queue使得每个Fastify实例都是有状态的。这给水平扩展带来了挑战。如果我们在负载均衡器后面运行多个实例,必须使用粘性会话(Sticky Sessions)来确保来自同一用户的连续事件被路由到同一个实例,但这会破坏负载均衡的均匀性。一旦某个实例崩溃,其内存中尚未刷写的事件将全部丢失。

因此,下一步的架构演进方向非常明确:将状态外部化
我们可以引入一个轻量级的消息队列,如Redis Streams或更专业的Kafka/Pulsar,来替代内存批处理队列。

演进后的流程会是:

  1. Fastify网关接收到事件后,不再将其放入内存队列,而是直接推送到Kafka的某个Topic中。此时,网关本身变为完全无状态的,可以无限水平扩展。
  2. 一个独立的消费者服务(可以是Node.js、Java或Go编写的)从Kafka中消费事件,并执行与当前S3Batcher类似的批处理和写入S3的逻辑。

这种架构的解耦带来了巨大的好处: ingestion(采集)和processing(处理)分离,各自可以独立扩展和失败。采集层可以承受极高的瞬时流量,而处理层可以根据背压和资源情况按自己的节奏消费数据。这才是构建真正高可用、高可扩展数据管道的必经之路。


  目录