自然醒的博客

Go 并发

· shenfq

并发

前言§

在学习 Go 的并发之前,先复习一下操作系统的基础知识。

并发与并行§

先来理一理并发与并行的区别。

并行:指的是在同一时间,多个程序在不同的 CPU 上共同运行,互相之间并没有对 CPU 资源进行竞争。比如,我在看书的时候,左手用来翻书,右手做笔记,两者可以同时进行。

并发:如果系统只有一个 CPU,有多个程序要运行,系统只能将 CPU 的时间划分为多个时间片,然后分配给不同的程序。比如,我看书的时候,只能用右手翻完书之后,才能腾出手来做笔记。

可是明确的是并发≠并行,但是只要 CPU 运行足够快,每个时间片划分足够小,就会给人们造成一种假象,认为计算机在同一时刻做了多个事情。

进程、线程、协程§

进程是一个程序执行的过程,也是系统进行资源分配和调度的基本单位。简单来说,一个进程就是我们电脑上某个独立运行的程序。

线程是系统能够调度的最小单位,它被包含在进程里面,是进程中的实际运作单位,一个进程可以包含多个线程。可以将进程理解为一个工厂,而工厂里面的工人就是线程。就像工厂里面必须要有一个工人才能工作一样,每个进程里面也必须有一个线程才能工作。比如,JavaScript 就被成为单线程的语言,说明 JavaScript 工厂里面只有一个打工人,这个打工人就是工头,称为主线程。多线程的进程中也会有一个主线程,主线程一般随着进程一起创建和销毁。

🏭-👷🏻

进程与线程都是操作系统上的概念,程序中如果要进行进程或者线程的切换,在切换的过程中,需要先保存当线程的状态,然后恢复另一个线程的状态,这是需要耗费时间的,如果是进程的切换还可能跨 CPU,无法利用 CPU 缓存,导致进程比线程的切换成本更加高昂。

所以,除了系统级别的内核线程外,一些程序中创建了用户线程这一说,这么做可以减少与操作系统交互,将线程的切换控制在程序内,这种用户态的线程被称为协程。用户线程的切换完全由程序控制,实际上使用的内核线程就只存在一个,内核线程与用户线程之间的关系为一对多。虽然这样做可以减少线程上下文切换带来的开销,但是,无法避免阻塞的问题。一旦某个用户线程被阻塞会导致内核线程的阻塞,无法进行用户线程进行切换,从而整个进程都被挂起,

协程§

Go 语言中的线程模型既不是使用内核线程,也不是完全的用户线程,而是一种混合型的线程模型。用户线程与内核线程的对应关系为多对多,用户线程与内核线程动态关联,当某个线程出现阻塞的时候,可以动态切换到另外的内核线程上。

G-P-M模型§

上面只是 Go 语言中抽象层面的线程模型,具体是如何进行线程调度的,还是看看 Go 语言的代码。

func log(msg string) {
	fmt.Println(msg)
}
func main() {
	log("hello")
	go log("world")
}

之前的文章介绍过,Go 程序在运行时,默认以 main 函数为入口,main 函数中运行的代码会到一个 goroutine 中运行。如果我们在调用的函数前,加上一个 go 关键词,那么这个函数就放到另外一个 goroutine 中运行。

这里说的 goroutine 就是 Go 语言中的用户线程,也就是协程。Go 语言在运行时,会建立一个 G-P-M 模型,这个模型专门负责 goroutine 的调度。

每个 goroutine 都会放到一个 goroutine 队列中,由于是用户自主创建,上下文的切换成本极低。P(processor)的主要作用是管理用户线程,将 goroutine 合理的安排到内核线程上,也就是这个模型的 M。通常情况下,G 的数量远远多于 M。

Goroutine§

如果你有运行过上面的代码,你会发现,go 关键词后的函数并没有真正执行。

func log(msg string) {
	fmt.Println(msg)
}
func main() {
	log("hello")
	go log("world")
}

运行后,终端只输出了 hello,并没有输出 world

