Goroutines:如何在 Go 中运行并发代码

Go 编程语言的最大优势之一是其内置的并发支持,基于 Tony Hoare 的“Communicating Sequential Processes”。作为一名具有 JS 和 Java 背景的开发人员,我对在 Go 中运行并发代码的轻松程度感到惊讶。

Go 和其他语言的并发区别

实际上,goroutines 不是线程。它们是绿色的线。让我们看看什么是绿线。

在 计算机编程中, 绿色线程 或 虚拟线程 是  由 运行时库 或 虚拟机(VM) 调度的线程, 而不是由底层 操作系统 (OS) 本机调度的线程。

维基百科

绿色线程模拟多线程环境,不依赖任何本机操作系统功能,它们在 用户空间 而不是 内核 空间进行管理,使它们能够在不支持本机线程的环境中工作。Go 调度器负责在 goroutine 之间切换线程。因此,在绿色线程(在我们的例子中是 goroutines)之间切换上下文比 os 线程有效地便宜。goroutine 的初始堆栈大小是 2KB(并且会缩小),而不是OS 线程堆栈的约 8MB

总结用户空间内部的go scheduler工作go runtime并使用操作系统线程。Goroutines 在操作系统线程的上下文中运行。

合作和先发制人

在 1.14 版本之前,Go 只有协作调度。这意味着 goroutine 自行决定何时出于任何原因释放资源(例如调用函数任何 IO 操作、等待互斥体、从通道读取等)。这可能会导致单个 goroutine 占用 CPU 的问题,并且不会达到上述任何原因。所以在 1.14 中引入了异步抢占。基于时间条件触发异步抢占。当一个 goroutine 运行超过 10 秒时,Go 调度程序将尝试抢占它。

让我们来看看它是如何工作的。首先,如何创建一个goroutine?

要创建一个 goroutine,我们需要使用如下关键字go

go func() {
     //logic of concurrent function
}()

我不会深入研究指针在 Go 中的工作原理,但您可以在此处阅读并观看内容。

我们的完整示例将如下所示:

package main
import (
	"fmt"
	"runtime"
)
func main() {
	runtime.GOMAXPROCS(1)
	i := 0
	go func(i *int) {
		for {
			*i++
		}
	}(&i)
	
	runtime.Gosched()
	fmt.Println(i)
}

尝试使用低于 1.14 的版本运行它,然后检查上面的版本。使用低于该版本的程序将无休止地等待无限循环完成。下面的版本将从运行无限循环的 goroutine 中抢占资源并打印 i 的值。

频道

有时我们需要一种方式在 goroutine 之间进行通信。在 Go 中有一个特殊的口号:

不要通过共享内存进行通信;相反,通过通信共享内存。

这是什么意思?使用并发程序总是不容易,因为您应该始终牢记竞争条件、死锁和其他问题。Go 引入了处理这个问题的通道。Channel 是 goroutine 之间的一种通信方式。它有一个类型 (int, string, some struct) 并由关键字创建make

make(ch chan int)

要从通道写入或读取某些内容,有一种特殊的语法:

ch <- 2 // write
v := <- ch // read and assign result to variable v

通道可以缓冲也可以不缓冲。不同的是,当 goroutine 尝试写入具有空闲空间的缓冲通道时,goroutine 不会被阻塞,而是会继续执行。

您还可以遍历通道。

for v := range ch {
   
}

正如您可能假设的那样,如果通道中没有值,则执行将被阻塞,直到某个 goroutine 将值写入线程。

您也可以关闭通道,结果 for 循环将停止在关闭通道上的迭代。

close(ch)

网络爬虫

作为示例,让我们创建一个简单的函数来检查网站的状态

package main
import (
	"fmt"
	"net/http"
)
func main() {
	websites := []string{
		"https://code8cn.com/",
		"https://github.com/",
		"https://apple.com/",
		"https://google.com/",
		"https://youtube.com/",
		"https://www.udemy.com/",
		"https://netflix.com/",
		"https://www.coursera.org/",
		"https://facebook.com/",
		"https://microsoft.com",
		"https://wikipedia.org",
		"https://educative.io",
		"https://acloudguru.com",
	}
	for _, website := range websites {
		checkResource(website)
	}
}
func checkResource(website string) {
	if res, err := http.Get(website); err != nil {
		fmt.Println(website, "is down")
	} else {
		fmt.Printf("[%d] %s is up\n", res.StatusCode, website)
	}
}

