..

微服务-分布式任务

微服务-分布式任务

前言

这篇文章应该拖了很久,今天终于抽空整理一下。 之前项目中一直使用的是 Celery,但项目业务场景复杂,需要处理大量异步任务,且对可视化运维和报警监控有强需求,语言生态无法满足现有系统架构,因此选择使用 XXL-JOB。

一、介绍

分布式任务是指将一个大型任务拆解为多个子任务,通过多台计算机/服务节点协同完成的计算模式。其核心目标是利用多机资源提升处理效率、保障系统高可用性。

二、核心对比表

维度asynqxxl-job-goCelerygocron
语言生态GoGo(XXL-JOB生态移植)PythonGo
核心定位异步任务队列分布式任务调度平台分布式任务队列 + 定时任务轻量级定时任务调度
调度类型异步任务 + 延迟任务定时任务 + API触发异步任务 + 定时任务定时任务为主
可视化支持需第三方工具(如Prometheus)自带Web控制台依赖Flower插件无原生界面
消息中间件依赖强依赖Redis无(基于数据库调度)强依赖RabbitMQ/Redis无依赖
分布式能力基于Redis的Worker集群中心化调度 + 执行器集群Worker集群 + 消息队列需自行实现分布式锁
典型场景邮件发送、实时数据处理报表生成、跨服务定时任务Django后台任务、数据分析单服务定时脚本

选型建议

场景推荐框架理由
高并发异步任务asynqRedis支撑高吞吐,Go的并发性能优势
企业级定时任务平台xxl-job-go控制台完善,任务分片机制成熟
Python项目后台任务Celery生态成熟,与Django无缝集成
轻量级单服务定时脚本gocron零依赖,API简单

扩展建议

  • 需要混合语言支持时,可组合使用(如Go用asynq处理计算密集型任务,Python用Celery处理AI推理)
  • 关键任务推荐搭配Redis Sentinel/Cluster(asynq)或MySQL高可用方案(xxl-job-go)

三、XXL-JOB

XXL-JOB 是一个分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。 选型 xxl-job 是因为已接入的公司不限于700+相对成熟稳定且企业级。

核心优势

  1. 轻量易用

    • 核心依赖仅 DB(MySQL),5分钟快速部署
    • 提供可视化控制台,支持动态调整任务参数(CRON表达式、路由策略等)
    • RESTful API 设计,与业务系统解耦
  2. 调度中心高可用

    • 采用中心化调度(非竞争式),支持集群部署
    • 任务触发策略丰富(轮训、故障转移、分片广播等)
  3. 完备的监控能力

    • 实时查看调度日志(支持日志回溯)
    • 内置邮件报警机制
    • 运行报表统计(成功率/触发次数)
  4. 弹性扩缩容

    • 执行器自动注册发现
    • 支持动态分片(如:处理海量数据时自动拆分任务)

四、与 go-zero 框架集成

只需要将 xxl-job-executor-go 集成到 go-zero 框架中,配合 xxl-job-admin 即可完成分布式任务调度。

  1. 编写任务逻辑

    只需要在 internal/task/ 目录下创建自己任务文件即可。 如下示例 internal/task/demo1.go

    package task
    
    import (
        "context"
        "fmt"
    
        xxl "github.com/xxl-job/xxl-job-executor-go"
    )
    
    func Test1(cxt context.Context, param *xxl.RunReq) (msg string) {
        fmt.Println("test one task" + param.ExecutorHandler + " param:" + param.ExecutorParams + " log_id:" + xxl.Int64ToStr(param.LogID))
        return "test done"
    }
    
  2. 注册任务

    这里将把我们编写代码任务注册。如下示例 internal/svc/regtask.go 文件中注册。

    package svc
    
    import (
        task "github.com/ch3nnn/xxl-job-zero/internal/task"
        "github.com/ch3nnn/xxl-job-zero/pkg/xxlx"
    )
    
    // RegisterTasks 注册任务
    func RegisterTasks(exec xxlx.Executor) {
        exec.RegTasks(
            []xxlx.Task{
                {
                    Pattern: "task.demo1",
                    TaskFn:  task.Test1,
                },
                {
                    Pattern: "task.demo2",
                    TaskFn:  task.Test2,
                },
                {
                    Pattern: "task.demo3",
                    TaskFn:  task.Panic,
                },
            },
        )
    }
    
  3. 启动

    我们只需要把项目中 job.go 文件启动即可。

    register task: task.demo1 
    register task: task.demo2 
    register task: task.demo3 
    Starting server at 0.0.0.0:8888...
    {"@timestamp":"2024-10-09T18:25:59.587+08:00","caller":"xxlx/log.go:22","content":"执行器注册成功:{\"code\":200,\"msg\":null,\"content\":null}","level":"info"}
    

实战完整源代码 https://github.com/ch3nnn/xxl-job-zero

五、参考资料