更多优质内容
请关注公众号

Go并发编程系列(二) 多进程编程与进程同步之Signal信号量-阿沛IT博客

正文内容

Go并发编程系列(二) 多进程编程与进程同步之Signal信号量

栏目:Go语言 系列:Go并发编程系列 发布时间:2021-02-04 19:46 浏览量:4169

信号 signal

操作系统信号(signal,以下简称信号)是IPC中唯一一种异步的通信方法,它的本质是用软件来模拟硬件的中断机制。信号用来通知某个进程有某个事件发生了。例如,在命令行终端按下某些快捷键,就会挂起或停止正在运行的程序。另外,通过kill 命令杀死某个进程的操作也有信号的参与。

每一个信号都有一个以SIG”为前缀的名字,例如SIGINTSIGQUIT以及SIGKILL,等等。但是,在操作系统内部,这些信号都由正整数表示,这些正整数称为信号编号。在Linux的命令行终端下,我们可以使用kill  -l命令来查看当前系统所支持的信号



可以看到,Linux支持的信号有62种(注意,没有编号为3233的信号)。其中,编号从131的信号属于标准信号(也称为不可靠信号),而编号从3464的信号属于实时信号(也称为可靠信号)。对于同一个进程来说,每种标准信号只会被记录并处理一次。并且,如果发送给某一个进程的标准信号的种类有多个,那么它们的处理顺序也是完全不确定的。而实时信号解决了标准信号的这两个问题,即多个同种类的实时信号都可以记录在案,并且它们可以按照信号的发送顺序被处理。虽然实时信号在功能上更为强大,但是已成为事实标准的标准信号也无法被替换掉。因此,这两大类信号一直共存着。


信号的来源有键盘输入(比如按下快捷键Ctrl-c)、硬件故障、系统函数调用和软件中的非法运算。进程响应信号的方式有3种:忽略、捕捉和执行默认操作。

Linux对每一个标准信号都有默认的操作方式。针对不同种类的标准信号,其默认的操作方式一定会是以下操作之一:终止进程、忽略该信号、终止进程并保存内存信息、停止进程、恢复进程(若进程已停止)。

对于绝大多数标准信号而言,我们可以自定义程序对它的响应方式。更具体地讲,进程要告知操作系统内核:当某种信号到来时,需要执行某种操作(也就是说这个操作是在用户程序定义,由内核执行)。在程序中,这些自定义的信号响应方式往往由函数表示。


Go命令会对其中的一些以键盘输入为来源的标准信号作出响应,这是通过标准库代码包os/signal中的一些API实现的。更具体地讲,Go命令指定了需要被处理的信号并用一种很优雅的方式(用到了通道类型)来监听信号的到来。


从接口类型os.Signal 开始讲起,该类型的声明如下:

type Signal interface {
    String() string
    Signal() // to distinguish from other Stringers
}


实现了os.Signal接口类型的类型就是1种信号量,或者说只要实现了SignalString方法的类型就是1种信号量。比如像syscall.Signal类型,它的底层类型是int,但是它实现了os.Signal接口类型,因此syscall.Signal就是信号量类型(其下有多种信号量,每一个整型数字代表1中信号)。

os.SignalSignal()方法比较有意思,它的实现是一个空方法,也就是说,syscall.SignalSingal()是一个空方法,其他实现os.Signal的类型其Signal()也是空方法。Signal()方法的作用仅仅是为了标识这个实现了该方法的类型是一个信号量类型而已。


如果查看syscall.Signal 类型的String 方法的源代码,还会发现一个包级私有的、名为signals 的变量。在这个数组类型的变量中,每个索引值都代表一个标准信号的编号,而对应的元素则是针对该信号的一个简短描述,调用String()方法会将对应信号的描述打印出来。



代码包os/signal中的Notify 函数

func Notify(c chan<- os.Signal, sig ...os.Signal)

这个函数需要传入一个channelsignal.Notify方法内部做的事就是接收到来自操作系统进程的信号并将这个信号发送给这个channel 

