Polyrepo 架构下集成 Milvus 与 SwiftUI 的实时多模态检索框架实现


我们面临一个极具挑战性的任务:为一个包含数百万视觉素材的数据库构建一个具备实时“灵感探索”功能的原生应用。用户在 SwiftUI 界面上进行文本输入、拖拽图片甚至涂鸦时,系统必须在毫秒级延迟内返回一系列语义相关、风格匹配的素材。这彻底排除了传统的基于元数据标签的搜索方案。我们需要的是一个能够理解内容本身的多模态向量检索系统,而且整个交互体验必须如丝般顺滑。

这种对极致响应速度的要求,意味着整个技术栈的每一环都必须为低延迟而设计。我们的初步构想是一个清晰的三层架构:一个用 SwiftUI 构建的、体验优先的 iOS 客户端;一个用 Python 构建的、承载核心 AI 逻辑的高性能后端;以及一个专为向量检索优化的数据库 Milvus。

最酷的是,为了让不同技术栈的团队(iOS 团队和 AI/后端团队)能够独立、高效地迭代,我们从第一天起就决定采用 Polyrepo 的代码组织策略。这带来了清晰的边界,但也引入了新的挑战:如何管理跨仓库的 API 契约和依赖关系。

架构决策与通信契约

Polyrepo 策略下,我们的代码库被拆分为三个独立的 Git 仓库:

  1. inspire-swiftui-client: 存放所有 SwiftUI 客户端代码。
  2. inspire-backend-api: 存放 FastAPI 应用、AI 模型加载与推理、Milvus 交互逻辑。
  3. inspire-api-protos: 一个专门用于定义服务间通信契约的仓库,使用 Protocol Buffers。

这个 inspire-api-protos 仓库是整个架构的粘合剂。它定义了客户端与后端之间的所有数据结构和服务接口,是唯一被其他两个仓库共同依赖的部分。这种做法强制实现接口先行,避免了大量的集成问题。

这是我们定义的核心检索服务的 .proto 文件:

// inspire-api-protos/protos/v1/retrieval.proto

syntax = "proto3";

package retrieval.v1;

// 服务定义
service RetrievalService {
  // 核心的多模态搜索接口
  rpc Search(SearchRequest) returns (SearchResponse) {}
}

// 搜索请求的模式
enum SearchMode {
  MODE_UNSPECIFIED = 0;
  TEXT_TO_IMAGE = 1;  // 以文搜图
  IMAGE_TO_IMAGE = 2; // 以图搜图
}

// 搜索请求体
message SearchRequest {
  string request_id = 1; // 用于日志追踪

  // 查询内容
  oneof query {
    string text_query = 2;
    bytes image_query = 3; // 图片的二进制数据
  }

  SearchMode mode = 4;

  // 过滤条件
  repeated string style_filters = 5; // e.g., "minimalist", "vintage"
  int32 top_k = 6; // 返回结果数量
}

// 单个搜索结果
message SearchResultItem {
  string item_id = 1;
  float score = 2; // 相似度得分
  string asset_url = 3;
}

// 搜索响应体
message SearchResponse {
  string request_id = 1;
  repeated SearchResultItem results = 2;
}

通过 Makefile,我们可以为 Python 和 Swift 生成各自的客户端/服务端代码,这彻底改变了跨团队协作的方式。任何 API 的变更都必须先在 inspire-api-protos 中完成,经过评审后,各团队再拉取更新并重新生成代码,流程清晰且不易出错。

后端框架:FastAPI 与 Milvus 的异步协奏

后端的性能是整个系统响应速度的瓶颈。我们选择 FastAPI 的理由非常明确:它的 async/await 语法天生适合处理 I/O 密集型任务,比如等待 AI 模型推理、查询 Milvus 数据库等,而不会阻塞整个服务。

我们的 Milvus collection 设计如下,它支持混合搜索,即同时进行向量相似度匹配和基于风格标签的标量过滤。

# inspire-backend-api/app/core/milvus_client.py

from pymilvus import (
    Collection, CollectionSchema, FieldSchema, DataType,
    utility, connections
)
import logging

