在Hadoop生态中利用Scala复用MyBatis作为声明式ETL引擎的架构实践


我们面临一个典型的技术债务与现代化改造的冲突场景。数据平台团队已经成功在Hadoop上构建了新的数据湖,技术栈统一为Scala和Spark。然而,业务的核心价值——数百个经过多年迭代、复杂且高度优化的SQL查询,被牢牢地锁在旧有Java单体应用的MyBatis XML映射文件中。直接将这些SQL翻译成Spark SQL或DataFrame API的成本是惊人的,不仅仅是工作量,更关键的是翻译过程中引入业务逻辑错误的风险极高。任何一个微小的差异都可能导致下游报表和风控模型的灾难性偏差。

常规的解决方案,即“完全重写”,在这种背景下显得鲁莽且不切实际。这会造成两个独立的技术团队(Java业务团队和Scala数据团队)在相当长的一段时间内维护两套逻辑几乎完全相同的代码库,这在任何一个真实项目中都是不可接受的。我们需要一个能连接这两个世界的桥梁。

方案A:原生大数据技术栈重构

这是最直接,也是技术上最“纯粹”的方案。核心思想是将所有MyBatis中的SQL逻辑,逐一迁移到Spark SQL中,并用Scala重写所有相关的业务逻辑。

  • 优势:

    1. 性能最优: 所有计算都发生在Spark引擎内部,可以充分利用Catalyst优化器、Tungsten执行引擎和全阶段代码生成,获得极致的执行性能。
    2. 技术栈统一: 整个数据处理链路都在Hadoop和Spark生态内,便于统一管理、监控和调优。
    3. 类型安全: 使用Scala的Dataset API可以获得编译期的类型安全检查,减少运行时错误。
  • 劣势:

    1. 极高的迁移成本与风险: 这是该方案的致命弱点。人工翻译SQL,尤其是包含大量动态标签(<if>, <foreach>)的复杂查询,极易出错。验证其业务逻辑一致性需要投入大量测试资源,周期漫长。
    2. 知识壁垒: 数据工程师需要深入理解遗留系统的业务逻辑,而这些逻辑往往没有清晰的文档,只体现在代码和SQL中。
    3. 维护双重逻辑: 在漫长的迁移过渡期,业务逻辑同时存在于MyBatis XML和Spark代码中,任何业务变更都需要同步修改两处,这会成为混乱的根源。

方案B:构建混合式声明性ETL执行器

这个方案的核心思想是“复用而非重造”。我们不触碰那些经过千锤百炼的MyBatis XML文件,而是构建一个通用的执行引擎,使其能够在Scala编写的Hadoop作业中,直接加载并执行指定的MyBatis查询,并将结果高效地写入HDFS。本质上,我们将MyBatis从一个应用层ORM框架,降维成一个“声明式SQL执行引擎”。

  • 优势:

    1. 风险极低,上线速度快: 完美复用现有业务逻辑,从根本上消除了逻辑翻译错误的风险。新旧系统的查询逻辑源自同一份文件,保证了绝对的一致性。
    2. 职责清晰: Java业务团队继续负责维护核心的业务SQL(他们最擅长的领域),数据平台团队则专注于构建稳定、高效的数据管道和执行框架。
    3. 迁移成本可控: 开发工作量从“重写所有业务逻辑”转变为“开发一个通用执行框架”,工作量从N(N为查询数量)降低到1。
  • 劣势:

    1. 非主流架构: 这种组合在业界并不常见,需要团队具备一定的技术整合与定制开发能力。
    2. 性能瓶颈: 数据抽取过程的瓶颈将从Spark的分布式计算能力转移到源端关系型数据库的JDBC吞吐能力。它无法像Spark的JDBC数据源那样进行谓词下推等高级优化。
    3. 依赖管理复杂: Hadoop作业的Classpath需要同时包含Hadoop、Scala、MyBatis以及JDBC驱动的依赖。

决策与理由

在真实的项目中,时间、成本和风险往往是比“技术纯粹性”更重要的考量因素。方案A虽然在技术上更优雅,但其高昂的成本和不可控的风险使其在当前阶段不具备可行性。我们最终选择了方案B。我们的目标是在保证数据质量和逻辑一致性的前提下,以最快的速度将数据从遗留系统安全地同步到数据湖。该方案将风险降至最低,并最大化地利用了团队现有的知识资产。

核心实现概览

我们将构建一个可配置的、由Scala驱动的YARN应用。这个应用不依赖Spark,它就是一个纯粹的Hadoop客户端程序,负责读取任务配置、执行MyBatis查询,并将结果以Parquet格式写入HDFS。

其核心工作流程可以用下面的图表来描述:

graph TD
    A[Yarn App Master - Scala Driver] --> B{读取ETL作业配置};
    B --> C[加载MyBatis全局配置];
    C --> D{创建SqlSessionFactory};
    D --> E[根据作业配置, 获取Mapper和Statement ID];
    E --> F{执行sqlSession.select};
    subgraph "数据处理与写入"
        F -- 传递ResultHandler --> G[流式处理JDBC结果集];
        G --> H[将每行数据转换为中间格式 Avro Record];
        H --> I[使用ParquetWriter写入HDFS临时文件];
    end
    I --> J{任务完成, 文件重命名};

    style A fill:#d4e6f1,stroke:#333,stroke-width:2px
    style J fill:#d5f5e3,stroke:#333,stroke-width:2px

这个架构的关键在于两点:

  1. 配置驱动: 整个流程是元数据驱动的,执行哪个查询、数据源是什么、目标路径在哪,都由外部配置文件决定。
  2. 流式处理: 必须使用MyBatis的ResultHandler机制来处理查询结果,避免将千万级甚至上亿级的数据一次性加载到Driver或单一Task的内存中,从而引发OOM。

关键代码与原理解析

1. 项目依赖与构建 (build.sbt)

SBT配置文件是整个项目的基础,它负责管理所有必要的依赖项。这里的关键是正确引入Hadoop客户端、MyBatis核心库、数据库驱动、Parquet写入库以及配置管理库。

// build.sbt
ThisBuild / version := "0.1.0-SNAPSHOT"
ThisBuild / scalaVersion := "2.12.15"

lazy val root = (project in file("."))
  .settings(
    name := "mybatis-hadoop-etl",
    libraryDependencies ++= Seq(
      // MyBatis Core
      "org.mybatis" % "mybatis" % "3.5.11",
      // Hadoop Client for HDFS access and YARN
      "org.apache.hadoop" % "hadoop-client" % "3.3.4" % "provided",
      // PostgreSQL JDBC Driver (example)
      "org.postgresql" % "postgresql" % "42.5.1",
      // Logging
      "ch.qos.logback" % "logback-classic" % "1.2.11",
      // Configuration Library
      "com.typesafe" % "config" % "1.4.2",
      // Parquet for writing to HDFS
      "org.apache.parquet" % "parquet-avro" % "1.12.3",
      "org.apache.hadoop" % "hadoop-common" % "3.3.4" % "provided",
      // Avro for schema definition
      "org.apache.avro" % "avro" % "1.11.1"
    )
  )

注意,hadoop-clienthadoop-common被标记为provided。这是因为当我们将应用打包成jar并提交到YARN集群时,这些库已经存在于集群的classpath中,无需重复打包,可以减小jar包体积。

2. 声明式作业配置 (application.conf)

使用Typesafe Config库(HOCON格式)可以极大地提高配置的灵活性和可读性。

// src/main/resources/application.conf
etl-job {
  source {
    db-type = "postgresql"
    jdbc-url = "jdbc:postgresql://localhost:5432/legacy_db"
    username = "user"
    password = "password"
    driver-class = "org.postgresql.Driver"
  }

  // Configuration for MyBatis itself
  mybatis {
    // Path to the main mybatis config XML inside the jar
    config-resource = "mybatis-config.xml"
    environment-id = "production"
  }

  // Task specific settings
  task {
    // The fully qualified ID for the mybatis statement to execute
    mapper-statement-id = "com.example.mappers.UserMapper.getActiveUsers"
    
    // Parameters to pass to the mybatis query
    parameters = {
      status: "ACTIVE",
      minRegistrationDate: "2023-01-01"
    }
  }

  destination {
    // HDFS path to write the output Parquet file
    hdfs-path = "/user/data/warehouse/users_active"
    // Parquet specific settings
    parquet {
      compression-codec = "SNAPPY"
      block-size = 134217728 // 128 MB
    }
  }
}

3. MyBatis核心初始化 (MyBatisSessionFactory.scala)

我们需要一个单例对象来管理SqlSessionFactory的生命周期。它只应被初始化一次,以避免重复解析XML配置文件的开销。

import com.typesafe.config.Config
import org.apache.ibatis.io.Resources
import org.apache.ibatis.session.{SqlSession, SqlSessionFactory, SqlSessionFactoryBuilder}
import java.util.Properties

object MyBatisSessionFactory {

  private var sqlSessionFactory: Option[SqlSessionFactory] = None

