在 DigitalOcean 上构建一个基于 Go 的轻量级 MapReduce 框架与 Jetpack Compose 实时监控面板


我们面临一个不大不小的麻烦:数十个 DigitalOcean Droplet 每天产生数 GB 的非结构化文本日志。这些日志需要进行临时的聚合分析,例如统计特定错误的频率,或追踪某个用户 ID 的行为路径。在真实项目中,引入一套完整的 ELK Stack 或搭建一个 Spark 集群对于这种“临时起意”的分析需求来说,成本过高,维护也过于复杂。我们需要的是一把瑞士军刀,而不是一辆重型卡车。

最初的构想很简单:能否自行实现一个极度轻量级的 MapReduce 计算模型?它不需要 Hadoop 的全部功能,只需要核心的分布式计算能力。一个主节点(Master)负责任务分发与协调,若干个工作节点(Worker)分布在现有的 Droplet 上执行计算。为了让运维人员能随时随地发起任务并监控进度,一个移动端的控制面板是必须的。

这就是技术选型的起点:

  • 后端框架: Go。它的并发模型(goroutine 和 channel)天生适合编写网络服务和分布式协调程序。编译后的静态二进制文件可以直接扔到任何 Droplet 上运行,无需复杂的环境依赖。
  • 基础设施: DigitalOcean Droplets。利用现有的计算资源,保持低成本。
  • 控制面板: Jetpack Compose。声明式 UI 能快速构建一个响应式、数据驱动的界面,用来实时可视化任务进度再合适不过。

这个组合的目标是:用最低的资源和维护成本,实现一个专用的、高效的分布式文本处理工具。

# 架构设计与核心约定

在编码之前,必须明确系统的边界和流程。一个 MapReduce 作业的核心流程分为:输入(Input)、映射(Map)、洗牌(Shuffle)、规约(Reduce)和输出(Output)。我们的轻量级实现将简化一些环节,但核心逻辑保持不变。

graph TD
    A[Client: Jetpack Compose App] -- 1. Submit Job (Input Files, JobType) --> B(Master Node);
    B -- 2. Split Input into N Map Tasks --> C{Task Queue};
    D[Worker 1] -- 3. Request Task --> B;
    E[Worker 2] -- 3. Request Task --> B;
    F[...] -- 3. Request Task --> B;
    B -- 4. Assign Map Task --> D;
    B -- 4. Assign Map Task --> E;
    C --> B;

    subgraph Map Phase
        D -- 5. Execute Map --> G1[Intermediate Files on Worker 1];
        E -- 5. Execute Map --> G2[Intermediate Files on Worker 2];
    end

    G1 -- 6. Notify Completion --> B;
    G2 -- 6. Notify Completion --> B;
    
    B -- 7. All Maps Done, Create Reduce Tasks --> C;
    
    H[Worker 3] -- 8. Request Task --> B;
    B -- 9. Assign Reduce Task (Key Range) --> H;
    
    subgraph Reduce Phase
      H -- 10. Fetch Intermediate Data --> G1;
      H -- 10. Fetch Intermediate Data --> G2;
      H -- 11. Execute Reduce --> I[Final Output on Worker 3];
    end

    I -- 12. Notify Completion --> B;
    A -- Periodically Poll Status --> B;

核心约定:

  1. 无状态 Worker: Worker 节点不保存任何长期状态。它们启动后向 Master 注册,然后进入一个无限循环:请求任务 -> 执行任务 -> 报告结果。
  2. Master 作为“大脑”: Master 负责维护所有状态,包括 Job 进度、Task 状态、Worker 列表。这也意味着 Master 是单点故障,对于我们这个场景,可以接受。
  3. 简化的 Map/Reduce 函数: 为了避免动态加载代码的复杂性,我们将 Map 和 Reduce 函数预先编译进 Worker 的二进制文件中。客户端提交 Job 时,只需指定一个 JobType 字符串(如 “word_count” 或 “error_aggregation”),Worker 根据此类型调用相应的内建函数。
  4. 通信协议: Master 和 Worker 之间使用简单的 HTTP REST API 通信。虽然 gRPC 性能更好,但 HTTP 的易调试性和普遍性在此场景下优势更明显。
  5. 中间文件: Map 阶段产生的中间文件直接存储在 Worker 本地的临时目录。Reduce Worker 在需要时通过 HTTP 直接从 Map Worker 拉取这些文件。Master 负责告知 Reduce Worker 文件的地址。

# Master 节点的 Go 实现