然后我们在用户程序中异步的从这个channel接收信号。这样我们就可以捕获来自操作系统发送的信号。


刚刚我们说过,每种信号量操作系统都有与之对应的默认操作,只要一个用户进程接收到了某个信号量,操作系统就会根据这个信号量对该进程作出默认操作(终止进程、忽略该信号、终止进程并保存内存信息、停止进程、恢复进程);用户进程也可以用自定义的处理方式来操作这些信号而不走操作系统的默认操作。

Notify的第二参sig,表示用户进程要自行处理的信号量。

只有第二参sig中规定的信号量才会发送到chan中,其他信号量不会(而是执行系统默认操作)。


看下面的例子:

func Example4(){
	fmt.Println("Start running")
	// 定义一个需要自定义处理的信号量列表
	signs_handled_by_myself := []os.Signal{syscall.SIGINT, syscall.SIGQUIT}

	// 定义一个channel接收来自操作系统的信号,容量为1
	channel := make(chan os.Signal, 1)

	// Notify监听操作系统是否有发送信号,如果有就会通过往channel发送信号量的方式异步通知当前进程(有信号量的时候才会通知)
	signal.Notify(channel, signs_handled_by_myself...)		// Notify是一个非阻塞的方法,所以即使操作系统还没有发送信号到Notify,Nofity也不会阻塞。阻塞的工作应该交给下面的for range channel循环

	// 在主进程中接收Notify的通知并处理需要自定义处理的信号
	for sign := range channel{
		fmt.Printf("Receive signal from operation system: %s\n", sign)
	}
}

在该例子中,SIGINTSIGQUIT这两种信号量会由Notify通过channel发送给主goroutine,并在main goroutine中对这两种信号量作出自定义的处理(在本例中只是对其打印,这样做就相当于是忽略了这个信号,不做任何处理)。而除了这两种信号量之外,操作系统向该进程发出的所有信号量都走的操作系统默认的操作,而不会通过channel传给main goroutine


下面我们运行这个例子,然后向这个例子发送SIGINTSIGQUIT信号(可以通过ctrl + c  ctrl + \),结果如下


当然除了使用快捷键之外,我们还可以先通过kill -l 查看信号对应的整型


再通过在另一个shell窗口用kill –{信号量的整型}  {进程id}的方式来发送信号。

例如

kill -2 进程id”和 ctrl + c等价

kill -3 进程id”和 ctrl + \ 等价


不过需要注意的是:

如果你是用go run的方式运行这个例子的话,在另一个shell执行 kill -2 进程id,这个示例是不会显示输出的,原因是执行go run的话,我们可以用ps -aux 查看一下他会生成2个进程


第一个进程是go run 生成的进程,第二个进程是 go run之后,gogo build一个临时的二进制可执行文件并执行这个文件而生成了的进程。如果 kill -2 {go run的那个进程的进程id}就会无法显示输出。


比较好的做法是,我们不使用go run执行,而是直接go build编译后再执行,然后之后的发送信号量的操作都针对这个go build生成的进程来发送。


各种信号量可以在这里查看

https://blog.csdn.net/u014470361/article/details/83591513

同时也附上一系列Linux下发送信号量对应的快捷键(在windows下这些快捷键除了ctrl+c其他都没用)

Ctrl+Z组合键将当前进程挂起

ctrl-c:终止进程

ctrl-z:发送 SIGTSTP信号给前台进程组中的所有进程,常用于挂起一个进程;

ctrl-d:不是发送信号,而是表示一个特殊的二进制值,表示 EOF,作用相当于在终端中输入exit后回车;

ctrl-\:发送 SIGQUIT 信号给前台进程组中的所有进程,终止前台进程并生成 core 文件;

ctrl-s:中断控制台输出;

ctrl-q:恢复控制台输出;

ctrl-l:清屏


试想一下,如果我们在上面的例子中不传Notify的第二参,会怎么样?

