我们面临一个典型的技术债务与现代化改造的冲突场景。数据平台团队已经成功在Hadoop上构建了新的数据湖,技术栈统一为Scala和Spark。然而,业务的核心价值——数百个经过多年迭代、复杂且高度优化的SQL查询,被牢牢地锁在旧有Java单体应用的MyBatis XML映射文件中。直接将这些SQL翻译成Spark SQL或DataFrame API的成本是惊人的,不仅仅是工作量,更关键的是翻译过程中引入业务逻辑错误的风险极高。任何一个微小的差异都可能导致下游报表和风控模型的灾难性偏差。
常规的解决方案,即“完全重写”,在这种背景下显得鲁莽且不切实际。这会造成两个独立的技术团队(Java业务团队和Scala数据团队)在相当长的一段时间内维护两套逻辑几乎完全相同的代码库,这在任何一个真实项目中都是不可接受的。我们需要一个能连接这两个世界的桥梁。
方案A:原生大数据技术栈重构
这是最直接,也是技术上最“纯粹”的方案。核心思想是将所有MyBatis中的SQL逻辑,逐一迁移到Spark SQL中,并用Scala重写所有相关的业务逻辑。
优势:
- 性能最优: 所有计算都发生在Spark引擎内部,可以充分利用Catalyst优化器、Tungsten执行引擎和全阶段代码生成,获得极致的执行性能。
- 技术栈统一: 整个数据处理链路都在Hadoop和Spark生态内,便于统一管理、监控和调优。
- 类型安全: 使用Scala的Dataset API可以获得编译期的类型安全检查,减少运行时错误。
劣势:
- 极高的迁移成本与风险: 这是该方案的致命弱点。人工翻译SQL,尤其是包含大量动态标签(
<if>
,<foreach>
)的复杂查询,极易出错。验证其业务逻辑一致性需要投入大量测试资源,周期漫长。 - 知识壁垒: 数据工程师需要深入理解遗留系统的业务逻辑,而这些逻辑往往没有清晰的文档,只体现在代码和SQL中。
- 维护双重逻辑: 在漫长的迁移过渡期,业务逻辑同时存在于MyBatis XML和Spark代码中,任何业务变更都需要同步修改两处,这会成为混乱的根源。
- 极高的迁移成本与风险: 这是该方案的致命弱点。人工翻译SQL,尤其是包含大量动态标签(
方案B:构建混合式声明性ETL执行器
这个方案的核心思想是“复用而非重造”。我们不触碰那些经过千锤百炼的MyBatis XML文件,而是构建一个通用的执行引擎,使其能够在Scala编写的Hadoop作业中,直接加载并执行指定的MyBatis查询,并将结果高效地写入HDFS。本质上,我们将MyBatis从一个应用层ORM框架,降维成一个“声明式SQL执行引擎”。
优势:
- 风险极低,上线速度快: 完美复用现有业务逻辑,从根本上消除了逻辑翻译错误的风险。新旧系统的查询逻辑源自同一份文件,保证了绝对的一致性。
- 职责清晰: Java业务团队继续负责维护核心的业务SQL(他们最擅长的领域),数据平台团队则专注于构建稳定、高效的数据管道和执行框架。
- 迁移成本可控: 开发工作量从“重写所有业务逻辑”转变为“开发一个通用执行框架”,工作量从N(N为查询数量)降低到1。
劣势:
- 非主流架构: 这种组合在业界并不常见,需要团队具备一定的技术整合与定制开发能力。
- 性能瓶颈: 数据抽取过程的瓶颈将从Spark的分布式计算能力转移到源端关系型数据库的JDBC吞吐能力。它无法像Spark的JDBC数据源那样进行谓词下推等高级优化。
- 依赖管理复杂: Hadoop作业的Classpath需要同时包含Hadoop、Scala、MyBatis以及JDBC驱动的依赖。
决策与理由
在真实的项目中,时间、成本和风险往往是比“技术纯粹性”更重要的考量因素。方案A虽然在技术上更优雅,但其高昂的成本和不可控的风险使其在当前阶段不具备可行性。我们最终选择了方案B。我们的目标是在保证数据质量和逻辑一致性的前提下,以最快的速度将数据从遗留系统安全地同步到数据湖。该方案将风险降至最低,并最大化地利用了团队现有的知识资产。
核心实现概览
我们将构建一个可配置的、由Scala驱动的YARN应用。这个应用不依赖Spark,它就是一个纯粹的Hadoop客户端程序,负责读取任务配置、执行MyBatis查询,并将结果以Parquet格式写入HDFS。
其核心工作流程可以用下面的图表来描述:
graph TD A[Yarn App Master - Scala Driver] --> B{读取ETL作业配置}; B --> C[加载MyBatis全局配置]; C --> D{创建SqlSessionFactory}; D --> E[根据作业配置, 获取Mapper和Statement ID]; E --> F{执行sqlSession.select}; subgraph "数据处理与写入" F -- 传递ResultHandler --> G[流式处理JDBC结果集]; G --> H[将每行数据转换为中间格式 Avro Record]; H --> I[使用ParquetWriter写入HDFS临时文件]; end I --> J{任务完成, 文件重命名}; style A fill:#d4e6f1,stroke:#333,stroke-width:2px style J fill:#d5f5e3,stroke:#333,stroke-width:2px
这个架构的关键在于两点:
- 配置驱动: 整个流程是元数据驱动的,执行哪个查询、数据源是什么、目标路径在哪,都由外部配置文件决定。
- 流式处理: 必须使用MyBatis的
ResultHandler
机制来处理查询结果,避免将千万级甚至上亿级的数据一次性加载到Driver或单一Task的内存中,从而引发OOM。
关键代码与原理解析
1. 项目依赖与构建 (build.sbt
)
SBT配置文件是整个项目的基础,它负责管理所有必要的依赖项。这里的关键是正确引入Hadoop客户端、MyBatis核心库、数据库驱动、Parquet写入库以及配置管理库。
// build.sbt
ThisBuild / version := "0.1.0-SNAPSHOT"
ThisBuild / scalaVersion := "2.12.15"
lazy val root = (project in file("."))
.settings(
name := "mybatis-hadoop-etl",
libraryDependencies ++= Seq(
// MyBatis Core
"org.mybatis" % "mybatis" % "3.5.11",
// Hadoop Client for HDFS access and YARN
"org.apache.hadoop" % "hadoop-client" % "3.3.4" % "provided",
// PostgreSQL JDBC Driver (example)
"org.postgresql" % "postgresql" % "42.5.1",
// Logging
"ch.qos.logback" % "logback-classic" % "1.2.11",
// Configuration Library
"com.typesafe" % "config" % "1.4.2",
// Parquet for writing to HDFS
"org.apache.parquet" % "parquet-avro" % "1.12.3",
"org.apache.hadoop" % "hadoop-common" % "3.3.4" % "provided",
// Avro for schema definition
"org.apache.avro" % "avro" % "1.11.1"
)
)
注意,hadoop-client
和hadoop-common
被标记为provided
。这是因为当我们将应用打包成jar并提交到YARN集群时,这些库已经存在于集群的classpath中,无需重复打包,可以减小jar包体积。
2. 声明式作业配置 (application.conf
)
使用Typesafe Config库(HOCON格式)可以极大地提高配置的灵活性和可读性。
// src/main/resources/application.conf
etl-job {
source {
db-type = "postgresql"
jdbc-url = "jdbc:postgresql://localhost:5432/legacy_db"
username = "user"
password = "password"
driver-class = "org.postgresql.Driver"
}
// Configuration for MyBatis itself
mybatis {
// Path to the main mybatis config XML inside the jar
config-resource = "mybatis-config.xml"
environment-id = "production"
}
// Task specific settings
task {
// The fully qualified ID for the mybatis statement to execute
mapper-statement-id = "com.example.mappers.UserMapper.getActiveUsers"
// Parameters to pass to the mybatis query
parameters = {
status: "ACTIVE",
minRegistrationDate: "2023-01-01"
}
}
destination {
// HDFS path to write the output Parquet file
hdfs-path = "/user/data/warehouse/users_active"
// Parquet specific settings
parquet {
compression-codec = "SNAPPY"
block-size = 134217728 // 128 MB
}
}
}
3. MyBatis核心初始化 (MyBatisSessionFactory.scala
)
我们需要一个单例对象来管理SqlSessionFactory
的生命周期。它只应被初始化一次,以避免重复解析XML配置文件的开销。
import com.typesafe.config.Config
import org.apache.ibatis.io.Resources
import org.apache.ibatis.session.{SqlSession, SqlSessionFactory, SqlSessionFactoryBuilder}
import java.util.Properties
object MyBatisSessionFactory {
private var sqlSessionFactory: Option[SqlSessionFactory] = None
/**
* Initializes the SqlSessionFactory. Must be called once before any sessions are opened.
* This method is designed to be thread-safe, though in our single-threaded driver,
* it's less of a concern.
*
* @param mybatisConfig The configuration subtree for mybatis settings.
* @param dbConfig The configuration subtree for database connection details.
*/
def initialize(mybatisConfig: Config, dbConfig: Config): Unit = synchronized {
if (sqlSessionFactory.isEmpty) {
val resource = mybatisConfig.getString("config-resource")
val environmentId = mybatisConfig.getString("environment-id")
val properties = new Properties()
properties.setProperty("jdbc.driver", dbConfig.getString("driver-class"))
properties.setProperty("jdbc.url", dbConfig.getString("jdbc-url"))
properties.setProperty("jdbc.username", dbConfig.getString("username"))
properties.setProperty("jdbc.password", dbConfig.getString("password"))
val inputStream = Resources.getResourceAsStream(resource)
try {
val factory = new SqlSessionFactoryBuilder().build(inputStream, environmentId, properties)
sqlSessionFactory = Some(factory)
} finally {
inputStream.close()
}
println("MyBatis SqlSessionFactory initialized successfully.")
}
}
/**
* Opens a new SqlSession. Throws an IllegalStateException if initialize() was not called.
*
* @return A new SqlSession instance.
*/
def openSession(): SqlSession = {
sqlSessionFactory.getOrElse(throw new IllegalStateException("SqlSessionFactory has not been initialized. Call initialize() first."))
.openSession()
}
}
这里的关键在于,我们将JDBC连接属性从mybatis-config.xml
中剥离出来,通过代码动态传入。这使得数据库连接信息可以完全由外部的application.conf
控制,增强了部署的灵活性。
4. 核心执行器与流式结果处理器 (MyBatisStreamProcessor.scala
)
这是整个框架的核心。它封装了查询执行、结果处理和文件写入的全部逻辑。
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericData, GenericRecord}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.ibatis.session.{ResultContext, ResultHandler, SqlSession}
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.ParquetWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import java.util.{Map => JavaMap}
import scala.collection.JavaConverters._
class MyBatisStreamProcessor(hadoopConf: Configuration, jobConfig: com.typesafe.config.Config) {
def execute(): Unit = {
val statementId = jobConfig.getString("task.mapper-statement-id")
val hdfsRawPath = jobConfig.getString("destination.hdfs-path")
val params = configToMap(jobConfig.getConfig("task.parameters"))
// A temporary path to prevent consumers from reading partial files
val tempHdfsPath = new Path(s"${hdfsRawPath}_tmp_${System.currentTimeMillis()}")
val finalHdfsPath = new Path(hdfsRawPath)
val fs = FileSystem.get(hadoopConf)
if(fs.exists(finalHdfsPath)) {
println(s"Final destination path ${finalHdfsPath.toString} already exists. Deleting.")
fs.delete(finalHdfsPath, true)
}
var sqlSession: SqlSession = null
try {
sqlSession = MyBatisSessionFactory.openSession()
// The ResultHandler is the key to streaming
val handler = new ParquetWritingResultHandler(fs, tempHdfsPath)
println(s"Executing MyBatis statement: $statementId with params: $params")
sqlSession.select(statementId, params, handler)
// After processing, rename the temp file to the final destination
println(s"Processed ${handler.getRowCount} rows. Renaming ${tempHdfsPath} to ${finalHdfsPath}")
fs.rename(tempHdfsPath, finalHdfsPath)
} finally {
if (sqlSession != null) {
sqlSession.close()
}
// Don't close FileSystem here if it's shared
}
}
private def configToMap(config: com.typesafe.config.Config): JavaMap[String, Any] = {
config.entrySet().asScala.map(entry => (entry.getKey, entry.getValue.unwrapped())).toMap.asJava
}
// Inner class to handle results row-by-row
private class ParquetWritingResultHandler(fs: FileSystem, path: Path) extends ResultHandler[JavaMap[String, AnyRef]] {
private var writer: ParquetWriter[GenericRecord] = null
private var avroSchema: Schema = null
private var rowCount: Long = 0
override def handleResult(resultContext: ResultContext[_ <: JavaMap[String, AnyRef]]): Unit = {
val rowMap = resultContext.getResultObject
if (rowCount == 0) {
// First row, derive schema and initialize writer
initializeWriter(rowMap)
}
val record = new GenericData.Record(avroSchema)
rowMap.asScala.foreach { case (key, value) =>
// Avro schema field names are typically case-sensitive and follow conventions.
// MyBatis returns keys in uppercase on some DBs. We normalize them.
val schemaField = avroSchema.getField(key.toLowerCase)
if (schemaField != null) {
record.put(key.toLowerCase(), value)
}
}
writer.write(record)
rowCount += 1
if(rowCount % 10000 == 0) {
println(s"Processed ${rowCount} rows...")
}
}
def getRowCount: Long = rowCount
private def initializeWriter(sampleRow: JavaMap[String, AnyRef]): Unit = {
this.avroSchema = deriveAvroSchema(sampleRow)
val compression = CompressionCodecName.valueOf(jobConfig.getString("destination.parquet.compression-codec").toUpperCase)
val blockSize = jobConfig.getInt("destination.parquet.block-size")
this.writer = AvroParquetWriter.builder[GenericRecord](path)
.withSchema(avroSchema)
.withConf(fs.getConf)
.withCompressionCodec(compression)
.withBlockSize(blockSize)
.build()
}
// A simplified schema derivation. In a production system, this should be more robust.
private def deriveAvroSchema(sampleRow: JavaMap[String, AnyRef]): Schema = {
val fields = sampleRow.asScala.map { case (key, value) =>
// Type mapping logic here needs to be very robust. This is a simplification.
val fieldType = value match {
case _: String => Schema.create(Schema.Type.STRING)
case _: java.lang.Integer => Schema.create(Schema.Type.INT)
case _: java.lang.Long => Schema.create(Schema.Type.LONG)
case _: java.math.BigDecimal => Schema.create(Schema.Type.STRING) // Safer to handle as string
case _: java.sql.Timestamp => Schema.create(Schema.Type.LONG)
case _ => Schema.create(Schema.Type.STRING) // Default to string for unknown types
}
// Create a union with NULL to allow nullable fields
val nullableType = Schema.createUnion(Schema.create(Schema.Type.NULL), fieldType)
new Schema.Field(key.toLowerCase(), nullableType, null, null)
}.toList.asJava
val schema = Schema.createRecord("ETLRecord", "Auto-generated schema from JDBC result set", "com.example.etl", false)
schema.setFields(fields)
println(s"Derived Avro Schema: ${schema.toString(true)}")
schema
}
// This method is called after the last result, but we close the writer in the outer scope
// to ensure it happens even if there are no results.
}
}
这段代码的精髓在于ParquetWritingResultHandler
。MyBatis每从JDBC ResultSet
中获取一行数据,就会调用一次handleResult
方法。在第一行数据到达时,我们动态地推断出数据的Schema(这里简化为Avro Schema),并初始化ParquetWriter
。随后的每一行都直接转换成GenericRecord
并写入HDFS。这个过程是纯流式的,内存占用极低,只与单行数据的大小有关。
5. 单元测试思路
对这样的框架进行测试至关重要。
- 数据库集成测试: 使用H2内存数据库,在测试代码中创建表、插入数据。然后运行
MyBatisStreamProcessor
,让它查询H2数据库。 - 输出验证:
MyBatisStreamProcessor
执行完毕后,使用parquet-tools
或parquet-mr
库读出生成的Parquet文件,断言其Schema和内容是否与预期一致。 - 异常处理测试: 模拟数据库连接失败、SQL语法错误、HDFS路径不存在等场景,验证程序的健壮性和错误处理逻辑。
架构的扩展性与局限性
这个架构虽然解决了眼前的痛点,但它的适用边界也必须清晰。
可扩展性:
- 多格式支持:
ParquetWritingResultHandler
可以被抽象成一个接口,通过配置轻松实现写入ORC、Avro或纯文本等不同格式。 - 执行引擎集成: 当前的实现是一个独立的YARN应用。这个核心逻辑可以被封装后,在一个Spark作业的
mapPartitions
中执行,从而利用Spark进行任务调度和并行化,比如按日期分区并行抽取数据。 - 动态SQL增强: 可以结合配置,动态地为MyBatis的参数Map添加额外的过滤条件,例如
etl_date
,实现增量抽取。
局限性:
- 无计算下推: 这是最大的局限。该框架本质上是一个
SELECT * FROM (...)
的执行器。任何复杂的聚合、连接、转换都无法利用数据库或Spark的优化能力,所有数据都被原样拉到执行端。因此,它最适合作为ETL流程中的“E(Extract)”和“L(Load)”阶段。重量级的“T(Transform)”操作应该在数据写入HDFS后,交由Spark来完成。 - 源数据库压力: 大规模数据抽取的并发度直接转化为对源数据库的连接和查询压力。必须谨慎配置并发任务数,并与DBA团队协作,确保不会影响在线业务系统的稳定性。
- Schema变更处理: 当前简化的Schema自动推断机制比较脆弱。在生产环境中,最好是建立一套独立的元数据管理系统,或者从配置中显式声明Schema,以应对源表结构变更的情况。
最终,这个方案并非银弹,而是一种务实的工程权衡。它以一种非典型但高效的方式,在现代数据平台与遗留系统之间架起了一座桥梁,用最小的代价换取了最大的业务价值交付速度。