构建面向ClickHouse的高吞吐量Swift客户端可观测性Kit


项目对客户端行为的洞察需求越来越复杂,单纯依赖第三方的分析SDK已经无法满足。这些SDK通常是黑盒,数据模型僵化,查询能力有限,并且成本高昂。当我们需要将客户端的UI卡顿、网络请求时延与后端的全链路追踪数据进行关联分析时,现有工具链的无力感便凸显出来。问题的核心在于,我们需要一个能将客户端原始、高维度的可观测性数据,以低延迟、高吞吐的方式直接灌入我们自己的数据分析引擎的管道。

初步构想是开发一个内部的Swift Kit,它必须满足几个严苛的生产环境要求:

  1. 高性能与低侵入:采集和上报行为绝不能阻塞主线程,对App的性能影响必须降到最低。
  2. 数据可靠性:在网络不佳或应用崩溃的情况下,尽可能保证数据不丢失。
  3. 高吞吐量:能够应对用户高频交互、网络监控等场景下产生的大量事件。
  4. 灵活性:数据结构应能轻松扩展,以适应不同业务的分析需求。

技术选型上,服务端的数据存储与分析引擎,我们放弃了传统的日志系统如ELK栈。虽然ELK功能强大,但其倒排索引机制在处理海量、结构化的时序事件时,无论是写入性能还是存储成本,都并非最优解。最终,我们将目光投向了ClickHouse。

选择ClickHouse的理由很明确:

  • 极致的写入性能:其列式存储和MergeTree引擎族为大规模数据的实时写入而生,非常适合我们的事件流场景。
  • 惊人的查询速度:对于聚合、统计类查询,ClickHouse的速度远超多数通用数据库。这正是可观测性数据分析的核心诉求。
  • 优异的压缩比:列式存储带来的高压缩比能显著降低存储成本。
  • 简单的HTTP接口:无需引入庞大的客户端SDK,一个简单的HTTP POST请求就能完成数据写入,这对保持客户端Kit的轻量化至关重要。

我们的目标是构建一个名为 CKObservabilityKit 的库。它的核心架构设计如下:

graph TD
    subgraph App Main Thread
        A[Track Event API] --> B{Thread-Safe Event Buffer};
    end

    subgraph Background Processing Queue
        B -- Events --> C[Batch Processor];
        C -- Timer / Size Trigger --> D[Data Serialization];
        D -- Serialized Batch --> E[Network Uploader];
    end

    subgraph Network Layer
        E -- HTTP POST (GZIP) --> F((ClickHouse));
        E -- onFail --> G{Retry Logic};
        G -- Exponential Backoff --> E;
    end

    style A fill:#cde4ff
    style F fill:#ffb3ba

这个架构的核心思想是解耦:将事件的产生(主线程)与事件的处理和上报(后台线程)彻底分离。通过一个内存中的线程安全缓冲区作为中介,利用批处理机制来摊销网络和服务器的开销。

第一步: 定义数据模型与ClickHouse表结构

首先,我们需要一个统一的事件模型。在Swift中,我们使用 Codable 协议来定义,使其易于序列化。

// CKEvent.swift

import Foundation

/// 表示一个可观测性事件的结构体
/// 使用 AnyCodable 来处理异构的属性字典
public struct CKEvent: Codable {
    /// 事件唯一ID
    let eventId: String
    /// 事件发生的时间戳 (毫秒)
    let timestamp: UInt64
    /// 事件名称,用于分类和查询
    let eventName: String
    /// 会话ID,用于串联用户一次会话中的所有行为
    let sessionId: String
    /// 用户唯一标识
    let userId: String?
    /// 包含事件具体上下文信息的字典
    let properties: [String: AnyCodable]

    init(name: String, sessionId: String, userId: String?, properties: [String: Any] = [:]) {
        self.eventId = UUID().uuidString
        self.timestamp = UInt64(Date().timeIntervalSince1970 * 1000)
        self.eventName = name
        self.sessionId = sessionId
        self.userId = userId
        self.properties = AnyCodable.from(properties)
    }
}

// 一个简单的类型擦除包装器,让 [String: Any] 可以被 Codable 编码
public struct AnyCodable: Codable {
    private let value: Any

    public func encode(to encoder: Encoder) throws {
        // ... 具体实现 ...
    }
    public init(from decoder: Decoder) throws {
        // ... 具体实现 ...
    }
    
    static func from(_ dictionary: [String: Any]) -> [String: AnyCodable] {
        // ... 将 [String: Any] 转换为 [String: AnyCodable] ...
    }
}

这里的坑在于:直接使用 [String: Any] 是无法被 JSONEncoder 编码的。你需要一个类型擦除的包装器(如社区实现的 AnyCodable)来处理异构字典。

