更多优质内容
请关注公众号

Go入门系列(十四) go并发编程之Goroutine与channel(上)-阿沛IT博客

正文内容

Go入门系列(十四) go并发编程之Goroutine与channel(上)

栏目:Go语言 系列:Go入门系列 发布时间:2021-01-10 13:44 浏览量:3473

现在终于讲到了Go中最核心也最重要的部分:Goroutine并发编程。

Go的并发有两种主要方式:基于CSP模型的并发(通过channel实现) 和 基于共享变量的并发。Go的主要并发方式是前者,当然后者也是不可或缺的处理并发的传统同步机制。本章重点介绍CSP模型的并发。

 

Goroutine

Go语言中,每一个并发的执行单元叫作一个goroutine(go中的协程)我们可以把一个goroutine理解为是一个线程(但是他和线程是不同的)。一个goroutine通常作用于一个函数,我们可以通过在函数调用前加上go关键字的方式将一个函数放在一个新创建的goroutine协程上运行这样的话这个函数无需等待且会相对于与其他函数并发的运行

主函数main是在一个单独的goroutine上运行的,我们叫它main goroutine

例如:

func main(){
	go demo()   // 会创建一个goroutine协程,并且让demo函数在这个新的协程上运行。该行代码会立刻返回,不会阻塞。
	fmt.Println("主协程运行")		
}

func demo() {
	fmt.Println("子协程运行")
	time.Sleep(1*time.Second)
	fmt.Println("子协程运行结束")
}

和多线程编程相似。当go demo() 的时候,系统会开启一个goroutine协程,使用了go关键字的语句会立刻返回,不会阻塞无需等待,之后主协程main goroutine和在main()中创建的子协程会同时并发运行。但是当main主协程与运行结束时,所有的子协程都会终止并结束。因此上面的打印结果为:

 

主协程运行

子协程运行

 

最后一句 fmt.Println("子协程运行结束")  来得及运行就已经终止。

 

除了从主函数退出或者直接终止程序之外,没有其它的编程方法能够让一个goroutine来打断另一个的执行。除非是通过协程与协程之间的通信,让一个协程通知另一个协程终止。

==============================================================

网络编程是并发大显身手的一个领域,由于服务器是最典型的需要同时处理很多连接的程序

 

下面通过两个网络编程例子来演示goroutine的更多使用。

第一个例子是一个clock服务器,客户端访问这个服务的时候会每隔一秒打印一次时间。

// 服务端代码:
func main(){
	// 创建一个tcp服务,这1句就包含了创建套接字,监听端口等操作,很方便
	server, err := net.Listen("tcp", "127.0.0.1:8000")
	if err != nil {
		log.Fatal(err)		// 创建服务失败则终止程序
	}

	fmt.Println("Clock服务开启")

	for{	// 死循环
		c, err := server.Accept()		// 接收客户端连接,这是一个阻塞的接收方法。c是接收得到的客户端连接
		if err != nil {		// 有客户端发过来建立连接的请求了,但是服务端接收连接失败
			log.Print(err)
		}

		// 为客户端提供相应的服务
		go handleConn(c)		// 使用go的话就意味着由主协程负责接收连接,由子协程负责处理每个连接的请求。而且每连进来一个连接都要创建一个goroutine协程来为其提供服务(一个连接对应一个协程)。
	}

}

// 处理连接,为客户端提供服务
func handleConn(c net.Conn) (err error) {		// c是客户端连接(客户端的socket套接字)
	defer func(){
		c.Close()
		fmt.Println("客户端关闭连接")
	}()

	for {
		// 获取当前的时间
		now := time.Now().Format("2006-01-02 15:04:05")

		_, err = io.WriteString(c, now + "\n")	// 向客户端发送 now
		if err != nil {		// 如果发送失败,说明客户端已经关闭连接(但是服务端连接还没关闭)
			return err		// 此时会执行上面 defer 中的关闭服务端连接的操作。
		}

		time.Sleep(1*time.Second)
	}
}


