更多优质内容
请关注公众号

Go并发编程系列(八)互斥锁, 读写锁, 条件变量, Waitgroup, Once, 临时对象池Pool和原子操作-阿沛IT博客

正文内容

Go并发编程系列(八)互斥锁, 读写锁, 条件变量, Waitgroup, Once, 临时对象池Pool和原子操作

栏目:Go语言 系列:Go并发编程系列 发布时间:2021-02-06 09:34 浏览量:3032

一、互斥锁和读写锁


关于锁相信大家都很熟悉了,因此这里不再对锁进行过多的描述,只是简单的带过一下。如果读者们想了解互斥锁和读写锁的基本使用和例子,可以看看这篇文章:

《Go入门系列(十七) go并发之基于共享变量的并发》


互斥锁 sync.Mutex

sync.Mutex 类型只有两个公开的指针方法——Lock Unlock 。顾名思义,前者用于锁定当前的互斥量,而后者则用于对当前的互斥量进行解锁。

sync.Mutex 类型的零值表示未被锁定的互斥量。也就是说,它是一个开箱即用的工具,我们只需对它进行简单声明,就像这样:

var mutex sync.Mutex


基本用法

var mutex sync.Mutex

func write() {
    mutex.Lock()
    defer mutex.Unlock()
    // 省略若干代码
}

使用defer解锁可以保证在函数结束时无论有没有发生异常,都能够正确的解锁以避免死锁的发生。

当对一个未锁定的互斥锁进行解锁操作时,就会引发一个panic


读写锁 sync.RWMutex

读写锁顾名思义就是读锁和写锁两把锁。读读是可以并发的,而写写和读写是会阻塞的只能串行的。

RWMutex4个常用的方法:

func (*RWMutex) Lock()
func (*RWMutex) Unlock()
func (*RWMutex) RLock()
func (*RWMutex) RUnlock()


他们分别是写锁的加锁与解锁操作,读锁的加锁和解锁操作。


写锁的解锁操作可以唤醒所有读锁和写锁的阻塞,而读锁的解锁操作只会唤醒写锁的阻塞(因为读锁不会阻塞读锁)

如果没有加任何锁的情况下调用Unlock会报错,但是调用RUnlock不会。


接下来我们看一个互斥锁和读写锁的完整示例,并发读写文件。

我们知道Go提供的文件操作对象os.File的读写操作是不会保证并发读写安全的,因此并发读写文件需要我们手动加锁。

下面我们将实现一个自动义的文件IO的类型,其规则如下:要求每次读取或写入指定长度的数据块,如果写入的数据块的大小超过了指定长度,则超出的长度不会被写入到文件。而且要求读写并发的安全。并且这个文件操作类还能返回当前读写的状况,即读取到第几个数据块和写入了多少个数据块。读和写的操作是分离的(因此应该会有读指针和写指针这两个指针)


代码如下:

// 定义一个文件类的接口进行规范
type DataFile interface {
	// rsn:当前读取到第几个数据块
	Read() (d []byte, rsn int64, err error)

	// wsn: 当前写入第几个数据块
	Write(d []byte) (wsn int64, err error)

	// 获取当前读取到第几个数据块
	Rsn() (rsn int64)

	// 同上
	Wsn() (wsn int64)

	// 获取数据块长度
	DataLen() (leng int64)

	// 关闭文件标识
	Close()
}

// Data是数据块
//type Data []byte

// 实现DataFile
type MyDataFile struct{
	f *os.File		// 文件标识
	fLock sync.RWMutex		// 读写锁用于保证文件并发读写的安全
	roffset int64
	woffset int64		// 当前读偏移量和写偏移量,相当于读指针和写指针
	rLock sync.Mutex	// 并发修改roffset时的互斥锁
	wLock sync.Mutex	// 同上
	dataLen	uint32	// 规定的数据块长度,该程序要在创建MyDataFile实例的时候就要设定好
}

func (df *MyDataFile) Read() (d []byte, rsn int64, err error){
	d = make([]byte, df.dataLen)		// 创建一个指定长度的数据块

	df.rLock.Lock()
	offset := df.roffset	// 获取当前读偏移量
	df.roffset += int64(df.dataLen)		// 修改读偏移量
	df.rLock.Unlock()

	// 开始读取文件,读取一个数据块
	rsn = offset / int64(df.dataLen)
	df.fLock.RLock()
	defer df.fLock.RUnlock()
	_, err = df.f.ReadAt(d, offset)		// ReadAt方法,指从offset位开始读,读d指定长度的数据
	return
}