对应的ClickHouse表结构设计如下,这里的选型至关重要:

CREATE TABLE default.observability_events (
    `eventId` String,
    `timestamp` DateTime64(3, 'Asia/Shanghai'),
    `eventName` LowCardinality(String),
    `sessionId` String,
    `userId` Nullable(String),
    `properties` String, -- 使用 JSON 字符串存储,查询时用函数解析
    `eventDate` Date DEFAULT toDate(timestamp)
)
ENGINE = MergeTree()
PARTITION BY toDate(timestamp)
ORDER BY (eventName, timestamp)
SETTINGS index_granularity = 8192;
  • 引擎选择MergeTree 是ClickHouse的王牌引擎,提供数据分区、稀疏索引和数据复制能力。
  • 分区键:使用 toDate(timestamp) 按天分区,这是时序数据最常见的优化手段,可以极大加速按时间范围的查询,并便于数据生命周期管理。
  • 排序键ORDER BY (eventName, timestamp) 是核心优化。将低基数的 eventName 放在前面,可以有效压缩数据,并加速对特定事件类型的查询。
  • **LowCardinality(String)**:对于像 eventName 这样取值范围有限的字符串,使用 LowCardinality 类型可以将其转换为字典编码,大幅减少存储空间并提升查询性能。
  • properties 字段:我们选择将其作为原始JSON字符串存储。虽然ClickHouse支持 JSON 对象类型,但在写入密集型场景下,直接存为字符串,在查询时使用 JSONExtract* 系列函数进行解析,通常写入性能更好。这是一个典型的写时优化与读时计算的权衡。

第二步: 构建核心的异步处理管道

管道是整个Kit的灵魂,它必须是线程安全的、高效的。

// CKEventBuffer.swift

import Foundation

/// 线程安全的内存事件缓冲区
final class CKEventBuffer {
    // 使用读写锁来允许多个读取者同时访问,但写入是互斥的。
    // 在这个场景下,一个简单的NSLock或者DispatchQueue(label: "...", attributes: .concurrent) 也可以。
    // 这里用 actor 展示现代并发模型。
    private actor BufferActor {
        private var events: [CKEvent] = []

        var count: Int {
            events.count
        }

        func enqueue(_ event: CKEvent) {
            events.append(event)
        }

        func dequeue(count: Int) -> [CKEvent] {
            let limit = min(count, events.count)
            let batch = Array(events.prefix(limit))
            events.removeFirst(limit)
            return batch
        }
        
        func clear() {
            events.removeAll()
        }
    }

    private let actor = BufferActor()

    func add(event: CKEvent) async {
        await actor.enqueue(event)
    }
    
    func getBatch(size: Int) async -> [CKEvent] {
        await actor.dequeue(count: size)
    }

    func count() async -> Int {
        await actor.count
    }
}

使用actor可以非常简洁地保证对events数组的互斥访问,避免了手动管理锁带来的复杂性和潜在错误。

接下来是批处理器 CKBatchProcessor,它负责在后台定时或定量地从缓冲区拉取数据并触发上报。

// CKBatchProcessor.swift

import Foundation
import Combine

final class CKBatchProcessor {
    private let buffer: CKEventBuffer
    private let uploader: CKUploader
    private let config: CKConfig
    
    private var timer: AnyCancellable?
    private let backgroundQueue = DispatchQueue(label: "com.ckkit.batchprocessor", qos: .background)

    init(buffer: CKEventBuffer, uploader: CKUploader, config: CKConfig) {
        self.buffer = buffer
        self.uploader = uploader
        self.config = config
    }

    func start() {
        // 在真实项目中,应确保 start() 不被重复调用
        timer = Timer.publish(every: config.flushInterval, on: .main, in: .common)
            .autoconnect()
            .sink { [weak self] _ in
                self?.flush(reason: .timer)
            }
    }

    func stop() {
        timer?.cancel()
        timer = nil
    }

    func flush(reason: FlushReason) {
        backgroundQueue.async {
            Task { [weak self] in
                guard let self = self else { return }
                
                let currentCount = await self.buffer.count()
                // 如果是定时器触发,但缓冲区为空,则无需操作
                if reason == .timer && currentCount == 0 {
                    return
                }

                // 这里的坑在于:即使触发了flush,也应该拉取不超过最大批次大小的事件。
                // 避免一次性上传过多数据导致请求超时或内存峰值。
                let batch = await self.buffer.getBatch(size: self.config.batchSize)
                
                if !batch.isEmpty {
                    // print("Attempting to upload batch of \(batch.count) events due to \(reason).")
                    await self.uploader.upload(events: batch)
                }
            }
        }
    }
    