  /**
   * Initializes the SqlSessionFactory. Must be called once before any sessions are opened.
   * This method is designed to be thread-safe, though in our single-threaded driver,
   * it's less of a concern.
   *
   * @param mybatisConfig The configuration subtree for mybatis settings.
   * @param dbConfig The configuration subtree for database connection details.
   */
  def initialize(mybatisConfig: Config, dbConfig: Config): Unit = synchronized {
    if (sqlSessionFactory.isEmpty) {
      val resource = mybatisConfig.getString("config-resource")
      val environmentId = mybatisConfig.getString("environment-id")
      
      val properties = new Properties()
      properties.setProperty("jdbc.driver", dbConfig.getString("driver-class"))
      properties.setProperty("jdbc.url", dbConfig.getString("jdbc-url"))
      properties.setProperty("jdbc.username", dbConfig.getString("username"))
      properties.setProperty("jdbc.password", dbConfig.getString("password"))

      val inputStream = Resources.getResourceAsStream(resource)
      try {
        val factory = new SqlSessionFactoryBuilder().build(inputStream, environmentId, properties)
        sqlSessionFactory = Some(factory)
      } finally {
        inputStream.close()
      }
      println("MyBatis SqlSessionFactory initialized successfully.")
    }
  }

  /**
   * Opens a new SqlSession. Throws an IllegalStateException if initialize() was not called.
   *
   * @return A new SqlSession instance.
   */
  def openSession(): SqlSession = {
    sqlSessionFactory.getOrElse(throw new IllegalStateException("SqlSessionFactory has not been initialized. Call initialize() first."))
      .openSession()
  }
}

这里的关键在于,我们将JDBC连接属性从mybatis-config.xml中剥离出来,通过代码动态传入。这使得数据库连接信息可以完全由外部的application.conf控制,增强了部署的灵活性。

4. 核心执行器与流式结果处理器 (MyBatisStreamProcessor.scala)

这是整个框架的核心。它封装了查询执行、结果处理和文件写入的全部逻辑。

import org.apache.avro.Schema
import org.apache.avro.generic.{GenericData, GenericRecord}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.ibatis.session.{ResultContext, ResultHandler, SqlSession}
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.ParquetWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import java.util.{Map => JavaMap}
import scala.collection.JavaConverters._

class MyBatisStreamProcessor(hadoopConf: Configuration, jobConfig: com.typesafe.config.Config) {

  def execute(): Unit = {
    val statementId = jobConfig.getString("task.mapper-statement-id")
    val hdfsRawPath = jobConfig.getString("destination.hdfs-path")
    val params = configToMap(jobConfig.getConfig("task.parameters"))

    // A temporary path to prevent consumers from reading partial files
    val tempHdfsPath = new Path(s"${hdfsRawPath}_tmp_${System.currentTimeMillis()}")
    val finalHdfsPath = new Path(hdfsRawPath)
    
    val fs = FileSystem.get(hadoopConf)
    if(fs.exists(finalHdfsPath)) {
        println(s"Final destination path ${finalHdfsPath.toString} already exists. Deleting.")
        fs.delete(finalHdfsPath, true)
    }

    var sqlSession: SqlSession = null
    try {
      sqlSession = MyBatisSessionFactory.openSession()
      
      // The ResultHandler is the key to streaming
      val handler = new ParquetWritingResultHandler(fs, tempHdfsPath)
      
      println(s"Executing MyBatis statement: $statementId with params: $params")
      sqlSession.select(statementId, params, handler)
      
      // After processing, rename the temp file to the final destination
      println(s"Processed ${handler.getRowCount} rows. Renaming ${tempHdfsPath} to ${finalHdfsPath}")
      fs.rename(tempHdfsPath, finalHdfsPath)

    } finally {
      if (sqlSession != null) {
        sqlSession.close()
      }
      // Don't close FileSystem here if it's shared
    }
  }

  private def configToMap(config: com.typesafe.config.Config): JavaMap[String, Any] = {
    config.entrySet().asScala.map(entry => (entry.getKey, entry.getValue.unwrapped())).toMap.asJava
  }
  
  // Inner class to handle results row-by-row
  private class ParquetWritingResultHandler(fs: FileSystem, path: Path) extends ResultHandler[JavaMap[String, AnyRef]] {
    private var writer: ParquetWriter[GenericRecord] = null
    private var avroSchema: Schema = null
    private var rowCount: Long = 0

    override def handleResult(resultContext: ResultContext[_ <: JavaMap[String, AnyRef]]): Unit = {
      val rowMap = resultContext.getResultObject
      if (rowCount == 0) {
        // First row, derive schema and initialize writer
        initializeWriter(rowMap)
      }

      val record = new GenericData.Record(avroSchema)
      rowMap.asScala.foreach { case (key, value) =>
        // Avro schema field names are typically case-sensitive and follow conventions.
        // MyBatis returns keys in uppercase on some DBs. We normalize them.
        val schemaField = avroSchema.getField(key.toLowerCase)
        if (schemaField != null) {
          record.put(key.toLowerCase(), value)
        }
      }
      writer.write(record)
      rowCount += 1
      if(rowCount % 10000 == 0) {
          println(s"Processed ${rowCount} rows...")
      }
    }

    def getRowCount: Long = rowCount