func (df *MyDataFile) Write(d []byte) (wsn int64, err error){
	df.wLock.Lock()
	offset := df.woffset	// 获取当前写偏移量
	df.woffset = offset + int64(df.dataLen)		// 修改写偏移量
	df.wLock.Unlock()

	// 开始写入数据块
	wsn = offset / int64(df.dataLen)
	if len(d) > int(df.dataLen) {		// 如果d的长度超过了规定数据块长度则截掉超出的部分
		d = d[:df.dataLen]
	}
	df.fLock.RLock()
	defer df.fLock.RUnlock()
	_, err = df.f.WriteAt(d, offset)		// WriteAt方法同ReadAt
	return
}

func (df *MyDataFile) Close() (err error){
	err = df.f.Close()
	return err
}

func (df *MyDataFile) Rsn() (rsn int64){
	df.rLock.Lock()
	defer df.rLock.Unlock()
	return df.roffset / int64(df.dataLen)
}

func (df *MyDataFile) Wsn() (wsn int64){
	df.wLock.Lock()
	defer df.wLock.Unlock()
	return df.woffset / int64(df.dataLen)
}

func (df *MyDataFile) DataLen() (leng uint32){
	return df.dataLen
}

// 初始化 MyDataFile 的函数
func NewMyDataFile(path string, dataLen uint32) (df *MyDataFile, err error){
	if dataLen <= 0 {
		err = fmt.Errorf("The length of data block must larger than zero")
		return nil, err
	}
	f, err := os.Create(path)
	if err != nil{
		return nil, err
	}

	return &MyDataFile{f:f, dataLen:dataLen}, nil
}


然而这个例子中的Read方法有问题,问题在于如果有3goroutine并发地执行某个*myDataFile 类型值的Read 方法,并有2goroutine并发地执行该值的Write 方法。通过前3goroutine的运行,数据文件中的数据块被依次读取出来。但是,由于进行写操作的goroutine比进行读操作的goroutine少,因此过不了多久,读偏移量roffset 的值就会等于甚至大于写偏移量woffset 的值。也就是说,读操作很快就会没有数据可读了。这种情况会使上面的df.f.ReadAt 方法返回的第二个结果值为io.EOF。这样的话就会造成调用方会得到读取出错的数据块的序列号,但却无法再次尝试读取这个数据块(也就是漏读了这个数据块)。然后,继续读下一个数据块。


为了避免这个问题,我们在遇到io.EOF错误时应该重复从offset读取指定数据块长度的数据,当然如果所有内容都写入完毕了,读取遇到EOF就无需重复再读了,此时写入的一方需要主动Close掉文件标识,Read方法自然会自动退出。

修改后的Read如下

func (df *myDataFile) Read() (d []byte, rsn int64, err error) {
    // 读取并更新读偏移量
    // 省略若干代码

    // 读取一个数据块
    rsn = offset / int64(df.dataLen)
    for {
        df.fLock.RLock()
        _, err = df.f.ReadAt(d, offset)
        if err != nil {
            if err == io.EOF {
                df.fLock.RUnlock()
                continue
            }
            df.fLock.RUnlock()
            return
        }
        df.fLock.RUnlock()
        return
    }
}


虽然避免了上面说的问题,但是代码变得非常的不优雅,因为代码中分别对每种if情况都重复写了RUnlock的解锁代码。


最后提一点:无论是互斥锁还是读写锁,他们都实现了 sync.Locker这个接口。



二、条件变量

有关条件变量的概念,原理和适用场景可以参考这篇文章

《Python多线程和多进程(三) 线程同步之条件变量》


这里不再赘述,而是重点介绍在go中如何适用条件变量。


Go标准库中的sync.Cond 类型代表了条件变量。与互斥锁和读写锁不同,简单的var声明无法创建的条件变量不能直接用,需要用sync.NewCond 函数对其初始化。

这里不再赘述,而是重点介绍在go中如何使用条件变量。

Go标准库中的sync.Cond 类型代表了条件变量。与互斥锁和读写锁不同,简单的var声明无法创建的条件变量不能直接用,需要用sync.NewCond 函数对其初始化。

func NewCond(l Locker) *Cond


条件变量总要与互斥量组合使用。sync.NewCond 函数的唯一参数是sync.Locker 类型的,而具体的参数值既可以是一个互斥锁,也可以是一个读写锁。sync.NewCond 函数在被调用之后,会返回一个*sync.Cond 类型的结果值,我们可以调用该值拥有的几个方法来操纵这个条件变量。主要的3个方法是Wait Signal Broadcast 。它们分别代表了等待通知、单次通知和广播通知的操作。

