我们面临一个极具挑战性的任务:为一个包含数百万视觉素材的数据库构建一个具备实时“灵感探索”功能的原生应用。用户在 SwiftUI 界面上进行文本输入、拖拽图片甚至涂鸦时,系统必须在毫秒级延迟内返回一系列语义相关、风格匹配的素材。这彻底排除了传统的基于元数据标签的搜索方案。我们需要的是一个能够理解内容本身的多模态向量检索系统,而且整个交互体验必须如丝般顺滑。
这种对极致响应速度的要求,意味着整个技术栈的每一环都必须为低延迟而设计。我们的初步构想是一个清晰的三层架构:一个用 SwiftUI 构建的、体验优先的 iOS 客户端;一个用 Python 构建的、承载核心 AI 逻辑的高性能后端;以及一个专为向量检索优化的数据库 Milvus。
最酷的是,为了让不同技术栈的团队(iOS 团队和 AI/后端团队)能够独立、高效地迭代,我们从第一天起就决定采用 Polyrepo 的代码组织策略。这带来了清晰的边界,但也引入了新的挑战:如何管理跨仓库的 API 契约和依赖关系。
架构决策与通信契约
Polyrepo 策略下,我们的代码库被拆分为三个独立的 Git 仓库:
-
inspire-swiftui-client
: 存放所有 SwiftUI 客户端代码。 -
inspire-backend-api
: 存放 FastAPI 应用、AI 模型加载与推理、Milvus 交互逻辑。 -
inspire-api-protos
: 一个专门用于定义服务间通信契约的仓库,使用 Protocol Buffers。
这个 inspire-api-protos
仓库是整个架构的粘合剂。它定义了客户端与后端之间的所有数据结构和服务接口,是唯一被其他两个仓库共同依赖的部分。这种做法强制实现接口先行,避免了大量的集成问题。
这是我们定义的核心检索服务的 .proto
文件:
// inspire-api-protos/protos/v1/retrieval.proto
syntax = "proto3";
package retrieval.v1;
// 服务定义
service RetrievalService {
// 核心的多模态搜索接口
rpc Search(SearchRequest) returns (SearchResponse) {}
}
// 搜索请求的模式
enum SearchMode {
MODE_UNSPECIFIED = 0;
TEXT_TO_IMAGE = 1; // 以文搜图
IMAGE_TO_IMAGE = 2; // 以图搜图
}
// 搜索请求体
message SearchRequest {
string request_id = 1; // 用于日志追踪
// 查询内容
oneof query {
string text_query = 2;
bytes image_query = 3; // 图片的二进制数据
}
SearchMode mode = 4;
// 过滤条件
repeated string style_filters = 5; // e.g., "minimalist", "vintage"
int32 top_k = 6; // 返回结果数量
}
// 单个搜索结果
message SearchResultItem {
string item_id = 1;
float score = 2; // 相似度得分
string asset_url = 3;
}
// 搜索响应体
message SearchResponse {
string request_id = 1;
repeated SearchResultItem results = 2;
}
通过 Makefile,我们可以为 Python 和 Swift 生成各自的客户端/服务端代码,这彻底改变了跨团队协作的方式。任何 API 的变更都必须先在 inspire-api-protos
中完成,经过评审后,各团队再拉取更新并重新生成代码,流程清晰且不易出错。
后端框架:FastAPI 与 Milvus 的异步协奏
后端的性能是整个系统响应速度的瓶颈。我们选择 FastAPI 的理由非常明确:它的 async/await
语法天生适合处理 I/O 密集型任务,比如等待 AI 模型推理、查询 Milvus 数据库等,而不会阻塞整个服务。
我们的 Milvus collection 设计如下,它支持混合搜索,即同时进行向量相似度匹配和基于风格标签的标量过滤。
# inspire-backend-api/app/core/milvus_client.py
from pymilvus import (
Collection, CollectionSchema, FieldSchema, DataType,
utility, connections
)
import logging
# ... 配置项从环境变量读取 ...
MILVUS_ALIAS = "default"
MILVUS_HOST = "localhost"
MILVUS_PORT = "19530"
COLLECTION_NAME = "creative_assets"
DIMENSION = 512 # CLIP ViT-B/32 模型的向量维度
logger = logging.getLogger(__name__)
def get_milvus_connection():
"""管理 Milvus 连接"""
if not utility.has_collection(COLLECTION_NAME, using=MILVUS_ALIAS):
logger.info(f"Collection '{COLLECTION_NAME}' not found. Attempting to create.")
_create_collection()
return Collection(COLLECTION_NAME, using=MILVUS_ALIAS)
def _create_collection():
"""
创建 Milvus Collection,这是整个系统的核心数据结构。
"""
try:
connections.connect(alias=MILVUS_ALIAS, host=MILVUS_HOST, port=MILVUS_PORT)
# 1. 定义字段
# 主键,必须有
pk_field = FieldSchema(
name="item_id", dtype=DataType.VARCHAR, is_primary=True,
auto_id=False, max_length=100
)
# 向量字段
vector_field = FieldSchema(
name="embedding", dtype=DataType.FLOAT_VECTOR, dim=DIMENSION
)
# 标量字段,用于过滤
style_field = FieldSchema(
name="style_tag", dtype=DataType.VARCHAR, max_length=50
)
schema = CollectionSchema(
fields=[pk_field, vector_field, style_field],
description="Creative assets collection for multimodal search"
)
# 2. 创建 Collection
collection = Collection(
name=COLLECTION_NAME, schema=schema, using=MILVUS_ALIAS
)
logger.info(f"Successfully created collection: {COLLECTION_NAME}")
# 3. 创建索引,这是性能的关键
# HNSW 是性能和召回率之间权衡得非常好的索引类型
index_params = {
"metric_type": "L2", # 欧式距离
"index_type": "HNSW",
"params": {"M": 16, "efConstruction": 256}
}
collection.create_index(
field_name="embedding", index_params=index_params
)
logger.info(f"Successfully created index for field 'embedding'")
# 为标量字段也创建索引,加速过滤
collection.create_index(field_name="style_tag")
logger.info(f"Successfully created index for field 'style_tag'")
# 加载到内存以备查询
collection.load()
logger.info(f"Collection '{COLLECTION_NAME}' loaded into memory.")
except Exception as e:
logger.error(f"Failed to create or load Milvus collection: {e}")
raise
finally:
connections.disconnect(MILVUS_ALIAS)
# 在应用启动时,确保连接和 Collection 存在
if MILVUS_HOST and MILVUS_PORT:
connections.connect(alias=MILVUS_ALIAS, host=MILVUS_HOST, port=MILVUS_PORT)
_create_collection()
这里的关键在于为向量字段 embedding
和标量字段 style_tag
都创建了索引。这使得 Milvus 能够高效地执行 “先过滤再搜索” 的混合查询策略。
接下来是 FastAPI 的核心路由实现。这段代码展示了如何将模型推理、向量生成和 Milvus 查询异步地串联起来。
# inspire-backend-api/app/api/v1/endpoints/retrieval.py
import uuid
import asyncio
from fastapi import FastAPI, APIRouter, HTTPException, Depends
from starlette.status import HTTP_400_BAD_REQUEST, HTTP_500_INTERNAL_SERVER_ERROR
from PIL import Image
import io
# 假设我们有一个模型服务,负责加载 CLIP 模型并提供 embedding 功能
# 在真实项目中,这可能是一个独立的类,通过依赖注入传入
from app.services.embedding_service import EmbeddingService, get_embedding_service
from app.core.milvus_client import get_milvus_connection
from app.schemas.retrieval_pb2 import SearchRequest, SearchResponse, SearchResultItem, TEXT_TO_IMAGE, IMAGE_TO_IMAGE
router = APIRouter()
# 这是一个非常强大的设计模式: 使用 asyncio.to_thread 将同步的、CPU密集型的
# 模型推理操作,包装成一个异步可等待的任务,避免阻塞 FastAPI 的事件循环。
async def run_in_threadpool(func, *args, **kwargs):
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, func, *args, **kwargs)
@router.post("/search", response_model=None) # response is protobuf
async def search(
request: SearchRequest, # FastAPI 尚不直接支持 Protobuf,需要中间件或手动解析
collection: Collection = Depends(get_milvus_connection),
embed_service: EmbeddingService = Depends(get_embedding_service),
) -> SearchResponse:
request_id = request.request_id or str(uuid.uuid4())
search_vector = None
try:
# 1. 根据请求类型,异步生成查询向量
if request.mode == TEXT_TO_IMAGE and request.text_query:
search_vector = await run_in_threadpool(
embed_service.get_text_embedding, request.text_query
)
elif request.mode == IMAGE_TO_IMAGE and request.image_query:
image = Image.open(io.BytesIO(request.image_query))
search_vector = await run_in_threadpool(
embed_service.get_image_embedding, image
)
else:
raise HTTPException(
status_code=HTTP_400_BAD_REQUEST, detail="Invalid search mode or missing query."
)
if search_vector is None:
raise HTTPException(
status_code=HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to generate embedding vector."
)
# 2. 构建 Milvus 查询
# 这里的 bool_expr 是混合搜索的精髓所在
expr = ""
if request.style_filters:
# 构建一个 'style_tag IN ["filter1", "filter2"]' 表达式
filters_str = ", ".join([f'"{f}"' for f in request.style_filters])
expr = f"style_tag in [{filters_str}]"
search_params = {
"metric_type": "L2",
"params": {"ef": 128} # ef 搜索参数,值越大越准但越慢
}
# 3. 异步执行搜索
# pymilvus 的 search 接口本身是同步的,所以也用 to_thread 包装
milvus_results = await run_in_threadpool(
collection.search,
data=[search_vector.tolist()],
anns_field="embedding",
param=search_params,
limit=request.top_k,
expr=expr,
output_fields=["item_id", "style_tag"] # 指定需要返回的字段
)
# 4. 组装 Protobuf 响应
response = SearchResponse(request_id=request_id)
# Milvus 返回的结果结构比较复杂,需要小心解析
hits = milvus_results[0]
for hit in hits:
item = SearchResultItem(
item_id=hit.id,
score=hit.distance,
# 在真实应用中,这里会根据 item_id 从另一个服务或数据库查询 URL
asset_url=f"https://cdn.example.com/assets/{hit.id}.jpg"
)
response.results.append(item)
return response
except HTTPException as http_exc:
# 直接抛出 HTTP 异常,让 FastAPI 处理
raise http_exc
except Exception as e:
# 捕获其他所有异常,记录日志并返回通用错误
logger.error(f"Search failed for request {request_id}: {e}", exc_info=True)
raise HTTPException(
status_code=HTTP_500_INTERNAL_SERVER_ERROR, detail="An internal error occurred."
)
这个实现有几个值得注意的细节:
- 线程池封装: AI 模型的
embedding
计算是 CPU 密集型任务,直接在async
函数中调用会阻塞事件循环。通过asyncio.to_thread
将其抛到线程池中执行,是 FastAPI 处理此类任务的最佳实践。 - 依赖注入:
Depends
的使用让我们可以轻松地管理和替换 Milvus 连接、模型服务等依赖,这对于单元测试至关重要。 - 混合搜索表达式:
expr
参数的构建展示了 Milvus 混合搜索的强大之处。我们可以动态构建复杂的布尔表达式,实现任意的元数据过滤组合。 - 详尽的错误处理: 代码区分了客户端错误(HTTP 400)和服务器端错误(HTTP 500),并对后者进行了详细的日志记录。
客户端框架:SwiftUI 的响应式艺术
在 iOS 端,我们的核心目标是提供一个零延迟感的交互界面。用户在搜索框输入时,结果列表应该实时更新。这正是 SwiftUI 和 Combine 框架(或新的 async/await
)的用武之地。
首先,我们需要一个网络服务层来处理与后端 Protobuf API 的通信。
// inspire-swiftui-client/Services/RetrievalAPIService.swift
import Foundation
import GRPC
import NIOCore
// 假设我们已经通过 Swift Protobuf 插件生成了代码
typealias RetrievalService = Retrieval_V1_RetrievalServiceAsyncClient
final class RetrievalAPIService {
private let client: RetrievalService
// 初始化 gRPC 连接
init() {
// 在生产环境中,这里应该是安全的 TLS 连接
let group = PlatformSupport.makeEventLoopGroup(loopCount: 1)
let channel = try! GRPCChannelPool.with(
target: .host("localhost", port: 50051),
transportSecurity: .plaintext,
eventLoopGroup: group
)
self.client = RetrievalService(channel: channel)
}
func search(query: String, styles: [String]) async throws -> [SearchResultItem] {
var request = Retrieval_V1_SearchRequest()
request.requestID = UUID().uuidString
request.textQuery = query
request.mode = .textToImage
request.topK = 30
request.styleFilters = styles
do {
let response = try await client.search(request)
// 将 Protobuf 模型转换为我们 SwiftUI View Model 使用的本地模型
return response.results.map { protoItem in
SearchResultItem(
id: protoItem.itemID,
score: protoItem.score,
assetURL: URL(string: protoItem.assetURL)
)
}
} catch let error as GRPCStatus {
// 对 gRPC 错误进行更细致的处理
print("gRPC Error: \(error.code) - \(error.message ?? "No message")")
throw APIError.grpcError(status: error)
} catch {
print("Unknown API error: \(error)")
throw APIError.unknown
}
}
// ... 以图搜图的函数类似 ...
}
// 定义一些本地模型和错误类型
struct SearchResultItem: Identifiable, Hashable {
let id: String
let score: Float
let assetURL: URL?
}
enum APIError: Error {
case grpcError(status: GRPCStatus)
case unknown
}
有了 API 服务,我们就可以构建 ViewModel。这个 ViewModel 将使用 async/await
和 @Published
属性来驱动 SwiftUI 视图的更新。最关键的技术点在于使用 debounce
来防止用户每输入一个字符就触发一次网络请求。
// inspire-swiftui-client/ViewModels/SearchViewModel.swift
import SwiftUI
import Combine
@MainActor
final class SearchViewModel: ObservableObject {
@Published var searchText: String = ""
@Published var searchResults: [SearchResultItem] = []
@Published var isLoading: Bool = false
@Published var errorMessage: String?
private let apiService = RetrievalAPIService()
private var cancellables = Set<AnyCancellable>()
init() {
// 这是实现实时搜索的关键:debounce
// 它会等待用户停止输入 500 毫秒后,才真正触发搜索
$searchText
.debounce(for: .milliseconds(500), scheduler: DispatchQueue.main)
.removeDuplicates() // 只有在文本变化时才触发
.filter { !$0.isEmpty } // 不搜索空字符串
.sink { [weak self] text in
self?.performSearch(query: text)
}
.store(in: &cancellables)
}
private func performSearch(query: String) {
isLoading = true
errorMessage = nil
Task {
do {
let results = try await apiService.search(query: query, styles: [])
self.searchResults = results
} catch {
self.errorMessage = "Search failed. Please try again."
self.searchResults = []
}
self.isLoading = false
}
}
}
debounce
操作符是构建响应式、高性能 UI 的利器。它极大地改善了用户体验并减轻了后端服务的压力。
最后,SwiftUI 视图本身的代码就变得异常简洁和声明式。
// inspire-swiftui-client/Views/SearchView.swift
import SwiftUI
struct SearchView: View {
@StateObject private var viewModel = SearchViewModel()
// 使用 LazyVGrid 高效展示大量图片
private let columns = [
GridItem(.adaptive(minimum: 100))
]
var body: some View {
NavigationView {
VStack {
// 搜索框
TextField("Search for inspiration...", text: $viewModel.searchText)
.textFieldStyle(RoundedBorderTextFieldStyle())
.padding()
// 结果展示区域
if viewModel.isLoading {
ProgressView()
Spacer()
} else if let errorMessage = viewModel.errorMessage {
Text(errorMessage)
.foregroundColor(.red)
Spacer()
} else {
ScrollView {
LazyVGrid(columns: columns, spacing: 10) {
ForEach(viewModel.searchResults, id: \.self) { item in
// AsyncImage 是 iOS 15+ 的原生异步图片加载组件
AsyncImage(url: item.assetURL) { image in
image.resizable()
.aspectRatio(contentMode: .fill)
} placeholder: {
Color.gray.opacity(0.3)
}
.frame(width: 100, height: 100)
.cornerRadius(8)
}
}
.padding(.horizontal)
}
}
}
.navigationTitle("Inspire Search")
}
}
}
这个视图完全由 SearchViewModel
的状态驱动。当 isLoading
变为 true
时,显示加载指示器。当 searchResults
更新时,LazyVGrid
会高效地只渲染屏幕上可见的单元格。这就是 SwiftUI 声明式编程的魅力。
sequenceDiagram participant SwiftUI_Client as SwiftUI Client participant FastAPI_Backend as FastAPI Backend participant Embedding_Service as Embedding Service participant Milvus_DB as Milvus SwiftUI_Client->>FastAPI_Backend: POST /search (SearchRequest Proto) Note over FastAPI_Backend: Request received, async task starts FastAPI_Backend->>Embedding_Service: Generate embedding for query Note over Embedding_Service: CPU-bound task runs in thread pool Embedding_Service-->>FastAPI_Backend: Return query_vector FastAPI_Backend->>Milvus_DB: search(vector, expr="style_tag in [...]") Note over Milvus_DB: Performs hybrid search on indexed data Milvus_DB-->>FastAPI_Backend: Return top_k results (IDs & distances) Note over FastAPI_Backend: Assemble SearchResponse Proto FastAPI_Backend-->>SwiftUI_Client: 200 OK (SearchResponse Proto) Note over SwiftUI_Client: Decode protobuf, update @Published property Note over SwiftUI_Client: SwiftUI view automatically re-renders
方案局限性与未来展望
这套架构虽然实现了核心目标,但并非完美。当前方案的性能瓶颈主要在于 embedding 模型的推理速度,虽然我们用了线程池,但对于真正的海量并发,需要将模型服务部署到专用的 GPU 服务器上,并考虑模型量化、蒸馏等优化手段。
其次,数据注入流程尚未覆盖。一个生产级的系统需要一个健壮的数据管道,能够实时地将新增的素材 embedding 化并写入 Milvus,这个过程可能需要借助 Kafka 和一个专门的数据处理服务来完成,以实现写入和查询路径的分离。
最后,Polyrepo 策略虽然带来了团队自治,但也增加了 CI/CD 的复杂性。对 inspire-api-protos
的任何改动都需要触发下游两个仓库的流水线重新构建和测试,这需要一套成熟的跨仓库流水线编排工具来管理。未来的迭代方向之一就是构建这样一个统一的发布与版本管理平台,以降低协作成本。