基于 Lua 和 SSE 构建用于全链路压测的轻量级实时指标中继


在一次复杂的全链路压测中,我们面临一个棘手的问题:如何实时、低延迟地聚合来自数十个微服务的海量性能指标(QPS、Latency P99、错误率等),并将其推送至一个统一的监控仪表盘,同时确保这个指标收集过程本身不会成为新的性能瓶颈。传统的日志聚合方案(ELK)延迟太高,而引入重量级的消息队列(Kafka)又显得杀鸡用牛刀,增加了架构复杂度和运维成本。我们需要一个轻量、高效、且具备一定动态处理能力的解决方案。

最初的构想是构建一个独立的指标中继(Relay)服务。这个服务通过 UDP 接收各个业务服务“即发即弃”的指标数据,然后通过 Server-Sent Events (SSE) 将处理后的数据流式推送到前端仪表盘。技术选型的关键在于这个 Relay 的实现。它必须能处理极高的并发连接和数据吞吐。最终,我们选择了 OpenResty,一个基于 Nginx 并集成了 LuaJIT 的高性能 Web 平台。Nginx 的事件驱动模型天然适合处理大量并发连接,而 LuaJIT 的性能则足以胜任实时的数据处理。

更核心的决策是引入 Lua 作为动态处理引擎。压测场景多变,有时我们关心延迟,有时关心错误码分布。如果将处理逻辑硬编码在 Relay 服务中,每次变更都需要重新部署,这在快节奏的压测过程中是不可接受的。通过将核心处理逻辑抽离成一个可动态加载的 Lua 脚本,我们可以随时调整指标的过滤、聚合和转换规则,而无需重启服务。

最终的架构示意如下:

graph TD
    subgraph "压测目标集群"
        ServiceA[服务 A] -- UDP --> Relay
        ServiceB[服务 B] -- UDP --> Relay
        ServiceC[服务 C] -- UDP --> Relay
    end

    subgraph "指标中继 (OpenResty)"
        Relay(UDP Listener) -. In-memory Channel .-> Processor
        Processor -- "执行动态 Lua 脚本" --> Aggregator
        Aggregator -. Fan-out .-> SSE_Handler1
        Aggregator -. Fan-out .-> SSE_Handler2
        Aggregator -. Fan-out .-> SSE_HandlerN
    end

    subgraph "监控端"
        Dashboard1[压测仪表盘 1] <-- SSE --> SSE_Handler1
        Dashboard2[压测仪表盘 2] <-- SSE --> SSE_Handler2
        DashboardN[压测仪表盘 N] <-- SSE --> SSE_HandlerN
    end

    style Relay fill:#f9f,stroke:#333,stroke-width:2px
    style Processor fill:#ccf,stroke:#333,stroke-width:2px

一、环境与项目结构

我们选择 OpenResty 作为运行时环境。整个项目的核心是 nginx.conf 配置文件和 lua/ 目录下的 Lua 脚本。

目录结构:

/usr/local/openresty/nginx/
├── conf/
│   └── nginx.conf
└── lua/
    ├── relay_core.lua      # 核心逻辑:SSE连接管理、数据广播
    ├── processor.lua       # 动态脚本加载与执行模块
    └── user_scripts/
        └── default_aggregator.lua # 用户自定义的聚合脚本

二、Nginx 配置:搭建骨架

nginx.conf 是所有功能的入口,它定义了数据接收、数据推送和脚本管理三个关键部分。

# nginx.conf

worker_processes  auto;
error_log logs/error.log info;

events {
    worker_connections 10240;
}