    private def initializeWriter(sampleRow: JavaMap[String, AnyRef]): Unit = {
      this.avroSchema = deriveAvroSchema(sampleRow)
      val compression = CompressionCodecName.valueOf(jobConfig.getString("destination.parquet.compression-codec").toUpperCase)
      val blockSize = jobConfig.getInt("destination.parquet.block-size")
      
      this.writer = AvroParquetWriter.builder[GenericRecord](path)
        .withSchema(avroSchema)
        .withConf(fs.getConf)
        .withCompressionCodec(compression)
        .withBlockSize(blockSize)
        .build()
    }
    
    // A simplified schema derivation. In a production system, this should be more robust.
    private def deriveAvroSchema(sampleRow: JavaMap[String, AnyRef]): Schema = {
        val fields = sampleRow.asScala.map { case (key, value) =>
            // Type mapping logic here needs to be very robust. This is a simplification.
            val fieldType = value match {
                case _: String => Schema.create(Schema.Type.STRING)
                case _: java.lang.Integer => Schema.create(Schema.Type.INT)
                case _: java.lang.Long => Schema.create(Schema.Type.LONG)
                case _: java.math.BigDecimal => Schema.create(Schema.Type.STRING) // Safer to handle as string
                case _: java.sql.Timestamp => Schema.create(Schema.Type.LONG)
                case _ => Schema.create(Schema.Type.STRING) // Default to string for unknown types
            }
            // Create a union with NULL to allow nullable fields
            val nullableType = Schema.createUnion(Schema.create(Schema.Type.NULL), fieldType)
            new Schema.Field(key.toLowerCase(), nullableType, null, null)
        }.toList.asJava
        
        val schema = Schema.createRecord("ETLRecord", "Auto-generated schema from JDBC result set", "com.example.etl", false)
        schema.setFields(fields)
        println(s"Derived Avro Schema: ${schema.toString(true)}")
        schema
    }

    // This method is called after the last result, but we close the writer in the outer scope
    // to ensure it happens even if there are no results.
  }
}

这段代码的精髓在于ParquetWritingResultHandler。MyBatis每从JDBC ResultSet中获取一行数据,就会调用一次handleResult方法。在第一行数据到达时,我们动态地推断出数据的Schema(这里简化为Avro Schema),并初始化ParquetWriter。随后的每一行都直接转换成GenericRecord并写入HDFS。这个过程是纯流式的,内存占用极低,只与单行数据的大小有关。

5. 单元测试思路

对这样的框架进行测试至关重要。

  1. 数据库集成测试: 使用H2内存数据库,在测试代码中创建表、插入数据。然后运行MyBatisStreamProcessor,让它查询H2数据库。
  2. 输出验证: MyBatisStreamProcessor执行完毕后,使用parquet-toolsparquet-mr库读出生成的Parquet文件,断言其Schema和内容是否与预期一致。
  3. 异常处理测试: 模拟数据库连接失败、SQL语法错误、HDFS路径不存在等场景,验证程序的健壮性和错误处理逻辑。

架构的扩展性与局限性

这个架构虽然解决了眼前的痛点,但它的适用边界也必须清晰。

可扩展性:

  • 多格式支持: ParquetWritingResultHandler可以被抽象成一个接口,通过配置轻松实现写入ORC、Avro或纯文本等不同格式。
  • 执行引擎集成: 当前的实现是一个独立的YARN应用。这个核心逻辑可以被封装后,在一个Spark作业的mapPartitions中执行,从而利用Spark进行任务调度和并行化,比如按日期分区并行抽取数据。
  • 动态SQL增强: 可以结合配置,动态地为MyBatis的参数Map添加额外的过滤条件,例如etl_date,实现增量抽取。

局限性:

  • 无计算下推: 这是最大的局限。该框架本质上是一个SELECT * FROM (...)的执行器。任何复杂的聚合、连接、转换都无法利用数据库或Spark的优化能力,所有数据都被原样拉到执行端。因此,它最适合作为ETL流程中的“E(Extract)”和“L(Load)”阶段。重量级的“T(Transform)”操作应该在数据写入HDFS后,交由Spark来完成。
  • 源数据库压力: 大规模数据抽取的并发度直接转化为对源数据库的连接和查询压力。必须谨慎配置并发任务数,并与DBA团队协作,确保不会影响在线业务系统的稳定性。
  • Schema变更处理: 当前简化的Schema自动推断机制比较脆弱。在生产环境中,最好是建立一套独立的元数据管理系统,或者从配置中显式声明Schema,以应对源表结构变更的情况。

最终,这个方案并非银弹,而是一种务实的工程权衡。它以一种非典型但高效的方式,在现代数据平台与遗留系统之间架起了一座桥梁,用最小的代价换取了最大的业务价值交付速度。


  目录