这是因为 main 函数会在主 goroutine 中运行,类似于主线程,而每个 go 语句会启动一个新的 goroutine,启动后的 goroutine 并不会直接执行,而是会放入一个 G 队列中,等待 P 的分配。但是主 goroutine 结束后,就意味着程序结束了,G 队列中的 goroutine 还没有等到执行时间。所以,go 语句后的函数是一个异步的函数,go 语句调用后,会立即去执行后面的语句,而不会等待 go 语句后的函数执行。

如果要 world 输出,我们可以在 main 函数后面加一个休眠,延长主 goroutine 的执行时间。

import (
	"fmt"
	"time"
)
func log(msg string) {
	fmt.Println(msg)
}
func main() {
	fmt.Println()
	log("hello")
	go log("world")
	time.Sleep(time.Millisecond * 500)
}

通道§

多线程编程中,由于各个线程之间需要共享数据,一般采用的是共享内存的方案。但是这么做,势必会出现多个线程同时修改同一份数据情况,为了保证数据的安全性,需要为数据加锁,处理起来就比较麻烦。

所以在 Go 语言社区有一句名言:

不要通过共享内存来通信,而应该通过通信来共享内存。

创建通道§

这里说的通信的方式,就是 Go 语言中的通道(channel)。通道是 Go 语言中的一种特殊类型,需要通过 make 方法创建一个通道。

ch := make(chan int) // 创建一个 int 类型的通道

创建通道的时候,需要加上一个类型,表示该通道传输数据的类型。也可以通过指定一个空接口的方式,创建一个可以传送任意数据的通道。

ch := make(chan interface{})

创建的通道分为无缓存通道和有缓存通道,make 方法的第二个参数表示可缓存的数量(如果传入 0,效果和不传一样)。

ch := make(chan string, 0) // 无缓存通道,传入
ch := make(chan string, 1)

发送和接收数据§

通道创建后,通过 <- 符号来接收和发送数据。

ch := make(chan string)
ch <- "hello world" // 发送一个字符串
msg := <- ch // 接收之前发送的字符串

实际在这个代码运行的时候,会提示一个错误。

fatal error: all goroutines are asleep - deadlock!

表明当前的 goroutine 处于挂起状态,并且后续不会有响应,只能直接中断程序。因为这里创建的是无缓存通道,发送数据后通道不会将数据缓存在通道中,导致后面要找通道要数据的时候无法正常从通道中获取数据。我们可以将通道的缓存设置为 1,让通道可以缓存一个数据在里面。

ch := make(chan string, 1)
ch <- "hello world" // 发送一个字符串
msg := <- ch // 接收之前发送的字符串
fmt.Println(msg)

但是如果发送的数据超出了缓存数量,或者接受数据时,缓存里面已经没有数据了,依然会报错。

ch := make(chan string, 1)
ch <- "hello world"
ch <- "hello world"

// fatal error: all goroutines are asleep - deadlock!
ch := make(chan string, 1)
ch <- "hello world"
<- ch
<- ch

// fatal error: all goroutines are asleep - deadlock!

协程中使用通道§

那么无缓存的通道中,应该怎么发送和接收数据呢?这就需要将通道与协程进行结合,也就是 Go 语言中常用的并发的开发模式。

无缓存的通道在收发数据时,由于一次只能同步的发送一个数据,会在两个 goroutine 间反复横跳,通道在接受数据时,会阻塞当前 goroutine,直到通道在另一个 goroutine 发送了数据。

ch := make(chan string) // 创建一个无缓存通道
temp := "我在地球"
go func () {  
	// 接收一个字符串
	ch <- "hello world"
	temp = "进入了异次元"
}()
// 运行到这里会被阻塞
// 直到通道在另一个 goroutine 发送了数据
msg := <- ch
fmt.Println(msg)
fmt.Println("temp =>", temp)

为了证明通道在接收数据时会被阻塞,我们可以在前面加上一个 temp 变量,然后在另外的 goroutine 中修改这个变量,看最后输出的值是否被修改,以此证明通道在接受数据时是否发生了阻塞。

运行结果已经证明,当通道接收数据时,阻塞了主 goroutine 的执行。除了主动的从通道里面一条条的获取数据,还可以通过 range 的方式循环获取数据。

ch := make(chan string)