http {
    # Lua 模块路径
    lua_package_path "/usr/local/openresty/nginx/lua/?.lua;;";

    # 定义一个共享内存区域,用于存储SSE客户端连接信息
    # 虽然worker间内存不共享,但我们可以用一些技巧来通信
    # 这里我们采用一个简化的模型,每个worker独立管理自己的客户端
    # 更复杂的场景需要借助 redis pub/sub
    lua_shared_dict sse_clients 10m;

    # 预加载核心Lua模块,提升性能
    init_by_lua_block {
        -- 在Master进程启动时加载,所有worker共享
        -- 避免在每个请求中重复加载和编译
        require "relay_core"
        require "processor"
    }

    server {
        listen 8080; # HTTP 端口,用于SSE连接

        # SSE 推送端点
        location /events {
            # 设置SSE的MIME类型和头部
            default_type 'text/event-stream';
            header_filter_by_lua_block {
                ngx.header['Content-Type'] = 'text/event-stream; charset=utf-8';
                ngx.header['Cache-Control'] = 'no-cache';
                ngx.header['Connection'] = 'keep-alive';
            }

            # 核心处理逻辑
            content_by_lua_block {
                relay_core.handle_sse_request()
            }
        }

        # 一个简单的管理接口,用于热重载聚合脚本
        location /reload_script {
            content_by_lua_block {
                local script_name = ngx.var.arg_script or "default_aggregator.lua"
                local ok, err = processor.load_script(script_name)
                if ok then
                    ngx.say("Script reloaded successfully: ", script_name)
                else
                    ngx.status = 500
                    ngx.say("Failed to reload script: ", err)
                end
            }
        }
    }

    # UDP 服务器,用于接收指标
    server {
        listen 9090 udp; # UDP 端口

        # UDP数据包处理
        preread_by_lua_block {
            relay_core.handle_udp_packet()
        }
    }
}

这份配置中有几个关键点:

  1. worker_processes auto;: 让 Nginx 根据 CPU核心数自动决定 worker 数量,充分利用多核性能。
  2. lua_package_path: 指定 Lua 模块的搜索路径。
  3. init_by_lua_block: 在 Nginx Master 进程启动时执行。这里我们用它来预加载核心的 Lua 模块。这是一种重要的性能优化,因为 Lua 代码只会被编译一次,所有 worker 进程共享这份只读的字节码。
  4. /events location: 这是 SSE 客户端连接的端点。content_by_lua_block 将请求完全交给 relay_core.lua 脚本处理。
  5. UDP server: Nginx 的 stream 模块(虽然这里配置在 http block 外部更规范,但为简化示例放在一起,实际生产应使用 stream block)可以监听 UDP 端口。preread_by_lua_block 让我们能在 TCP/IP 协议栈的更早阶段处理数据,性能极高。

三、核心实现:连接管理与数据广播

relay_core.lua 是系统的中枢神经。它负责管理所有活跃的 SSE 连接,并把从 UDP 收到的数据广播给它们。

由于 Nginx 的多 worker 模型,每个 worker 进程都有自己独立的内存空间。一个 worker 无法直接访问另一个 worker 的变量。这意味着在一个 worker 中建立的 SSE 连接,无法被另一个接收到 UDP 包的 worker 直接推送数据。

一个简单且实用的方案是让每个 worker 独立管理自己持有的 SSE 客户端。UDP 数据包会被 Nginx 分发给某个空闲的 worker,这个 worker 再将数据“广播”给自己名下的所有客户端。这种模式虽然不是全局广播,但在压测场景下,指标数据量巨大且均匀,每个客户端收到的数据样本在统计意义上是足够一致的。对于需要全局精确广播的场景,则需要引入外部中间件如 Redis Pub/Sub。

-- lua/relay_core.lua

local server = require "resty.core.preread"
local cjson = require "cjson.safe"

local M = {}

-- 每个 worker 进程私有的客户端列表
-- key 是客户端的唯一ID,value 是一个包含 ngx.socket 对象的 table
local clients = {}
local client_id_counter = 0

-- 广播消息给当前 worker 管理的所有客户端
function M.broadcast(message)
    -- ngx.log(ngx.INFO, "Broadcasting message: ", message)
    local dead_clients = {}
    for cid, client in pairs(clients) do
        -- 在写入前检查 socket 是否仍然有效
        -- 使用 pcall 捕获可能因客户端断开连接而产生的错误
        local ok, err = pcall(function()
            -- SSE 格式: "data: <json_string>\n\n"
            local sse_message = "data: " .. message .. "\n\n"
            local bytes, err = client.sock:send(sse_message)
            if not bytes then
                ngx.log(ngx.ERR, "Failed to send to client ", cid, ": ", err)
                table.insert(dead_clients, cid)
            end
        end)

        if not ok then
            ngx.log(ngx.ERR, "Error during broadcast to client ", cid, ": ", err)
            table.insert(dead_clients, cid)
        end
    end

    -- 清理已断开的客户端
    for _, cid in ipairs(dead_clients) do
        ngx.log(ngx.INFO, "Removing dead client: ", cid)
        clients[cid] = nil
    end