    enum FlushReason {
        case timer
        case batchSizeLimit
        case manual
    }
}

// 在 CKObservabilityKit 的主类中,每次添加事件后检查缓冲区大小
func track(event: CKEvent) {
    Task {
        await buffer.add(event: event)
        let count = await buffer.count()
        if count >= config.batchSize {
            processor.flush(reason: .batchSizeLimit)
        }
    }
}
  • DispatchQueue vs Task: 我们使用一个专用的后台 DispatchQueue 来调度 flush 操作,确保所有批处理逻辑都在同一个串行队列上执行,避免竞态条件。内部则使用 Task 来利用 Swift 的现代结构化并发。
  • 触发时机flush 操作由两个条件触发:一是定时器 (flushInterval),确保数据不会在缓冲区停留太久;二是缓冲区大小 (batchSize),当事件积压到一定程度时立即上报,以平滑网络流量。

第三步: 实现健壮的网络上传与重试机制

这是决定数据可靠性的关键一环。CKUploader 负责序列化、压缩和网络传输。

// CKUploader.swift

import Foundation

final class CKUploader {
    private let config: CKConfig
    private let buffer: CKEventBuffer // 需要引用buffer以便上传失败时重新入队
    private let session: URLSession

    init(config: CKConfig, buffer: CKEventBuffer) {
        self.config = config
        self.buffer = buffer
        let sessionConfig = URLSessionConfiguration.default
        sessionConfig.timeoutIntervalForRequest = 30.0
        sessionConfig.timeoutIntervalForResource = 60.0
        self.session = URLSession(configuration: sessionConfig)
    }

    func upload(events: [CKEvent]) async {
        guard !events.isEmpty else { return }

        do {
            let body = try serialize(events: events)
            let compressedBody = try (body as NSData).compressed(using: .gzip)
            
            var request = URLRequest(url: config.clickhouseEndpoint)
            request.httpMethod = "POST"
            request.setValue("gzip", forHTTPHeaderField: "Content-Encoding")
            request.setValue("application/json", forHTTPHeaderField: "Accept")
            // 在生产环境中,应包含认证信息
            // request.setValue("Bearer \(config.apiKey)", forHTTPHeaderField: "Authorization")
            request.httpBody = compressedBody as Data

            let (_, response) = try await session.data(for: request)
            
            guard let httpResponse = response as? HTTPURLResponse else {
                throw UploadError.nonHttpResponse
            }

            // ClickHouse 成功响应码为 200
            if httpResponse.statusCode != 200 {
                // 如果是 4xx 客户端错误,说明数据格式可能有问题,重试无用,直接丢弃并记录日志
                if (400...499).contains(httpResponse.statusCode) {
                    print("[CKKit Error] Client error \(httpResponse.statusCode). Dropping batch.")
                    return 
                }
                // 如果是 5xx 服务端错误,可以重试
                throw UploadError.serverError(statusCode: httpResponse.statusCode)
            }
            // print("[CKKit] Successfully uploaded \(events.count) events.")

        } catch {
            print("[CKKit Error] Upload failed: \(error). Re-queuing \(events.count) events.")
            await requeue(events: events)
        }
    }
    
    private func serialize(events: [CKEvent]) throws -> Data {
        // ClickHouse 的 JSONEachRow 格式要求每个 JSON 对象占一行
        let jsonStrings = try events.map { event -> String in
            let data = try JSONEncoder().encode(event)
            return String(data: data, encoding: .utf8) ?? ""
        }
        return jsonStrings.joined(separator: "\n").data(using: .utf8) ?? Data()
    }
    
    private func requeue(events: [CKEvent]) async {
        // 这里的坑是,不能直接把失败的批次放回队首,否则可能导致后续事件一直无法上报
        // 一个简单的策略是放回队尾,或者实现一个专门的重试队列。
        // 这里为了简化,我们放回队尾。
        for event in events {
            await buffer.add(event: event)
        }
    }
    
    enum UploadError: Error {
        case nonHttpResponse
        case serverError(statusCode: Int)
    }
}
  • JSONEachRow 格式:这是ClickHouse推荐的高性能JSON摄入格式。它要求每个JSON对象都是一个独立的行,用换行符分隔。这比一个大的JSON数组效率更高,因为它允许ClickHouse流式解析,而不需要将整个请求体加载到内存。
  • GZIP压缩:对于文本类的JSON数据,GZIP压缩效果非常显著,可以节省大量的网络带宽,尤其是在移动网络环境下。ClickHouse原生支持解压GZIP编码的请求体。
  • 错误处理与重试:这是生产级代码与玩具代码的核心区别。我们区分了可恢复的错误(如5xx服务器错误、网络超时)和不可恢复的错误(4xx客户端错误)。对于可恢复的错误,我们将事件重新放回缓冲区。一个更复杂的实现会引入指数退避(Exponential Backoff)策略,并为重试次数设置上限,避免无限重试耗尽设备资源。

