..

Go Sync并发包之errgroup

你是否写过一个函数,它之所以很长,是因为它要完成很多任务,即使这些任务之间并不相互依赖? 你是否写过一个很长的函数,因为它要完成很多任务,即使这些任务并不相互依赖?我就遇到过这种情况。

想想看,你有一个函数可以做 3 件事:

  • 按用户 ID 从数据库中获取用户详细信息。
  • 调用外部服务,按用户 ID 获取用户最近的活动信息
  • 访问日志服务,按用户 ID 获取用户上次登录的详细信息
func FetchUserData(userID string) error {
    g := errgroup.Group{}
    
	// 获取用户详细信息
    userDetail, _ := fetchUserDetails(userID)
	// 获取用户活动
    userAct, _ := fetchUserActivity(userID)
    // 获取用户登录详细信息
    userLoginInfo, _ := fetchUserLoginDetails(userID)

    // ...
}

所有这些任务都只需要用户 ID,而不使用其他任务的数据。

您可以尝试使用 goroutines 来实现这一点,但您需要自己处理细节问题。让我们来回答这些问题:

  • 同步:如何确保一切完成?
  • 错误处理:如果一项任务出错,你该怎么办?或者三项任务中有两项不成功?
  • 取消:如果一个任务出现问题,如何停止所有其他正在运行的程序?
  • 限制:你是否考虑过限制同时运行多少个 goroutines?
  • 可重复使用:一旦找到解决方案,如何在类似情况下再次使用?

1.什么是 errgroup?

errgroup 软件包可让您同时处理多项任务。

通过它,可以轻松地以安全的方式一起运行,保持同步,处理错误,并控制何时停止 goroutines。下面是一个如何使用它的快速示例:

func FetchUserData() error {
	g := errgroup.Group{}

	// 获取用户详细信息
	g.Go(func() error {
		time.Sleep(1 * time.Second)
		fmt.Println("Fetched user details...")
		return nil
	})

    // 获取用户活动
    g.Go(func() error {
		time.Sleep(1 * time.Second)
		fmt.Println("Fetched user activity...")
		return nil
	})

	// 获取用户登录详细信息
	g.Go(func() error {
		time.Sleep(2 * time.Second)
		fmt.Println("Fetched user login details...")
		return nil
	})

	// 等待所有goroutines完成并返回第一个错误 (如果有)
	return g.Wait()
}

因此,errgroup 的工作就是运行这些任务,并通过 g.Wait() 等待任务结束,我们需要做的就是添加任务。 什么是 errgroup?

当你有很多任务时,比如 10 项、20 项甚至更多,这种方法就非常有用。

但有一点需要注意,如果不加以控制,同时运行过多的程序会占用资源。我们该如何处理呢?让我们在下一节中探讨。"

2.SetLimit:限制程序运行次数

这个软件包为我们提供了一种限制同时运行的 goroutines 数量的方法,让我们看看如何使用它:

func FetchUserData() error {
    g := errgroup.Group{}

    // 将限制设置为2
    g.SetLimit(2)

    // 获取用户详细信息
    g.Go(func() error {
        time.Sleep(1 * time.Second)
        fmt.Println("Fetched user details...")
        return nil
    })

    // 获取用户活动
    g.Go(func() error {
        time.Sleep(1 * time.Second)
        fmt.Println("Fetched user activity...")
        return nil
    })

    // 获取用户登录详细信息
    g.Go(func() error {
        time.Sleep(2 * time.Second)
        fmt.Println("Fetched user login details...")
        return nil
    })

    // 等待所有goroutines完成并返回第一个错误 (如果有)
    return g.Wait()
}

这样做可以确保只有 2 个程序同时运行。试试看,前两个任务会同时显示信息,而第三个任务会在启动 3 秒后显示信息。 SetLimit:限制程序数目

“设置限值后可以更改吗?

答案是肯定的,但要小心。

如果任何 goroutine 正在运行,试图更改限制将导致 errgroup.SetLimit() 异常。

现在,让我们再想一想,如果 errgroup 中已经有 50 个 goroutines,而你不想再添加更多的 goroutines,该怎么办?

3.TryGo:添加快速程序的受控方法

TryGo 与 Go 函数有相似之处,但它提供了一种细致入微的方法来处理 errgroup 中的 goroutine。具体来说,如果当前计数达到设定的限制,它就会阻止添加新的 goroutine

TryGo 的签名有点与众不同:

// TryGo: 检查并添加 goroutine
func (g *Group) TryGo(fn func () error) bool

// Go: 只需添加一个 goroutine
func (g *Group) Go(fn func () error)

如果使用 TryGo,并成功将 goroutine 添加到 errgroup,它将通过返回 true 来传达成功。如果不成功,则返回 false。

但有趣的地方就在这里。

当一个 errgroup 已满时,errgroup.Go() 会阻塞直到一个 goroutine 结束,然后再添加一个新的,相反,errgroup.TryGo() 不会等待。如果 errgroup 已满,TryGo 会立即返回 false。

4.WithContext:处理取消

如果其中一个程序出错,如何停止所有正在运行的程序,从而避免浪费资源?

WithContext 函数用于创建新的 errgroup 和 context :

erg, ctx := errgroup.WithContext(context.Background())

该函数为您提供context,但不提供取消函数,因此您无法自行停止context。

“如果出现错误,errgroup 会为我停止所有程序吗?

不,errgroup 会取消上下文,但不会停止 goroutines。其余的 goroutines 会继续运行,直到完成为止,除非你这样做:

func main() {
  g, ctx := errgroup.WithContext(context.Background())
  
  g.Go(func() error { return fetchUserDetails(ctx) })
  g.Go(func() error { return fetchUserActivity(ctx) })
  g.Go(func() error { return fetchUserPaymentHistory(ctx) })
  
  // Wait for tasks to finish and print the error
  if err := g.Wait(); err != nil {
    fmt.Printf("Encountered an error: %v\n", err)
  }
}

让这些任务依赖于上下文非常重要。因此,当上下文取消时,所有 goroutines 也将停止。

接下来,我将谈谈 Errgroup 的功能:Errgroup Explained:了解其内部运作。

这种方法不仅可以减少额外的代码,而且还提供了一种处理错误和控制 goroutines 生命周期的有效方法。