end

-- 处理新的 SSE 连接请求
function M.handle_sse_request()
    -- 获取底层的 TCP socket
    local sock, err = ngx.req.socket(true)
    if not sock then
        ngx.log(ngx.ERR, "Failed to get request socket: ", err)
        return ngx.exit(ngx.HTTP_INTERNAL_SERVER_ERROR)
    end
    
    -- 设置 TCP KeepAlive 以维持长连接
    sock:setkeepalive(60000, 10)
    
    -- 为这个新客户端生成一个唯一ID
    client_id_counter = client_id_counter + 1
    local cid = ngx.worker.pid() .. ":" .. client_id_counter
    clients[cid] = { sock = sock }
    
    ngx.log(ngx.INFO, "New SSE client connected: ", cid)

    -- 进入一个循环,保持连接开放,直到客户端断开
    -- 这是 SSE 服务的核心:一个永不退出的循环
    while true do
        -- 这里的 sleep 很重要,它将控制权交还给 Nginx 的事件循环
        -- 避免了阻塞 worker 进程
        ngx.sleep(1)
        
        -- 检查客户端是否仍然连接
        -- 通过一个无操作的发送来探测连接状态
        local ok, err = sock:send(":\n\n") -- SSE 心跳/注释行
        if not ok then
            ngx.log(ngx.INFO, "Client ", cid, " disconnected: ", err)
            clients[cid] = nil -- 从列表中移除
            break
        end
    end
end

-- 处理 UDP 数据包
function M.handle_udp_packet()
    -- 从 preread 阶段获取 UDP socket
    local sock = server.get_socket()
    if not sock then
        ngx.log(ngx.ERR, "Failed to get preread socket")
        return
    end

    -- 接收数据,设置一个超时
    local data, err = sock:receive()
    if not data then
        ngx.log(ngx.ERR, "Failed to receive UDP data: ", err)
        return
    end
    
    if data == "" then
        return -- 忽略空包
    end
    
    -- 将原始数据交给 processor 模块处理
    -- processor 会调用用户脚本并返回处理后的数据
    local processed_data, err = processor.process(data)
    
    if not processed_data then
        ngx.log(ngx.WARN, "Metric dropped by processor script: ", err or "N/A")
        return
    end

    -- 将处理后的数据广播出去
    -- 注意:这里广播的只是当前 worker 的客户端
    M.broadcast(processed_data)
end

return M

代码中的几个关键设计:

  1. 非阻塞循环handle_sse_request 中的 while true 循环通过 ngx.sleep(1) 将执行权交还给 Nginx,这使得单个 worker 进程能高效地处理成千上万的并发连接,而不会被任何一个连接阻塞。
  2. 连接健康检查:通过定期发送 SSE 注释行 :\n\n 作为心跳,我们可以及时检测到已经断开的客户端连接并进行清理,防止内存泄漏。
  3. 错误处理pcall 的使用确保了在向一个已经关闭的 socket 发送数据时,程序不会崩溃,而是优雅地将其标记为“死亡”并后续清理。这是生产级代码必须考虑的。
  4. 职责分离relay_core 只负责网络I/O和连接管理。它不关心数据的具体内容,而是将数据处理的职责委托给 processor 模块。

四、动态处理引擎:加载和执行用户脚本

processor.lua 是实现动态性的核心。它负责加载用户指定的 Lua 脚本,并提供一个统一的接口来执行脚本中的处理函数。

-- lua/processor.lua

local cjson = require "cjson.safe"

local M = {}

-- 当前加载的用户处理函数
local user_process_func = nil
local last_error = nil

-- 用户脚本必须实现一个名为 'process' 的函数
-- function process(raw_data) -> processed_json_string or nil, error_string
local SCRIPT_ENTRY_FUNCTION = "process"