go func() {
  for i := 0; i < 5; i++ {
    ch <- fmt.Sprintf("数据 %d", i)
  }
  close(ch)
}()

for data := range ch {
		fmt.Println("接收 =>", data)
}

如果使用 range 循环读取通道中的数据时,在数据发送完毕时,需要调用 close(ch) ,将通道关闭。

实战§

在了解了前面的基础知识之后,我们可以通过协程 + 通道的写一段爬虫,来实战一下 Go 语言的并发能力。

首先确定爬虫需要爬取的网站,由于个人比较喜欢看电影,所以决定爬一爬豆瓣的电影 TOP 榜单。

其域名为 https://movie.douban.com/top250,翻到第二页后,域名为 https://movie.douban.com/top250?start=25 ,第三页的域名为 https://movie.douban.com/top250?start=50,说明每次这个 TOP 榜单每页会有 25 部电影,每次翻页就给 start 参数加上 25。

const limit = 25 // 每页的数量为 25
const total = 100 // 爬取榜单的前 100 部电影
const page = total / limit // 需要爬取的页数

func main() {
	var start int
	var url string
	for i :=0; i < page; i++ {
    start := i * limit
    // 计算得到所有的域名
    url := "https://movie.douban.com/top250?start=" + strconv.Itoa(start)
	}
}

然后,我们可以构造一个 fetch 函数,用于请求对应的页面。

func fetch(url string) {
  // 构造请求体
	req, _ := http.NewRequest("GET", url, nil)
  // 由于豆瓣会校验请求的 Header
  // 如果没有 User-Agent,http code 会返回 418
	req.Header.Add("User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.114 Safari/537.36")

  // 发送请求
	client := &http.Client{}
	rsp, _ := client.Do(req)

  // 断开连接
	defer rsp.Body.Close()
}

func main() {
	for i :=0; i < page; i++ {
    url := ……
		go fetch(url, ch)
	}
}

然后使用 goquery 来解析 HTML,提取电影的排名以及电影名。

image-20210622210049300

// 第二个参数为与主goroutine 沟通的通道
func fetch(url string, ch chan string) {
  // 省略部分代码 ……
	rsp, _ := client.Do(req)
  // 断开连接
	defer rsp.Body.Close()
  // 解析 HTML
	doc, _ := goquery.NewDocumentFromReader(rsp.Body)
	// 提取 HTML 中的电影排行与电影名称
	doc.Find(".item").Each(func(_ int, s *goquery.Selection) {
		num := s.Find(".pic em").Text()
		title := s.Find(".title::first-child").Text()
    // 将电影排行与名称写入管道中
		ch <- fmt.Sprintf("top %s: %s\n", num, title)
	})
}

最后,在主 goroutine 中创建通道,以及接收通道中的数据。

func main() {
  ch := make(chan string)

	for i :=0; i < page; i++ {
    url := ……
		go fetch(url, ch)
	}

	for i :=0; i < total; i++ {
		top := <- ch // 接收数据
		fmt.Println(top)
	}
}

最后的执行结果如下:

可以看到由于是并发执行,输出的顺序是乱序。

完整代码§

package main

import (
	"fmt"
	"github.com/PuerkitoBio/goquery"
	"net/http"
	"strconv"
)

const limit = 25
const total = 100
const page = total / limit

func fetch(url string, ch chan string) {
	req, _ := http.NewRequest("GET", url, nil)
	req.Header.Add("User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.114 Safari/537.36")

	client := &http.Client{}
	rsp, _ := client.Do(req)

	defer rsp.Body.Close()

	doc, _ := goquery.NewDocumentFromReader(rsp.Body)

	doc.Find(".item").Each(func(_ int, s *goquery.Selection) {
		num := s.Find(".pic em").Text()
		title := s.Find("span.title::first-child").Text()
		ch <- fmt.Sprintf("top %s: %s\n", num, title)
	})
}

func main() {
	ch := make(chan string)

	for i :=0; i < page; i++ {
		start := i * limit
		url := "https://movie.douban.com/top250?start=" + strconv.Itoa(start)
		go fetch(url, ch)
	}

	for i :=0; i < total; i++ {
		top := <- ch
		fmt.Println(top)
	}
}