我们面临一个不大不小的麻烦:数十个 DigitalOcean Droplet 每天产生数 GB 的非结构化文本日志。这些日志需要进行临时的聚合分析,例如统计特定错误的频率,或追踪某个用户 ID 的行为路径。在真实项目中,引入一套完整的 ELK Stack 或搭建一个 Spark 集群对于这种“临时起意”的分析需求来说,成本过高,维护也过于复杂。我们需要的是一把瑞士军刀,而不是一辆重型卡车。
最初的构想很简单:能否自行实现一个极度轻量级的 MapReduce 计算模型?它不需要 Hadoop 的全部功能,只需要核心的分布式计算能力。一个主节点(Master)负责任务分发与协调,若干个工作节点(Worker)分布在现有的 Droplet 上执行计算。为了让运维人员能随时随地发起任务并监控进度,一个移动端的控制面板是必须的。
这就是技术选型的起点:
- 后端框架: Go。它的并发模型(goroutine 和 channel)天生适合编写网络服务和分布式协调程序。编译后的静态二进制文件可以直接扔到任何 Droplet 上运行,无需复杂的环境依赖。
- 基础设施: DigitalOcean Droplets。利用现有的计算资源,保持低成本。
- 控制面板: Jetpack Compose。声明式 UI 能快速构建一个响应式、数据驱动的界面,用来实时可视化任务进度再合适不过。
这个组合的目标是:用最低的资源和维护成本,实现一个专用的、高效的分布式文本处理工具。
# 架构设计与核心约定
在编码之前,必须明确系统的边界和流程。一个 MapReduce 作业的核心流程分为:输入(Input)、映射(Map)、洗牌(Shuffle)、规约(Reduce)和输出(Output)。我们的轻量级实现将简化一些环节,但核心逻辑保持不变。
graph TD A[Client: Jetpack Compose App] -- 1. Submit Job (Input Files, JobType) --> B(Master Node); B -- 2. Split Input into N Map Tasks --> C{Task Queue}; D[Worker 1] -- 3. Request Task --> B; E[Worker 2] -- 3. Request Task --> B; F[...] -- 3. Request Task --> B; B -- 4. Assign Map Task --> D; B -- 4. Assign Map Task --> E; C --> B; subgraph Map Phase D -- 5. Execute Map --> G1[Intermediate Files on Worker 1]; E -- 5. Execute Map --> G2[Intermediate Files on Worker 2]; end G1 -- 6. Notify Completion --> B; G2 -- 6. Notify Completion --> B; B -- 7. All Maps Done, Create Reduce Tasks --> C; H[Worker 3] -- 8. Request Task --> B; B -- 9. Assign Reduce Task (Key Range) --> H; subgraph Reduce Phase H -- 10. Fetch Intermediate Data --> G1; H -- 10. Fetch Intermediate Data --> G2; H -- 11. Execute Reduce --> I[Final Output on Worker 3]; end I -- 12. Notify Completion --> B; A -- Periodically Poll Status --> B;
核心约定:
- 无状态 Worker: Worker 节点不保存任何长期状态。它们启动后向 Master 注册,然后进入一个无限循环:请求任务 -> 执行任务 -> 报告结果。
- Master 作为“大脑”: Master 负责维护所有状态,包括 Job 进度、Task 状态、Worker 列表。这也意味着 Master 是单点故障,对于我们这个场景,可以接受。
- 简化的 Map/Reduce 函数: 为了避免动态加载代码的复杂性,我们将 Map 和 Reduce 函数预先编译进 Worker 的二进制文件中。客户端提交 Job 时,只需指定一个
JobType
字符串(如 “word_count” 或 “error_aggregation”),Worker 根据此类型调用相应的内建函数。 - 通信协议: Master 和 Worker 之间使用简单的 HTTP REST API 通信。虽然 gRPC 性能更好,但 HTTP 的易调试性和普遍性在此场景下优势更明显。
- 中间文件: Map 阶段产生的中间文件直接存储在 Worker 本地的临时目录。Reduce Worker 在需要时通过 HTTP 直接从 Map Worker 拉取这些文件。Master 负责告知 Reduce Worker 文件的地址。
# Master 节点的 Go 实现
Master 是整个系统的心脏。它的主要职责是管理状态和调度任务。
// file: master/main.go
package main
import (
"encoding/json"
"fmt"
"log"
"net/http"
"sync"
"time"
)
// TaskStatus defines the state of a task.
type TaskStatus int
const (
Idle TaskStatus = iota
InProgress
Completed
Failed
)
// Task represents a single unit of work (either Map or Reduce).
type Task struct {
ID int
Type string // "map" or "reduce"
Status TaskStatus
InputFile string // For Map tasks
WorkerID string // Worker currently assigned to this task
StartTime time.Time
}
// Job represents a full MapReduce job.
type Job struct {
ID string
JobType string // e.g., "word_count"
InputFiles []string
MapTasks []*Task
ReduceTasks []*Task
Status string // "running", "completed", "failed"
mu sync.Mutex
}
// WorkerInfo holds metadata about a worker.
type WorkerInfo struct {
ID string
Address string
LastHeartbeat time.Time
}
// Coordinator is the central state management struct for the master.
type Coordinator struct {
jobs map[string]*Job
workers map[string]*WorkerInfo
jobCounter int
taskCounter int
mu sync.Mutex
taskQueue chan *Task
}
var coord *Coordinator
func main() {
coord = &Coordinator{
jobs: make(map[string]*Job),
workers: make(map[string]*WorkerInfo),
taskQueue: make(chan *Task, 100), // Buffered channel for tasks
}
go coord.monitorWorkers()
http.HandleFunc("/submit", handleSubmitJob)
http.HandleFunc("/register", handleRegisterWorker)
http.HandleFunc("/heartbeat", handleHeartbeat)
http.HandleFunc("/requestTask", handleRequestTask)
http.HandleFunc("/updateTask", handleUpdateTask)
http.HandleFunc("/status", handleGetStatus)
log.Println("Master node listening on :8080...")
if err := http.ListenAndServe(":8080", nil); err != nil {
log.Fatalf("Failed to start server: %v", err)
}
}
// handleSubmitJob creates a new job and its initial map tasks.
func handleSubmitJob(w http.ResponseWriter, r *http.Request) {
// ... (request body parsing logic)
// For brevity, let's assume we receive a simple structure
var req struct {
JobType string `json:"job_type"`
InputFiles []string `json:"input_files"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "Invalid request body", http.StatusBadRequest)
return
}
coord.mu.Lock()
defer coord.mu.Unlock()
coord.jobCounter++
jobID := fmt.Sprintf("job-%d", coord.jobCounter)
newJob := &Job{
ID: jobID,
JobType: req.JobType,
InputFiles: req.InputFiles,
Status: "running",
}
// Create map tasks for each input file
for _, file := range req.InputFiles {
coord.taskCounter++
task := &Task{
ID: coord.taskCounter,
Type: "map",
Status: Idle,
InputFile: file,
}
newJob.MapTasks = append(newJob.MapTasks, task)
coord.taskQueue <- task // Push task to the queue
}
// Assume 1 reduce task for simplicity for now
coord.taskCounter++
reduceTask := &Task{
ID: coord.taskCounter,
Type: "reduce",
Status: Idle,
}
newJob.ReduceTasks = append(newJob.ReduceTasks, reduceTask)
coord.jobs[jobID] = newJob
log.Printf("Submitted new job: %s with %d map tasks", jobID, len(newJob.MapTasks))
w.WriteHeader(http.StatusAccepted)
json.NewEncoder(w).Encode(map[string]string{"job_id": jobID})
}
// handleRequestTask gives a worker an available task from the queue.
func handleRequestTask(w http.ResponseWriter, r *http.Request) {
workerID := r.URL.Query().Get("workerId")
if workerID == "" {
http.Error(w, "workerId is required", http.StatusBadRequest)
return
}
select {
case task := <-coord.taskQueue:
task.WorkerID = workerID
task.Status = InProgress
task.StartTime = time.Now()
log.Printf("Assigning task %d (%s) to worker %s", task.ID, task.Type, workerID)
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(task)
case <-time.After(5 * time.Second): // Timeout if no task is available
w.WriteHeader(http.StatusNoContent)
}
}
// monitorWorkers periodically checks for dead workers and reschedules their tasks.
func (c *Coordinator) monitorWorkers() {
for {
time.Sleep(10 * time.Second)
c.mu.Lock()
now := time.Now()
for id, worker := range c.workers {
// A worker is considered dead if no heartbeat for 30 seconds
if now.Sub(worker.LastHeartbeat) > 30*time.Second {
log.Printf("Worker %s is dead. Re-scheduling its tasks.", id)
delete(c.workers, id)
// Find and reschedule tasks assigned to the dead worker
for _, job := range c.jobs {
job.mu.Lock()
for _, task := range append(job.MapTasks, job.ReduceTasks...) {
if task.WorkerID == id && task.Status == InProgress {
log.Printf("Re-queueing task %d from dead worker %s", task.ID, id)
task.Status = Idle
task.WorkerID = ""
c.taskQueue <- task
}
}
job.mu.Unlock()
}
}
}
c.mu.Unlock()
}
}
// ... other handlers like handleRegisterWorker, handleHeartbeat, handleUpdateTask, handleGetStatus are omitted for brevity
// handleUpdateTask would be crucial: worker posts updates, master checks if all maps are done to queue the reduce tasks.
这里的核心在于 Coordinator
结构体和 taskQueue
channel。所有状态都通过互斥锁(sync.Mutex
)保护,确保并发安全。monitorWorkers
goroutine 实现了一个简单的故障检测机制,这是生产级系统中必不可少的一环。如果一个 Worker 掉线,它正在处理的任务会被重新放回队列,等待其他健康的 Worker 领取。
# Worker 节点的 Go 实现
Worker 是执行实际计算的单元。它的逻辑很简单:启动、注册、然后在一个循环中不断向 Master 请求任务。
// file: worker/main.go
package main
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"os"
"time"
// Fictional package for our map/reduce functions
"our-mr-library/functions"
)
var masterAddr string
var workerID string
// Task represents a task received from the master.
type Task struct {
ID int `json:"ID"`
Type string `json:"Type"`
InputFile string `json:"InputFile"`
}
func main() {
if len(os.Args) < 2 {
log.Fatal("Usage: worker <master_address>")
}
masterAddr = os.Args[1]
workerID = fmt.Sprintf("worker-%d", time.Now().UnixNano())
register()
go sendHeartbeats()
log.Printf("Worker %s started. Polling master for tasks...", workerID)
pollForTasks()
}
func sendHeartbeats() {
for {
// Simple HTTP POST to notify master we are alive
// ... implementation omitted
time.Sleep(10 * time.Second)
}
}
func pollForTasks() {
for {
resp, err := http.Get(fmt.Sprintf("%s/requestTask?workerId=%s", masterAddr, workerID))
if err != nil {
log.Printf("Error requesting task: %v. Retrying...", err)
time.Sleep(5 * time.Second)
continue
}
if resp.StatusCode == http.StatusNoContent {
// No task available, wait and poll again
time.Sleep(5 * time.Second)
resp.Body.Close()
continue
}
var task Task
if err := json.NewDecoder(resp.Body).Decode(&task); err != nil {
log.Printf("Error decoding task: %v", err)
resp.Body.Close()
continue
}
resp.Body.Close()
log.Printf("Received task %d of type %s", task.ID, task.Type)
executeTask(&task)
}
}
func executeTask(task *Task) {
var err error
if task.Type == "map" {
err = executeMapTask(task)
} else if task.Type == "reduce" {
err = executeReduceTask(task)
} else {
err = fmt.Errorf("unknown task type: %s", task.Type)
}
updateTaskStatus(task.ID, err)
}
func executeMapTask(task *Task) error {
// In a real scenario, we'd fetch the file from a shared storage like S3
// or another Droplet. Here, we assume it's locally accessible for simplicity.
content, err := ioutil.ReadFile(task.InputFile)
if err != nil {
return fmt.Errorf("failed to read input file %s: %w", task.InputFile, err)
}
// Call the pre-compiled function based on job type (not shown, but would be retrieved from master)
// For this example, let's assume it's always word count.
intermediate := functions.WordCountMap(string(content))
// Write intermediate results to local files, partitioned by key.
// This is a simplified shuffle. In reality, we'd hash the key
// to determine which reducer it belongs to.
// e.g., output-map-<task_id>-<reduce_task_id>
// ... file writing logic omitted
log.Printf("Map task %d completed.", task.ID)
return nil
}
func updateTaskStatus(taskID int, taskErr error) {
status := "completed"
errMsg := ""
if taskErr != nil {
status = "failed"
errMsg = taskErr.Error()
}
payload, _ := json.Marshal(map[string]interface{}{
"task_id": taskID,
"status": status,
"error": errMsg,
})
http.Post(fmt.Sprintf("%s/updateTask", masterAddr), "application/json", bytes.NewBuffer(payload))
log.Printf("Reported status for task %d: %s", taskID, status)
}
// ... executeReduceTask and other functions are similar in structure
Worker 的代码非常直白。它是一个不知疲倦的执行者,完全听从 Master 的指挥。单元测试的思路会集中在 executeMapTask
和 executeReduceTask
这两个函数上,确保它们对于各种边界输入(空文件、超大文件、异常格式)都能正确处理或优雅失败。
# Jetpack Compose 实时监控面板
移动端控制面板是这个系统的“脸面”。它必须能清晰、实时地展示整个集群的状态和作业进度。Jetpack Compose 的声明式和响应式特性让这件事变得非常高效。
1. ViewModel 和数据流
我们使用 ViewModel
来管理UI状态和网络请求。通过 StateFlow
将数据流暴露给 UI 层。ViewModel 会定期轮询 Master 的 /status
接口。
// file: ui/JobViewModel.kt
package com.example.mrcontroller.ui
import androidx.lifecycle.ViewModel
import androidx.lifecycle.viewModelScope
import com.example.mrcontroller.data.ClusterStatus
import com.example.mrcontroller.data.JobStatus
import com.example.mrcontroller.data.MrApi // Retrofit service interface
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.launch
// Represents the entire UI state for the dashboard.
data class DashboardUiState(
val jobs: List<JobStatus> = emptyList(),
val isLoading: Boolean = true,
val error: String? = null
)
class JobViewModel : ViewModel() {
private val _uiState = MutableStateFlow(DashboardUiState())
val uiState: StateFlow<DashboardUiState> = _uiState
private val apiService = MrApi.create() // Your Retrofit instance
init {
// Start polling for status updates as soon as the ViewModel is created.
startPolling()
}
private fun startPolling() {
viewModelScope.launch(Dispatchers.IO) {
while (true) {
try {
val status = apiService.getClusterStatus()
_uiState.value = DashboardUiState(jobs = status.jobs, isLoading = false)
} catch (e: Exception) {
// In a real app, handle different network errors specifically.
_uiState.value = _uiState.value.copy(error = "Failed to connect to master: ${e.message}", isLoading = false)
}
// Poll every 3 seconds. Adjust based on desired real-time feel vs. network load.
delay(3000)
}
}
}
// A function to submit a new job, called from the UI.
fun submitNewJob(jobType: String, inputFiles: List<String>) {
viewModelScope.launch(Dispatchers.IO) {
try {
// Call the API to submit the job.
// Upon success, the next poll will automatically pick it up.
// We could also optimistically update the UI here.
apiService.submitJob(jobType, inputFiles)
} catch (e: Exception) {
_uiState.value = _uiState.value.copy(error = "Job submission failed: ${e.message}")
}
}
}
}
2. 核心 Composable:任务进度可视化
最有价值的 UI 部分是单个 Job 的详情页,它能可视化所有 Map 和 Reduce 任务的状态。我们可以用 LazyVerticalGrid
来展示大量的任务。
// file: ui/JobDetailsScreen.kt
package com.example.mrcontroller.ui
import androidx.compose.foundation.background
import androidx.compose.foundation.border
import androidx.compose.foundation.layout.*
import androidx.compose.foundation.lazy.grid.GridCells
import androidx.compose.foundation.lazy.grid.LazyVerticalGrid
import androidx.compose.foundation.lazy.grid.items
import androidx.compose.material3.Text
import androidx.compose.runtime.Composable
import androidx.compose.ui.Alignment
import androidx.compose.ui.Modifier
import androidx.compose.ui.graphics.Color
import androidx.compose.ui.unit.dp
import androidx.compose.ui.unit.sp
import com.example.mrcontroller.data.JobStatus
import com.example.mrcontroller.data.TaskStatus
@Composable
fun JobDetailsScreen(job: JobStatus) {
Column(modifier = Modifier.padding(16.dp)) {
Text("Job ID: ${job.id}", fontSize = 20.sp)
Text("Status: ${job.status}", color = if (job.status == "completed") Color.Green else Color.Yellow)
Spacer(modifier = Modifier.height(16.dp))
// Map Tasks Visualization
Text("Map Tasks (${job.mapTasks.count { it.status == "Completed" }} / ${job.mapTasks.size})")
TaskGrid(tasks = job.mapTasks)
Spacer(modifier = Modifier.height(16.dp))
// Reduce Tasks Visualization
Text("Reduce Tasks (${job.reduceTasks.count { it.status == "Completed" }} / ${job.reduceTasks.size})")
TaskGrid(tasks = job.reduceTasks)
}
}
@Composable
fun TaskGrid(tasks: List<TaskStatus>) {
LazyVerticalGrid(
columns = GridCells.Adaptive(minSize = 48.dp), // Responsive grid
horizontalArrangement = Arrangement.spacedBy(4.dp),
verticalArrangement = Arrangement.spacedBy(4.dp),
modifier = Modifier.fillMaxWidth()
) {
items(tasks) { task ->
TaskCell(task = task)
}
}
}
@Composable
fun TaskCell(task: TaskStatus) {
val backgroundColor = when (task.status) {
"Idle" -> Color.Gray
"InProgress" -> Color.Blue
"Completed" -> Color.Green
"Failed" -> Color.Red
else -> Color.DarkGray
}
Box(
modifier = Modifier
.size(48.dp)
.background(backgroundColor)
.border(1.dp, Color.White),
contentAlignment = Alignment.Center
) {
Text(
text = task.id.toString(),
color = Color.White,
fontSize = 12.sp
)
}
}
这段 Compose 代码的威力在于,UI 的展现完全由 JobStatus
这个数据结构驱动。ViewModel
从后端获取新数据后,StateFlow
会触发 UI 重组,TaskCell
的颜色会自动根据任务状态变化,无需任何手动的 UI 操作。这种数据驱动的范式,使得构建复杂的、动态的界面变得异常清晰和可维护。
# 遗留问题与未来迭代
这套系统解决了我们最初的痛点,但它远非完美。在真实项目中,总要对技术的适用边界有清醒的认识。
- Master 单点故障 (SPOF): 当前的 Master 节点是整个系统的阿喀琉斯之踵。如果它宕机,所有正在运行的作业都会失败,且状态会丢失。生产级的解决方案通常会引入 Raft 或 Paxos 协议,让多个 Master 节点选举出一个 Leader,实现高可用。
- 状态持久化: Master 的所有状态(作业信息、任务进度)都存储在内存中。重启即丢失。一个简单的改进是引入一个嵌入式 K/V 存储(如 BoltDB 或 LevelDB)来持久化状态,这样 Master 就可以在重启后恢复作业。
- 简化的 Shuffle 阶段: 我们简化了 Shuffle 过程,即 Reduce Worker 直接从 Map Worker 拉取数据。在规模扩大时,这会产生大量的网络连接,并可能导致 Map Worker 成为瓶颈。一个更健壮的设计会有一个专门的 Shuffle 服务,或者像 Hadoop 那样,将中间结果写入一个分布式文件系统。
- 硬编码的计算逻辑: Map 和 Reduce 函数直接编译在 Worker 中,缺乏灵活性。未来的迭代可以考虑使用 WASM (WebAssembly) 或 gRPC 插件系统,允许用户动态提交计算逻辑,从而将框架与具体业务解耦。
这个项目的价值不在于重新发明 Hadoop,而在于展示了如何利用现代工具(Go、Jetpack Compose)和务实的云平台(DigitalOcean),以极低的成本,从第一性原理出发,构建一个刚好满足特定需求的分布式系统。它验证了一个观点:并非所有问题都需要重量级的“标准答案”,有时候,一把定制的、锋利的“小刀”更加有效。