// 客户端代码
func main(){
	// 连接服务端
	conn, err := net.Dial("tcp", "127.0.0.1:8000")
	if err != nil{
		log.Fatal(err)
	}

	defer func(){
		conn.Close()
		fmt.Println("服务端停止服务或客户端关闭连接")
	}()

	// 接收来自服务端的响应信息(由于这是一个时钟服务,所以客户端无需向服务端发送消息,服务端也无需接收客户端的消息。只需客户端接收服务端消息即可)
	for {
		_, err := io.Copy(os.Stdout, conn)		// io.Copy方法是一个阻塞方法,有2个参数:第一参是可写对象,第二参是可读对象,Copy的作用是将一个读事件就绪的可读对象的内容拷贝到一个可写对象中。这里是将服务端发送到客户端缓冲区的数据拷贝到标准输出
		if err != nil{	// 可能是客户端主动断开连接
			break
		}
	}
}

这里需要注意的是:在使用io.Copy的时候,我们不用写for死循环来接收服务端不断传输过来的数据。因为io.Copy本身内部实现了for死循环,会自己不断的接收消息。所以上面客户端的for是多余的。

 

第二个例子是一个Encho服务,这个服务也很简单,客户端可以往服务端发送一个消息,服务端需要回复这个消息3遍,客户端接收回复并打印。

这里我复用了上面第一个例子的代码框架,略微作出修改使得 handleConn 能够接收一个函数引用来决定它提供clock服务还是encho服务。

type serviceFunc func (c net.Conn, args ...interface{})(err error)

func main(){
	// 创建一个tcp服务,这1句就包含了创建套接字,监听端口等操作,很方便
	server, err := net.Listen("tcp", "127.0.0.1:8000")
	if err != nil {
		log.Fatal(err)		// 创建服务失败则终止程序
	}

	fmt.Println("服务开启")

	for{	// 死循环
		c, err := server.Accept()		// 接收客户端连接,这是一个阻塞的接收方法。c是接收得到的客户端连接
		if err != nil {		// 有客户端发过来建立连接的请求了,但是服务端接收连接失败
			log.Print(err)
		}

		// 为客户端提供相应的服务
		go handleConn(c, encho)		// 使用go的话就意味着由主协程负责接收连接,由子协程负责处理每个连接的请求。而且每连进来一个连接都要创建一个goroutine协程来为其提供服务(一个连接对应一个协程)。
	}

}

// 处理连接,为客户端提供服务
func handleConn(c net.Conn, fn serviceFunc) (err error) {		// c是客户端连接(客户端的socket套接字)
	defer func(){
		c.Close()
		fmt.Println("客户端关闭连接")
	}()

	err = fn(c)		// 调用服务函数; fn的返回值只会为error类型。
	return err
}

func clock(c net.Conn, args ...interface{}) (err error){
	for {
		// 获取当前的时间
		now := time.Now().Format("2006-01-02 15:04:05")

		_, err = io.WriteString(c, now+"\n") // 向客户端发送 now
		if err != nil {                      // 如果发送失败,说明客户端已经关闭连接(但是服务端连接还没关闭)
			return err // 此时会执行上面 defer 中的关闭服务端连接的操作。
		}

		time.Sleep(1 * time.Second)
	}
}

func encho(c net.Conn, args ...interface{}) (err error){
	scanner := bufio.NewScanner(c)		// NewScanner需要传入一个可读对象(io.Reader),返回一个Scanner对象,这是一个带io缓冲区的扫描器对象,该对象提供从缓冲区读取数据的方法如Scan()

	for scanner.Scan(){		// 不断从缓冲区读取数据(接收客户端消息),这是个阻塞的方法。
		_encho(c, scanner)
	}

	return err
}

func _encho(c net.Conn, scanner *bufio.Scanner) {
	cont := scanner.Text()		// 单纯的从scanner对象的token变量中获取数据,读数据的过程(等待的过程)其实是发生在Scan。Text()返回的是一个String
	fmt.Fprintln(c, "\t" + strings.ToUpper(cont))		// 向c(客户端连接)发送一个消息
	time.Sleep(1 * time.Second)		// 每次回复隔一秒
	fmt.Fprintln(c, "\t" + cont)
	time.Sleep(1 * time.Second)
	fmt.Fprintln(c, "\t" + strings.ToLower(cont))
}