Wait 方法会自动地对与该条件变量关联的那个锁进行解锁,并且使它所在的goroutine阻塞。一旦接收到通知,该方法所在的goroutine就会被唤醒,并且该方法会立即尝试锁定该锁。方法Signal Broadcast 的作用都是发送通知,以唤醒正在为此阻塞的goroutine。不同的是,前者的目标只有一个,而后者的目标则是所有。


下面我们用条件变量优化一下上面的文件读写的例子,思路就是当读操作读到EOF时使用条件变量Wait方法进行等待,当有写操作发生的时候则用Signal方法对读操作唤醒。

func (df *MyDataFile) Read() (d []byte, rsn int64, err error){
	// 省略重复的代码
    
	// 开始读取文件,读取一个数据块
	rsn = offset / int64(df.dataLen)
	df.fLock.RLock()
	defer df.fLock.RUnlock()
	for{
		_, err = df.f.ReadAt(d, offset)		// ReadAt方法,指从offset位开始读,读d指定长度的数据
		if err != nil{
			if err == io.EOF{
				df.cond.Wait()
				continue			// 进入下一次循环,这是为了重新判断条件
			}
			// 如果不是EOF错误则直接返回
			return nil, 0, err
		}
		return
	}
}

func (df *MyDataFile) Write(d []byte) (wsn int64, err error){
	// 省略掉重复的代码

	// 开始写入数据块
	wsn = offset / int64(df.dataLen)
	if len(d) > int(df.dataLen) {		// 如果d的长度超过了规定数据块长度则截掉超出的部分
		d = d[:df.dataLen]
	}
	df.fLock.RLock()
	defer df.fLock.RUnlock()
	_, err = df.f.WriteAt(d, offset)		// WriteAt方法同ReadAt
	df.cond.Signal()
	return
}

func NewMyDataFile(path string, dataLen uint32) (df *MyDataFile, err error){
	if dataLen <= 0 {
		err = fmt.Errorf("The length of data block must larger than zero")
		return nil, err
	}
	f, err := os.Create(path)
	if err != nil{
		return nil, err
	}

	cond := sync.NewCond(df.fLock.RLocker())

	return &MyDataFile{f:f, dataLen:dataLen, cond:cond}, nil
}

需要注意的是,使用条件变量的时候也不能去掉Read中的for循环,因为在Wait被唤醒后需要重复校验文件是否读到尽头的条件(防止刚写入的数据块已经被其他调用Readgoroutine读走了)。整个for循环是被读写锁df.fLock.RLock()给包住的。


NewMyDataFile中,我在NewCond中传入了一个df.fLock.RLocker()而不是传入df.fLock本身,原因是df.fLock中含有读锁和写锁两把锁。如果传入的是df.fLock,条件标量调用Wait时,其内部会调用fLock.Unlock方法解锁。但是注意,fLock.Unlock是写锁的解锁操作。而在本例中我们要的是配合读锁的条件变量。

RLocker()会返回df.fLock的读锁,返回的类型是sync.Locker类型。而且返回的这个读锁调用Lock()Unlock()就相当于df.fLock调用RLockRUnlock()。因此RLocker()方法相当于是做了一次适配,把读锁的RLockRUnlock()适配成Lock()Unlock()




三、原子操作