Master 是整个系统的心脏。它的主要职责是管理状态和调度任务。

// file: master/main.go
package main

import (
	"encoding/json"
	"fmt"
	"log"
	"net/http"
	"sync"
	"time"
)

// TaskStatus defines the state of a task.
type TaskStatus int

const (
	Idle TaskStatus = iota
	InProgress
	Completed
	Failed
)

// Task represents a single unit of work (either Map or Reduce).
type Task struct {
	ID        int
	Type      string // "map" or "reduce"
	Status    TaskStatus
	InputFile string   // For Map tasks
	WorkerID  string   // Worker currently assigned to this task
	StartTime time.Time
}

// Job represents a full MapReduce job.
type Job struct {
	ID          string
	JobType     string // e.g., "word_count"
	InputFiles  []string
	MapTasks    []*Task
	ReduceTasks []*Task
	Status      string // "running", "completed", "failed"
	mu          sync.Mutex
}

// WorkerInfo holds metadata about a worker.
type WorkerInfo struct {
	ID        string
	Address   string
	LastHeartbeat time.Time
}

// Coordinator is the central state management struct for the master.
type Coordinator struct {
	jobs         map[string]*Job
	workers      map[string]*WorkerInfo
	jobCounter   int
	taskCounter  int
	mu           sync.Mutex
	taskQueue    chan *Task
}

var coord *Coordinator

func main() {
	coord = &Coordinator{
		jobs:      make(map[string]*Job),
		workers:   make(map[string]*WorkerInfo),
		taskQueue: make(chan *Task, 100), // Buffered channel for tasks
	}

	go coord.monitorWorkers()

	http.HandleFunc("/submit", handleSubmitJob)
	http.HandleFunc("/register", handleRegisterWorker)
	http.HandleFunc("/heartbeat", handleHeartbeat)
	http.HandleFunc("/requestTask", handleRequestTask)
	http.HandleFunc("/updateTask", handleUpdateTask)
	http.HandleFunc("/status", handleGetStatus)

	log.Println("Master node listening on :8080...")
	if err := http.ListenAndServe(":8080", nil); err != nil {
		log.Fatalf("Failed to start server: %v", err)
	}
}

// handleSubmitJob creates a new job and its initial map tasks.
func handleSubmitJob(w http.ResponseWriter, r *http.Request) {
    // ... (request body parsing logic)
	// For brevity, let's assume we receive a simple structure
	var req struct {
		JobType    string   `json:"job_type"`
		InputFiles []string `json:"input_files"`
	}
    if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
		http.Error(w, "Invalid request body", http.StatusBadRequest)
		return
	}

	coord.mu.Lock()
	defer coord.mu.Unlock()

	coord.jobCounter++
	jobID := fmt.Sprintf("job-%d", coord.jobCounter)
	newJob := &Job{
		ID:         jobID,
		JobType:    req.JobType,
		InputFiles: req.InputFiles,
		Status:     "running",
	}

	// Create map tasks for each input file
	for _, file := range req.InputFiles {
		coord.taskCounter++
		task := &Task{
			ID:        coord.taskCounter,
			Type:      "map",
			Status:    Idle,
			InputFile: file,
		}
		newJob.MapTasks = append(newJob.MapTasks, task)
		coord.taskQueue <- task // Push task to the queue
	}

    // Assume 1 reduce task for simplicity for now
    coord.taskCounter++
    reduceTask := &Task{
        ID:     coord.taskCounter,
        Type:   "reduce",
        Status: Idle,
    }
    newJob.ReduceTasks = append(newJob.ReduceTasks, reduceTask)

	coord.jobs[jobID] = newJob
	log.Printf("Submitted new job: %s with %d map tasks", jobID, len(newJob.MapTasks))
	
	w.WriteHeader(http.StatusAccepted)
	json.NewEncoder(w).Encode(map[string]string{"job_id": jobID})
}

// handleRequestTask gives a worker an available task from the queue.
func handleRequestTask(w http.ResponseWriter, r *http.Request) {
	workerID := r.URL.Query().Get("workerId")
	if workerID == "" {
		http.Error(w, "workerId is required", http.StatusBadRequest)
		return
	}

	select {
	case task := <-coord.taskQueue:
		task.WorkerID = workerID
		task.Status = InProgress
		task.StartTime = time.Now()
		
        log.Printf("Assigning task %d (%s) to worker %s", task.ID, task.Type, workerID)
		w.Header().Set("Content-Type", "application/json")
		json.NewEncoder(w).Encode(task)
	case <-time.After(5 * time.Second): // Timeout if no task is available
		w.WriteHeader(http.StatusNoContent)
	}
}

