构建支持流批一体与即时图计算的混合特征存储架构决策与实现


在一个高并发的风控或推荐场景下,对特征计算和存储系统的要求是极为苛刻的。系统必须能以毫秒级延迟响应在线服务的请求,同时处理来自实时事件流的瞬时特征、来自批处理作业的深度历史特征,以及在特定请求下才能触发的复杂关系网络特征。任何单一的架构范式——纯流式、纯批处理——都无法独立应对这种混合负载。问题的核心在于如何设计一个数据底座,能够优雅地融合这几种完全不同的数据处理模式,并对外提供统一、高性能的访问接口。

定义问题:混合特征的三重挑战

我们的目标是构建一个特征平台,它需要同时满足以下三点:

  1. 实时性 (Streaming Features): 基于用户当前会话的行为(如点击流、交易事件),在秒级窗口内计算特征,例如“用户最近5分钟内的交易失败次数”。
  2. 历史性 (Batch Features): 基于海量历史数据进行深度挖掘,计算长周期、全局性的特征,例如“用户过去90天的平均客单价”或“该用户所属分群的欺诈风险指数”。
  3. 关联性 (On-Demand Graph Features): 在某些高风险请求中,需要即时计算用户在复杂关系网络中的结构性特征,例如“查询目标用户的二度关系人中是否存在已知的欺诈账户”。这类计算成本极高,无法为所有用户预先计算。

传统的架构往往将这三者割裂处理,导致数据孤岛、技术栈冗余和数据一致性难题。我们的任务,就是在一套统一的架构中解决这个问题。

方案评估:从单一范式到混合架构

一个看似直接的方案是,将所有数据都视为事件流,通过Kafka汇集,使用Apache Flink进行处理,并将结果实时写入一个低延迟的键值存储(如Redis或ScyllaDB)。

  • 优势:

    • 端到端延迟极低,非常适合计算实时特征。
    • 架构相对简单,技术栈统一。
  • 劣势:

    • 历史特征计算的困境: Flink状态后端虽然强大,但存储全量历史数据(TB甚至PB级别)是不现实的,成本和稳定性都成问题。进行跨越数月甚至数年的全局聚合计算,流式引擎并非最佳选择。
    • 复杂图计算的无力: 在流式任务中执行深度图遍历或社区发现算法,会严重阻塞主处理流程,造成巨大的反压和延迟。这对于要求毫秒级响应的服务是不可接受的。
    • 成本: 维持一个能容纳全量数据的庞大Flink状态和高性能KV存储,成本高昂。

这个方案在处理纯粹的实时场景时是有效的,但无法满足我们对历史和关联性特征的需求,在真实生产环境中存在明显短板。

方案B:传统批处理架构 (Hadoop + 数据仓库)

另一个极端是经典的数仓架构。所有数据首先落入Hadoop HDFS,通过每日或每小时的MapReduce/Spark批处理作业计算所有类型的特征,然后将结果导入到服务数据库(如MySQL或HBase)中。

  • 优势:

    • 批处理对复杂、大规模的历史数据分析能力极强。
    • 技术成熟,生态完善,适合进行深度数据挖掘。
    • 单位存储成本低。
  • 劣势:

    • 数据时效性差: T+1或T+H的延迟对于实时风控是致命的。当欺诈行为发生时,系统看到的还是几小时前的数据。
    • 无法响应实时事件: 架构本身不具备处理实时事件流的能力。

此方案完全牺牲了实时性,无法满足业务的核心需求。

最终架构选型:流批一体的混合模型

我们最终选择的架构是一个融合了流处理、批处理、即时计算和高性能存储的混合模型。它并非简单地将多个组件堆砌,而是让它们在数据流转和计算触发上形成有机协同。

graph TD
    subgraph "在线服务层"
        API[服务API / 模型推理]
    end

    subgraph "离线计算层 (Batch)"
        HDFS[Hadoop HDFS] --> Spark[Spark/MapReduce Job]
        Spark --> ScyllaDB_Batch[(ScyllaDB)]
    end

    subgraph "实时计算层 (Streaming)"
        Kafka[Kafka Event Stream] --> Flink[Apache Flink Job]
        Flink --> ScyllaDB_Stream[(ScyllaDB)]
    end

    subgraph "即时计算层 (On-Demand)"
        CeleryBroker(RabbitMQ/Redis)
        CeleryWorker[Celery Worker] --> GraphDB[NoSQL 图数据库]
        GraphDB --> CeleryWorker
        CeleryWorker --> ScyllaDB_OnDemand[(ScyllaDB)]
    end

    subgraph "统一存储层 (Unified Storage)"
        ScyllaDB_Batch -- 数据写入 --> FeatureStore[ScyllaDB 集群]
        ScyllaDB_Stream -- 数据写入 --> FeatureStore
        ScyllaDB_OnDemand -- 数据写入 --> FeatureStore
    end

    API -- "高价值请求" --> CeleryBroker -- "触发任务" --> CeleryWorker
    API -- "常规请求" --> FeatureStore

    RawLogs[原始日志] --> HDFS
    RawLogs --> Kafka