第四步: 封装统一的公开API

最后,我们将所有组件组装起来,提供一个简洁易用的单例接口。

// CKObservabilityKit.swift

import Foundation

public final class CKObservabilityKit {
    public static let shared = CKObservabilityKit()

    private var buffer: CKEventBuffer!
    private var processor: CKBatchProcessor!
    private var uploader: CKUploader!
    private var config: CKConfig!
    
    // 会话管理
    private var sessionId: String = UUID().uuidString
    private var userId: String?

    private init() {}

    public func configure(with config: CKConfig) {
        guard self.config == nil else {
            print("[CKKit Warning] Already configured. Ignoring.")
            return
        }
        self.config = config
        self.buffer = CKEventBuffer()
        self.uploader = CKUploader(config: config, buffer: buffer)
        self.processor = CKBatchProcessor(buffer: buffer, uploader: uploader, config: config)
        self.processor.start()
        
        // 监听应用生命周期,在应用进入后台或终止时,尽力上报所有数据
        // NotificationCenter.default.addObserver(...)
    }

    public func identify(userId: String) {
        self.userId = userId
    }

    public func track(name: String, properties: [String: Any] = [:]) {
        guard let config = config else {
            print("[CKKit Error] You must call configure() before tracking events.")
            return
        }
        
        let event = CKEvent(name: name, sessionId: sessionId, userId: userId, properties: properties)
        
        Task(priority: .background) {
            await buffer.add(event: event)
            let count = await buffer.count()
            if count >= config.batchSize {
                processor.flush(reason: .batchSizeLimit)
            }
        }
    }
    
    // 提供手动触发上报的接口,用于应用退出等关键时机
    public func flush() {
        processor.flush(reason: .manual)
    }
}

public struct CKConfig {
    let clickhouseEndpoint: URL
    let batchSize: Int
    let flushInterval: TimeInterval
    
    public init(clickhouseEndpoint: URL, batchSize: Int = 100, flushInterval: TimeInterval = 15.0) {
        self.clickhouseEndpoint = clickhouseEndpoint
        self.batchSize = batchSize
        self.flushInterval = flushInterval
    }
}

通过这种方式,业务代码只需要在 AppDelegateSceneDelegate 中进行一次配置,之后就可以在任何地方通过 CKObservabilityKit.shared.track(...) 来记录事件,完全无需关心底层的缓冲、批处理和网络细节。

成果与验证

部署这套Kit后,我们获得了前所未有的客户端洞察力。例如,我们可以轻松地执行这样的ClickHouse查询:

-- 计算不同App版本下,首页(home_view)加载时间的 P90 和 P99 分位数
SELECT
    properties.app_version AS app_version,
    quantile(0.90)(JSONExtractUInt(properties, 'load_duration_ms')) AS p90_load_time,
    quantile(0.99)(JSONExtractUInt(properties, 'load_duration_ms')) AS p99_load_time
FROM default.observability_events
WHERE eventName = 'view_loaded' AND properties.view_name = 'home_view'
GROUP BY app_version
ORDER BY app_version DESC;

-- 查找导致API请求失败次数最多的用户
SELECT
    userId,
    count() AS failed_requests
FROM default.observability_events
WHERE eventName = 'api_request_failed'
GROUP BY userId
ORDER BY failed_requests DESC
LIMIT 10;

这种灵活性和查询性能是任何第三方SDK都无法比拟的。

局限性与未来迭代方向

当前这套实现并非完美,它是一个务实的起点,依然存在一些可以迭代优化的方向:

  1. 离线缓存:目前的实现是纯内存缓冲,如果应用在数据未上报前被强制杀死,这部分数据会丢失。一个关键的改进是增加一个磁盘缓存层,比如使用一个简单的文件队列或嵌入式数据库(如SQLite),将事件在入队时就持久化,上传成功后再删除。
  2. 动态配置与采样:对于某些高频事件(如滑动操作),全量上报是不现实的。未来可以从服务端下发动态配置,实现对特定事件的客户端采样,或者在后端实现尾部采样,以在保证数据代表性的前提下控制数据总量和成本。
  3. 网络感知:Kit可以感知当前的网络状态(Wi-Fi/蜂窝网络),在蜂窝网络下采用更小的批次、更长的上报间隔,或只上报高优事件,以节省用户流量。
  4. 数据模式一致性properties 字典的灵活性是一把双刃剑,它可能导致数据质量问题。引入某种形式的Schema注册和校验机制,或者在团队内部推广数据埋点规范,是规模化后必须考虑的问题。

  目录