这个例子中,我声明了一个底层类型为函数 serviceFunc 类型,所有的服务函数(如enchoclock)都要遵循这个类型的规范,需要传入一个客户端连接c 和可以接受任意类型任意多个参数的其他参数。

 

客户端代码如下

package main

import (
	"io"
	"log"
	"net"
	"os"
)

// 该方法是对 io.Copy 的简单封装,这样的话这个方法既能用于从服务端接收消息并输出到客户端,又能用于把客户端的消息发送给服务端
// r是消息发送端,w是消息接收端,mustCopy的意思是从一个可读对象r读取数据,再发送到可写对象w中
// 该方法 内部会阻塞,会无限循环(io.Copy内部实现了死循环)接收Reader的消息
func mustCopy(w io.Writer, r io.Reader){
	if _, err := io.Copy(w, r); err != nil {
		log.Fatal(err)
	}
}

func main() {
	// 连接服务端
	conn, err := net.Dial("tcp", "127.0.0.1:8000")
	if err != nil{
		log.Fatal()
	}

	defer conn.Close()

	// 开一个子协程用于接收服务端的消息
	go mustCopy(os.Stdout, conn)	// 将服务端发过来的消息发送到客户端的标准输出(屏幕上)

	// 主协程则向服务端发送消息
	mustCopy(conn, os.Stdin)    // 将客户端标准输入拷贝(发送)到服务端
}

这里要注意,客户端代码开了两个goroutine,一个用来向服务器发送消息(main goroutine中发生),一个用来接收服务端消息并打印到屏幕(子协程中)。这样做的好处是:如果接收消息和发送消息都放在一个goroutine中,那么任何其中一个操作都可能阻塞另一个操作,造成类似于“服务端返回上一次消息的响应之前客户端无法发送下一条消息”的情况。

 

上面的程序还有一个小缺陷:虽然服务器还是对一个连接建立一个goroutine进行服务避免了多用户连接造成的服务端阻塞问题。但是某个客户端如果连续发出2条消息给服务端的话,那么在这个 goroutine内服务端必须回复完第1条消息(花3秒)才能开始回复第二条消息,因此会发生这样的情况:

hello	# 客户端发送
	HELLO		# 服务端回复
Hi		# 客户端发送

	hello
	hello
	HI
	hi
	hi

服务端必须回复完所有的Hello才能回复hi

 

但我希望的是服务端回复Hello的时候也能抽出空回复hi

hello
	HELLO
hi
	HI
	hello
	hi
	hello
	hi

其实很简单,我们只需要让服务端针对客户端每发送过来的1条消息创建一个goroutine来处理即可。这意味着多个_encho()之间可以并发运行

只需对程序作出微小的改动(红色部分):

func encho(c net.Conn, args ...interface{}) (err error){
	scanner := bufio.NewScanner(c)	
	for scanner.Scan(){		
		go _encho(c, scanner)
	}

	return err
}

 

再做一个小改进,让客户端输出end之后服务端断开连接:

func encho(c net.Conn, args ...interface{}) (err error){
	scanner := bufio.NewScanner(c)		// NewScanner需要传入一个可读对象(io.Reader),返回一个Scanner对象,这是一个带io缓冲区的扫描器对象,该对象提供从缓冲区读取数据的方法如Scan()

	for scanner.Scan(){		// 不断从缓冲区读取数据,这是个阻塞的方法。返回一个false
		cont := scanner.Text()
		if cont == "end"{	// 如果客户端输入回车表示停止服务
			break
		}
		go _encho(c, cont)
	}

	return err
}

 

最后还要说一点:

如果一个程序有多个goroutine在运行,其中任何一个goroutine发生panic异常都会终止所有的goroutine,然后程序结束。

 

Channels

Goroutinego中最常见的并发执行体,而Channel是多个goroutine之间最常见的通信机制。它类似于一个通道,用来传输数据,而且传输的形式是一个对象的形式而不是流的形式。

 

Channel有具体的类型。用来传输字符串的就是字符串类型的channel( chan string),用来传输整型的就是整型的channel(chan int)

 

1.如何创建一个channel

通过make()

ch := make(chan int) // ch has type 'chan int'
ch = make(chan int)    // 创建无缓存的channel
ch = make(chan int, 0) // 创建无缓存的channel
ch = make(chan int, 3) // 创建一个容量为3的带缓存channel