这个架构的核心决策点如下:

  • 数据湖基石 (Hadoop HDFS): 所有原始日志的唯一事实来源(Single Source of Truth)。它为批处理提供了廉价、可靠的存储。
  • 在线特征存储 (ScyllaDB): 选择ScyllaDB而非Cassandra或HBase,关键在于其对p99延迟的极致优化。其基于Seastar框架的线程-核心绑定(Thread-per-core)架构避免了传统JVM数据库在GC和上下文切换上的开销,为在线服务提供了稳定、可预测的亚毫秒级读取性能。
  • 实时心脏 (Apache Flink): Flink的强项在于其强大的状态管理和事件时间处理能力,确保了实时特征计算的准确性,即使在事件乱序或延迟的情况下。
  • 关系大脑 (图数据库): 使用一个专门的图数据库(如JanusGraph,其后端可配置为ScyllaDB,或NebulaGraph)来存储实体间的关系。这使得复杂的关联查询变得高效。
  • 异步调度中枢 (Celery): 这是整个架构的“粘合剂”。Celery在这里扮演两个角色:
    1. 批处理任务编排: 触发并监控重量级的Spark作业。
    2. 即时计算引擎: 当在线服务遇到一个需要进行昂贵图计算的请求时,它不直接查询图数据库,而是向Celery提交一个异步任务。这避免了同步阻塞API,同时将计算压力隔离到专门的Worker池中。

核心实现细节与代码

1. ScyllaDB 的表结构设计

为支持混合特征,我们需要精心设计表结构。一个关键原则是,无论是流式、批处理还是即时计算的特征,都应该能写入同一张逻辑表,通过字段来区分来源和时效性。

-- 使用CQL (Cassandra Query Language) 定义特征表
-- 在真实的生产环境中,需要更复杂的键设计来避免热点,例如增加hash前缀。
CREATE KEYSPACE feature_store WITH replication = {'class': 'NetworkTopologyStrategy', 'datacenter1': 3};

USE feature_store;

-- 定义一个用户自定义类型来存储特征值和元数据
CREATE TYPE feature_value (
    value_double double,
    value_string text,
    value_vector list<float>,
    updated_at timestamp,
    source text -- 'stream', 'batch', 'ondemand'
);

-- 主特征表,以用户ID为分区键
CREATE TABLE user_features (
    user_id uuid,
    feature_name text,
    feature_data feature_value,
    PRIMARY KEY (user_id, feature_name)
) WITH CLUSTERING ORDER BY (feature_name ASC);

-- 示例:插入一个流式计算的特征
-- Flink任务将会执行类似这样的更新
UPDATE user_features
SET feature_data = {
    value_double: 5.0,
    updated_at: toTimestamp(now()),
    source: 'stream'
}
WHERE user_id = 123e4567-e89b-12d3-a456-426614174000 AND feature_name = 'transaction_count_5min';

-- 示例:插入一个批处理计算的特征
-- Spark作业输出会执行这类更新
UPDATE user_features
SET feature_data = {
    value_double: 785.6,
    updated_at: '2023-10-27T02:00:00Z',
    source: 'batch'
}
WHERE user_id = 123e4567-e89b-12d3-a456-426614174000 AND feature_name = 'avg_transaction_amount_90d';

设计考量:

  • user_id 作为分区键,保证同一个用户的所有特征在物理上临近,便于高效查询。
  • feature_name 作为聚类键,使得我们可以一次性查询某个用户的所有特征或一个范围的特征。
  • 使用feature_value UDT (User-Defined Type) 将特征值与元数据(如更新时间、来源)封装在一起,这对于问题排查、特征版本管理和监控至关重要。

以下是一个简化的Flink作业(使用Java),它从Kafka消费交易事件,计算5分钟滚动窗口内的交易次数,并将其写入ScyllaDB。

import com.datastax.driver.core.Cluster;
import com.datastax.driver.mapping.Mapper;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.cassandra.CassandraSink;
import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;

import java.time.Duration;
import java.util.UUID;

public class RealtimeFeatureEngine {

    // 内部类,代表从Kafka消费的原始事件
    public static class TransactionEvent {
        public UUID userId;
        public double amount;
        public long timestamp;
        // ... 其他字段
    }