# ... 配置项从环境变量读取 ...
MILVUS_ALIAS = "default"
MILVUS_HOST = "localhost"
MILVUS_PORT = "19530"

COLLECTION_NAME = "creative_assets"
DIMENSION = 512 # CLIP ViT-B/32 模型的向量维度

logger = logging.getLogger(__name__)

def get_milvus_connection():
    """管理 Milvus 连接"""
    if not utility.has_collection(COLLECTION_NAME, using=MILVUS_ALIAS):
        logger.info(f"Collection '{COLLECTION_NAME}' not found. Attempting to create.")
        _create_collection()
    return Collection(COLLECTION_NAME, using=MILVUS_ALIAS)

def _create_collection():
    """
    创建 Milvus Collection,这是整个系统的核心数据结构。
    """
    try:
        connections.connect(alias=MILVUS_ALIAS, host=MILVUS_HOST, port=MILVUS_PORT)
        
        # 1. 定义字段
        # 主键,必须有
        pk_field = FieldSchema(
            name="item_id", dtype=DataType.VARCHAR, is_primary=True, 
            auto_id=False, max_length=100
        )
        # 向量字段
        vector_field = FieldSchema(
            name="embedding", dtype=DataType.FLOAT_VECTOR, dim=DIMENSION
        )
        # 标量字段,用于过滤
        style_field = FieldSchema(
            name="style_tag", dtype=DataType.VARCHAR, max_length=50
        )
        
        schema = CollectionSchema(
            fields=[pk_field, vector_field, style_field],
            description="Creative assets collection for multimodal search"
        )
        
        # 2. 创建 Collection
        collection = Collection(
            name=COLLECTION_NAME, schema=schema, using=MILVUS_ALIAS
        )
        logger.info(f"Successfully created collection: {COLLECTION_NAME}")

        # 3. 创建索引,这是性能的关键
        # HNSW 是性能和召回率之间权衡得非常好的索引类型
        index_params = {
            "metric_type": "L2", # 欧式距离
            "index_type": "HNSW",
            "params": {"M": 16, "efConstruction": 256}
        }
        collection.create_index(
            field_name="embedding", index_params=index_params
        )
        logger.info(f"Successfully created index for field 'embedding'")

        # 为标量字段也创建索引,加速过滤
        collection.create_index(field_name="style_tag")
        logger.info(f"Successfully created index for field 'style_tag'")
        
        # 加载到内存以备查询
        collection.load()
        logger.info(f"Collection '{COLLECTION_NAME}' loaded into memory.")

    except Exception as e:
        logger.error(f"Failed to create or load Milvus collection: {e}")
        raise
    finally:
        connections.disconnect(MILVUS_ALIAS)

# 在应用启动时,确保连接和 Collection 存在
if MILVUS_HOST and MILVUS_PORT:
    connections.connect(alias=MILVUS_ALIAS, host=MILVUS_HOST, port=MILVUS_PORT)
    _create_collection()

这里的关键在于为向量字段 embedding 和标量字段 style_tag 都创建了索引。这使得 Milvus 能够高效地执行 “先过滤再搜索” 的混合查询策略。

接下来是 FastAPI 的核心路由实现。这段代码展示了如何将模型推理、向量生成和 Milvus 查询异步地串联起来。

# inspire-backend-api/app/api/v1/endpoints/retrieval.py

import uuid
import asyncio
from fastapi import FastAPI, APIRouter, HTTPException, Depends
from starlette.status import HTTP_400_BAD_REQUEST, HTTP_500_INTERNAL_SERVER_ERROR
from PIL import Image
import io

# 假设我们有一个模型服务,负责加载 CLIP 模型并提供 embedding 功能
# 在真实项目中,这可能是一个独立的类,通过依赖注入传入
from app.services.embedding_service import EmbeddingService, get_embedding_service
from app.core.milvus_client import get_milvus_connection
from app.schemas.retrieval_pb2 import SearchRequest, SearchResponse, SearchResultItem, TEXT_TO_IMAGE, IMAGE_TO_IMAGE