// monitorWorkers periodically checks for dead workers and reschedules their tasks.
func (c *Coordinator) monitorWorkers() {
	for {
		time.Sleep(10 * time.Second)
		c.mu.Lock()
		
		now := time.Now()
		for id, worker := range c.workers {
			// A worker is considered dead if no heartbeat for 30 seconds
			if now.Sub(worker.LastHeartbeat) > 30*time.Second {
				log.Printf("Worker %s is dead. Re-scheduling its tasks.", id)
				delete(c.workers, id)
				
				// Find and reschedule tasks assigned to the dead worker
				for _, job := range c.jobs {
					job.mu.Lock()
					for _, task := range append(job.MapTasks, job.ReduceTasks...) {
						if task.WorkerID == id && task.Status == InProgress {
							log.Printf("Re-queueing task %d from dead worker %s", task.ID, id)
							task.Status = Idle
							task.WorkerID = ""
							c.taskQueue <- task
						}
					}
					job.mu.Unlock()
				}
			}
		}
		c.mu.Unlock()
	}
}

// ... other handlers like handleRegisterWorker, handleHeartbeat, handleUpdateTask, handleGetStatus are omitted for brevity
// handleUpdateTask would be crucial: worker posts updates, master checks if all maps are done to queue the reduce tasks.

这里的核心在于 Coordinator 结构体和 taskQueue channel。所有状态都通过互斥锁(sync.Mutex)保护,确保并发安全。monitorWorkers goroutine 实现了一个简单的故障检测机制,这是生产级系统中必不可少的一环。如果一个 Worker 掉线,它正在处理的任务会被重新放回队列,等待其他健康的 Worker 领取。

# Worker 节点的 Go 实现

Worker 是执行实际计算的单元。它的逻辑很简单:启动、注册、然后在一个循环中不断向 Master 请求任务。

// file: worker/main.go
package main

import (
	"bytes"
	"encoding/json"
	"fmt"
	"io/ioutil"
	"log"
	"net/http"
	"os"
	"time"
    // Fictional package for our map/reduce functions
    "our-mr-library/functions" 
)

var masterAddr string
var workerID string

// Task represents a task received from the master.
type Task struct {
	ID        int    `json:"ID"`
	Type      string `json:"Type"`
	InputFile string `json:"InputFile"`
}

func main() {
	if len(os.Args) < 2 {
		log.Fatal("Usage: worker <master_address>")
	}
	masterAddr = os.Args[1]
	workerID = fmt.Sprintf("worker-%d", time.Now().UnixNano())

	register()
	go sendHeartbeats()

	log.Printf("Worker %s started. Polling master for tasks...", workerID)
	pollForTasks()
}

func sendHeartbeats() {
	for {
		// Simple HTTP POST to notify master we are alive
		// ... implementation omitted
		time.Sleep(10 * time.Second)
	}
}

func pollForTasks() {
	for {
		resp, err := http.Get(fmt.Sprintf("%s/requestTask?workerId=%s", masterAddr, workerID))
		if err != nil {
			log.Printf("Error requesting task: %v. Retrying...", err)
			time.Sleep(5 * time.Second)
			continue
		}

		if resp.StatusCode == http.StatusNoContent {
			// No task available, wait and poll again
			time.Sleep(5 * time.Second)
			resp.Body.Close()
			continue
		}

		var task Task
		if err := json.NewDecoder(resp.Body).Decode(&task); err != nil {
			log.Printf("Error decoding task: %v", err)
			resp.Body.Close()
			continue
		}
		resp.Body.Close()

		log.Printf("Received task %d of type %s", task.ID, task.Type)
		executeTask(&task)
	}
}

func executeTask(task *Task) {
	var err error
	if task.Type == "map" {
		err = executeMapTask(task)
	} else if task.Type == "reduce" {
		err = executeReduceTask(task)
	} else {
		err = fmt.Errorf("unknown task type: %s", task.Type)
	}
	
	updateTaskStatus(task.ID, err)
}