这样就相当于捕获了所有的信号量并且走自定义的处理(所有信号量都不走系统的默认操作,而只是打印这些信号量),这是一种很危险的行为。不过,还好在类Unix操作系统下有两种信号既不能自行处理,也不会被忽略,它们是SIGKILLSIGSTOP,对它们的响应只能是执行系统的默认操作。

即使你这样:

signal.Notify(sigRecv, syscall.SIGKILL, syscall.SIGSTOP)

也不会改变当前进程对SIGKILL信号和SIGSTOP信号的处理动作。这种保障不论对于应用程序还是操作系统来说,都是非常有必要的。

对于其他信号,除了能够自行处理它们之外,还可以在之后的任意时刻恢复对它们的系统默认操作。这需要用到os/signal包中的Stop 函数,其声明如下:

func Stop(c chan<- os.Signal)


接下来我们看一个例子,这个例子会创建两个接收系统信号量的channel,并且其中一个channel接收SIGINTSIGQUIT信号,另一个接收SIGQUIT信号。他们的接收被放在了两个goroutine中并发执行:

func Example5(){
	sigChan1 := make(chan os.Signal, 1)
	sigChan2 := make(chan os.Signal, 1)
	sigList1 := []os.Signal{syscall.SIGINT, syscall.SIGQUIT}
	sigList2 := []os.Signal{syscall.SIGINT}
	finishChan := make(chan struct{})

	// 监听系统发送过来的信号
	signal.Notify(sigChan1, sigList1...)
	signal.Notify(sigChan2, sigList2...)

	go func(){
		var cmd string
		for {
			_, err := fmt.Fscanln(os.Stdin, &cmd) // 如果有标准输出则视作为停止sigChan1和sigChan2接收自定义信号量

			if err != nil{
				log.Fatal(err)
			}
			
			if cmd == "1"{
				fmt.Println("Stop sigChan1")
				signal.Stop(sigChan1)		// signal.Stop方法只会清除掉sigChan1对应的自定义信号集合sig2,但是不会关闭sigChan1这个通道
			}else if cmd == "2"{
				fmt.Println("Stop sigChan2")
				signal.Stop(sigChan2)
			}else if cmd == "end"{
				fmt.Println("finish main goroutine")
				finishChan <- struct{}{}
			}
		}
	}()

loop:
	for{
		select {
		case sig1 := <- sigChan1:
			fmt.Printf("Sig from  sigChan1: %s\n", sig1)
		case sig2 := <- sigChan2:
			fmt.Printf("Sig from  sigChan2: %s\n", sig2)
			case <-finishChan:
				break loop
		}
	}
}


运行结果如下:

[root@VM-0-13-centos ~]# go run multiprocess_main
^CSig from  sigChan1: interrupt
Sig from  sigChan2: interrupt
^\Sig from  sigChan1: quit
^CSig from  sigChan1: interrupt
Sig from  sigChan2: interrupt
^\Sig from  sigChan1: quit
asdfadsf
2
Stop sigChan2
^CSig from  sigChan1: interrupt
^\Sig from  sigChan1: quit
2
Stop sigChan2
^CSig from  sigChan1: interrupt
1
Stop sigChan1
^Csignal: interrupt

绿色是标准输入,黑色的字是标准输出


可以看出按了12就会停止往sigChan1sigChan2发送他们对应的自定义信号量。


在很多时候,我们可能并不想完全取消掉自行处理信号的行为,而只是想取消一部分信号的自定义处理。为此,只需再次调用signal.Notify 函数,并重新设定与其参数sig 绑定的参数值即可(例如sigChan1注册了两个自定义处理的信号量SIGINTSIGQUIT,但是我希望只取消掉SIGINT而不想取消掉SIGQUIT,只需signal.Stop(sigChan1)之后再signal.Notify(sigChan1, syscall.SIGQUIT )即可)