channel和切片、哈希表一样是一种引用类型,它对应着底层的一种数据结构。

当我们把channel赋给一个变量的时候或者作为参数传给函数时,是仅仅复制多一份这个引用,而不会复制底层类型。Channel的零值是nil

 

两个相同类型的channel可以使用==运算符比较。如果两个channel引用的是相同的对象,那么比较的结果为真。一个channel也可以和nil进行比较。

 

一个channel有发送和接受两个主要操作,都是通信行为。例如:

 

ch <- x  // 发送x值(相当于put()或者append()操作)

x = <-ch // 从通道接受一个值并赋给x(相当于get()或者acquire()操作)

<-ch     // 这样也可以,相当于弹出一个值但不赋给任何变量

 

2.关闭一个channel

当不再需要从一个channel接收数据,也不再需要从一个channel发送数据的时候,就需要关闭这个channel。虽然如果不关闭,go的垃圾回收机制也会回收不再使用的channel,但是我们还是要遵循创建关闭原则手动的关闭一下,否则可能会发送一些不可预料的问题。

 

对一个未关闭的channel发送消息:

A 如果goroutine 1对一个不带缓存的channel发消息则会阻塞,直到有另一个goroutine 2接收消息,goroutine 1才会被唤醒;

B如果goroutine 1对一个带缓存的channelchannel未满)发消息,则不会阻塞, 消息会被暂存在channel内部的一个队列中;如果满了就阻塞。

 

对一个已经关闭的channel发送消息会导致panic异常

 

 

对一个未关闭的channel接收消息:

A 如果goroutine 1从一个不带缓存的channel接收消息则会阻塞,直到有另一个goroutine 2向这个channel发消息,goroutine 1才会被唤醒然后接收到消息。

B如果goroutine 1从一个带缓存的channelchannel有消息的话)接收消息,则不会阻塞;如果channel为空就会阻塞。

 

对一个已经关闭的channel接收消息(对带不带缓存的channel都相同):

A 如果channel中还有消息,则可以不阻塞的接收到这些消息;

B 如果channel中没有消息,则会不阻塞的不停的接收到相应类型的零值,不会引发panic

 

使用内置的close函数就可以关闭一个channel

close(ch)

 

需要注意的是:

channel的发送操作会复制一份发送的数据,在数据复制完之前,接收方的接收操作会被阻塞直到复制完成才开始接收。所以,往channel中发送的数据最好是不要太大,如果要发送一个结构体则尽量发送其指针。这样既可以减少拷贝的数据量,又节省了接收方等待发送方拷贝的时间,提高channel的传输性能(不过这样的话,接收方修改这个指针也会影响到发送方的数据)。

对于一个无缓存的channel,由于不存在缓存,所以发送方发送数据的时候,数据只会被复制1次,直接复制给接收方;

对于一个有缓存的channel,如果接收方执行接收操作的时候,这个channel是空的,那么当发送方发送数据的时候,数据会绕过channel的缓冲队列,直接发送给接收方,因此这种情况下就只会复制1次; 如果接收方接收到的数据是从缓冲队列得到的(即接收方接收的时候channel中有数据的情况下),则数据总共会被复制2次,一次是发送方将数据写入channel的缓冲队列的拷贝,一次是接收方接收数据时从缓冲队列到接收方的拷贝。也就是说,当消费者的接收操作快于发送者的发送操作时,就只会复制1次,反之则会复制2次。

 

下面将分别介绍带缓存和不带缓存的channel

关于无缓存或带缓存channels之间的选择,首先两者都能够进行数据在goroutine之间的传输以达到通信的作用。而不同点在于(适用场景的不同点):无缓存channel更强地保证了每个发送操作与相应的同步接收操作(更强调协程之间的同步和安排多个goroutine的有序执行),这是基于其无缓存而带来阻塞的特性,通过阻塞可以暂停协程的工作,让另一个协程完成了任务之后才开始或继续本协程的工作带缓存channel更适用于相互解耦操作的goroutine,意味着一个函数中有多个不同类型却相互依赖的小任务时,我们可以将多个不同类型的小任务解耦到多个函数中,用goroutine并发的跑这些任务,而此时相互依赖的任务之间可以通过带缓存channel来传输消息,最典型的应用就是生产者消费者模型的任务。

 