    // 内部类,代表要写入ScyllaDB的特征
    public static class UserFeature {
        private UUID user_id;
        private String feature_name;
        // 在真实项目中,这里应该是一个复杂的UDT对象
        private double feature_value; 
        private String source = "stream";

        // getters and setters ...
    }


    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 配置检查点,这是生产环境稳定性的关键
        env.enableCheckpointing(60000); // 每60秒一个checkpoint
        env.getCheckpointConfig().setCheckpointTimeout(300000); // 5分钟超时
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000); // 两个checkpoint之间最小间隔
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

        // 1. 定义Kafka数据源
        KafkaSource<TransactionEvent> source = KafkaSource.<TransactionEvent>builder()
                .setBootstrapServers("kafka-broker1:9092,kafka-broker2:9092")
                .setTopics("transactions")
                .setGroupId("flink-feature-engine")
                .setStartingOffsets(OffsetsInitializer.latest())
                // 使用JSON反序列化器,这里省略了具体实现
                .setValueOnlyDeserializer(new TransactionEventDeserializer()) 
                .build();

        // 2. 创建数据流并分配时间戳和水印
        DataStream<TransactionEvent> transactions = env.fromSource(source,
                WatermarkStrategy.<TransactionEvent>forBoundedOutOfOrderness(Duration.ofSeconds(10))
                        .withTimestampAssigner((event, timestamp) -> event.timestamp),
                "Kafka Transaction Source");

        // 3. 核心业务逻辑:计算5分钟滚动窗口内的交易次数
        DataStream<UserFeature> featureStream = transactions
                .keyBy(event -> event.userId)
                .window(TumblingEventTimeWindows.of(Time.minutes(5)))
                .aggregate(new TransactionCounterAggregate()) // 自定义聚合函数,返回 (userId, count)
                .map(new MapFunction<Tuple2<UUID, Long>, UserFeature>() {
                    @Override
                    public UserFeature map(Tuple2<UUID, Long> value) throws Exception {
                        UserFeature feature = new UserFeature();
                        feature.setUserId(value.f0);
                        feature.setFeatureName("transaction_count_5min");
                        feature.setFeatureValue((double) value.f1);
                        return feature;
                    }
                });

        // 4. 定义ScyllaDB Sink
        CassandraSink.addSink(featureStream)
                .setClusterBuilder(new ClusterBuilder() {
                    @Override
                    protected Cluster buildCluster(Cluster.Builder builder) {
                        return builder
                                .addContactPoint("scylla-node1.example.com")
                                .withPort(9042)
                                .withCredentials("username", "password") // 生产环境使用凭证
                                .build();
                    }
                })
                // 非常重要:写入语句必须是幂等的,以保证Exactly-Once
                .setQuery("UPDATE feature_store.user_features SET feature_data = {value_double: ?, updated_at: toTimestamp(now()), source: ?} WHERE user_id = ? AND feature_name = ?;")
                // 配置Mapper,将UserFeature对象字段映射到查询的?占位符
                .setMapperOptions(() -> new Mapper.Option[]{
                        Mapper.Option.map(UserFeature::getFeatureValue),
                        Mapper.Option.map(UserFeature::getSource),
                        Mapper.Option.map(UserFeature::getUserId),
                        Mapper.Option.map(UserFeature::getFeatureName)
                })
                .build();
        
        env.execute("Real-time Feature Engineering Job");
    }
    // TransactionCounterAggregate 和 TransactionEventDeserializer 的实现被省略
}

代码关键点:

  • 状态与容错: 启用并配置Checkpoint是生产级Flink作业的必要条件,它保证了在发生故障时能够从上一个检查点恢复状态,实现Exactly-Once或At-Least-Once语义。
  • 事件时间: 使用WatermarkStrategy处理事件时间,可以正确处理数据延迟和乱序,保证窗口计算的准确性。
  • 幂等写入: Sink到ScyllaDB的UPDATE语句是幂等的。这意味着即使Flink因故障恢复而重放了某些数据,多次执行相同的UPDATE操作结果也是一样的。这是实现端到端Exactly-Once的关键。

3. Celery 即时图计算任务

当API服务需要一个昂贵的图特征时,它会触发一个Celery任务。

# tasks.py - Celery任务定义
import os
from celery import Celery
from celery.utils.log import get_task_logger
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from graph_client import GraphDatabaseClient # 这是一个假设的图数据库客户端

# 日志记录
logger = get_task_logger(__name__)

# Celery App配置
# 在生产环境中,这些配置应该来自配置文件或环境变量
CELERY_BROKER_URL = os.environ.get('CELERY_BROKER_URL', 'amqp://guest:guest@rabbitmq:5672//')
CELERY_RESULT_BACKEND = os.environ.get('CELERY_RESULT_BACKEND', 'redis://redis:6379/0')