我们看signal.Notify源码, signal包中定义了一个全局的map,当调用signal.Notify(chan1, sigList1…)的时候,Notify内部会将chan1作为key,自定义的信号量集合sigList1作为value存到这个map中。如果键-元素对不存在,就会向信号集合字典添加一个,否则就更新该键-元素对中的信号(追加)集合变体。当调用signal.Stop(chan1) 函数时,就会删除这个全局map中以chan1为键的键-元素对。

当操作系统或者其他进程发送一个信号量给当前进程,signal处理程序会对这个信号量进行封装(封装为os.Signal接口类型),然后遍历全局map中的所有键-元素对,并查看它们的元素中是否包含了该信号。如果包含,就立即把它发送给作为键的signal接收通道。


除了能够在当前进程接收信号量之外,也可以在当前进程向其他进程或者自己这个进程发送信号量。

下面我们再看一个例子,这个例子需要在当前进程给自己发送信号并自己捕获信号量然后将这个信号量打印出来。

为了能发信号给自己,因此要先获取到自己这个进程对象,要想获取自己这个进程对象就得拿到自己的进程id


代码如下:

var pids []int
//var waiting chan struct{}
func Example6(){
	sigs := []os.Signal{syscall.SIGINT, syscall.SIGQUIT,  syscall.SIGINT, syscall.SIGINT, syscall.SIGINT, syscall.SIGQUIT}
	sigChannel := make(chan os.Signal, 1) 		// 接收系统信号量的通道
	//waiting = make(chan struct{})
	listenSignal(sigChannel)	 // 监听通知信号量

	go sendSignal(sigs, sigChannel)	// 向当前进程发送信号量

	catchSignal(sigChannel)		//捕获信号量, 里面的for sigChannel会阻塞
}

// 准备通知信号量
func listenSignal(sigChannel chan os.Signal){
	sigs := []os.Signal{syscall.SIGINT, syscall.SIGQUIT}	// 定义需要自行处理的信号量

	signal.Notify(sigChannel, sigs...)		// 监听 sigs 列表中的信号量,非阻塞
}

// 捕获信号量
func catchSignal(sigChannel chan os.Signal){
	for sig := range sigChannel{
		//<- waiting
		fmt.Printf("Signal from os: %s\n", sig)
	}
}

// 向当前进程发送信号量,第一参是要发送的信号量
func sendSignal(sigs []os.Signal, sigChannel chan os.Signal){
	current_pids := getPids()	    // 获取当前进程相关的pid

	// 根据pid获取进程的对象 *Process, 并向这些与当前进程相关的pid发送信号
	for _, pid := range current_pids{
		process, err := os.FindProcess(pid)
		if err != nil{
			log.Fatalf("Fail to get process by pid: %d", pid)
		}
		for _, sig := range sigs{
			process.Signal(sig)		// 向这个进程发送信号量
time.Sleep(1 * time.Second)

			//waiting <- struct{}{}		// 等待catchSignal方法把信号接收到才发下一个信号(但是这么干的话最后会永久阻塞,原因是除了第一个pid是顺利被catchSignal捕获到之外,向其他的pid发送的信号都不会被catchSignal捕获到,因为那些pid不是本进程的pid。)
		}
	}

	// 发完之后通知main goroutine唤醒阻塞,结束主协程
	close(sigChannel)
}

func getPids() []int{
	if pids != nil {	// 如果pid已经获取过了,就直接返回
		return pids
	}

	// 执行命令 ps -aux | grep multiprocess_main |grep -v "go run" | grep -v "grep" | awk '{print $2}'
	// awk 后的命令必须用单引号不能用双引号
	cmds := []*exec.Cmd{
		exec.Command("ps", "-aux"),
		exec.Command("grep", "multiprocess_main"),
		exec.Command("grep", "-v", "go run"),
		exec.Command("grep", "-v", "grep"),
		exec.Command("awk", "{print $2}"),
	}

	// 运行命令得到结果res
	res, err := runCmds(cmds)
	if err != nil{
		log.Fatalf("Fail to get pids: %s", err)
	}

	// 将获取到的res结果(里面放着字符串的pid)转为[]int类型
	for _, r := range res{
		pid, err := strconv.Atoi(strings.TrimSpace(r))	// TrimSpace 可以去掉字符串的空白符如空格 \t \n
		if err != nil{
			log.Fatalf("Fail to change type from string to int: %s | err: %s", r, err)
		}
		pids = append(pids, pid)
	}

	return pids
}