原子操作即执行过程不能被中断的操作。在针对某个值的原子操作执行过程当中,CPU绝不会再去执行其他针对该值的操作。Go语言提供的原子操作由标准库代码包sync/atomic提供,我可以通过调用这些函数对几种简单类型的值执行原子操作。这些类型包括6种:int32 int64 uint32 uint64 uintptr unsafe.Pointer(这个限制好像有点大,因为只能对数字进行原子操作 。这些函数提供的原子操作共有5种:增或减、比较并交换、载入、存储和交换。

增或减

增或减的原子操作(以下简称“原子增/减操作”)的函数名称都以“Add ”为前缀.。如果想原子地把一个int32 类型的变量i32 的值增大3,可以这样做。

newi32 := atomic.AddInt32(&i32, 3)

第一个参数值必须是指针类型的值,是因为该函数需要获得被操作值在内存中的存放位置,以便施加特殊的CPU指令。也就是说,对于不能被取址的数值是无法进行原子操作的。函数atomic.AddInt32 在执行结束时,会返回经过原子操作后的新值。不过,这里无需把这个新值再赋给原先的变量i32 ,因为它的值已经在atomic.AddInt32 函数返回之前被原子地修改了。与该函数类似的还有atomic.AddInt64 atomic.AddUint32 atomic.AddUint64 atomic.AddUintptr 这些函数,它们也可以用于原子地增/减对应类型的值。例如,如果要原子地将int64 类型的变量i64 的值减小3,可以这样

var i64 int64

atomic.AddInt64(&i64, -3)

相比于加锁,原子操作的性能更好,但是原子操作只能做一些简单的操作例如加减,交换之类的操作。而使用锁其临界区内可以做任何事情。


下面是一段比较互斥锁和原子增减的性能的程序:

var x int64
func Demo1(){
	defer func(st time.Time){
		fmt.Printf("使用锁花费时间:%s \n", time.Since(st))
	}(time.Now())

	x = 0		// 把x重置
	waiting := make(chan struct{})
	var lock sync.Mutex

	go func() {
		for i:=0;i<100000000;i++{
			lock.Lock()
			x--
			lock.Unlock()
		}
		waiting <- struct{}{}
	}()

	for i:=0;i<100000000;i++{
		lock.Lock()
		x++
		lock.Unlock()
	}
	<-waiting
	fmt.Printf("使用锁得到x: %d \n", x)
}

func Demo2(){
	defer func(st time.Time){
		fmt.Printf("原子操作花费时间:%s \n", time.Since(st))
	}(time.Now())

	x = 0		// 把x重置
	waiting := make(chan struct{})

	go func() {
		for i:=0;i<100000000;i++{
			// 原子操作
			atomic.AddInt64(&x, -1)
		}
		waiting <- struct{}{}
	}()

	for i:=0;i<100000000;i++{
		// 原子操作
		atomic.AddInt64(&x, 1)
	}
	<-waiting
	fmt.Printf("原子操作得到x: %d \n", x)
}


比较并交换

比较并交换即Compare And Swap”,简称CAS。在sync/atomic包中,这类原子操作由名称以“CompareAndSwap ”为前缀的若干函数代表。如

func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)

CompareAndSwapInt32 函数接受3个参数。第一个参数的值是指向被操作值的指针值,该值的类型为*int32 。而后两个参数的类型都是int32 ,并且它们的值分别代表被操作值的旧值和新值。CompareAndSwapInt32 函数在被调用之后,会先判断参数addr 指向的被操作值与参数old 的值是否相等。仅当此判断得到肯定的结果之后,该函数才会用参数new 代表的新值替换旧值;否则,后面的替换操作就会被忽略。

CompareAndSwapInt32 函数的结果swapped 用来表示是否成功进行了值的替换操作。失败的原因总是由于addr的值已不与old 的值相等了


相比于之前讲到的互斥锁而言,CAS更类似于一种自旋锁,CAS一般会搭配for循环使用,如果addrold不相等就意味着addr这个数已经被其他goroutine改变,因此本goroutine会放弃对addr的修改,并再次进入下一次循环重新检验条件。这一过程是不会阻塞的,但我们直观的感受就是修改操作暂停了,直到检测到addr是没有变化的(即其他并发的goroutine没有正在修改addr)才对其进行修改,这个过程是自旋的,会消耗CPU,但是这一过程又是很短暂的,因此不会消耗很多CPU

总结就是,CAS操作的优势是可以在不创建互斥量和不形成临界区的情况下,完成并发安全的值修改和替换操作。这可以大大地减少同步对程序性能的损耗。当然,CAS操作也有劣势:在被操作值被频繁变更的情况下,CAS操作并不那么容易成功。有些时候,我们可能不得不利用for 循环来进行多次尝试。

var value int32
func addValue(delta int32) {
    for {
        v := value
        if atomic.CompareAndSwapInt32(&value, v, (v + delta)) {
            break
        }
    }
}



例子

func Demo3(){
	defer func(st time.Time){
		fmt.Printf("原子操作花费时间:%s \n", time.Since(st))
	}(time.Now())

	x = 0		// 把x重置
	waiting := make(chan struct{})

	go func() {
		for i:=0;i<100000000;i++{
			swap := false
			// 原子操作
			for !swap{
				tmp := x
				swap = atomic.CompareAndSwapInt64(&x, tmp, tmp-1)
			}
		}
		waiting <- struct{}{}
	}()

	for i:=0;i<100000000;i++{
		swap := false
		// 原子操作
		for !swap{
			tmp := x
			swap = atomic.CompareAndSwapInt64(&x, tmp, tmp+1)
		}
	}
	<-waiting
	fmt.Printf("原子操作得到x: %d \n", x)
}



原子值 atomic.Value

atomic.Value相当于一个容器,我们可以往里面存入一个任意类型的值(Store方法),也可以将这个值取出来(Load方法),而且这两个方法都是原子操作。下面是atomic.Value结构的源码:

// A Value provides an atomic load and store of a consistently typed value.
// The zero value for a Value returns nil from Load.
// Once Store has been called, a Value must not be copied.
//
// A Value must not be copied after first use.
type Value struct {
	v interface{}
}

Load Store ,前者会返回一个interface{} 类型。后者接受一个interface{} 类型的参数而没有任何返回结果。在未曾通过Store 方法向原子值实例存储值之前,它的Load 方法总会返回nil 


Store 方法有两个限制。第一,传入的参数不能为nil 。第二,传入的参数值必须与之前传入的值(如果有的话)的类型相同。也就是说,一旦原子值实例存储了某一个类型的值,那么它之后存储的值就都必须是该类型的。如果违反了任意一个限制,对该方法的调用都会引发一个panic


sync/atomic.Value 类型的变量一旦声明,其值就不应该被复制到它处。比如赋给其他变量、作为参数值传入函数、作为结果值从函数返回、作为元素值通过chan传递等都会造成值的复制,所以这类变量之上不应该实施这些操作。虽然这不会造成编译错误,但Go标准工具go vet 却会报告此类不正确(或者说有安全隐患)的用法。不过,sync/atomic.Value 类型的指针类型的变量却不存在这个问题。


例如

func main() {
    var countVal atomic.Value
    countVal.Store([]int{1, 3, 5, 7})
    anotherStore(countVal)
    fmt.Printf("The count value: %+v \n", countVal.Load())
}

func anotherStore(countVal atomic.Value) {
    countVal.Store([]int{2, 4, 6, 8})
}

这段代码的结果还是[1 3 5 7],因为anotherStore中对原子值的操作不会影响main中的原子值,他们是两个独立的原子值


对于sync包中的Mutex RWMutex Cond 类型,go vet 命令同样会检查此类复制问题,其原因也是相似的。一个比较彻底的解决方案是,避免直接使用它们,而使用它们的指针值。

下面我们实现一个并发安全的整型数组:

type ConcurrentArray interface {
	Set (index int, elem int) (err error)
	Get (index int) (elem int, err error)
}

type Arr struct {
	leng int
	val atomic.Value
}

func (arr *Arr) CheckIndex(index int) bool {
	return index < arr.leng
}

func (arr *Arr) Get(index int) (elem int, err error){
	if arr.CheckIndex(index) {
		return 0, fmt.Errorf("wrong index")
	}

	return arr.val.Load().([]int)[index], nil
}

func (arr *Arr) Set(index int, elem int) (err error){
	if arr.CheckIndex(index) {
		return fmt.Errorf("wrong index")
	}
	arrList := make([]int, arr.leng)
	copy(arrList, arr.val.Load().([]int))
	arrList[index] = elem
	arr.val.Store(arrList)
	return
}

func (arr *Arr) Len() int{
	return arr.leng
}

func NewConcurrentArray(leng int) *Arr{
	var arr Arr
	arr.leng = leng
	arr.val.Store(make([]int, leng))		// 初始化原子值里面的数组
	return &arr
}


这里有几个注意点:

1. 为什么Set要使用copyatomic.Value中的切片拷贝到一个临时变量arrList再修改?因为atomic.Value中的原子值是一个切片,切片是引用类型,如果我们直接这样:

arrList := arr.val.Load()

arrList[index] = element

由于arrListarr.val中的切片指向同一个底层数组,所以在执行第二句的时候,也会同时修改arr.val内部的切片值,但是这个修改操作不在原子操作保护范围内(因为不是通过调用Store来修改的),在并发的时候会导致数据被改乱。

copy则是重新复制一份新的底层数组,arrListarr.val中的切片就不是只想两个底层数组的了。

2. 虽然arr.val中存的是一个切片,意味着它可以扩容。但是我这里是将其当做是一个长度不可变的数组(限定了leng作为其长度)

3. 这个程序的缺点在于,每次修改切片中的1个元素都要将整个切片拷贝到arr.val中(即调用arr.val.Store(arrList)中),如果切片的长度很大那么这也是很浪费性能的。



最后我们尝试用原子操作来优化一下之前的文件操作的例子:

1. 我们在对读指针和写指针偏移量的修改是包在互斥锁里面的,如:

df.wLock.Lock()
offset := df.woffset	// 获取当前写偏移量
df.woffset = offset + int64(df.dataLen)		// 修改写偏移量
df.wLock.Unlock()


这里可以使用原子操作的比较和交换来替代

var offset int64
for{
    offset = df.woffset
    if atomic.CompareAndSwapInt64(&df.woffset, offset, offset + int64(df.dataLen)){
        break
    }
}



