上一章介绍了基于channel的并发,这一章介绍基于共享变量的并发。
简介基于共享变量的并发
我们知道如果多个线程或者协程并发的对一个变量进行修改,会出现数据不一致的问题,这是因为多个并发体竞争同一个资源导致的。以最经典的存钱取钱的例子,下面的例子有2个goroutine并发的多次对存款变量进行修改:
func main() {
waiting := make(chan struct{})
go func(){
for i:=0; i < 100000000; i++{
deposit++
}
waiting <- struct{}{}
}()
for i:=0; i < 100000000; i++{
deposit--
}
<-waiting // 等待子goroutine结束才结束打印余额,否则可能子goroutine都还没执行完1亿次循环就打印余额了,这样的话余额肯定不准。
fmt.Println(deposit)
}
所以,解决的方法也很简单:
1.让资源的修改发生在一个goroutine而非多个goroutine中,这样就不存在说多个goroutine竞争一个资源。用channel就可以解决这个问题。
// bank包下的deposit.go文件
package bank
var Deposit chan int // 存钱或取钱用到的channel
var Search chan int // 查看月用到的channel
var amount int // 余额
func init() {
Deposit = make(chan int)
Search = make(chan int)
go func(){ // 必须开一个goroutine跑否则会阻塞init方法,也会阻塞main可执行文件
for { // 银行会不停的提供存钱和取钱的业务服务
select {
case money := <-Deposit: // 当客户端存钱或取钱时才会走到这个case
amount += money
case Search <- amount: // 将余额告诉客户(当客户端要查看余额时才会走到这个case)
}
// 如果没有客户存钱或者取钱,那么这个goroutine就会被select阻塞,银行就闲下来没事干了
}
}()
}
// main.go
package main
import (
"bank" // 引入bank包,引入时会执行其init函数
"fmt"
)
func main(){
waiting := make(chan struct{})
go func(){
for i:=0; i < 100000000; i++{
bank.Deposit <- 1 // 每次存1块钱
}
waiting <- struct{}{}
}()
for i:=0; i < 100000000; i++{
bank.Deposit <- -1 // 每次取1块钱
}
<-waiting
fmt.Println(<-bank.Search) // 最后查看余额
}
该例子将资源的修改(存款的修改)放在了 init 函数中的一个goroutine中进行,只有一个goroutine修改这个资源从而保证了数据的安全。
2.可以让资源的修改发生在多个goroutine,但是同一时刻只允许1个goroutine改这个资源,使得资源的访问和修改不再是竞争的而是有序的。可以使用互斥锁做到。
var amount int
var mutex *sync.Mutex // 互斥锁
func main(){
defer func(t time.Time) {
fmt.Println(float64(time.Now().UnixNano() - t.UnixNano()) / 10e9)
}(time.Now())
waiting := make(chan struct{})
mutex = &sync.Mutex{}
go func(){
for i:=0; i < 100000000; i++{
mutex.Lock()
amount++
mutex.Unlock()
}
waiting <- struct{}{}
}()
for i:=0; i < 100000000; i++{
mutex.Lock()
amount--
mutex.Unlock()
}
<-waiting
fmt.Println(amount) // 最后查看余额
}
无论用channel还是用互斥锁,都需要损耗更多性能用于同步和保证并发安全,因为原本数据的修改由并发变成时串行。
不用互斥锁和channel的程序耗时0.04秒。
使用互斥锁的程序耗时0.45秒。
使用channel的程序耗时7.5秒。因为数据在多个goroutine协程中的传递代价要远大于简单的上一个锁,而且这个差距在放大1亿倍之后会尤为明显。也就是说,通道相比于基于共享变量的同步原语来说量级是比较重的。
当然这不意味着channel就比锁要差,这要看应用场景,channel几乎可以替代所有的并发同步机制,而如果用传统的同步机制我们就不仅要用到锁,还要用到条件变量/信号量/队列等等以应付多种情况。
Go语言是更倾向于用CSP模式的并发(也就是使用channel)来替代传统的基于共享变量的并发,Go(和Go的发明者)认为“不要使用共享数据来通信;而是使用通信来共享数据”,这句话进一步体现了channel在go中的重要性。
当然这不意味这我们不需要学习传统的并发机制,还是那句话不同的技术适用于不同的应用场景,在一些场景下用传统的并发机制比channel更合适或者逻辑的实现更简单。就像上面的例子,用channel也能做到数据安全,但是程序的设计就因为用了channel而复杂了很多。
不知道大家是否有这样的一个疑问:我们经常会在多个goroutine中对channel变量进行访问和修改(发送和接收数据),这其实也会产生竞争,但是却没有发现channel数据不一致(例如多个goroutine都获取到channel的某个相同元素)。这是因为channel内部的实现是并发安全的,它的接收和发送数据肯定也是使用加锁或者条件变量或其他之类的机制。
并发机制之互斥锁
上一节中我们用两个例子简单的介绍了基于CSP(使用channel的例子)和基于共享内存(使用互斥锁的例子)的并发。
从这一节开始往后,我们开始正式介绍基于共享变量的并发机制有哪些。在众多的传统并发机制中,互斥锁是最基础也最常用的一个。
调用mutex的Lock方法会获取一个互斥锁并对要保护的对象(就是Lock()到UnLock()之间的代码)上锁。如果其它的goroutine已经获得了这个锁的话,这个操作会被阻塞直到其它goroutine调用了Unlock释放了该锁后这个goroutine才能拿到锁。mutex会保护共享变量。
在Lock和Unlock之间的代码段中的内容goroutine可以随便读取或者修改,这个代码段叫做临界区。锁的持有者在其他goroutine获取该锁之前需要调用Unlock解锁。
互斥锁的作用就是保证共享变量的修改和访问不再是并发的而是串行的从而保证数据的并发安全。
还有就是,上锁后一定要释放锁,go的defer可以保证无论如何都能够释放到锁,不会因为一些错误导致函数直接返回而跳过了释放锁的步骤。
其实,互斥锁可以用一个长度和容量为1的channel来实现和替代。上面使用互斥锁的例子改写如下:
func main() {
fmt.Println("start")
defer func(t time.Time) {
fmt.Println(float64(time.Now().UnixNano() - t.UnixNano()) / 10e9)
}(time.Now())
waiting := make(chan struct{})
deposit_chan := make(chan struct {}, 1)
deposit := 0
go func(){
for i:=0; i < 100000000; i++{
deposit_chan <- struct {}{} // 上锁,这段期间其他goroutine无法再从这个chan接收值因而被阻塞
deposit++
<- deposit_chan // 释放锁
}
waiting <- struct{}{}
}()
for i:=0; i < 100000000; i++{
deposit_chan <- struct {}{}
deposit--
<- deposit_chan
}
<-waiting
fmt.Println(deposit)
}
然后我们再将存取款的代码用互斥锁写一遍,这次我们将存取款和读取操作封装起来,让其更清晰可读。
var lock *sync.Mutex
var waiting chan struct{}
var deposition int
func main() {
fmt.Println("start")
defer func(t time.Time) {
fmt.Println(float64(time.Now().UnixNano() - t.UnixNano()) / 10e9)
}(time.Now())
lock = &sync.Mutex{}
waiting = make(chan struct{})
go func(){
for i:=0; i < 100000000; i++{
Deposit(1)
}
waiting <- struct{}{}
}()
for i:=0; i < 100000000; i++{
Deposit(-1)
}
<-waiting
fmt.Println(Search())
}
func Deposit(amount int) {
lock.Lock()
deposition += amount
lock.Unlock()
}
func Search() int{
lock.Lock()
defer lock.Unlock()
return deposition
}
并发机制之读写锁
读写锁是互斥锁的一种优化,它适用于存在频繁读取的场景。读写锁顾名思义,包含读锁和写锁,我们一般会对读取数据的代码上读锁,修改数据的代码上写锁。使得多个goroutine的读写操作是串行的;写写操作也是串行的;唯有读读操作是并行的。
下面还是银行存取款的例子,假如这个账户的用户是一对夫妇A和B,这对夫妇都是急性子,在1亿次存入的过程中,他们不停的查看存款的变化。此时我们可以让A的读取操作和B的读取操作之间不会互相阻塞从而提高银行存款的效率。
var lock *sync.RWMutex
var deposition int
var waiting chan struct{}
func main() {
fmt.Println("start")
defer func(t time.Time) {
fmt.Println(float64(time.Now().UnixNano() - t.UnixNano()) / 10e9)
}(time.Now())
lock = &sync.RWMutex{}
// 客户A查看存款
go func() {
getDeposition("A")
waiting <- struct {}{}
}()
// 客户B查看存款
go func() {
getDeposition("B")
waiting <- struct {}{}
}()
for i:=0; i < 100000000; i++{
Deposit(1)
}
<-waiting // 等待A和B两个goroutine结束
<-waiting
}
func Deposit(amount int) {
lock.Lock()
deposition += amount
lock.Unlock()
}
func Search() int{
lock.RLock()
defer lock.RUnlock()
return deposition
}
// 查询10万次存款
func getDeposition(user string) {
for i := 0; i < 100000 ; i++ {
fmt.Printf("%s : %d : %d\n", user, i, Search())
}
}
需要注意2点:
1.一共有3个goroutine,存款发生于main goroutine,查询存款发生在A和B两个goroutine(你可以理解为A和B这对夫妇在地球上的两个不同地区国家同时查询存款)。由于使用了读写锁,因此A和main之间的deposition操作是串行的,B和main也是串行的,但A和B(读读)是并发的。
2.存款操作循环了1亿次操作,而A和B的查询循环了10万次,但是你会发现main中的1亿次循环完毕之后,A和B中的循环还只循环了不到1万次。这是因为A和B中的循环是做的打印操作,输出到屏幕是一个硬件IO操作,因此速度会远远慢于单纯的加减运算(内存操作)。所以不要觉得发生了什么异常现象。
而且这个现象也告诉我们,在生产环境中尽量少打印,这样能很大程度提升运行效率(有一次我写了一个并发程序为了调试在很多地方做了打印了,之后又把打印都注释掉了,发现前者要运行几分钟,后者只要几秒就运行完了)。
Sync.Once懒加载
考虑一种情况,在高并发的情况下(例如秒杀),获取一个商品的列表页,而商品的信息是放在redis缓存中的。但是当缓存中的商品信息有效时间失效的时候,就会又大量高并发请求打在db上,造成磁盘io压力过大,数据库可能会崩。我们可以用下面的代码模拟这一过程:
var goods map[string]float32
func main() {
for i :=0 ; i < 1000; i++ { // 1000个客户并发请求商品
go func(){fmt.Printf("%v\n", getGoods())}()
}
time.Sleep(10 * time.Second)
}
func getGoodsFromDb() map[string]float32{
fmt.Println("初次加载商品")
time.Sleep(1 * time.Second / 10)
g := map[string]float32{
"goods1": 10,
"goods2": 20,
"goods3":30,
}
return g
}
func getGoods() map[string]float32{
if goods == nil{
goods = getGoodsFromDb()
}
return goods
}
这段程序的运行结果是打印了998次“初次加载商品”,说明缓存中数据为空时,1000次请求又998次都查询mysql了。
这个时候我们想出来的办法是给这个从数据库取数据的过程加一个锁,仅让一个客户端从数据库中取数据并将数据缓存到redis,而这个过程中其他客户端都无法获取锁因而被阻塞。当redis数据生成之后他们才能获取锁,此时他们发现redis里已经有数据了,就会去查redis而不会去查mysql。如下所示:
var goods map[string]float32
var lock *sync.Mutex = &sync.Mutex{}
// ...略过重复代码
func getGoods() map[string]float32{
lock.Lock()
defer lock.Unlock()
if goods == nil{
goods = getGoodsFromDb()
}
return goods
}
然而go为这种情景提供了一种方便Once方法,我们可以写为这样:
var goods map[string]float32
var once *sync.Once = &sync.Once{}
// 省略重复代码
func getGoodsFromDb(){ // 此时getGoodsFromDb要变成一个没有返回值的函数,在内部就要为goods赋值而不是使用一个临时变量再返回
fmt.Println("初次加载商品")
time.Sleep(1 * time.Second / 10)
goods = map[string]float32{
"goods1": 10,
"goods2": 20,
"goods3":30,
}
}
func getGoods() map[string]float32{
once.Do(getGoodsFromDb)
return goods
}
Sync.Once的作用就是保证初始化操作在并发下是串行的安全的。
Do方法需要传入一个函数这个函数的作用一般是初始化,在并发多个goroutine执行到Do的时候,只有第一个goroutine会执行getGoodsFromDb方法,而其他goroutine会被Do阻塞,当第一个协程执行完Do中的回调函数之后,其他协程就会被唤醒并且不再去执行Do中的回调函数,直接往下走执行。
Once方法的原理:其内部使用一个bool值表示是否进行过初始化,在执行初始化操作时用一个锁(默认是互斥锁,当然你也可以用别的锁)来保护这个操作的全过程。并且在初始化完成后将bool变为true。
源码如下:
func (o *Once) Do(f func()) {
if atomic.LoadUint32(&o.done) == 0 { // 判断这个bool是否为0,如果为0则调用f这个回调,也就是初始化函数
o.doSlow(f)
}
}
func (o *Once) doSlow(f func()) {
o.m.Lock()
defer o.m.Unlock()
if o.done == 0 {
defer atomic.StoreUint32(&o.done, 1) // 初始化操作f()完成后,将bool修改为1
f()
}
}
源码的做法其性能是优于我上面自己使用互斥锁保护初始化操作的例子的,因为源码用了两层if判断o.done是否为0,好处是当从redis中有数据之后,后面的客户端获取数据时都无需经过mutex的加锁和释放锁的过程。而我自己的例子则仅仅只用了1层判断,即使redis已经有了数据,之后每次从redis中取数据时还是得先经过加锁判断goods是否为nil的过程,这样就会损耗一部分性能。
示例:并发的非阻塞缓存
最后我们编写一个并发的不重复非阻塞缓存来结合使用基于channel和共享变量的并发。
这个并发的缓存我们可以将其理解为是一个简化CDN服务器,其作用是当多个用户并发的请求一系列url的时候,如果这个url不存在于缓存中则先原服务器的资源并缓存,如果url存在则直接从缓存中返回,而无需重复的请求目标网址,通过这种方式即减轻了目标服务器的负担,也加速了用户得到响应的速度。
这是第一版的缓存系统代码:
// request/request.go
package request
import (
"fmt"
"io/ioutil"
"net/http"
"os"
"path/filepath"
"strings"
)
func GetUrl(url string) (cont interface{}, err error){
resp, err := http.Get(url)
if err != nil{
return nil, err
}
defer resp.Body.Close()
cont, err = ioutil.ReadAll(resp.Body)
return cont, err
}
// 写入文件
func Save(url, dir string, cont []byte) {
if strings.Contains(url, "php"){
return
}
fn := strings.TrimRight(strings.TrimRight(url, "/"), ".html")
fn = filepath.Base(fn) + ".html"
dir = strings.TrimRight(dir, "/") + "/"
fp := dir + fn
_,err := os.Stat(dir)
if err != nil && os.IsNotExist(err){
os.Mkdir(dir, 0777)
}
f, err := os.OpenFile(fp, os.O_CREATE|os.O_WRITE), 0777)
if err != nil {
fmt.Printf("写入文件 %s 失败, 原因: %s \n", fp, err)
}
defer f.Close()
f.Write([]byte(cont))
}
// memo/memory.go
package memo
type Memory struct { // 缓存类
f Func // 当缓存仓库没有key对应的数据时就会调用这个f方法从数据源获取数据
results map[string]result // 缓存仓库
}
// Func类型是一个函数类型,这种函数是一种能从数据源获取数据并存到Memory的函数
type Func func(key string) (value interface{}, err error)
type result struct{ // 某个key对应的value
value interface{} // value可以是任何类型
err error
}
// 实例化一个Memory对象
func New(getFromSource Func) *Memory{
return &Memory{
f: getFromSource,
results: map[string]result{},
}
}
// 根据一个key, 返回一个result
func (m *Memory) Get(key string) (value interface{}, err error){
result, ok := m.results[key]
if !ok { // 如果缓存仓库中不存在key数据,则从数据源获取
result.value, result.err = m.f(key)
m.results[key] = result // 将内容写入缓存
}
return result.value, result.err
}
// main.go
var waiting *sync.WaitGroup = &sync.WaitGroup{}
func main(){
st := time.Now()
urlsToRequest := []string{"http://www.fx112.com/", "http://www.fx112.com/article", "http://www.fx112.com/rili"} // 一共两百多个url(有部分url重复),这里省略掉了
memo := memo2.New(request.GetUrl)
dir := "./html"
for _, url := range(urlsToRequest){
waiting.Add(1)
go func(url string) {
defer waiting.Done()
cont, err := memo.Get(url)
if err != nil {
log.Printf("爬取url %s 失败:", url, err)
return
}
// 单独开新的goroutine完成文件的写入
waiting.Add(1)
go func(url string, cont interface{}){
request.Save(url, dir, cont.([]byte)) // 通过断言将 interface{}转为[]byte类型
waiting.Done()
}(url, cont)
}(url)
}
waiting.Wait()
fmt.Printf("耗时:%s", time.Since(st))
}
这个例子的关键时memory.go,这是缓存系统的核心代码,而main.go从缓存系统中获取url的html内容。
这个例子还有很多要改进的地方,首先main.go并发的获取了两百多个url的html内容,会先从缓存 m.result 中取,如果没有才发网络请求获取。
上面的代码在单个goroutine跑的时候是没问题的。但是并发的情况下就会出现我们在介绍sync.One这一章节提出过的问题:由于在判断缓存是否为空的时候没有加锁,如果这并发的goroutine A和goroutine B他们请求的是相同的url时,就会发生重复请求网页的情况。
为此我们可以通过上锁来解决:
下面是第一种锁法:
func (m *Memory) Get(key string) (value interface{}, err error){
m.lock.Lock()
defer m.lock.Unlock()
result, ok := m.results[key]
if !ok {
result.value, result.err = m.f(key)
m.results[key] = result
}
return result.value, result.err
}
我们不可以这样锁,这样把判断和爬取的过程都锁住的话,那么并发的goroutine请求url操作就变成了串行的了。
我们看看第二种锁法:
func (m *Memory) Get(key string) (value interface{}, err error){
m.lock.Lock()
result, ok := m.results[key]
m.lock.Unlock()
if !ok {
result.value, result.err = m.f(key)
m.lock.Lock()
m.results[key] = result
m.lock.Unlock()
}
return result.value, result.err
}
这种锁法让网络请求是并发的而多个goroutine间查看缓存和写入缓存的行为是串行的。但是还是没有解决最初的重复请求的问题。
我们希望的是在某个url的缓存为空的时候,多个请求该相同url的goroutine是串行的,第一个拿到锁的url会发起网络请求而后面其他的(请求相同的url的)goroutine会阻塞,并在第一个先拿到锁的goroutine请求完毕后解除他们的阻塞然后直接从缓存中拿。但是在此过程中,爬取其他url的goroutine不能被阻塞,否则所有goroutine就是串行的了。
为此,我们可以设计一个类似于python中future对象的东西,这个对象可以让协程不受阻塞的拿到想要的结果对象,但是这个对象中没有返回值,协程需要阻塞到future对象中设置了返回值才被唤醒。在这个例子中,思路是让第一个拿到锁的goroutine去请求url并在请求之前将result的状态设为有缓存只不过缓存的内容为零值,其他请求相同url的goroutine就能不受阻塞的直接拿到result,但是这个result还没有他们想要的html,因此需要等待第一个goroutine请求完毕并发送一个信号给这些其他的goroutine才能唤醒他们。
所以第三种上锁的方式如下,需要配合一个channel,而且每个url都有一个这样专属的channel才能保证请求url A的channel不会阻塞到请求url B的goroutine:
type Memory struct { // 缓存类
f Func
cache map[string]*entry // 缓存仓库cache,这个仓库中存放多个entry对象,entry包含我们想获取的结果res和一个res对应的channel
repeat_lock *sync.Mutex // 用于并发时,防止重复请求相同url的锁
}
type Func func(key string) (value interface{}, err error)
type entry struct{
res *result
waiting_req chan struct{}
}
type result struct{ // 某个key对应的value
value interface{} // value可以是任何类型
err error
}
// 实例化一个Memory对象
func New(getFromSource Func) *Memory{
return &Memory{
f: getFromSource,
cache: map[string]*entry{},
repeat_lock: &sync.Mutex{},
}
}
// 根据一个key, 返回一个result
func (m *Memory) Get(key string) (value interface{}, err error){
m.repeat_lock.Lock()
ety, ok := m.cache[key]
if !ok { // 如果缓存仓库中不存在key数据,则从数据源获取
ety = &entry{
res: &result{},
waiting_req: make(chan struct{}), // 必须有这句哦,否则会隐式的为waiting_req成员设置一个零值channel。对零值channel接收会马上返回不会阻塞的
}
m.cache[key] = ety // 第一个拿到锁并且检测到没有缓存的goroutine会在请求url前设置一个零值缓存,这样后面请求相同url的goroutine就会走到else的代码块中
m.repeat_lock.Unlock()
// 请求的过程会阻塞几百毫秒,此时其他请求相同的url的协程也会被else代码块的channel阻塞
m.cache[key].res.value, m.cache[key].res.err = m.f(key)
close(ety.waiting_req) // 广播给其他请求相同url的goroutine已结束请求
}else{
m.repeat_lock.Unlock()
<-ety.waiting_req // 走到这里会分两种情况:1.url已经请求过了,此时waiting_req在很早之前就已经被关闭了,因此不会阻塞;2.url正在被请求,waiting_req阻塞,直到请求结束,waiting_req被关闭,阻塞解除
}
return ety.res.value, ety.res.err
}
学过python的并发机制的同学就知道这里的entry类型很像python的future对象,它能够延迟结果res的设置。并发的goroutine能够马上拿到entry,但是想要拿到entry中的res就必须等待res的值被任务完成的goroutine填充才行,而且close广播在唤醒等待res被填充的goroutine中起到重要作用。
除了这个方法,作者还提供了单纯使用channel而没有用锁的方式来做:
// Func、result和entry的声明和之前保持一致:
// Func is the type of the function to memoize.
type Func func(key string) (interface{}, error)
// A result is the result of calling a Func.
type result struct {
value interface{}
err error
}
type entry struct {
res result
ready chan struct{} // closed when res is ready
}
// request类型包含一个response的channel用来接收请求的响应
type request struct {
key string
response chan<- result // the client wants a single result
}
// 此外,Memo(就是原来的Memory类型)只有一个requests成员channel,这个requests channel用来呈放request类型的请求对象
type Memo struct{ requests chan request }
// 初始化Memo的时候会调用server方法,这个方法会不停的从request获取请求的url和本次请求相关的response channel,然后发起网络请求(或者缓存中已经有值就不发起请求而是直接返回缓存值)
func New(f Func) *Memo {
memo := &Memo{requests: make(chan request)}
go memo.server(f)
return memo
}
// Get方法会向server方法(或者说server goroutine)发送一个request请求,并等待server返回response响应。
func (memo *Memo) Get(key string) (interface{}, error) {
response := make(chan result)
memo.requests <- request{key, response}
res := <-response
return res.value, res.err
}
func (memo *Memo) Close() { close(memo.requests) }
func (memo *Memo) server(f Func) {
cache := make(map[string]*entry)
for req := range memo.requests {
e := cache[req.key]
if e == nil {
// This is the first request for this key.
e = &entry{ready: make(chan struct{})}
cache[req.key] = e
go e.call(f, req.key) // call f(key)
}
go e.deliver(req.response)
}
}
func (e *entry) call(f Func, key string) {
// Evaluate the function.
e.res.value, e.res.err = f(key)
// Broadcast the ready condition.
close(e.ready)
}
func (e *entry) deliver(response chan<- result) {
// Wait for the ready condition.
<-e.ready
// Send the result to the client.
response <- e.res
}
相比于第一种完美解决方案,这种纯用channel的解决方案显得逻辑太过复杂,但是这个例子告诉了我们channel可以替代几乎所有的并发同步机制(尽管在这种场景下用channel是不合适的,太复杂)。
那么到此,关于go并发的入门知识已经节本介绍完毕,但是go的并发还有很多奥秘和技巧以及原理性的内容诸如goroutine协程与线程之间的区别与联系等,这些更深的知识本人会在写完“Go入门系列”的文章之后再出一个“深入go并发机制”系列文章来说明。