router = APIRouter()

# 这是一个非常强大的设计模式: 使用 asyncio.to_thread 将同步的、CPU密集型的
# 模型推理操作,包装成一个异步可等待的任务,避免阻塞 FastAPI 的事件循环。
async def run_in_threadpool(func, *args, **kwargs):
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(None, func, *args, **kwargs)

@router.post("/search", response_model=None) # response is protobuf
async def search(
    request: SearchRequest, # FastAPI 尚不直接支持 Protobuf,需要中间件或手动解析
    collection: Collection = Depends(get_milvus_connection),
    embed_service: EmbeddingService = Depends(get_embedding_service),
) -> SearchResponse:

    request_id = request.request_id or str(uuid.uuid4())
    search_vector = None
    
    try:
        # 1. 根据请求类型,异步生成查询向量
        if request.mode == TEXT_TO_IMAGE and request.text_query:
            search_vector = await run_in_threadpool(
                embed_service.get_text_embedding, request.text_query
            )
        elif request.mode == IMAGE_TO_IMAGE and request.image_query:
            image = Image.open(io.BytesIO(request.image_query))
            search_vector = await run_in_threadpool(
                embed_service.get_image_embedding, image
            )
        else:
            raise HTTPException(
                status_code=HTTP_400_BAD_REQUEST, detail="Invalid search mode or missing query."
            )
        
        if search_vector is None:
            raise HTTPException(
                status_code=HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to generate embedding vector."
            )

        # 2. 构建 Milvus 查询
        # 这里的 bool_expr 是混合搜索的精髓所在
        expr = ""
        if request.style_filters:
            # 构建一个 'style_tag IN ["filter1", "filter2"]' 表达式
            filters_str = ", ".join([f'"{f}"' for f in request.style_filters])
            expr = f"style_tag in [{filters_str}]"
        
        search_params = {
            "metric_type": "L2",
            "params": {"ef": 128} # ef 搜索参数,值越大越准但越慢
        }
        
        # 3. 异步执行搜索
        # pymilvus 的 search 接口本身是同步的,所以也用 to_thread 包装
        milvus_results = await run_in_threadpool(
            collection.search,
            data=[search_vector.tolist()],
            anns_field="embedding",
            param=search_params,
            limit=request.top_k,
            expr=expr,
            output_fields=["item_id", "style_tag"] # 指定需要返回的字段
        )
        
        # 4. 组装 Protobuf 响应
        response = SearchResponse(request_id=request_id)
        
        # Milvus 返回的结果结构比较复杂,需要小心解析
        hits = milvus_results[0]
        for hit in hits:
            item = SearchResultItem(
                item_id=hit.id,
                score=hit.distance,
                # 在真实应用中,这里会根据 item_id 从另一个服务或数据库查询 URL
                asset_url=f"https://cdn.example.com/assets/{hit.id}.jpg" 
            )
            response.results.append(item)
            
        return response

    except HTTPException as http_exc:
        # 直接抛出 HTTP 异常,让 FastAPI 处理
        raise http_exc
    except Exception as e:
        # 捕获其他所有异常,记录日志并返回通用错误
        logger.error(f"Search failed for request {request_id}: {e}", exc_info=True)
        raise HTTPException(
            status_code=HTTP_500_INTERNAL_SERVER_ERROR, detail="An internal error occurred."
        )

这个实现有几个值得注意的细节:

  • 线程池封装: AI 模型的 embedding 计算是 CPU 密集型任务,直接在 async 函数中调用会阻塞事件循环。通过 asyncio.to_thread 将其抛到线程池中执行,是 FastAPI 处理此类任务的最佳实践。
  • 依赖注入: Depends 的使用让我们可以轻松地管理和替换 Milvus 连接、模型服务等依赖,这对于单元测试至关重要。
  • 混合搜索表达式: expr 参数的构建展示了 Milvus 混合搜索的强大之处。我们可以动态构建复杂的布尔表达式,实现任意的元数据过滤组合。
  • 详尽的错误处理: 代码区分了客户端错误(HTTP 400)和服务器端错误(HTTP 500),并对后者进行了详细的日志记录。