2. 在Wsn()Rsn()方法中

df.wLock.Lock()
defer df.wLock.Unlock()
return df.woffset / int64(df.dataLen)


用原子操作替代:

woffset := atomic.LoadInt64(&df.woffset)    // 原子操作中的载入
return woffset / int64(df.dataLen)


最后要说的是,原子操作比锁更加简单和高效。不过,由于原子操作自身的限制(几乎只能作用于整数),因此锁依然比原子操作常用且重要。

而之所以原子操作比锁更高效是因为:锁、条件变量等是操作系统层面上支持的,而原子操作则是直接在硬件层面支持的。



四、初始化 sync.Once

sync.Once的基本使用和介绍在我之前的文章也有介绍,希望详细了解sync.Once的朋友可以看看。

《Go入门系列(十七) go并发之基于共享变量的并发》

Once的使用场景一般用于在并发情况下做一次性的初始化工作(例如数据库连接或连接池的创建,更新失效的redis缓存,全局变量的延迟初始化等)。它能保证多个并发的goroutine进行执行初始化工作的时候只有其中任意一个goroutine去完成初始化工作,而其他goroutine就不用再次执行初始化。

初始化工作由sync.Once对象的Do方法完成。Do方法需要传入一个用于完成初始化工作的函数,这个函数afunc不能有参数和返回值。

once := &sync.Once{}

once.Do(afunc)


在执行到Do的时候,Do内部会马上调用afunc。关于Do的内部实现在上方链接中的文章有详细介绍,这里不再赘述,我们只要知道它内部也是用互斥锁,原子操作实现的,也是一个并发安全的操作即可。

除了第一个gouroutine执行的Do方法时会触发afunc之外,之后的其他goroutine执行到Do也不会去触发这个afunc。从而达到只执行一次初始化工作的目的。


五、Waitgroup

Waitgroup是一种并发安全的计数器,它可以用于阻塞main goroutine等待多个goroutine执行完毕后才退出,保证了所有任务都能在程序结束前完成。

sync.WaitGroup 是一个结构体类型,其内部包含初始值为0的一个计数器。该类型有3指针方法,即:Add Done Wait (这些方法都是并发安全的)。

Add 方法接受一个int 类型的值,并且也可以通过该方法增加或减少计数值(可以传负数)。如果计数器小于0就会引发panicDone 方法使其中的给定计数值减一,相当于Add(-1)。当计数器大于0的时候,Wait方法的调用可以阻塞住调用该方法的goroutine,当计数器变回为0Wait会被唤醒。


Waitgroup的使用场景和用途就是在不断生成goroutine和结束goroutine的高并发程序中等待所有goroutine都结束。这一点其实使用channel也能够做到。

如下代码所示

func main(){
	n := 3
	waiting := make(chan struct{}, n)		// 用于阻塞等待所有子goroutine结束的channel

	for i:=0; i<n; i++{
		go func(){
			time.Sleep(time.Second)
			waiting<- struct{}{}
		}()
	}

	for i:=0;i<n;i++{
		<-waiting
	}
}

但是channel的主要作用是进行消息传递和通信,把channel专门用于goroutine的阻塞等待会显得有点过重。原则上说,我们不应该把通道当作互斥锁或信号量来使用而应该把通道当做消息传递和通信的工具,这才是chan的真正用途。在这种场景下使用channel并没有体现出它的优势,反而会在代码易读性和性能方面打一些折扣。


下面我们看看用Waitgroup实现这个场景:

func main(){
	n := 3
	wg := &sync.WaitGroup{}

	wg.Add(n)
	for i:=0;i<n;i++{
		go func(index int){
			defer wg.Done()
			time.Sleep(time.Second)
			fmt.Printf("第 %d 个goroutine结束\n", index)
		}(i)
	}

	fmt.Println("等待子goroutine结束")
	wg.Wait()
	fmt.Println("结束")
}

下面是Waitgroup的一些注意点:
1.Add 方法的第一次调用,必须发生在调用该值的Done 之前(否则会panic),也发生在调用该值的Wait 方法之前(否则Wait根本不会阻塞,因为一开始计数器就是0)。
2.sync.WaitGroup 类型值是可以复用的。也就是说如果计数器变回0之后,Wait会被唤醒。只要计数器再次变为大于0,我们依旧可以再次调用Wait去进行新一轮的阻塞。
3.尽量只是用WaitGroup的指针而不是其本身(或者说这是必须的强制的,否则在你没意识到的地方发生了复制的行为,这个同步工具就会失效。)
4.一般来说,wait方法都是在main goroutine调用,目的是阻塞主协程以等待所有子协程运行完毕。而Add和Done方法的调用一般不会发生在同一个goroutine中,Add一般在子goroutine外调用,Done一般是在子goroutine内在运行结束时才调用(也就是说,Done几乎都会与defer结合使用)