-- 加载并验证用户脚本
function M.load_script(script_name)
    local script_path = "user_scripts." .. script_name:gsub("%.lua$", "")
    
    -- 清除旧的包缓存,确保能加载最新的代码
    package.loaded[script_path] = nil
    
    local ok, user_module = pcall(require, script_path)

    if not ok then
        last_error = "Failed to require script '" .. script_path .. "': " .. tostring(user_module)
        ngx.log(ngx.ERR, last_error)
        user_process_func = nil
        return false, last_error
    end

    if type(user_module[SCRIPT_ENTRY_FUNCTION]) ~= "function" then
        last_error = "Script '" .. script_path .. "' does not contain a '" .. SCRIPT_ENTRY_FUNCTION .. "' function."
        ngx.log(ngx.ERR, last_error)
        user_process_func = nil
        return false, last_error
    end
    
    ngx.log(ngx.INFO, "Successfully loaded user script: ", script_path)
    user_process_func = user_module[SCRIPT_ENTRY_FUNCTION]
    last_error = nil
    
    return true
end

-- 处理单条数据
function M.process(raw_data)
    if not user_process_func then
        return nil, "No user script loaded"
    end
    
    local ok, result, err_msg = pcall(user_process_func, raw_data)
    
    if not ok then
        -- pcall 捕获了用户脚本执行时的错误
        ngx.log(ngx.ERR, "Error executing user script: ", tostring(result))
        return nil, "Script execution error"
    end
    
    return result, err_msg
end

-- 在模块加载时,尝试加载默认脚本
M.load_script("default_aggregator.lua")

return M

这个模块的设计很简单但非常有效:

  1. load_script: 提供了热重载功能。它通过 package.loaded[script_path] = nil 清除 Lua 的模块缓存,确保下一次 require 会重新加载文件。这使得我们可以通过调用 /reload_script 接口来动态更换处理逻辑。
  2. pcall 保护M.process 函数中使用 pcall 来执行用户脚本。这是一个重要的安全和稳定性措施。即使用户脚本中有错误(例如,语法错误、空指针等),它也不会导致整个 Nginx worker 进程崩溃,而只会在日志中记录一个错误。

五、用户脚本示例:实现一个 P99 延迟计算器

现在,我们来编写一个实际的用户脚本。假设服务上报的 UDP 数据是 JSON 字符串,格式为 {"service": "user-api", "latency_ms": 120}。我们希望在 Relay 中实时计算每秒钟各个服务的 P99 延迟,并将结果推送到前端。

-- lua/user_scripts/default_aggregator.lua

local cjson = require "cjson.safe"

local M = {}

-- 用于存储每个服务的延迟数据
-- 每个 worker 都有自己独立的 state
local state = {
    -- service_name -> { latencies = {...}, timestamp = ... }
}

local BUCKET_DURATION = 1 -- 聚合窗口,单位:秒

