Golang 中的 ES7 风格的 Async/Await 实现

前言

我最近在学习 Golang。我对 Golang 感兴趣的是goroutine + channel并发模型。在 Golang 中,我们使用 goroutine 执行异步任务,而这些 goroutine 通过名为Channel.

作为一个前端开发者,我发现 Golang 中的 goroutine + channel 模型很像 Javascript 中的事件循环。

  • Javascript V8 引擎使用主线程执行同步代码,而 Golang 使用主 goroutine。
  • Javascript V8 引擎维护一个线程池来执行异步代码,而 Golang 使用多个 goroutine。
  • Javascript 线程通过不断轮询的事件循环与主线程通信,而 Golang 协程通过Channel.

所以我想知道是否有一种方法可以通过Async/ Await模式控制 Golang 中的异步任务,就像我们在 Javascript ES7 中所做的那样。答案是肯定的!

更多详情 → https://github.com/sun0day/async

异步/等待

在继续之前,让我们回顾一下常见的异步模型。常见的异步模型有两种:阻塞异步模型和非阻塞异步模型。阻塞异步模型和非阻塞异步模型的区别在于同步代码执行是否应该等待异步任务响应继续。

在 Javascript ES7 中,我们使用async保留字将 a 标记function为异步任务。一旦调用 a async function,它将被发送到另一个子线程执行。然后不管是否 async function完成,其余的同步代码都会立即执行。上面的async funciton调用流程是非阻塞异步的。我们还可以使用保留字将async function调用流转换为阻塞异步。await

async function nonBlock() {...}

async function block() {...}

async function main() {
  nonBlock() // will not block main 

  await block() // will block main
}

Golang 提供了一种更精细、更灵活的方式来控制异步任务的执行。但灵活性也意味着我们必须编写更多代码来控制异步任务的执行。更糟糕的是,我们还要处理 goroutine 泄漏、死锁、panic 等异常。在 golang 中引入 Async/Await 模式可以帮助 Goer 编写更整洁、可读、健壮的代码。

Golang 中的异步实现

在 Golang 中,我们可以func使用func命名将同步转换为异步AsyncAsync类型定义为:

func Async[V any](f func () V) func() *AsyncTask[V] {...}

Async接受 afunc作为参数并返回一个新的func,它将创建一个struct命名的AsyncTask. 您应该通过泛型参数告诉类型是Async什么。表示由 . 创建的异步任务。它有两个属性:& 。returnfVAsyncTaskAsyncdataerr

type AsyncTask[V any] struct {
  data chan V
  err chan any
}

我们可以通过 观察f()结果AsyncTask.data和可能的错误AsyncTask.err。那么我们来看看它的内部结构Async

func Async[V any](f func () V) func() *AsyncTask[V] {
  exec := func() *AsyncTask[V] {
    data, err := handle[V](f)
    return &AsyncTask[V]{data: data, err: err}
  }

  return exec
}

func命名将在内部exec创建一个AsyncTask指针,exec异步 handle处理同步f结果和可能的错误。data Channel和容量都err Channel设置为 1 以防止 goroutine 泄漏。一旦f()完成,由创建的子 goroutinehandle可以立即退出,而不管是否有其他 goroutine 接收dataerr

func handle[V any](f func () V) (chan V, chan any) {
  data := make(chan V, 1)
  err := make(chan any, 1)

  go func() {
    var result V
    defer func() {
      if e:= recover(); e == nil {
        data <- result
      } else {
        err <- e
      }
      close(data)
      close(err)
    }()
    
    result = f()
  }()

  return data, err
}

下面的示例显示了如何Async封装f以及如何f异步工作。

func main() {
  a := 1
  b := 2
  af := Async[int](func() int {
    c := a + b
    fmt.Println("f() result is", c)
    return c
  })

  fmt.Println("sync start, goroutine=", runtime.NumGoroutine())
  af()
  fmt.Println("sync end, goroutine=", runtime.NumGoroutine())
  
  time.Sleep(1 * time.Second)
  fmt.Println("async end, goroutine=", runtime.NumGoroutine())
}
/* stdout:
sync start, goroutine=1
sync end, goroutine=2
f() result: 3
async end, goroutine=1
*/

在 Golang 中等待实现

AwaitGolang 中的实现很简单。什么Await是等待AsyncTask.dataAsyncTask.err发送数据。

func Await[V any](t *AsyncTask[V]) (V, any) {
  var data V
  var err any

  select {
  case err := <-t.err:
    return data, err
  case data := <-t.data:
    return data, err
  }
}

下面的示例显示了如何Await等待Async响应。

func main() {
  a := 1
  b := 2
  af1 := Async[int](func() int {
    c := a + b
    return c
  })
  af2 := Async[int](func() int {
    panic("f() error")
  })
  
  fmt.Printf("sync start, goroutine=%d\n", runtime.NumGoroutine())
  data, _ := Await[int](af1())
  _, err := Await[int](af2())
  fmt.Printf("sync end, goroutine=%d, af1() result=%d, af2() result='%s'\n", runtime.NumGoroutine(), data, err)
}

/* stdout
sync start, goroutine=1
sync end, goroutine=1, af1() result=3, af2() result='f() error'
*/

结论

本文展示了如何通过 goroutine 和Channel. 除了上面的AsyncTask定义,我们还需要考虑它的执行结果和状态。更进一步,我们还可以实现其他基于Async和的异步流 API Await,例如all,race等。


完整的源代码可以在这里找到。