func executeMapTask(task *Task) error {
    // In a real scenario, we'd fetch the file from a shared storage like S3
    // or another Droplet. Here, we assume it's locally accessible for simplicity.
	content, err := ioutil.ReadFile(task.InputFile)
	if err != nil {
		return fmt.Errorf("failed to read input file %s: %w", task.InputFile, err)
	}

    // Call the pre-compiled function based on job type (not shown, but would be retrieved from master)
    // For this example, let's assume it's always word count.
	intermediate := functions.WordCountMap(string(content))

    // Write intermediate results to local files, partitioned by key.
    // This is a simplified shuffle. In reality, we'd hash the key
    // to determine which reducer it belongs to.
    // e.g., output-map-<task_id>-<reduce_task_id>
    // ... file writing logic omitted
	log.Printf("Map task %d completed.", task.ID)
	return nil
}

func updateTaskStatus(taskID int, taskErr error) {
	status := "completed"
	errMsg := ""
	if taskErr != nil {
		status = "failed"
		errMsg = taskErr.Error()
	}

	payload, _ := json.Marshal(map[string]interface{}{
		"task_id": taskID,
		"status":  status,
		"error":   errMsg,
	})

	http.Post(fmt.Sprintf("%s/updateTask", masterAddr), "application/json", bytes.NewBuffer(payload))
	log.Printf("Reported status for task %d: %s", taskID, status)
}
// ... executeReduceTask and other functions are similar in structure

Worker 的代码非常直白。它是一个不知疲倦的执行者,完全听从 Master 的指挥。单元测试的思路会集中在 executeMapTaskexecuteReduceTask 这两个函数上,确保它们对于各种边界输入(空文件、超大文件、异常格式)都能正确处理或优雅失败。

# Jetpack Compose 实时监控面板

移动端控制面板是这个系统的“脸面”。它必须能清晰、实时地展示整个集群的状态和作业进度。Jetpack Compose 的声明式和响应式特性让这件事变得非常高效。

1. ViewModel 和数据流

我们使用 ViewModel 来管理UI状态和网络请求。通过 StateFlow 将数据流暴露给 UI 层。ViewModel 会定期轮询 Master 的 /status 接口。

// file: ui/JobViewModel.kt
package com.example.mrcontroller.ui

import androidx.lifecycle.ViewModel
import androidx.lifecycle.viewModelScope
import com.example.mrcontroller.data.ClusterStatus
import com.example.mrcontroller.data.JobStatus
import com.example.mrcontroller.data.MrApi // Retrofit service interface
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.launch

// Represents the entire UI state for the dashboard.
data class DashboardUiState(
    val jobs: List<JobStatus> = emptyList(),
    val isLoading: Boolean = true,
    val error: String? = null
)

class JobViewModel : ViewModel() {

    private val _uiState = MutableStateFlow(DashboardUiState())
    val uiState: StateFlow<DashboardUiState> = _uiState

    private val apiService = MrApi.create() // Your Retrofit instance

    init {
        // Start polling for status updates as soon as the ViewModel is created.
        startPolling()
    }

    private fun startPolling() {
        viewModelScope.launch(Dispatchers.IO) {
            while (true) {
                try {
                    val status = apiService.getClusterStatus()
                    _uiState.value = DashboardUiState(jobs = status.jobs, isLoading = false)
                } catch (e: Exception) {
                    // In a real app, handle different network errors specifically.
                    _uiState.value = _uiState.value.copy(error = "Failed to connect to master: ${e.message}", isLoading = false)
                }
                // Poll every 3 seconds. Adjust based on desired real-time feel vs. network load.
                delay(3000)
            }
        }
    }
    
    // A function to submit a new job, called from the UI.
    fun submitNewJob(jobType: String, inputFiles: List<String>) {
        viewModelScope.launch(Dispatchers.IO) {
            try {
                // Call the API to submit the job.
                // Upon success, the next poll will automatically pick it up.
                // We could also optimistically update the UI here.
                apiService.submitJob(jobType, inputFiles)
            } catch (e: Exception) {
                _uiState.value = _uiState.value.copy(error = "Job submission failed: ${e.message}")
            }
        }
    }
}

2. 核心 Composable:任务进度可视化

最有价值的 UI 部分是单个 Job 的详情页,它能可视化所有 Map 和 Reduce 任务的状态。我们可以用 LazyVerticalGrid 来展示大量的任务。

// file: ui/JobDetailsScreen.kt
package com.example.mrcontroller.ui

import androidx.compose.foundation.background
import androidx.compose.foundation.border
import androidx.compose.foundation.layout.*
import androidx.compose.foundation.lazy.grid.GridCells
import androidx.compose.foundation.lazy.grid.LazyVerticalGrid
import androidx.compose.foundation.lazy.grid.items
import androidx.compose.material3.Text
import androidx.compose.runtime.Composable
import androidx.compose.ui.Alignment
import androidx.compose.ui.Modifier
import androidx.compose.ui.graphics.Color
import androidx.compose.ui.unit.dp
import androidx.compose.ui.unit.sp
import com.example.mrcontroller.data.JobStatus
import com.example.mrcontroller.data.TaskStatus