如果您将运行它,您将在控制台中看到这样的日志:

[200] https://code8cn.com/已上线 [200] https://github.com/已上线 [200] https://apple.com/已上线 [200] https://google.com/起来 [200]

https://www.youtube.com/embed/undefined已上涨 [200] 

https://www.udemy.com/已上涨 [200] 

https://netflix.com/已上涨 [200] 

https://www.coursera.org/已上涨 [200] 

https: //facebook.com/已上线 [200] 

https://microsoft.com已上线 [200] 

https://wikipedia.org已上线 [200] 

https://education.io已上线 [200] 

https:// /acloudguru.com已上线

调用此代码大约需要 10 秒。问题当然是因为一个接一个地同时检查每个资源。现在让我们试着让它更快一点。为此,我们将使用工作池模式。您将使用一个 goroutine 池来管理正在执行的并发工作。使用 for 循环,您将创建一定数量的工作 goroutine 作为资源池。然后,在您的 main()“线程”中,您将使用一个通道来提供工作。

首先,我们需要为我们的案例定义一个工人。它看起来像:

func worker(resources, results chan string) {
	for resource := range resources {
		if res, err := http.Get(resource); err != nil {
			results <- resource + " is down"
		} else {
			results <- fmt.Sprintf("[%d] %s is up", res.StatusCode, resource)
		}
	}
}

让我们快速找出这里到底发生了什么。每个工作人员都将等待来自频道的网站资源,resources并且在有人将资源 URL 推送到频道工作人员之后,工作人员将收到此 URL 并检查它是否正常并将结果推送到另一个名为results.

现在让我们看看我们将如何运行我们的工作池:

func main() {
	websites := []string{
		//...
	}
	resources := make(chan string, 6)
	results := make(chan string)
	for i := 0; i < 6; i++ {
		go worker(resources, results)
	}
}

我们的工作池包含 6 个 goroutine,它们正在运行并等待资源检查。在这里,我们可以将其用作单独的 goroutine IIF 或立即调用函数:

go func() {
	for _, v := range websites {
		resources <- v
	}
}()

为什么我们不应该在这里使用同步内联代码?什么时候,你可能会尝试移除最后一个例子go,你会遇到一个deadlock

现在我们不仅有我们的工人池,而且还为他们提供了工作:) 我们需要做的最后一件事是从池中读取结果。为此,我们可以遍历results主 goroutine 中的通道并打印检查每个网站的所有结果:

for i := 0; i < len(websites); i++ {
	fmt.Println(<-results)
}

完整的代码如下所示:

package main
import (
	"fmt"
	"net/http"
)
func main() {
	websites := []string{
		"https://code8cn.com/",
		"https://github.com/",
		"https://apple.com/",
		"https://google.com/",
		"https://youtube.com/",
		"https://www.udemy.com/",
		"https://netflix.com/",
		"https://www.coursera.org/",
		"https://facebook.com/",
		"https://microsoft.com",
		"https://wikipedia.org",
		"https://educative.io",
		"https://acloudguru.com",
	}
	resources := make(chan string, 6)
	results := make(chan string)
	for i := 0; i < 6; i++ {
		go worker(resources, results)
	}
	go func() {
		for _, v := range websites {
			resources <- v
		}
	}()
	
	for i := 0; i < len(websites); i++ {
		fmt.Println(<-results)
	}
}
func worker(resources, results chan string) {
	for resource := range resources {
		if res, err := http.Get(resource); err != nil {
			results <- resource + " is down"
		} else {
			results <- fmt.Sprintf("[%d] %s is up", res.StatusCode, resource)
		}
	}
}

如果您运行它,您会发现它的调用速度比顺序版本快得多。您还可以使用 goroutine 的数量,看看它将如何影响执行速度。