-- 计算百分位数的核心函数
-- 使用最简单的排序方法,对于压测指标聚合场景足够
local function calculate_percentile(latencies, p)
    if not latencies or #latencies == 0 then
        return 0
    end
    table.sort(latencies)
    local index = math.ceil(#latencies * (p / 100))
    return latencies[index]
end

-- 用户脚本入口函数,由 processor.lua 调用
function M.process(raw_data)
    local now = ngx.now()
    
    local ok, metric = cjson.decode(raw_data)
    if not ok then
        ngx.log(ngx.WARN, "Failed to decode UDP data: ", raw_data)
        return nil, "Invalid JSON"
    end
    
    local service_name = metric.service
    local latency = tonumber(metric.latency_ms)
    
    if not service_name or not latency then
        return nil, "Missing service or latency field"
    end

    -- 初始化服务状态
    if not state[service_name] then
        state[service_name] = {
            latencies = {},
            timestamp = now,
            count = 0
        }
    end

    local service_state = state[service_name]
    
    -- 检查是否进入了新的时间窗口
    if now - service_state.timestamp >= BUCKET_DURATION then
        -- 计算上一个窗口的统计数据
        local p99 = calculate_percentile(service_state.latencies, 99)
        local p95 = calculate_percentile(service_state.latencies, 95)
        local avg = 0
        if #service_state.latencies > 0 then
            local sum = 0
            for _, v in ipairs(service_state.latencies) do sum = sum + v end
            avg = sum / #service_state.latencies
        end

        local result = {
            service = service_name,
            timestamp = math.floor(service_state.timestamp),
            qps = service_state.count / BUCKET_DURATION,
            p99_ms = p99,
            p95_ms = p95,
            avg_ms = avg
        }
        
        -- 重置状态以开始新的窗口
        service_state.latencies = {}
        service_state.timestamp = now
        service_state.count = 0
        
        -- 将聚合结果编码为 JSON 字符串并返回
        return cjson.encode(result)
    end
    
    -- 在当前窗口内,只收集数据
    table.insert(service_state.latencies, latency)
    service_state.count = service_state.count + 1
    
    -- 如果还未到上报时间,则返回 nil,表示这条原始数据不需要被广播
    return nil
end

return M

这个脚本展示了动态处理的威力:

  • 有状态的聚合:它在内存中维护了一个状态表 state,按服务和时间窗口聚合延迟数据。
  • 数据降频:它不是每收到一条数据就推送,而是每秒钟计算一次聚合结果再推送,极大地降低了推送给前端的数据量,减轻了浏览器渲染的压力。
  • 自定义计算:我们可以轻易地在 M.process 中添加 P50、错误码统计等任何我们需要的逻辑。

六、测试与验证

  1. 启动 OpenResty:

    /usr/local/openresty/bin/openresty -p /usr/local/openresty/nginx/ -c /usr/local/openresty/nginx/conf/nginx.conf
  2. 连接 SSE 客户端:
    打开一个终端,使用 curl 模拟仪表盘客户端。

    curl -N http://localhost:8080/events

    这个命令会保持连接,并实时打印接收到的数据。

  3. 发送 UDP 指标数据:
    打开另一个终端,使用 netcat 模拟业务服务上报指标。

    # 循环发送数据
    while true; do
      LATENCY=$((RANDOM % 200 + 50)) # 模拟 50-250ms 的延迟
      echo "{\"service\": \"user-api\", \"latency_ms\": ${LATENCY}}" | nc -u -w0 127.0.0.1 9090
      
      LATENCY=$((RANDOM % 400 + 100)) # 模拟 100-500ms 的延迟
      echo "{\"service\": \"order-api\", \"latency_ms\": ${LATENCY}}" | nc -u -w0 127.0.0.1 9090
      
      sleep 0.01
    done

curl 终端,你将看到类似下面这样的输出,每秒钟一条,包含了计算好的聚合指标:

data: {"service":"user-api","timestamp":1666863601,"qps":100,"p99_ms":248,"p95_ms":235,"avg_ms":149.5}

data: {"service":"order-api","timestamp":1666863601,"qps":100,"p99_ms":495,"p95_ms":475,"avg_ms":298.7}

data: {"service":"user-api","timestamp":1666863602,"qps":100,"p99_ms":249,"p95_ms":238,"avg_ms":151.2}

data: {"service":"order-api","timestamp":1666863602,"qps":100,"p99_ms":498,"p95_ms":480,"avg_ms":301.4}
...

局限性与未来展望

这个方案虽然轻量且高效,但在生产环境中应用仍有几个需要考量的点。首先,当前的广播机制是 worker 级别的,无法做到全局广播。若要实现严格的全局数据一致性,需要引入 Redis Pub/Sub,由 UDP 处理器 PUBLISH 指标,所有 worker 的 SSE 处理器 SUBSCRIBE 同一个频道。这将增加一个外部依赖,但在扩展性上是必要的。

其次,用户上传的 Lua 脚本存在安全风险。在多租户或安全性要求高的环境中,必须对用户脚本进行沙箱化处理,限制其可以访问的函数库(如禁用 os, io 库)和执行时间,防止恶意代码或死循环拖垮整个服务。

最后,可以为这个 Relay 增加更完善的管理 API,比如查询当前连接数、查看已加载脚本状态、动态调整聚合参数等,使其成为一个更完整的压测基础设施组件。


  目录