app = Celery('feature_tasks', broker=CELERY_BROKER_URL, backend=CELERY_RESULT_BACKEND)

# ScyllaDB 和 图数据库的连接不是在全局创建的,而是在任务内部按需创建或使用连接池
# 这是为了避免在fork模型下出现连接共享问题
SCYLLA_HOSTS = ['scylla-node1.example.com']
SCYLLA_KEYSPACE = 'feature_store'

def get_scylla_session():
    # 生产环境中应使用更健壮的连接池管理
    cluster = Cluster(SCYLLA_HOSTS)
    session = cluster.connect(SCYLLA_KEYSPACE)
    return session

@app.task(bind=True, name='tasks.compute_graph_feature', max_retries=3, default_retry_delay=60)
def compute_graph_feature(self, user_id: str):
    """
    一个计算用户二度人脉中欺诈账户数量的示例任务
    """
    logger.info(f"Starting graph feature computation for user: {user_id}")
    scylla_session = None
    try:
        graph_db = GraphDatabaseClient() # 连接图数据库
        
        # 1. 执行昂贵的图查询
        # "MATCH (u:User {id: $userId})-[:RELATION*2]->(u2:User) WHERE u2.is_fraud = true RETURN count(u2)"
        fraud_contacts_count = graph_db.get_second_degree_fraud_contacts(user_id)
        logger.info(f"User {user_id} has {fraud_contacts_count} fraudulent second-degree contacts.")
        
        # 2. 将结果写入ScyllaDB
        scylla_session = get_scylla_session()
        cql_query = """
        UPDATE user_features 
        SET feature_data = {value_double: ?, updated_at: toTimestamp(now()), source: 'ondemand'} 
        WHERE user_id = ? AND feature_name = 'fraud_contacts_2nd_degree'
        """
        prepared = scylla_session.prepare(cql_query)
        scylla_session.execute(prepared, (float(fraud_contacts_count), UUID(user_id)))
        
        return {"status": "SUCCESS", "user_id": user_id, "feature_value": fraud_contacts_count}
        
    except Exception as exc:
        logger.error(f"Task for user {user_id} failed. Retrying...", exc_info=True)
        # Celery的自动重试机制
        raise self.retry(exc=exc)
    finally:
        if scylla_session:
            scylla_session.cluster.shutdown()

# 在API服务端的调用方式
# from tasks import compute_graph_feature
# task = compute_graph_feature.delay(user_id='123e4567-e89b-12d3-a456-426614174000')
# result = task.get(timeout=5) # 可以同步等待,但更推荐异步轮询或webhook

设计要点:

  • 异步解耦: API服务通过.delay()调用将任务推入队列后立即返回,不会被长时间的计算阻塞。
  • 隔离性: 图计算的资源消耗被隔离在Celery Worker进程中,不会影响主API服务的性能和稳定性。
  • 健壮性: 利用Celery的重试机制 (max_retries, default_retry_delay) 来处理图数据库或ScyllaDB的暂时性故障,提高了系统的韧性。
  • 缓存效应: 计算结果被写入ScyllaDB。如果短时间内有对同一用户的相同特征请求,可以直接从ScyllaDB中获取,无需重复计算。可以为这类特征设置一个较短的TTL。

架构的局限性与未来迭代

尽管该架构解决了混合特征的核心挑战,但在生产环境中仍存在一些需要注意的局限性:

  1. 运维复杂性: 同时维护Hadoop、Flink、ScyllaDB、图数据库和Celery等多个分布式系统,对团队的DevOps能力要求很高。统一的监控、告警和日志平台是必不可少的。
  2. 数据一致性: 流式路径和批处理路径对同一特征的计算结果可能存在微小差异(例如,窗口对齐问题)。这要求在业务层面能够容忍这种最终一致性,并建立数据质量监控来追踪偏差。
  3. 即时计算的延迟: 对于首次触发的即时计算请求,用户仍然需要等待Celery任务完成,这个延迟可能在秒级。虽然是异步的,但对于某些同步场景,需要产品设计上的配合(例如,前端显示“正在进行深度安全分析…”)。

未来的迭代方向可能包括:

  • 统一计算引擎: 探索使用Apache Flink的批处理API或Flink SQL来逐步替代部分Spark作业,向更统一的Lambda或Kappa架构演进,以简化技术栈。
  • 特征治理: 引入一个中心化的特征注册表(Feature Registry),用于管理特征的定义、元数据、版本和血缘关系,解决特征发现和复用的问题。
  • 智能化调度: 对于即时计算,可以基于请求的风险评分来决定是否触发昂贵的图计算,而不是对所有高价值请求都一视同仁,从而进一步优化资源使用。

  目录