下面让我们看看带缓存和不带缓存的特性。

 

不带缓存的Channel

一个基于无缓存Channels的发送操作将导致发送者goroutine阻塞,直到另一个goroutine在相同的Channels上执行接收操作,当发送的值通过Channels成功传输之后,两个goroutine可以继续执行后面的语句。反之,如果接收操作先发生,那么接收者goroutine也将阻塞,直到有另一个goroutine在相同的Channels上执行发送操作。

如果发送方通过一个无缓存channel发送数据时被阻塞,接收方会先接收数据(接收方执行<-channel的时候才开始拷贝数据),接收完了之后才去唤醒发送方而不是先唤醒后接收。

如果接收方接收一个无缓存channel被阻塞,发送方会先发送数据(发送方执行channel<-时才开始拷贝数据),发送完了才去唤醒接收方而不是先唤醒后发送。

也就是说唤醒动作都是在拷贝完成之后发生的。

 

下面作者给出了一个例子来介绍不带缓存的channel的用法:

下面有3个协程,counter用于不停的生成数字,squarer负责对counter生成的数字计算其平方,printer负责打印squarer的计算结果。3个协程使用2个通道naturalssquares进行通信。

func main(){
	naturals := make(chan int)
	squares := make(chan int)

	// counter
	go func (){
		for i:=0;;i++ {
			time.Sleep(3 * time.Second / 10)		// 限一下循环的速度
			naturals<-i
		}
	}()

	// squarer
	go func (){
		for {		// 不断接收counter发送的i
			i := <-naturals
			squares <- i * i
		}
	}()

	// printer
	go func (){
		for{		// 不断接收squarer的结果
			fmt.Println(<-squares)
		}
	}()
}

这个程序有一个致命的缺点:由于用go声明运行的函数是非阻塞的,因此main goroutine会立刻结束,连带着3个子goroutine也马上结束,根本没来得及执行。

 

 

python的多线程并发编程中,主线程一般会调用子线程的join()方法等待所有子线程运行完之后才结束。但是在go中貌似没有这种类似的等待方法。不过我们可以通过channel来阻塞。

 

我只需要在main的开始多声明一个blockchannel,这个channel的作用只有1个,就是一直阻塞main goroutine,让主协程永远都不会结束。