客户端框架:SwiftUI 的响应式艺术

在 iOS 端,我们的核心目标是提供一个零延迟感的交互界面。用户在搜索框输入时,结果列表应该实时更新。这正是 SwiftUI 和 Combine 框架(或新的 async/await)的用武之地。

首先,我们需要一个网络服务层来处理与后端 Protobuf API 的通信。

// inspire-swiftui-client/Services/RetrievalAPIService.swift

import Foundation
import GRPC
import NIOCore

// 假设我们已经通过 Swift Protobuf 插件生成了代码
typealias RetrievalService = Retrieval_V1_RetrievalServiceAsyncClient

final class RetrievalAPIService {
    private let client: RetrievalService

    // 初始化 gRPC 连接
    init() {
        // 在生产环境中,这里应该是安全的 TLS 连接
        let group = PlatformSupport.makeEventLoopGroup(loopCount: 1)
        let channel = try! GRPCChannelPool.with(
            target: .host("localhost", port: 50051),
            transportSecurity: .plaintext,
            eventLoopGroup: group
        )
        self.client = RetrievalService(channel: channel)
    }

    func search(query: String, styles: [String]) async throws -> [SearchResultItem] {
        var request = Retrieval_V1_SearchRequest()
        request.requestID = UUID().uuidString
        request.textQuery = query
        request.mode = .textToImage
        request.topK = 30
        request.styleFilters = styles
        
        do {
            let response = try await client.search(request)
            
            // 将 Protobuf 模型转换为我们 SwiftUI View Model 使用的本地模型
            return response.results.map { protoItem in
                SearchResultItem(
                    id: protoItem.itemID, 
                    score: protoItem.score, 
                    assetURL: URL(string: protoItem.assetURL)
                )
            }
        } catch let error as GRPCStatus {
            // 对 gRPC 错误进行更细致的处理
            print("gRPC Error: \(error.code) - \(error.message ?? "No message")")
            throw APIError.grpcError(status: error)
        } catch {
            print("Unknown API error: \(error)")
            throw APIError.unknown
        }
    }
    
    // ... 以图搜图的函数类似 ...
}

// 定义一些本地模型和错误类型
struct SearchResultItem: Identifiable, Hashable {
    let id: String
    let score: Float
    let assetURL: URL?
}

enum APIError: Error {
    case grpcError(status: GRPCStatus)
    case unknown
}

有了 API 服务,我们就可以构建 ViewModel。这个 ViewModel 将使用 async/await@Published 属性来驱动 SwiftUI 视图的更新。最关键的技术点在于使用 debounce 来防止用户每输入一个字符就触发一次网络请求。

// inspire-swiftui-client/ViewModels/SearchViewModel.swift

import SwiftUI
import Combine

@MainActor
final class SearchViewModel: ObservableObject {
    @Published var searchText: String = ""
    @Published var searchResults: [SearchResultItem] = []
    @Published var isLoading: Bool = false
    @Published var errorMessage: String?
    
    private let apiService = RetrievalAPIService()
    private var cancellables = Set<AnyCancellable>()
    
    init() {
        // 这是实现实时搜索的关键:debounce
        // 它会等待用户停止输入 500 毫秒后,才真正触发搜索
        $searchText
            .debounce(for: .milliseconds(500), scheduler: DispatchQueue.main)
            .removeDuplicates() // 只有在文本变化时才触发
            .filter { !$0.isEmpty } // 不搜索空字符串
            .sink { [weak self] text in
                self?.performSearch(query: text)
            }
            .store(in: &cancellables)
    }
    
    private func performSearch(query: String) {
        isLoading = true
        errorMessage = nil
        
        Task {
            do {
                let results = try await apiService.search(query: query, styles: [])
                self.searchResults = results
            } catch {
                self.errorMessage = "Search failed. Please try again."
                self.searchResults = []
            }
            self.isLoading = false
        }
    }
}