@Composable
fun JobDetailsScreen(job: JobStatus) {
    Column(modifier = Modifier.padding(16.dp)) {
        Text("Job ID: ${job.id}", fontSize = 20.sp)
        Text("Status: ${job.status}", color = if (job.status == "completed") Color.Green else Color.Yellow)
        
        Spacer(modifier = Modifier.height(16.dp))

        // Map Tasks Visualization
        Text("Map Tasks (${job.mapTasks.count { it.status == "Completed" }} / ${job.mapTasks.size})")
        TaskGrid(tasks = job.mapTasks)
        
        Spacer(modifier = Modifier.height(16.dp))

        // Reduce Tasks Visualization
        Text("Reduce Tasks (${job.reduceTasks.count { it.status == "Completed" }} / ${job.reduceTasks.size})")
        TaskGrid(tasks = job.reduceTasks)
    }
}

@Composable
fun TaskGrid(tasks: List<TaskStatus>) {
    LazyVerticalGrid(
        columns = GridCells.Adaptive(minSize = 48.dp), // Responsive grid
        horizontalArrangement = Arrangement.spacedBy(4.dp),
        verticalArrangement = Arrangement.spacedBy(4.dp),
        modifier = Modifier.fillMaxWidth()
    ) {
        items(tasks) { task ->
            TaskCell(task = task)
        }
    }
}

@Composable
fun TaskCell(task: TaskStatus) {
    val backgroundColor = when (task.status) {
        "Idle" -> Color.Gray
        "InProgress" -> Color.Blue
        "Completed" -> Color.Green
        "Failed" -> Color.Red
        else -> Color.DarkGray
    }

    Box(
        modifier = Modifier
            .size(48.dp)
            .background(backgroundColor)
            .border(1.dp, Color.White),
        contentAlignment = Alignment.Center
    ) {
        Text(
            text = task.id.toString(),
            color = Color.White,
            fontSize = 12.sp
        )
    }
}

这段 Compose 代码的威力在于,UI 的展现完全由 JobStatus 这个数据结构驱动。ViewModel 从后端获取新数据后,StateFlow 会触发 UI 重组,TaskCell 的颜色会自动根据任务状态变化,无需任何手动的 UI 操作。这种数据驱动的范式,使得构建复杂的、动态的界面变得异常清晰和可维护。

# 遗留问题与未来迭代

这套系统解决了我们最初的痛点,但它远非完美。在真实项目中,总要对技术的适用边界有清醒的认识。

  1. Master 单点故障 (SPOF): 当前的 Master 节点是整个系统的阿喀琉斯之踵。如果它宕机,所有正在运行的作业都会失败,且状态会丢失。生产级的解决方案通常会引入 Raft 或 Paxos 协议,让多个 Master 节点选举出一个 Leader,实现高可用。
  2. 状态持久化: Master 的所有状态(作业信息、任务进度)都存储在内存中。重启即丢失。一个简单的改进是引入一个嵌入式 K/V 存储(如 BoltDB 或 LevelDB)来持久化状态,这样 Master 就可以在重启后恢复作业。
  3. 简化的 Shuffle 阶段: 我们简化了 Shuffle 过程,即 Reduce Worker 直接从 Map Worker 拉取数据。在规模扩大时,这会产生大量的网络连接,并可能导致 Map Worker 成为瓶颈。一个更健壮的设计会有一个专门的 Shuffle 服务,或者像 Hadoop 那样,将中间结果写入一个分布式文件系统。
  4. 硬编码的计算逻辑: Map 和 Reduce 函数直接编译在 Worker 中,缺乏灵活性。未来的迭代可以考虑使用 WASM (WebAssembly) 或 gRPC 插件系统,允许用户动态提交计算逻辑,从而将框架与具体业务解耦。

这个项目的价值不在于重新发明 Hadoop,而在于展示了如何利用现代工具(Go、Jetpack Compose)和务实的云平台(DigitalOcean),以极低的成本,从第一性原理出发,构建一个刚好满足特定需求的分布式系统。它验证了一个观点:并非所有问题都需要重量级的“标准答案”,有时候,一把定制的、锋利的“小刀”更加有效。


  目录