..
微服务-分布式任务
微服务-分布式任务
前言
这篇文章应该拖了很久,今天终于抽空整理一下。 之前项目中一直使用的是 Celery,但项目业务场景复杂,需要处理大量异步任务,且对可视化运维和报警监控有强需求,语言生态无法满足现有系统架构,因此选择使用 XXL-JOB。
一、介绍
分布式任务是指将一个大型任务拆解为多个子任务,通过多台计算机/服务节点协同完成的计算模式。其核心目标是利用多机资源提升处理效率、保障系统高可用性。
二、核心对比表
维度 | asynq | xxl-job-go | Celery | gocron |
---|---|---|---|---|
语言生态 | Go | Go(XXL-JOB生态移植) | Python | Go |
核心定位 | 异步任务队列 | 分布式任务调度平台 | 分布式任务队列 + 定时任务 | 轻量级定时任务调度 |
调度类型 | 异步任务 + 延迟任务 | 定时任务 + API触发 | 异步任务 + 定时任务 | 定时任务为主 |
可视化支持 | 需第三方工具(如Prometheus) | 自带Web控制台 | 依赖Flower插件 | 无原生界面 |
消息中间件依赖 | 强依赖Redis | 无(基于数据库调度) | 强依赖RabbitMQ/Redis | 无依赖 |
分布式能力 | 基于Redis的Worker集群 | 中心化调度 + 执行器集群 | Worker集群 + 消息队列 | 需自行实现分布式锁 |
典型场景 | 邮件发送、实时数据处理 | 报表生成、跨服务定时任务 | Django后台任务、数据分析 | 单服务定时脚本 |
选型建议
场景 | 推荐框架 | 理由 |
---|---|---|
高并发异步任务 | asynq | Redis支撑高吞吐,Go的并发性能优势 |
企业级定时任务平台 | xxl-job-go | 控制台完善,任务分片机制成熟 |
Python项目后台任务 | Celery | 生态成熟,与Django无缝集成 |
轻量级单服务定时脚本 | gocron | 零依赖,API简单 |
扩展建议:
- 需要混合语言支持时,可组合使用(如Go用asynq处理计算密集型任务,Python用Celery处理AI推理)
- 关键任务推荐搭配Redis Sentinel/Cluster(asynq)或MySQL高可用方案(xxl-job-go)
三、XXL-JOB
XXL-JOB 是一个分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。 选型 xxl-job 是因为已接入的公司不限于700+相对成熟稳定且企业级。
核心优势
轻量易用
- 核心依赖仅 DB(MySQL),5分钟快速部署
- 提供可视化控制台,支持动态调整任务参数(CRON表达式、路由策略等)
- RESTful API 设计,与业务系统解耦
调度中心高可用
- 采用中心化调度(非竞争式),支持集群部署
- 任务触发策略丰富(轮训、故障转移、分片广播等)
完备的监控能力
- 实时查看调度日志(支持日志回溯)
- 内置邮件报警机制
- 运行报表统计(成功率/触发次数)
弹性扩缩容
- 执行器自动注册发现
- 支持动态分片(如:处理海量数据时自动拆分任务)
四、与 go-zero 框架集成
只需要将 xxl-job-executor-go 集成到 go-zero 框架中,配合 xxl-job-admin 即可完成分布式任务调度。
编写任务逻辑
只需要在 internal/task/ 目录下创建自己任务文件即可。 如下示例 internal/task/demo1.go
package task import ( "context" "fmt" xxl "github.com/xxl-job/xxl-job-executor-go" ) func Test1(cxt context.Context, param *xxl.RunReq) (msg string) { fmt.Println("test one task" + param.ExecutorHandler + " param:" + param.ExecutorParams + " log_id:" + xxl.Int64ToStr(param.LogID)) return "test done" }
注册任务
这里将把我们编写代码任务注册。如下示例 internal/svc/regtask.go 文件中注册。
package svc import ( task "github.com/ch3nnn/xxl-job-zero/internal/task" "github.com/ch3nnn/xxl-job-zero/pkg/xxlx" ) // RegisterTasks 注册任务 func RegisterTasks(exec xxlx.Executor) { exec.RegTasks( []xxlx.Task{ { Pattern: "task.demo1", TaskFn: task.Test1, }, { Pattern: "task.demo2", TaskFn: task.Test2, }, { Pattern: "task.demo3", TaskFn: task.Panic, }, }, ) }
启动
我们只需要把项目中 job.go 文件启动即可。
register task: task.demo1 register task: task.demo2 register task: task.demo3 Starting server at 0.0.0.0:8888... {"@timestamp":"2024-10-09T18:25:59.587+08:00","caller":"xxlx/log.go:22","content":"执行器注册成功:{\"code\":200,\"msg\":null,\"content\":null}","level":"info"}