使用select处理多个channel
以一个例子开始。这个例子模拟火箭倒计时发射,并且用户可以在标准输入流终止这个倒计时功能。
// 未加终止功能的火箭发射
func main() {
ticker := time.Tick(1 * time.Second) // 创建一个计时器,该计时器会返回一个只能接受数据的channel。并且每秒钟向这个channel发送一个时间戳
countdown(ticker, 5) // 5秒倒计时
lauch()
}
// 倒计时
func countdown(ticker <-chan time.Time, s int) { // 第二参是倒计时的秒数
for ; s > 0; s-- {
fmt.Println(s)
<-ticker
}
}
func lauch() {
fmt.Println("火箭发射!")
}
接下来为其添加终止功能,思路是开多一个goroutine,这个goroutine负责接收标准输入,无论接收到用户的输入是什么,这个goroutine都会终止整个程序。
更改后的代码如下:
func main() {
ticker := time.Tick(1 * time.Second) // 创建一个计时器,该计时器会返回一个只能接受数据的channel。并且每秒钟向这个channel发送一个时间戳
go stop()
countdown(ticker, 5) // 5秒倒计时(该函数会阻塞)
lauch()
}
// 倒计时
func countdown(ticker <-chan time.Time, s int) { // 第二参是倒计时的秒数
for ; s > 0; s-- {
fmt.Println(s)
<-ticker
}
}
// 中止功能
func stop() {
// 从标准输入接受一个值,该方法会阻塞直到有用户输入
os.Stdin.Read(make([]byte, 1)) // 接受单个字节
fmt.Println("发射中止")
// 我们不关心input的值,而是只要用户一有输入,并且回车,无论输入的是什么值都终止火箭发射
os.Exit(0)
}
func lauch() {
fmt.Println("火箭发射!")
}
正常来说一个协程只能接受或者发送一个channel(无缓存),因为如果要接收多个无缓存channel的话,某个channel肯定会阻塞这个协程让这个协程无法去接收(或发送)其他channel。
此时就可以使用select ,select可以在一个goroutine中监听多个channel的状态,并且选择可读或者可写的channel(没有被阻塞的channel)优先进行处理。
例如:
select {
case <-ch1:
// ...
case x := <-ch2:
// ...use x...
case ch3 <- y:
// ...
default:
// ...
}
这个select监听了ch1,ch2和ch3这3个channel,这3个channel中,谁最先进入可接收或可发送的非阻塞状态,select就会运行谁。假如ch2先有值可接收,那么就会先执行第二个case。
Go中select的原理其实是使用了操作系统的select多路复用器,将多个要监视的channel添加到了一个监视列表,并且通过轮询的方式获取channel的状态是否可读或者可写。操作系统会返回可读或者可写状态的channel给go。
如果在运行到select时,所有的case的channel都阻塞,则运行default的代码块。
如果在运行到select时,所有的case的channel都阻塞,而且没有default这一项,则select会阻塞直到有一个channel可读或可写。
如果运行到select时,有多个channel都处于可接收或可发送的状态,select会随机选择一个channel执行。
一个没有任何case的select语句写作select{},会永远地等待下去。
下面我们换成使用select处理多个channel的方式来实现上面火箭发射的程序。
func main() {
terminate := make(chan struct{})
ticker := time.Tick(1 * time.Second) // 创建一个计时器,该计时器会返回一个只能接受数据的channel。并且每秒钟向这个channel发送一个时间戳
go stop(terminate)
s := 5
for ; s > 0; s-- {
select {
case <-ticker:
fmt.Println(s)
case <-terminate:
fmt.Println("中止发射")
os.Exit(0)
}
}
lauch()
}
func stop(terminate chan struct{}){
// 接受标准输入(阻塞)
os.Stdin.Read(make([]byte, 1))
terminate <- struct{}{}
}
func lauch() {
fmt.Println("火箭发射!")
}
我们再看一个有趣的例子,这个例子会输出偶数
func main() {
c := make(chan int, 1)
for i:=0; i < 100; i++ {
select {
case x := <- c:
fmt.Println(x)
case c <- i:
}
}
}
产生只输出偶数是因为两个case会交替发生。
假如将c的容量改为大于1,那么输出就会是随机的。
如果c是一个无缓存的,那么就会永远阻塞(报deadlock异常)。
有时候我们不希望在接收一个无缓存的channel时发生阻塞,此时select + default也可以实现这个需求。
例如:
select {
case <-c:
// do something
default:
// do nothing
}
当执行到select的时候,如果c能接收则接收,c没有数据接收则走default从而避免阻塞。
我们还可以在这个select外加一个for,反复的从c接收,如果本次循环没有东西接收则不会阻塞,进入default,在下一次循环再尝试接收。反复地做这样的操作叫做“轮询channel”
channel的零值是nil。对一个nil的channel发送和接收操作会永远阻塞,在select语句中操作nil的channel永远都不会被select到。
这使得我们可以用nil来激活或者禁用case。
并发遍历目录
这里我们写一个遍历目录计算目录大小的程序。
先做一个普通的遍历目录计算大小的程序
func main() {
defer func(t time.Time) {
fmt.Println(float64(time.Now().UnixNano() - t.UnixNano()) / 10e9)
}(time.Now())
var totalSize int64
dirPath := os.Args[1]
walkDir(dirPath, &totalSize)
fmt.Println(totalSize)
}
func walkDir(dirPath string, size *int64) {
dirPath = strings.TrimRight(dirPath, "/") + "/"
fileInfos, err := ioutil.ReadDir(dirPath)
if err != nil{
fmt.Println(err)
return err
}
for _, fileInfo := range fileInfos {
if fileInfo.IsDir() {
walkDir(dirPath + fileInfo.Name(), size)
}else{
*size += fileInfo.Size()
}
}
}
接下来,我们希望能够每隔2秒打印当前遍历了多少个文件以及当前统计的文件大小。而且这个过程应该从walkDir函数中独立出来,否则耦合在一起会有点乱,因此我们每次统计出来一个文件的大小就通过channel传给main goroutine,由main goroutine统计总和打印。改进后的程序如下:
func main() {
curSize := new(int64)
curFileNum := new(int64)
dirPath := os.Args[1]
size := make(chan int64)
//ticker = time.Tick()
st := time.Now()
defer func(t time.Time) {
fmt.Println(float64(time.Now().UnixNano() - t.UnixNano()) / 10e9)
}(st)
go func() { // 要用go给walkDir开一个协程,因为里面用到了channel,如果不开协程就会阻塞主协程
walkDir(dirPath, size)
close(size) // 跑完walkDir后,关闭size这个channel,否则主协程会被 接收size 卡住
}()
go func(n, s *int64) { // 开一个协程定时打印当前遍历了多少文件和多少大小
for {
time.Sleep( 2 * time.Second)
fmt.Printf("当前已遍历 %d 个文件,大小为 %dKb \n", *n, *s / 1000)
}
}(curFileNum, curSize)
for s := range size{
*curFileNum += 1
*curSize += s
}
fmt.Println(*curSize)
}
func walkDir(dirPath string, size chan int64) (err error){
//defer func(dirPath string){
// fmt.Println(dirPath + "遍历完毕")
//}(dirPath)
dirPath = strings.TrimRight(dirPath, "/") + "/"
fileInfos, err := ioutil.ReadDir(dirPath)
if err != nil{
fmt.Println(err)
return err
}
for _, fileInfo := range fileInfos {
if fileInfo.IsDir() {
walkDir(dirPath + fileInfo.Name(), size)
}else{
size <- fileInfo.Size()
}
}
return err
}
这里一定要用指针传递数字。这样打印出来的才是同一个数字。
上面是我的做法,但是作者的实现更加优雅,他使用了select和定时器,这样一来就可以少开一个goroutine,做到在一个goroutine(main goroutine)中接收2个channel:
func main() {
var curSize, curFileNum int64
dirPath := os.Args[1]
size := make(chan int64)
ticker := time.Tick(2 * time.Second)
go func() { // 要用go给walkDir开一个协程,因为里面用到了channel,如果不开协程就会阻塞主协程
walkDir(dirPath, size)
close(size) // 跑完walkDir后,关闭size这个channel,否则主协程会被 接收size 卡住
}()
loop:
for {
select {
case s, ok := <-size:
if !ok {
break loop
}
curFileNum += 1
curSize += s
case <-ticker:
fmt.Printf("当前已遍历 %d 个文件,大小为 %dKb \n", curFileNum, curSize / 1000)
}
}
fmt.Println(curFileNum, curSize)
}
注意:
这里的break语句用到了标签break(可以跳出loop这个标签),这样可以同时终结select和for两个循环;如果没有用标签就break的话只会退出内层的select循环,而外层的for循环会使之进入下一轮select循环。
最后为了加快我们遍历目录的速度,我们可以在递归的时候开启新的协程来并发遍历,不过要做出一些修改,当然我们最好不要无限开启walkDir协程,而是有一定限制的开启,所以加了一个limitSpeed这个channel来限速:
package main
import (
"fmt"
"io/ioutil"
"os"
"strings"
"sync"
"time"
)
func main() {
var curSize, curFileNum int64
var wg sync.WaitGroup // 用一个WaitGroup来记录和等待所有的walkDir goroutine结束
dirPath := os.Args[1]
size := make(chan int64)
limitSpeed := make(chan struct{}, 100)
ticker := time.Tick(2 * time.Second)
wg.Add(1)
go walkDir(dirPath, size, limitSpeed, &wg)
go func(wg *sync.WaitGroup) { // 开了一个新协程等待walkDir协程结束,Wait被唤醒后关闭size这个channel从而解除main的阻塞
wg.Wait()
close(size)
}(&wg)
loop:
for {
select {
case s, ok := <-size:
if !ok {
break loop
}
curFileNum += 1
curSize += s
case <-ticker:
fmt.Printf("当前已遍历 %d 个文件,大小为 %dKb \n", curFileNum, curSize / 1000)
}
}
fmt.Println(curFileNum, curSize)
}
func walkDir(dirPath string, size chan int64, limitSpeed chan struct{}, wg *sync.WaitGroup) (err error){
defer wg.Done()
limitSpeed <- struct{}{}
dirPath = strings.TrimRight(dirPath, "/") + "/"
fileInfos, err := ioutil.ReadDir(dirPath)
<- limitSpeed
if err != nil{
fmt.Println(err)
return err
}
for _, fileInfo := range fileInfos {
if fileInfo.IsDir() {
wg.Add(1)
go walkDir(dirPath + fileInfo.Name(), size, limitSpeed, wg)
}else{
size <- fileInfo.Size()
}
}
return err
}
还是强调一点:将wg传入函数的时候一定要传指针,否则多个协程中操作的就不是用一个wg因而达不到同步的效果。
limitSpeed的限速也可以放在 go walkDir(…)的前后
还强调一点:要善用select,凡是想在一个协程中接收多个可阻塞channel的消息的时候就可以使用select来解决和优化。如果不用select的情况下想接收多个channel的值就只能开多个goroutine。
并发的退出(广播机制)
这一节我们探讨一下如果并发的退出,所谓的并发的退出就是发出一个指令就能够停止和关闭所有正在运行的某种goroutine。
Go是没有提供在一个goroutine中终止另一个goroutine的方法,但是我们可以通过在一个goroutine中发送一个信号给channel,由另一个goroutine接收这个channel消息后自动退出的方式来间接的做到。在火箭发射的例子中,我们就是这样做的。
但是如果我们是希望终止多个goroutine而不是只终止1个的话,那么这种方式就不合适,我们不可能向这些goroutine一个个的发送信号,再者我们有时候不知道到底产生了多少个goroutine,因而不知道要发送多少次信号。有没有一种类似于广播的机制可以一次性通知所有的goroutine关闭呢?答案是用关闭一个channel来进行广播。
在上一个例子中,我们实现了并发遍历目录。现在我们为它添加一个退出功能,当用户在屏幕上输入一个任意字符再回车就会退出全部goroutine再结束主函数。
我们可以封装一个cancelled函数,用来监控当前是否用户已经在屏幕上发出结束信号
// 判断用户是否发出终止指令的方法。cancelled是一个被动方法,他不会主动检测用户是否发出终止指令,而是在程序运行到cancelled的时候才会检测用户是否发出终止
// 运行到cancelled时,如果用户已经发出了终止指令(关闭了done这个channel),那么done就会接收到这个指令(不阻塞的接收到一个零值),返回true;如果运行到cancelled时,如果用户没有发出终止指令也不会被done阻塞就走default,直接返回
func cancelled(done <-chan struct{}) bool {
select {
case <-done:
return true
default:
return false
}
}
完整的代码如下
func main() {
var curSize, curFileNum int64
var wg sync.WaitGroup // 用一个WaitGroup来记录和等待所有的walkDir goroutine结束
dirPath := os.Args[1]
size := make(chan int64)
limitSpeed := make(chan struct{}, 100)
ticker := time.Tick(2 * time.Second)
done := make(chan struct{}) // 用于接收用户终止信号的channel
wg.Add(1)
go walkDir(dirPath, size, limitSpeed, done, &wg)
go func(wg *sync.WaitGroup) { // 开了一个新协程等待walkDir协程结束,Wait被唤醒后关闭size这个channel从而解除main的阻塞
wg.Wait()
close(size)
}(&wg)
go func(){ // 开一个goroutine接收用户的终止指令
os.Stdin.Read(make([]byte, 1)) // 会阻塞该协程
close(done) // 通过关闭done的方式广播给所有goroutine中的cancelled方法
}()
loop:
for {
select {
case s, ok := <-size: // 99.999%会走到这个case,因为walkDir一直在往size发送数据,所以size一直都可接收
if !ok {
break loop
}
curFileNum += 1
curSize += s
case <-ticker: // 每隔两秒会走到这个case
fmt.Printf("当前已遍历 %d 个文件,大小为 %dKb \n", curFileNum, curSize/1000)
case <-done: // 只有当用户在屏幕敲了东西,然后程序执行了close(done)才会走到这个case
for range size {} // 当用户停止程序,就从size中取出所有的数据但不作为;当所有的size都取出来之后,这里会阻塞,但是不要紧,很快所有的walkDir goroutine都退出,然后close(size)就会执行,然后这个for就会跳出执行到下面的break loop
break loop
}
}
fmt.Println(curFileNum, curSize)
}
// 判断用户是否发出终止指令的方法。cancelled是一个被动方法,他不会主动检测用户是否发出终止指令,而是在程序运行到cancelled的时候才会检测用户是否发出终止
// 运行到cancelled时,如果用户已经发出了终止指令(关闭了done这个channel),那么done就会接收到这个指令(不阻塞的接收到一个零值),返回true;如果运行到cancelled时,如果用户没有发出终止指令也不会被done阻塞而是走的default,直接返回
func cancelled(done <-chan struct{}) bool {
select {
case <-done:
return true
default:
return false
}
}
func walkDir(dirPath string, size chan int64, limitSpeed, done chan struct{}, wg *sync.WaitGroup) (err error){
defer wg.Done()
// 判断是否已经取消
//if cancelled(done){
// return nil
//}
limitSpeed <- struct{}{}
// 判断是否已经取消
if cancelled(done){
<- limitSpeed
return nil
}
dirPath = strings.TrimRight(dirPath, "/") + "/"
fileInfos, err := ioutil.ReadDir(dirPath)
<- limitSpeed
if err != nil{
fmt.Println(err)
return err
}
for _, fileInfo := range fileInfos {
if fileInfo.IsDir() {
wg.Add(1)
go walkDir(dirPath + fileInfo.Name(), size, limitSpeed, done, wg)
}else{
size <- fileInfo.Size()
}
}
return err
}
请注意,我把cancelled的调用放在了limitSpeed <- struct{}{}之后而不是注释所在的地方,原因是这个程序会创建很多个goroutine,而且由于限速,很多goroutine会卡在limitSpeed <- struct{}{}这一句。这意味着,如果我们把cancelled的调用放在limitSpeed <- struct{}{}之前,那么当用户发出终止的时候,其实cancelled返回true要发生到下一次递归的walkDir中,因此你会发现一个现象:用户发出终止指令后程序没有马上结束,而是延时了一两秒才结束,这一两秒中内之前已生成的goroutine在解除limitSpeed的阻塞后会继续遍历文件。
如果把cancelled放在limitSpeed <- struct{}{}后,那么用户发出结束指令后,之前已生成的goroutine解除limitSpeed的阻塞就执行到调用cancelled并返回true从而return结束,避免了多一轮的递归从而减少时延。
Main goroutine中的case <-done不会减少时延,但是可以避免用户终止后,剩余goroutine继续增加curSize和curFileNum。
但是,并发的退出需要侵入式的修改代码,也就是说退出并发的逻辑不能和负责任务的goroutine独立开来,必须将cancelled的逻辑写入到walkDir中,这是它的缺点,却也是不可避免的。
聊天服务
最后我们以一个聊天服务结束goroutine和channel这一章节。
这个聊天服务的设计思路如下:
服务端在main goroutine监听套接字成功后开始接受客户端的连接,对每一个客户端的到来创建一个goroutine(handleConn函数)为其处理请求。每一个客户端连接conn对对应一个属于自己的channel。此外还会有一个总的广播协程broadcast()负责接受所有客户端发送给服务端的消息并通过每个客户但的专属channel发送给每个客户端。
此外,当有客户端进来和退出时,也会向其他所有客户端广播(通过entering和leaving这两个channel)。
客户端的专属channel可以接收msg(普通消息)/entering/leaving这3中消息。
服务端代码如下:
var clients map[string]chan string // 记录连接进来的客户端连接
var msg chan string // 传输客户端消息的channel
var entering chan string // 传输客户端连接进来时服务端产生的通知消息
var leaving chan string // 传输客户端连接断开时服务端产生的通知消息(channel中放客户端的地址即可)
func main(){
msg = make(chan string)
entering = make(chan string)
leaving = make(chan string)
clients = map[string]chan string{}
// 创建广播服务
go broadcast()
// 创建网络服务(阻塞)
AcceptConn()
}
// 创建套接字监听端口,接收连接
func AcceptConn() {
listener, err := net.Listen("tcp", "127.0.0.1:8080")
if err != nil{
log.Fatal(err) // 创建服务失败
}
defer func() {
listener.Close()
fmt.Println("服务端停止服务")
}()
fmt.Println("创建服务成功!")
for {
// 接收连接
conn, err := listener.Accept()
if err != nil {
fmt.Printf("客户端连接服务器失败:%s\n", err)
continue
}
addr := conn.RemoteAddr().String()
clients[addr] = make(chan string)
fmt.Println("客户端连接成功:" + addr)
entering <- addr
// 并发的处理客户端连接之后发送过来的所有请求
go handleConn(conn)
}
}
// 处理单个客户端的所有请求
func handleConn(conn net.Conn){
defer conn.Close()
// 为每个连接开一个goroutine用于将这个消息从服务端发送给这个客户端
go sendMsg(conn)
// 不断接收客户端消息
recvMsg(conn)
}
// 发送消息给客户端
func sendMsg(conn net.Conn){
addr := conn.RemoteAddr().String()
msgChan := clients[addr] // 获取对应客户端的消息通道,该消息通道可以接收msg/entering/leaving这3种消息
for {
// 接收所有消息(msg/entering/leaving这3种消息)
cont := <-msgChan
_,err := fmt.Fprint(conn, "%s", cont)
if err != nil {
fmt.Printf("消息:%s 无法发送给客户端 %s ,原因:%s\n", cont, addr, err)
}
}
}
// 接收客户端消息并发送给其他客户端
func recvMsg(conn net.Conn){
addr := conn.RemoteAddr().String()
input := bufio.NewScanner(conn)
// 服务端接收客户端发送过来的消息
for input.Scan(){
cont := input.Bytes()
// 将消息广播给其他客户端
message := fmt.Sprintf("%s : %s\n", addr, cont)
msg <- message
}
fmt.Println(addr + "客户端断开连接")
delete(clients, addr) // 将客户端信息从map中剔除
leaving <- addr
}
// 广播消息,负责接收msg/leaving/entering这3中消息并发送给所有客户端
func broadcast() {
for{
select{
case c := <-msg:
go _broadcast(c, "msg", "")
case addr := <-entering:
c := fmt.Sprintf("客户端 %s 加入群聊~\n", addr)
go _broadcast(c, "entering", addr)
case addr := <-leaving:
c := fmt.Sprintf("客户端 %s 退出群聊\n", addr)
go _broadcast(c, "leaving", addr)
}
}
}
// 这里的message参数可能是msg或leaving或entering中的任意一种消息
func _broadcast(message, typ, sender string){
for addr, client_chan := range clients{
// 如果message是客户端退出的消息,则这个消息不发送给退出的那个客户端
if typ == "leaving" && sender == addr{
continue
}
client_chan <- message
}
}
客户端可以使用nc命令,这样就可以省下了写客户端代码的工夫。
整个程序的架构图如下:
可不可以不要broadcast这个方法也不设置客户端自己的channel,而是在recvMsg中发送消息到msg这个channel,在sendMsg中接收msg这个channel的消息而非从自己的channel中接收?
如果是这样的话,那么当广播消息A的时候,可能会出现某些客户端接收到多次A消息,而某些客户端接收不到A消息。