六、临时对象池 Pool

sync.Pool是一种可以存放复用的资源(如数据库连接,网络连接等)的一个容器,并且这个容器是一个动态的可伸缩容量的容器。Pool对象有2个重要方法Get/Put和一个重要成员New,并且New成员的类型其实也是一个用于创建资源实例的函数。


下面是Pool的源码

type Pool struct {
	noCopy noCopy

	local     unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal
	localSize uintptr        // size of the local array

	victim     unsafe.Pointer // local from previous cycle
	victimSize uintptr        // size of victims array

	// New optionally specifies a function to generate
	// a value when Get would otherwise return nil.
	// It may not be changed concurrently with calls to Get.
	New func() interface{}
}


我们主要关注其中的New成员,New成员的类型是一个函数,之所以要将一个函数作为成员而不是Pool类型的方法是为了可以让用户自定义这个New方法。New方法中要做的事情很简单,就是生成一个资源实例,并将这个资源实例返回给调用方。


Get方法会返回池里面的一个资源实例,这里又分为两种情况:
A 如果池中没有资源实例,则调用Get方法会隐式的调用New方法先生成一个实例,再返回给调用方。
B 如果池中存贮着实例,则Get会直接获取这个实例并返回,就不会调用New

另外Get方法返回的是一个interface{}类型的值,因为New创建资源后返回的也是interface{}类型的值。而Get本质上取的是New生成的资源值。


Put方法需要传入一个资源实例,它会将这个资源实例存到(或者说放回到)Pool中以供Get下一次复用。另一方面,我们通常会使用Put为一个池预先存储(初始化)一定量的资源实例,才开始让调用方去使用这些实例。



需要注意的是
1.通过New和Get方法创建出来的资源实例是不会保存到Pool中的。只有将New和Get创建出来的资源对象通过Put方法才能存入到Pool中。


2.New,Get和Put的调用都是并发安全的。其中New是我们自定义的,这也就意味着New中的整个代码块其实相当于是上了一把锁,这把锁的临界区就是New的整个代码块。多个goroutine并发调用 New时是串行的。


3.和我们以前认知的池不一样的是,以前我们使用一个池(如线程池、连接池)都是先限定一个阈值以控制池中最多有多少个资源实例,如果池中的所有实例都被拿出池外使用的情况下还有其他调用方从池中获取资源实例时就会发生阻塞。而sync.Pool则没有限制能存储资源实例的阈值,当池中实例为空时再从这个Pool取出数据不会阻塞,而是会再生成一个新的资源实例,这也是Pool是可伸缩的含义。
当然,不论是go的sync.Pool还是以前我们使用过的池,他们的共同点是都可以复用池中的资源,节省重复创建和销毁资源带来的开销。


4.当触发GC垃圾回收的时候,池中的所有资源实例都会被清空。



下面我们看看Pool的例子:

var bufferNum int	// 缓冲区资源的个数

func main(){
	// 定义一个缓冲区池
	pool := &sync.Pool{
		New: func() interface{} {
			bufferNum++
			buffer := make([]byte, 4096)	// 每次New会创建一个4K的缓冲区
			return &buffer
		},
	}

	// 初始化4个缓冲区到池中
	pool.Put(pool.New())
	pool.Put(pool.New())
	pool.Put(pool.New())
	pool.Put(pool.New())

	// 并发的使用pool中的缓冲区做一些事情
	goro_num := 1024 * 100
	wg := &sync.WaitGroup{}
	wg.Add(goro_num)
	for i:=0; i<goro_num; i++{
		go func(i int){
			// fmt.Printf("第%d个goroutine正在运行中\n", i)
			defer wg.Done()
			buffer := pool.Get().(*[]byte)		// 这里用断言进行类型转换,原因是Get返回的资源实例是一个interface{}类型,意味着这个Get返回的资源的所有方法都被屏蔽
			defer pool.Put(buffer)		// 使用完缓冲区之后放回池中

			// 使用buffer做一些工作,这里省略
		}(i)
	}
	wg.Wait()

	fmt.Printf("共创建了 %d 个缓冲区", bufferNum)
}


在这个例子中我创建了1024*1024=1Mgoroutine,并且并发的从Pool取出buffer缓冲区。

输出结果如下:

共创建了 6 个缓冲区