func main(){
	naturals := make(chan int)
	squares := make(chan int)
	block := make(chan struct{})

	// counter
	go func ()go func (){//...}()

	// squarer
	go func ()go func (){//...}()

	// printer
	go func (){//...}()

	// 往block发送一个值,由于没有任何协程接收这个值,因此这里会永久阻塞
	block<- struct{}{}
}

 

除此之外我们还可以这样做,将printer的工作直接放在主协程中做,而无需创建一个子协程来打印平方值,这样<-squares就会阻塞主协程而无需用block这个channel来阻塞

func main(){
	naturals := make(chan int)
	squares := make(chan int)

	// counter
	go func ()go func (){//...}()

	// squarer
	go func ()go func (){//...}()

	// printer
	for{	// 不断接收squarer的结果
		fmt.Println(<-squares)
	}

}

 

counter naturals这个通道发出消息后squarernaturals接收消息前的这段时间(虽然这段时间很短很短,短的可以忽略),counter是被阻塞的,squarerprinter之间同理。也就是说,虽然使用了3个子goroutine,但是整个程序还是一个串行的程序,因为这个例子中一个协程的下一步操作的开始要依赖于另一个协程的某个操作完成。

 

现在我们要考虑更加多的细节问题,如果我希望计算有限多个(比如100个)i的值而不是无限个,此时我们要考虑数据传输完之后channel要怎么被处理。

// counter
go func (){
    for i:=0;i <= 100;i++ {		// 让生产者只生产100个i
        time.Sleep(3 * time.Second / 10)		// 限一下速度
        naturals<-i
    }
}()

i全部被发送出去之后,counter跳出循环结束goroutine,而printersquarer2个协程由于接收不到消息而被阻塞,而本身main goroutine也是被block这个channel阻塞的。因此,程序中所有的goroutine都被阻塞住了,而且是永远的被阻塞住了,永远都不会被唤醒。

 

这种情况被称之为 goroutine死锁,go是不会允许这种情况出现的,因此会报一个错误:

fatal error: all goroutines are asleep - deadlock!

(如果所有gouroutine都被阻塞了,但是之后可能被唤醒的话,就不会报goroutine死锁的错误,例如,所有的goroutine由于都调用了sleep暂时都被阻塞住)。

 

但是其实像python等语言中,发生所有线程或协程被阻塞也是常有的事,这些语言在这种情况下并不会报错。

 

那怎么解决这个问题?我们只需要在counter发送完所有的i时给printer发送一个信号,让printer不要再从naturals中取信息即可。最直接的方式就是关闭naturalssquarer就会在读取naturals时读到一堆int的零值。

 

当一个channel被关闭后,再向该channel发送数据将导致panic异常

当一个channel被关闭后,如果channel中的数据还没被接收完(带缓存的channel)依然可以从该channel接收数据。如果channel没有数据了,再从里面接收数据可以不阻塞的接收到一个零值。

 

但是当goroutinechannel中读到一个零值的话,它无法确定这个channel是关闭了还是发送者确实发送了一个对应类型的零值给它。此时我们可以用第二个结果接收,他是一个bool ture表示成功从channels接收到值,false表示channels已经被关闭并且里面没有值可接收。(如 i, ok := <-naturals

我们甚至可以使用for range来接收,channel被关闭并且没有值可接收时会自动跳出循环。(如果channel没有被关闭,但是channel也没有值可以接收时,for range不会结束,而是会阻塞)。

 

忘记说很重要的一点:

channel的关闭一般都是在发送消息的goroutine,而不是在接收消息的goroutine。原因很简单,如果在接收消息的goroutine关闭channel,发送消息的goroutine继续往channel放数据会引发panic从而结束整个进程。

改进后的代码如下:

package main

import (
	"fmt"
	"time"
)

func main(){
	naturals := make(chan int)
	squares := make(chan int)
	block := make(chan struct{})

	// counter
	go func (){
		for i:=0;i <= 10;i++ {
			time.Sleep(3 * time.Second / 10)		// 限一下速度
			naturals<-i
		}
		close(naturals)
	}()

	// squarer
	go func (){
		for i := range naturals{	// 不断接收counter发送的i
			squares <- i * i
		}
		close(squares)
	}()

	// printer
	go func (){
		for res := range squares{	// 不断接收squarer的结果
			fmt.Println(res)
		}
		block <- struct{}{}
	}()

	<-block
}

不管一个channel是否被关闭,当它没有被引用时将会被Go语言的垃圾自动回收器回收。(不要将关闭一个打开文件的操作和关闭一个channel操作相提并论。对于每个打开的文件,都需要在不使用的时候调用对应的Close方法来关闭文件。)

试图重复关闭一个channel将导致panic异常,试图关闭一个nil值的channel也将导致panic异常。关闭一个channels还会触发一个广播机制这个我们在后面再说.

 

带缓存的channel

带缓存的channel内部持有一个队列

向缓存Channel的发送操作就是向内部缓存队列的尾部插入元素,接收操作则是从队列的头部弹出元素。如果内部缓存队列是满的,那么发送操作将阻塞直到因另一个goroutine执行接收操作而释放了新的队列空间。反之亦然。

 

可以使用内置的cap和len函数获取一个带缓存的channel的容量和长度。

 

下面我们看一个比较典型的例子:

这个程序会向3个镜像站点发出请求,三个镜像站点分散在不同的地理位置。这个程序的作用是返回响应最快的镜像的响应给用户。

func main() {
	response := make(chan string, 3)

	// 同时向3个站点发起请求,request是一个阻塞的方法,需要等待服务的响应
	go func () { response <- request("mirror1")}()
	go func () { response <- request("mirror2")}()
	go func () { response <- request("mirror3")}()

	// main goroutine则接受最快的响应,其他响应不接收
	fmt.Println(<-response)
}


func request(mirror string) string{
	if mirror == "mirror1"{
		time.Sleep(5 * time.Second / 10)    // 通过sleep模拟请求和响应在网络中传输的时间和服务器处理请求的时间
	}else if mirror == "mirror2"{
		time.Sleep(3 * time.Second / 10)
	}else if mirror == "mirror3"{
		time.Sleep(7 * time.Second / 10)
	}

	return "response from " + mirror
}

注意,如果上面换成使用无缓存的channel,那么两个慢的goroutines将会因为没有人接收而被永远卡住。这种情况,称为goroutines泄漏,这将是一个BUG。和垃圾变量不同,泄漏的goroutines并不会被自动回收(因为这两个goroutine永远都不会运行结束,所以永远都不会被回收,除非main goroutine结束,整个程序结束)。因此确保每个不再需要的goroutine能正常退出是重要的(也就是说不再使用的goroutine要确保不被channel给卡死而无法退出)

 

Channel的一种错误用法是在一个goroutine中对同一个channel既发送又接收除非是特殊场景,例如使用channel作为信号量限制最大的并发goroutine数量。这么一来channel的通信作用就失去意义,单纯的变为一个切片,而且还有发生死锁的风险。

 

只发送的channel和只接收的channel

作者在这一节取的标题是“单方向的channel,但我个人觉得这样是有歧义的,因为单方向是指从一个通道的首端入尾端出,而不能从尾端入首端出。而channel本身确实遵循这个原则,也就是说channel本身就是一个单向的通道而不是双向的。

 

作者所说的“单向的channel”其实是想表达只能发送的channel或只能接收的channel,但是这应该叫做“半开闭的channel”,即通道的一端是开放的一端是封闭的,因此我这里把标题做了修改以避免歧义。

在很多情况下,一个goroutine只负责对一个channel发送或只接收,而不会既对一个同一个channel又发送又接收。

 

因此我们可以通过声明的时候限制一个channel是只发送或只接收的。

类型chan<- int表示一个只发送intchannel,只能发送不能接收。相反,类型<-chan int表示一个只接收intchannel,只能接收不能发送。这种限制将在编译期检测

 

对一个只接收的channel调用close将是一个编译错误(因为从逻辑上说,一个只接收信息的goroutine无法知道channel什么时候会发送完毕消息,这是由发送端的goroutine决定的,所以接收者goroutine和只接收的channel应该无权调用close)。

 

现在,我们对上面的counter/squarer/printer例子进行最后的改进,将channel在特定的goroutine中变为半开闭的channel

func main(){
	naturals := make(chan int)
	squares := make(chan int)

	// 已知数据都是从channel的右边进左边出(单向)
	go counter(naturals)	// naturals在counter是右开左闭的
	go squarer(squares, naturals)	// naturals在squarer中是左开右闭,squares在squarer是右开左闭
	printer(squares)	// printer直接在main goroutine中跑,而不另开协程
}

// counter
func counter(in chan<-int){
	for i:=0;i <= 10;i++ {
		time.Sleep(3 * time.Second / 10)		// 限一下速度
		in<-i
	}
	close(in)
}

// squarer
func squarer(in chan<-int, out <-chan int){
	for i := range out{	// 不断接收counter发送的i
		in <- i * i
	}
	close(in)
}

// printer
func printer(in <-chan int){
	for res := range in{	// 不断接收squarer的结果
		fmt.Println(res)
	}
}

这里涉及到 channel类型的转换。我们在main中声明naturals的时候是声明的一个右开右闭的双开口channel。但是当将它传给counter的时候,函数会做一次 in := naturals的隐式赋值,而in又是 chan<-int 类型的单开口channel,所以在counter函数中naturals的类型将隐式地从chan int转换成chan<- int

 

开口channel向单开口channel变量的赋值操作都将导致该隐式转换。双开口的channel可以转为单开口,但是反之单开口的channel不能转双开口的。

 

 

 




更多内容请关注微信公众号
zbpblog微信公众号

如果您需要转载,可以点击下方按钮可以进行复制粘贴;本站博客文章为原创,请转载时注明以下信息

张柏沛IT技术博客 > Go入门系列(十四) go并发编程之Goroutine与channel(上)

热门推荐
推荐新闻