debounce 操作符是构建响应式、高性能 UI 的利器。它极大地改善了用户体验并减轻了后端服务的压力。

最后,SwiftUI 视图本身的代码就变得异常简洁和声明式。

// inspire-swiftui-client/Views/SearchView.swift

import SwiftUI

struct SearchView: View {
    @StateObject private var viewModel = SearchViewModel()

    // 使用 LazyVGrid 高效展示大量图片
    private let columns = [
        GridItem(.adaptive(minimum: 100))
    ]

    var body: some View {
        NavigationView {
            VStack {
                // 搜索框
                TextField("Search for inspiration...", text: $viewModel.searchText)
                    .textFieldStyle(RoundedBorderTextFieldStyle())
                    .padding()
                
                // 结果展示区域
                if viewModel.isLoading {
                    ProgressView()
                    Spacer()
                } else if let errorMessage = viewModel.errorMessage {
                    Text(errorMessage)
                        .foregroundColor(.red)
                    Spacer()
                } else {
                    ScrollView {
                        LazyVGrid(columns: columns, spacing: 10) {
                            ForEach(viewModel.searchResults, id: \.self) { item in
                                // AsyncImage 是 iOS 15+ 的原生异步图片加载组件
                                AsyncImage(url: item.assetURL) { image in
                                    image.resizable()
                                         .aspectRatio(contentMode: .fill)
                                } placeholder: {
                                    Color.gray.opacity(0.3)
                                }
                                .frame(width: 100, height: 100)
                                .cornerRadius(8)
                            }
                        }
                        .padding(.horizontal)
                    }
                }
            }
            .navigationTitle("Inspire Search")
        }
    }
}

这个视图完全由 SearchViewModel 的状态驱动。当 isLoading 变为 true 时,显示加载指示器。当 searchResults 更新时,LazyVGrid 会高效地只渲染屏幕上可见的单元格。这就是 SwiftUI 声明式编程的魅力。

sequenceDiagram
    participant SwiftUI_Client as SwiftUI Client
    participant FastAPI_Backend as FastAPI Backend
    participant Embedding_Service as Embedding Service
    participant Milvus_DB as Milvus

    SwiftUI_Client->>FastAPI_Backend: POST /search (SearchRequest Proto)
    Note over FastAPI_Backend: Request received, async task starts
    
    FastAPI_Backend->>Embedding_Service: Generate embedding for query
    Note over Embedding_Service: CPU-bound task runs in thread pool
    Embedding_Service-->>FastAPI_Backend: Return query_vector
    
    FastAPI_Backend->>Milvus_DB: search(vector, expr="style_tag in [...]")
    Note over Milvus_DB: Performs hybrid search on indexed data
    Milvus_DB-->>FastAPI_Backend: Return top_k results (IDs & distances)
    
    Note over FastAPI_Backend: Assemble SearchResponse Proto
    FastAPI_Backend-->>SwiftUI_Client: 200 OK (SearchResponse Proto)

    Note over SwiftUI_Client: Decode protobuf, update @Published property
    Note over SwiftUI_Client: SwiftUI view automatically re-renders

方案局限性与未来展望

这套架构虽然实现了核心目标,但并非完美。当前方案的性能瓶颈主要在于 embedding 模型的推理速度,虽然我们用了线程池,但对于真正的海量并发,需要将模型服务部署到专用的 GPU 服务器上,并考虑模型量化、蒸馏等优化手段。

其次,数据注入流程尚未覆盖。一个生产级的系统需要一个健壮的数据管道,能够实时地将新增的素材 embedding 化并写入 Milvus,这个过程可能需要借助 Kafka 和一个专门的数据处理服务来完成,以实现写入和查询路径的分离。

最后,Polyrepo 策略虽然带来了团队自治,但也增加了 CI/CD 的复杂性。对 inspire-api-protos 的任何改动都需要触发下游两个仓库的流水线重新构建和测试,这需要一套成熟的跨仓库流水线编排工具来管理。未来的迭代方向之一就是构建这样一个统一的发布与版本管理平台,以降低协作成本。


  目录