我们面临一个棘手的集成场景:一个核心业务流程由成熟的 Spring Boot 应用主导,负责订单处理和账户更新,而一个新增的高性能计算模块则采用 Rust 开发,用于实时的风险评估与信用额度冻结。两边都需要操作同一个 SQL Server 数据库。业务要求是,订单创建、账户扣款(Java 端)与信用冻结(Rust 端)必须构成一个原子操作——要么全部成功,要么全部回滚。
单纯的 API 调用链显然无法满足这个原子性要求。如果在 Java 成功提交后,调用 Rust 服务失败,我们将面临数据不一致的严重问题。这直接将我们引向了分布式事务的领域。考虑到场景对强一致性的严苛要求,我们决定探索基于 XA 协议的两阶段提交(Two-Phase Commit, 2PC)方案。
挑战在于,JTA(Java Transaction API)是 Java 生态的标准,而 Rust 作为一个系统级语言,并没有原生的 JTA 参与者(Participant)实现。我们的任务,就是将这个 Rust 服务“伪装”或改造,使其能够响应 Java 事务管理器(Transaction Manager)的 prepare
和 commit/rollback
指令,从而被纳入统一的全局事务管理中。
初步构想:将 Rust 服务封装为自定义 XAResource
我们的技术栈核心是 Spring Boot,它通过集成 Atomikos 这样的独立事务管理器,可以很好地支持 JTA 和 XA 协议。事务管理器负责协调所有参与事务的资源管理器(Resource Manager),如数据库连接、JMS 队列等。
关键思路是,在 Spring Boot 应用中,创建一个自定义的 XAResource
实现。这个实现不直接管理数据库连接,而是通过网络调用(例如 RESTful API)将 prepare
, commit
, rollback
等指令转发给 Rust 服务。Rust 服务则需要暴露相应的接口,并实现一个内部状态机来正确处理这些两阶段提交的指令。
整体架构流程如下:
sequenceDiagram participant Client participant Spring Boot App (Coordinator) participant Atomikos (Transaction Manager) participant SQL Server (Java ORM) participant Rust Service (Participant) Client->>Spring Boot App (Coordinator): POST /create-order Spring Boot App (Coordinator)->>Atomikos (Transaction Manager): Begin Global Transaction Atomikos (Transaction Manager)->>Spring Boot App (Coordinator): Return Transaction Context par Java Operation Spring Boot App (Coordinator)->>SQL Server (Java ORM): Update account table (enlisted in transaction) and Rust Operation Spring Boot App (Coordinator)->>Rust Service (Participant): POST /freeze-credit (enlisted via custom XAResource) end Spring Boot App (Coordinator)->>Atomikos (Transaction Manager): Attempt to commit transaction Note over Atomikos (Transaction Manager): --- Phase 1: Prepare --- Atomikos (Transaction Manager)->>SQL Server (Java ORM): XA PREPARE SQL Server (Java ORM)-->>Atomikos (Transaction Manager): VOTE_OK Atomikos (Transaction Manager)->>Spring Boot App (Coordinator): XA PREPARE (for custom Rust resource) Spring Boot App (Coordinator)->>Rust Service (Participant): POST /transaction/prepare Rust Service (Participant)-->>Spring Boot App (Coordinator): 200 OK Spring Boot App (Coordinator)-->>Atomikos (Transaction Manager): VOTE_OK Note over Atomikos (Transaction Manager): --- Phase 2: Commit --- Atomikos (Transaction Manager)->>SQL Server (Java ORM): XA COMMIT SQL Server (Java ORM)-->>Atomikos (Transaction Manager): OK Atomikos (Transaction Manager)->>Spring Boot App (Coordinator): XA COMMIT (for custom Rust resource) Spring Boot App (Coordinator)->>Rust Service (Participant): POST /transaction/commit Rust Service (Participant)-->>Spring Boot App (Coordinator): 200 OK Spring Boot App (Coordinator)-->>Client: 201 Created
这个方案的核心在于 Spring Boot App
和 Rust Service
之间的交互协议,以及两者内部的事务状态管理。
第一步:构建 Rust 事务参与者
Rust 服务需要成为一个能够理解 2PC 流程的 HTTP 服务。我们将使用 actix-web
作为 Web 框架,sqlx
作为数据库交互库,因为它提供了优秀的异步支持和事务管理功能。
这里的坑在于,Rust 服务本身不能直接开启一个 XA 事务,而是要将一个本地数据库事务的控制权(提交或回滚)交由外部调用者决定。
1. 项目设置 Cargo.toml
[package]
name = "rust_xa_participant"
version = "0.1.0"
edition = "2021"
[dependencies]
actix-web = "4"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
serde = { version = "1.0", features = ["derive"] }
sqlx = { version = "0.7", features = ["runtime-tokio", "mssql", "macros"] }
uuid = { version = "1", features = ["v4", "serde"] }
dashmap = "5" # 用于线程安全的内存状态存储
dotenvy = "0.15"
2. 事务状态管理
我们需要一个地方来存储待处理的事务。在生产环境中,这应该是一个持久化的存储(例如 Redis 或一个专门的状态表),以防止服务重启导致事务状态丢失,从而造成资源永久锁定。为简化演示,我们这里使用 DashMap
作为线程安全的内存 K-V 存储。
src/transaction_manager.rs
:
use sqlx::{Mssql, Transaction};
use std::collections::HashMap;
use std::sync::Arc;
use uuid::Uuid;
use dashmap::DashMap;
// 代表一个处于 prepare 阶段的本地事务
pub struct PreparedTransaction<'a> {
pub db_transaction: Transaction<'a, Mssql>,
}
// 事务状态
pub enum TransactionState<'a> {
Active(PreparedTransaction<'a>),
Committed,
RolledBack,
}
// 线程安全的事务管理器
// key 是全局事务ID (xid), value 是该事务对应的本地数据库事务
#[derive(Clone)]
pub struct TransactionManager {
// DashMap<String, Arc<Mutex<Transaction<'static, Mssql>>>> is not possible
// due to lifetime constraints. The transaction object holds a mutable reference
// to the connection, which cannot be safely shared across threads.
// The correct approach is to manage connections and transactions per request.
// For 2PC, we need to hold the transaction open. This is a complex problem.
// A simplified (and not production-ready) approach for this example is to manage it
// in memory, recognizing its limitations. A production system needs a durable log.
pub pending_transactions: Arc<DashMap<String, Transaction<'static, Mssql>>>,
}
impl TransactionManager {
pub fn new() -> Self {
Self {
pending_transactions: Arc::new(DashMap::new()),
}
}
}
注意:Transaction<'a, Mssql>
带有生命周期 'a
,这使得直接将其存储在跨线程共享的 DashMap
中变得极其困难。一个真正的生产级实现需要一个更复杂的机制,可能是将连接本身放入一个池中,并通过事务 ID 来认领和操作。为了演示核心逻辑,我们将采用一种变通的方法,但这在真实项目中需要被替换为持久化日志和更健壮的恢复机制。
3. API 端点实现
src/main.rs
:
use actix_web::{web, App, HttpResponse, HttpServer, Responder};
use serde::Deserialize;
use sqlx::{mssql::MssqlPoolOptions, MssqlPool};
use std::env;
use std::sync::Arc;
use uuid::Uuid;
use dashmap::DashMap;
// 使用 Arc<DashMap> 来存储事务,key是全局事务ID,value是sqlx::Transaction
// 这是一个高度简化的模型,忽略了许多生命周期和并发安全问题
// 在真实世界中,这需要一个持久化的事务日志
type TransactionStore = Arc<DashMap<String, sqlx::Transaction<'static, sqlx::Mssql>>>;
#[derive(Deserialize)]
struct TransactionRequest {
xid: String, // Global Transaction ID
}
#[derive(Deserialize)]
struct FreezeRequest {
xid: String,
user_id: i32,
amount: f64,
}
// 初始业务操作,开始一个本地事务,但不提交
async fn freeze_credit(
pool: web::Data<MssqlPool>,
tx_store: web::Data<TransactionStore>,
req: web::Json<FreezeRequest>,
) -> impl Responder {
let mut tx = match pool.begin().await {
Ok(tx) => tx,
Err(_) => return HttpResponse::InternalServerError().body("Failed to begin transaction"),
};
// 执行业务SQL
let result = sqlx::query("UPDATE Credit SET frozen_amount = frozen_amount + ? WHERE user_id = ?")
.bind(req.amount)
.bind(req.user_id)
.execute(&mut *tx)
.await;
if result.is_err() {
// 如果业务逻辑失败,立即回滚,不需要进入prepare阶段
tx.rollback().await.ok();
return HttpResponse::BadRequest().body("Credit freeze failed during execution");
}
// 将事务存入内存store,等待后续的 prepare/commit/rollback 指令
// 这里的 'static 生命周期是一个 hack,依赖于我们手动管理,确保在 commit/rollback 之前程序不退出
let static_tx: sqlx::Transaction<'static, sqlx::Mssql> = unsafe { std::mem::transmute(tx) };
tx_store.insert(req.xid.clone(), static_tx);
HttpResponse::Ok().body("Credit freeze initiated and pending prepare.")
}
// 准备阶段
async fn prepare_transaction(
tx_store: web::Data<TransactionStore>,
req: web::Json<TransactionRequest>,
) -> impl Responder {
// 在这个简化模型中,prepare 阶段只是一个确认
// 我们检查事务是否存在于我们的 pending store 中
// 在一个真正的 XA 实现中,这里会写真正的 prepare log
if tx_store.contains_key(&req.xid) {
HttpResponse::Ok().body("Transaction prepared")
} else {
HttpResponse::NotFound().body("Transaction not found")
}
}
// 提交阶段
async fn commit_transaction(
tx_store: web::Data<TransactionStore>,
req: web::Json<TransactionRequest>,
) -> impl Responder {
match tx_store.remove(&req.xid) {
Some((_, tx)) => {
if let Err(e) = tx.commit().await {
// 这是一个灾难性错误。事务已经 prepare,但 commit 失败。
// 需要人工干预或复杂的恢复流程。
log::error!("CRITICAL: Failed to commit prepared transaction {}: {}", req.xid, e);
return HttpResponse::InternalServerError().body("Failed to commit transaction");
}
HttpResponse::Ok().body("Transaction committed")
}
None => HttpResponse::NotFound().body("Transaction not found or already handled"),
}
}
// 回滚阶段
async fn rollback_transaction(
tx_store: web::Data<TransactionStore>,
req: web::Json<TransactionRequest>,
) -> impl Responder {
match tx_store.remove(&req.xid) {
Some((_, tx)) => {
if let Err(e) = tx.rollback().await {
log::error!("WARN: Failed to rollback transaction {}: {}", req.xid, e);
return HttpResponse::InternalServerError().body("Failed to rollback transaction");
}
HttpResponse::Ok().body("Transaction rolled back")
}
None => HttpResponse::NotFound().body("Transaction not found or already handled"),
}
}
#[actix_web::main]
async fn main() -> std::io::Result<()> {
dotenvy::dotenv().ok();
env_logger::init_from_env(env_logger::Env::new().default_filter_or("info"));
let database_url = env::var("DATABASE_URL").expect("DATABASE_URL must be set");
let pool = MssqlPoolOptions::new()
.max_connections(5)
.connect(&database_url)
.await
.expect("Failed to create pool.");
let transaction_store: TransactionStore = Arc::new(DashMap::new());
log::info!("Starting server at http://127.0.0.1:8081");
HttpServer::new(move || {
App::new()
.app_data(web::Data::new(pool.clone()))
.app_data(web::Data::new(transaction_store.clone()))
.route("/freeze-credit", web::post().to(freeze_credit))
.route("/transaction/prepare", web::post().to(prepare_transaction))
.route("/transaction/commit", web::post().to(commit_transaction))
.route("/transaction/rollback", web::post().to(rollback_transaction))
})
.bind(("127.0.0.1", 8081))?
.run()
.await
}
重要说明: unsafe { std::mem::transmute(tx) }
是一个非常危险的操作,它绕过了 Rust 的生命周期检查。在这里我们这样做是因为 sqlx::Transaction
的生命周期与它借用的连接绑定。为了将其存入一个 'static
的集合,我们必须“欺骗”编译器。这在概念验证中是可接受的,但生产代码绝不能这样写。一个健壮的方案需要实现一个事务日志,只记录事务ID和状态,并在恢复时重新连接数据库进行处理。
第二步:构建 Spring Boot 事务协调者
现在轮到 Java 端。我们需要配置 Atomikos,定义数据源,并创建那个关键的自定义 XAResource
。
1. Maven 依赖 pom.xml
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<!-- JTA & Atomikos for distributed transactions -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jta-atomikos</artifactId>
</dependency>
<!-- SQL Server Driver -->
<dependency>
<groupId>com.microsoft.sqlserver</groupId>
<artifactId>mssql-jdbc</artifactId>
<scope>runtime</scope>
</dependency>
<!-- For making HTTP calls to Rust service -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
2. 配置 application.yml
我们需要配置 Atomikos 来管理 SQL Server 的 XA 数据源。
server:
port: 8080
spring:
application:
name: java-xa-coordinator
# Atomikos JTA configuration
jta:
atomikos:
properties:
service: com.atomikos.icatch.standalone.UserTransactionServiceFactory
max-timeout: 300000
enable-logging: true
log-base-name: tm-log
log-base-dir: target/atomikos
# Configuration for the primary datasource (managed by Atomikos)
app:
datasource:
# Use Atomikos's wrapper class
xa-data-source-class-name: com.microsoft.sqlserver.jdbc.SQLServerXADataSource
unique-resource-name: sqlserver1
xa-properties:
user: your_user
password: your_password
serverName: localhost
portNumber: 1433
databaseName: your_database
# Pool size settings for Atomikos
min-pool-size: 5
max-pool-size: 20
# Rust service endpoint
rust-service:
base-url: "http://localhost:8081"
3. 自定义 RustXAResource
这是连接 Java JTA 世界和我们自定义的 Rust 服务的桥梁。它实现了 javax.transaction.xa.XAResource
接口。
RustXAResource.java
:
import org.springframework.web.reactive.function.client.WebClient;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
public class RustXAResource implements XAResource {
private final Xid xid;
private final WebClient webClient;
private final String baseUrl;
public RustXAResource(Xid xid, String rustServiceBaseUrl) {
this.xid = xid;
this.baseUrl = rustServiceBaseUrl;
this.webClient = WebClient.builder().baseUrl(this.baseUrl).build();
}
private String getXidAsString() {
// Create a unique string representation of the Xid
return Base64.getEncoder().encodeToString(xid.getGlobalTransactionId()) +
"_" +
Base64.getEncoder().encodeToString(xid.getBranchQualifier());
}
@Override
public void commit(Xid xid, boolean onePhase) throws XAException {
// In a real scenario, you'd check if the xid matches the one this resource is for.
System.out.println("--> RUST_XA: Committing transaction: " + getXidAsString());
try {
webClient.post()
.uri("/transaction/commit")
.bodyValue(new TransactionCommand(getXidAsString()))
.retrieve()
.toBodilessEntity()
.block(); // Blocking call is acceptable within XA resource impl
} catch (Exception e) {
throw new XAException(XAException.XAER_RMERR);
}
}
@Override
public void rollback(Xid xid) throws XAException {
System.out.println("--> RUST_XA: Rolling back transaction: " + getXidAsString());
try {
webClient.post()
.uri("/transaction/rollback")
.bodyValue(new TransactionCommand(getXidAsString()))
.retrieve()
.toBodilessEntity()
.block();
} catch (Exception e) {
throw new XAException(XAException.XAER_RMERR);
}
}
@Override
public int prepare(Xid xid) throws XAException {
System.out.println("--> RUST_XA: Preparing transaction: " + getXidAsString());
try {
webClient.post()
.uri("/transaction/prepare")
.bodyValue(new TransactionCommand(getXidAsString()))
.retrieve()
.toBodilessEntity()
.block();
return XA_OK;
} catch (Exception e) {
throw new XAException(XAException.XAER_RMERR);
}
}
// --- Other XAResource methods can be left with default/empty implementations ---
@Override
public void end(Xid xid, int flags) throws XAException {}
@Override
public void forget(Xid xid) throws XAException {}
@Override
public int getTransactionTimeout() throws XAException { return 0; }
@Override
public boolean isSameRM(XAResource xares) throws XAException { return this == xares; }
@Override
public Xid[] recover(int flag) throws XAException { return new Xid[0]; } // IMPORTANT: Production needs recovery!
@Override
public boolean setTransactionTimeout(int seconds) throws XAException { return false; }
@Override
public void start(Xid xid, int flags) throws XAException {}
// DTO for communication
static class TransactionCommand {
public String xid;
public TransactionCommand(String xid) { this.xid = xid; }
}
}
recover
方法的坑: 在这个示例中,recover
返回一个空数组,意味着如果协调者崩溃并重启,它无法查询到 Rust 服务中任何“悬而未决”(in-doubt)的事务。生产级的实现必须在这里与 Rust 服务通信,查询所有处于 prepared
状态的事务,并返回它们的 Xid,以便 Atomikos 能够完成它们。
4. 业务逻辑实现
现在,我们将所有部分串联起来。我们需要一个服务来手动将我们的 RustXAResource
注册到当前事务中。
RustTransactionParticipant.java
:
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.web.reactive.function.client.WebClient;
import com.atomikos.icatch.jta.TransactionImp;
import javax.transaction.SystemException;
import javax.transaction.Transaction;
import javax.transaction.xa.XAResource;
@Component
public class RustTransactionParticipant {
private final WebClient webClient;
private final String rustServiceBaseUrl;
public RustTransactionParticipant(@Value("${rust-service.base-url}") String rustServiceBaseUrl) {
this.rustServiceBaseUrl = rustServiceBaseUrl;
this.webClient = WebClient.builder().baseUrl(this.rustServiceBaseUrl).build();
}
public void freezeCredit(int userId, double amount) throws SystemException {
// Get the current JTA transaction
Transaction tx = TransactionImp.getTransaction();
if (tx == null) {
throw new IllegalStateException("Not in a JTA transaction");
}
// Generate a unique ID for this operation within the transaction
String xidString = XidUtil.generateXidString();
// Perform the initial business call to the Rust service
webClient.post()
.uri("/freeze-credit")
.bodyValue(new FreezeRequest(xidString, userId, amount))
.retrieve()
.toBodilessEntity()
.block();
// Create and enlist our custom XA resource
XAResource rustXAResource = new RustXAResource(XidUtil.parseXid(xidString), rustServiceBaseUrl);
try {
tx.enlistResource(rustXAResource);
} catch (Exception e) {
throw new RuntimeException("Failed to enlist Rust XA resource", e);
}
}
// DTOs
static class FreezeRequest {
public String xid;
public int user_id;
public double amount;
// constructor...
}
}
// A utility to create a string representation for Xid
// In a real app this would be more robust.
class XidUtil {
private static java.util.concurrent.atomic.AtomicLong counter = new java.util.concurrent.atomic.AtomicLong(System.currentTimeMillis());
public static String generateXidString() {
return "rust-branch-" + counter.getAndIncrement();
}
public static javax.transaction.xa.Xid parseXid(String xidString) {
// This is a simplification. A real Xid has formatId, gtrid, and bqual.
// We are creating a dummy one where bqual holds our unique string.
return new com.atomikos.icatch.xa.XidImp(xidString.getBytes(), "gtid".getBytes());
}
}
OrderService.java
:
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.transaction.SystemException;
@Service
public class OrderService {
private final AccountRepository accountRepository; // A standard Spring Data JPA repository
private final RustTransactionParticipant rustParticipant;
public OrderService(AccountRepository accountRepository, RustTransactionParticipant rustParticipant) {
this.accountRepository = accountRepository;
this.rustParticipant = rustParticipant;
}
@Transactional // This will now be a JTA transaction managed by Atomikos
public void createOrder(int userId, double orderAmount, double creditAmount) throws SystemException {
// 1. Perform local database operation via JPA/ORM
System.out.println("--> JAVA: Debiting account for user " + userId);
Account account = accountRepository.findById(userId).orElseThrow();
account.setBalance(account.getBalance() - orderAmount);
accountRepository.save(account);
// 2. Perform operation on Rust service, which will be enlisted in the same transaction
System.out.println("--> JAVA: Freezing credit via Rust service for user " + userId);
rustParticipant.freezeCredit(userId, creditAmount);
// If this point is reached, the transaction manager will start the 2PC protocol
System.out.println("--> JAVA: Business logic complete. Triggering commit.");
// Simulating a failure point to test rollback
// if (true) { throw new RuntimeException("Simulated failure before commit!"); }
}
}
局限性与展望
这个实现成功地将一个非 JVM、不支持 JTA 的 Rust 服务纳入到了 Spring Boot 的分布式事务管理中,保障了跨语言操作的原子性。但这个方案的复杂性也暴露了 2PC 模型的固有缺陷。
首先,同步阻塞是最大的问题。在 prepare
阶段,所有资源(数据库行、Rust 服务中的事务)都会被锁定,直到整个事务完成。如果事务协调者(Spring Boot 应用)崩溃,这些资源将被无限期锁定,直到协调者恢复并根据事务日志完成提交或回滚。我们实现的 recover
方法是空的,这在生产中是致命的,必须构建一个完整的恢复机制。
其次,协调者单点故障。整个事务的成败都依赖于 Atomikos 所在的 Spring Boot 应用。虽然 Atomikos 自身有持久化日志来保证重启后的一致性,但协调者节点的宕机会导致整个系统的事务处理暂停。
最后,实现的复杂性。我们手写了 XAResource
,并为 Rust 服务设计了一套事务控制 API。这套机制的健壮性、错误处理和恢复逻辑都需要大量的测试和完善,维护成本很高。
在对一致性要求不是最高,但对可用性和性能要求更高的场景中,采用基于最终一致性的 Saga 模式可能是一个更 pragmatic 的选择。Saga 通过一系列本地事务和补偿操作来完成整个业务流程,避免了全局锁,提高了系统的吞吐量和韧性。然而,在我们这个必须保证资金和信用额度完全同步的场景下,2PC 尽管复杂,却是保障数据正确性的一个有效手段。