func runCmds(cmds []*exec.Cmd) (lines []string, err error){
	var output []byte
	firstCmd := true
	for _, cmd := range cmds {
		if !firstCmd{
			// 将上一个cmd的输出作为本次cmd的输入
			var cmdInput bytes.Buffer
			cmd.Stdin = &cmdInput
			cmdInput.Write(output)
		}

		var cmdOutput bytes.Buffer
		cmd.Stdout = &cmdOutput
		if err := cmd.Start(); err != nil{
			log.Fatalf("running cmd '%s' failed, reason: %s", cmd.Path, err)
		}
		cmd.Wait()

		// 将命令的输出从动态缓冲cmdOutput保存到output中
		output = cmdOutput.Bytes()
		firstCmd = false
	}

	// 此时output保存着命令的运行结果,但是output此时是一堆字节流,每一个元素都是单字节,而我们想要的是一个int类型的切片,因此这里需要做一下转换
	// 转换的思路就是以\n作为分隔符,将output中每2个\n之间的字节流单独提出来转为string再转为int即可
	var outputBuffer bytes.Buffer
	outputBuffer.Write(output)		// 这两句可以将一个[]byte类型的数据包装为bytes.Buffer的动态缓冲区数据

	for{
		eachLine, err := outputBuffer.ReadBytes('\n')	    // 循环获取以\n为分隔符的内容
		res := strings.TrimSpace(string(eachLine))
		if res != "" {
			lines = append(lines, res)
		}
		if err != nil {
			if err == io.EOF{
				break
			}else{
				return nil, err
			}
		}
	}

	return lines, nil
}


上面的例子中,我在sendSignal方法中,每发送一个信号量都会睡1秒。如果不这样做的话,会发现程序没有任何输出就结束了。原因是:信号量的发送需要在进程与进程之间的传递,因此信号的发送会比较慢,很可能sendSignal把所有信号发送完毕后,catchSignal方法都还没接收到1个信号,然后sendSignal就结束了,main goroutine就结束了,然后catchSignal也随之结束,因此没有任何输出。



在操作系统中,信号量的本质就是一个数字。


创建子进程

如果我们希望在本进程fork出一个子进程,可以使用os.StartProcess方法,它会fork出一个子进程后返回这个子进程的pid。但是我们一般不这么做,因为 os.StartProcess 是一个比较底层的接口,一般是提供给go内部其他的方法去使用的,我们可以看一下os.StartProcess

func StartProcess(name string, argv []string, attr *ProcAttr) (*Process, error) {
	testlog.Open(name)
	return startProcess(name, argv, attr)
}

如果我们直接调用StartProcess方法,我们可能不知道该怎么去传第3个参数,所以这个方法其实是给os/exec包中的其他方法使用的。


如果我们真的想在go程序中运行多进程,可以使用os.Command方法创建一个命令,并调用cmd.Start()方法运行命令,Start()方法内部其实就是调用的os.StartProcess()方法去fork一个子进程。

如果我们想在 main.go中运行fork出一个子进程,并且让父子进程同时运行,并且相互通信的话,可以用os.Command(cmd_string)创建一个命令,然后通过在cmd_string设置不同的参数avg去区分父子进程。


或者说我们不要转牛角尖,非得对一份go文件代码开启父子进程做不同的事,而是可以把不同的逻辑写成多个go文件,然后手动同时运行这多个go文件即可。





更多内容请关注微信公众号
zbpblog微信公众号

如果您需要转载,可以点击下方按钮可以进行复制粘贴;本站博客文章为原创,请转载时注明以下信息

张柏沛IT技术博客 > Go并发编程系列(二) 多进程编程与进程同步之Signal信号量

热门推荐
推荐新闻