假设我们不使用临时对象池对buffer进行复用,那么就会同时创建1024*10244K的缓冲区,等于在一瞬间占用了4G的内存。这对于系统内存是极大的负担。

使用了Pool进行复用之后,实际上一共只创建了6个缓冲区,只消耗了4K*6 = 24K的内存。


当然这个程序有点特殊,首先每个goroutinePool中拿了buffer后什么都没干就把buffer放回了Pool。这意味着在buffer的数量为6的情况下,放回Pool的速度完全跟得上新goroutine生成的速度,所以buffer才不会继续增加。如果创建goroutine的速度大于buffer放回的速度,那么buffer的数量肯定会不断增加。


需要注意的是,New返回的是切片的指针而不是切片,这也有利于节省开销,因为切片的底层类型其实也还是个结构体。


另外,在运行这个程序的时候,我发现我电脑的CPU和内存的占用都达到了99%。这是怎么回事?不是说只消耗了24K的内存吗,为什么我8G的内存和6个核的CPU都跑满了呢?

其实这不是Pool中的buffer占用了我8G的内存,而是创建了过多的goroutine占用了我8G的内存。因为系统每创建一个goroutine都需要为其分配一个空间栈,栈的生成是要占用内存的,而且每一个goroutine空间栈至少会占8K内存,而且goroutine的栈还会随着goroutine的运行发生可能的扩容。因此 8K *1024*1024 = 8G,丫的刚好把我的电脑内存消耗完。

而且这些goroutine是与多个内核线程关联的,而这些内核线程是平均分配给每个CPU去运行和调度的,一次性运行1Mgoroutine意味着每个CPU要对大量的goroutine微线程进行切换和操作,而且Pool的Put和Get操作还不是IO操作而是纯运算的操作。因此CPU也被跑满。


结论就是,虽然goroutine的创建和切换的成本远低于线程,但是无限制的创建也会造成比很大的开销。


接下来我们考虑一个问题,如果上面的例子中goroutine生成的速度大于buffer归还到池中的速度,那么buffer创建的数量会不断增大从而消耗大量内存。例如:

for i:=0; i<goro_num; i++{
    go func(i int){
        fmt.Printf("第%d个goroutine正在运行中\n", i)
        defer wg.Done()
        buffer := pool.Get().(*[]byte)		// 这里用断言进行类型转换,原因是Get返回的资源实例是一个interface{}类型,意味着这个Get返回的资源的所有方法都被屏蔽
        defer pool.Put(buffer)		// 使用完缓冲区之后放回池中

        // 使用buffer做一些工作,这里用Sleep模拟
        time.Sleep(time.Millisecond)
    }(i)
}

通过Sleep阻塞goroutine就可以减慢buffer归还的速度,因为此时buffer还在被goroutine使用中。此时我们运行的结果是:

共创建了 1943 个缓冲区


为了解决这个问题,我们有2种方案:

A 可以在做一个阈值限制,当生成的总buffer数量达到这个阈值的时候,就不在增加buffer。当Pool里面的buffer数量为0的时候,Get就会阻塞goroutine,直到Pool中的buffer数量大于0。但是如果真的要实现这样的需求,我们其实完全可以使用chan来代替Pool


B 限制goroutine的并发量(从而间接限制goroutine的创建速度)

goro_num := 1024 * 100
wg := &sync.WaitGroup{}
limit := make(chan struct{}, 50)
wg.Add(goro_num)
for i:=0; i<goro_num; i++{
    limit <- struct{}{}
    go func(i int){
        fmt.Printf("第%d个goroutine正在运行中\n", i)
        defer func(){
            wg.Done()
            <-limit
        }()
        buffer := pool.Get().(*[]byte)		// 这里用断言进行类型转换,原因是Get返回的资源实例是一个interface{}类型,意味着这个Get返回的资源的所有方法都被屏蔽
        defer pool.Put(buffer)		// 使用完缓冲区之后放回池中

        // 使用buffer做一些工作,这里省略
        time.Sleep(time.Millisecond)
    }(i)
}

这里我使用了一个含有50个缓冲空间的channel来限制goroutine的并发数量,这样一来同一时刻并发的goroutine就只有50个。结果就是Pool中从程序开始到结束大概总共创建了40~60个buffer切片,和limit的缓冲空间个数大致相同。





更多内容请关注微信公众号
zbpblog微信公众号

如果您需要转载,可以点击下方按钮可以进行复制粘贴;本站博客文章为原创,请转载时注明以下信息

张柏沛IT技术博客 > Go并发编程系列(八)互斥锁, 读写锁, 条件变量, Waitgroup, Once, 临时对象池Pool和原子操作

热门推荐
推荐新闻