Go Sync并发包之errgroup
你是否写过一个函数,它之所以很长,是因为它要完成很多任务,即使这些任务之间并不相互依赖? 你是否写过一个很长的函数,因为它要完成很多任务,即使这些任务并不相互依赖?我就遇到过这种情况。
想想看,你有一个函数可以做 3 件事:
- 按用户 ID 从数据库中获取用户详细信息。
- 调用外部服务,按用户 ID 获取用户最近的活动信息
- 访问日志服务,按用户 ID 获取用户上次登录的详细信息
func FetchUserData(userID string) error {
g := errgroup.Group{}
// 获取用户详细信息
userDetail, _ := fetchUserDetails(userID)
// 获取用户活动
userAct, _ := fetchUserActivity(userID)
// 获取用户登录详细信息
userLoginInfo, _ := fetchUserLoginDetails(userID)
// ...
}
所有这些任务都只需要用户 ID,而不使用其他任务的数据。
您可以尝试使用 goroutines 来实现这一点,但您需要自己处理细节问题。让我们来回答这些问题:
- 同步:如何确保一切完成?
- 错误处理:如果一项任务出错,你该怎么办?或者三项任务中有两项不成功?
- 取消:如果一个任务出现问题,如何停止所有其他正在运行的程序?
- 限制:你是否考虑过限制同时运行多少个 goroutines?
- 可重复使用:一旦找到解决方案,如何在类似情况下再次使用?
1.什么是 errgroup?
errgroup 软件包可让您同时处理多项任务。
通过它,可以轻松地以安全的方式一起运行,保持同步,处理错误,并控制何时停止 goroutines。下面是一个如何使用它的快速示例:
func FetchUserData() error {
g := errgroup.Group{}
// 获取用户详细信息
g.Go(func() error {
time.Sleep(1 * time.Second)
fmt.Println("Fetched user details...")
return nil
})
// 获取用户活动
g.Go(func() error {
time.Sleep(1 * time.Second)
fmt.Println("Fetched user activity...")
return nil
})
// 获取用户登录详细信息
g.Go(func() error {
time.Sleep(2 * time.Second)
fmt.Println("Fetched user login details...")
return nil
})
// 等待所有goroutines完成并返回第一个错误 (如果有)
return g.Wait()
}
因此,errgroup 的工作就是运行这些任务,并通过 g.Wait()
等待任务结束,我们需要做的就是添加任务。
当你有很多任务时,比如 10 项、20 项甚至更多,这种方法就非常有用。
但有一点需要注意,如果不加以控制,同时运行过多的程序会占用资源。我们该如何处理呢?让我们在下一节中探讨。"
2.SetLimit:限制程序运行次数
这个软件包为我们提供了一种限制同时运行的 goroutines 数量的方法,让我们看看如何使用它:
func FetchUserData() error {
g := errgroup.Group{}
// 将限制设置为2
g.SetLimit(2)
// 获取用户详细信息
g.Go(func() error {
time.Sleep(1 * time.Second)
fmt.Println("Fetched user details...")
return nil
})
// 获取用户活动
g.Go(func() error {
time.Sleep(1 * time.Second)
fmt.Println("Fetched user activity...")
return nil
})
// 获取用户登录详细信息
g.Go(func() error {
time.Sleep(2 * time.Second)
fmt.Println("Fetched user login details...")
return nil
})
// 等待所有goroutines完成并返回第一个错误 (如果有)
return g.Wait()
}
这样做可以确保只有 2 个程序同时运行。试试看,前两个任务会同时显示信息,而第三个任务会在启动 3 秒后显示信息。
“设置限值后可以更改吗?
答案是肯定的,但要小心。
如果任何 goroutine 正在运行,试图更改限制将导致 errgroup.SetLimit()
异常。
现在,让我们再想一想,如果 errgroup 中已经有 50 个 goroutines,而你不想再添加更多的 goroutines,该怎么办?
3.TryGo:添加快速程序的受控方法
TryGo 与 Go 函数有相似之处,但它提供了一种细致入微的方法来处理 errgroup 中的 goroutine。具体来说,如果当前计数达到设定的限制,它就会阻止添加新的 goroutine
TryGo 的签名有点与众不同:
// TryGo: 检查并添加 goroutine
func (g *Group) TryGo(fn func () error) bool
// Go: 只需添加一个 goroutine
func (g *Group) Go(fn func () error)
如果使用 TryGo,并成功将 goroutine 添加到 errgroup,它将通过返回 true 来传达成功。如果不成功,则返回 false。
但有趣的地方就在这里。
当一个 errgroup 已满时,errgroup.Go() 会阻塞直到一个 goroutine 结束,然后再添加一个新的,相反,errgroup.TryGo() 不会等待。如果 errgroup 已满,TryGo 会立即返回 false。
4.WithContext:处理取消
如果其中一个程序出错,如何停止所有正在运行的程序,从而避免浪费资源?
WithContext 函数用于创建新的 errgroup 和 context :
erg, ctx := errgroup.WithContext(context.Background())
该函数为您提供context,但不提供取消函数,因此您无法自行停止context。
“如果出现错误,errgroup 会为我停止所有程序吗?
不,errgroup 会取消上下文,但不会停止 goroutines。其余的 goroutines 会继续运行,直到完成为止,除非你这样做:
func main() {
g, ctx := errgroup.WithContext(context.Background())
g.Go(func() error { return fetchUserDetails(ctx) })
g.Go(func() error { return fetchUserActivity(ctx) })
g.Go(func() error { return fetchUserPaymentHistory(ctx) })
// Wait for tasks to finish and print the error
if err := g.Wait(); err != nil {
fmt.Printf("Encountered an error: %v\n", err)
}
}
让这些任务依赖于上下文非常重要。因此,当上下文取消时,所有 goroutines 也将停止。
接下来,我将谈谈 Errgroup 的功能:Errgroup Explained:了解其内部运作。
这种方法不仅可以减少额外的代码,而且还提供了一种处理错误和控制 goroutines 生命周期的有效方法。