张柏沛的个人IT技术博客-专注和分享PHP建站和Python技术的学习博客

正文内容

Go入门系列(十五) go并发编程之Goroutine与channel(中)

栏目:Go语言 系列:Go入门系列 发布时间:2021-01-12 10:09:41 浏览量:397

用循环开启多个goroutine进行并发

在这一节,我们会用生成缩略图的例子来熟悉goroutinechannel的使用。

首先我们下载一个作者提供的图片压缩的包 thumbnail(这个包在go圣经作者的书中源码有,链接为:https://github.com/adonovan/gopl.io/tree/master/ch8/thumbnail) 我们不用去细读这个包的内容,只需要知道里面的ImageFile函数需要传入一个图片路径,并且会将这个图片进行压缩生成另一个图片之后返回缩略图的路径。

func main() {
	// 读取目录下的图片名称
	var inFileImgDir = "./imgs/"
	var outFileImgDir = "./thumb_imgs"
	file, err := ioutil.ReadDir(inFileImgDir)
	if err != nil{
		log.Fatal(err)
	}

	// defer用于计算压缩图片消耗的时间
	defer func (st time.Time) {
		fmt.Println(float64(time.Now().UnixNano() - st.UnixNano()) / 1e9)
	}(time.Now())

	run(file, inFileImgDir, outFileImgDir)
}

// 串行运行
func run (file []os.FileInfo, inFileImgDir, outFileImgDir string) {
	for _, f := range file {
		fp := inFileImgDir + f.Name()
fmt.Println(fp)
		ImageFile(fp, outFileImgDir)		// 不接收返回值,也就是说,忽略错误
	}
}

// 压缩一张图片,并存放到 thumbDir 目录下
func ImageFile(infile string, thumbDir string) (string, error) {
	_, err := os.Stat(thumbDir)

	// 判断目录是否存在,不存在则新建
	if err != nil && os.IsNotExist(err) {
		os.Mkdir(thumbDir, 0777)
	}

	ext := filepath.Ext(infile) // 获取文件后缀 ".jpg", ".JPEG"
	imgName := filepath.Base(infile)	// 获取文件名
	outfile := strings.TrimRight(thumbDir, "/") + "/" + strings.TrimSuffix(imgName, ext) + "_thumb" + ext		// 拼接压缩图片的路径
	return outfile, thumbnail.ImageFile2(outfile, infile)	// ImageFile2的作用就是压缩图片infile生成outfile
}

这是一个串行的程序,跑完这个程序(压缩了220张图片)大概花了12

 

我们再看看如果将run这个串行的方法换成runConcurrently 这个并发压缩图片的方法会怎样:

// 并发运行
func runConcurrently (file []os.FileInfo, inFileImgDir, outFileImgDir string) {
	waiting := make(chan struct{})

	// 并发压缩图片
	for _, f := range file {
		go func(f os.FileInfo) {
			fp := inFileImgDir + f.Name()
			ImageFile(fp, outFileImgDir)		// 不接收返回值,也就是说,忽略错误
			waiting <- struct{}{}		// 往waiting通道中发送空结构体
		}(f)
	}

	// 阻塞 main goroutine,直到所有负责压缩的goroutine都已经完成任务才结束main goroutine
	finished := 0	// 记录已经完成压缩的图片数量
	for {
		<-waiting
		finished ++
		if finished >= len(file) {		// 当所有负责压缩图片的goroutine完成任务后跳出死循环结束main goroutine
			break
		}
	}
}

最后一个for循环可以用下面这个更简洁的方式代替:

for range filenames {    // 循环图片数量的次数

      <-waiting

}

这里作者再三强调要将f作为参数传入到go声明的匿名函数中,而不能直接引用外部变量f。因为随着循环的进行,f会被循环中的赋值语句不断改变,从而影响到goroutine中的f。将f传入到goroutine可以避免这种情况。

如果希望在主协程中拿到每一个goroutine的返回值怎么弄?很简单,可以将这个返回值(如压缩后的图片路径或者压缩过程中产生的错误)发送给waiting通道(而不是发送一个空结构体)。然后在主协程中接收waiting传过来的值,然后用一个slice将这些返回值存储起来。

 

现在我们考虑一种情况:假如我们不知道我们总共要压缩的图片有多少张(譬如,我们是从网络上爬取的n张图片,而且爬取的过程在不断进行,会不断往容器切片中增加图片,而且同时程序也在压缩图片,也就是说压缩和爬取是并发的。这时我们就不知道要压缩的图片有多少张)。那么我们就无法以判断finished是否达到file切片的长度来判断所有协程是否结束,也就不直到main何时可以结束。

 

此时我们可以这样做:还是用循环来接收waiting消息的方式阻塞主协程,在所有子协程结束时,子协程关闭waiting。主协程从一个关闭的channel中取值会取到一个false(第二个返回值),然后跳出循环,结束main

 

现在的关键是,所有的子协程都可以操作waiting,到底由哪个协程去关闭waiting,而且这个协程又是怎么知道它什么时候可以关闭这个waiting(什么时候所有子协程运行结束)?

 

作者给出了这个方案:在每开启一个子协程之前,对一个计数器+1,每一个子协程结束前对计数器-1。另外再开启一个不负责压缩图片的子协程,这个子协程用来监控这个计数器是否变为0,是则关闭waiting通道。

 

这个方案涉及到多个并发的协程修改一个共享的计数器,可能会发生并发下数据不安全的问题,因此在对这个计数器修改的时候需要加锁。

 

Go提供了一个sync.WaitGroup,它其实就是充当一个并发安全的计数器,在对其进行+1-1的操作都是在加锁后进行的。下面我们看看如何实现:

func runConcurrently2 (file []os.FileInfo, inFileImgDir, outFileImgDir string) {
	waiting := make(chan struct{})
	var counter sync.WaitGroup		// 创建一个并发安全的计数器,它记录着正在运行中的协程数量

	// 并发压缩图片
	for _, f := range file {
		counter.Add(1)	// 每开启一个goroutine都会对计数器+1
		go func(f os.FileInfo) {
			defer counter.Done()		// 每次goroutine结束都会对计数器-1,这个操作放在defer中可以保证即使协程发生panic也能够对计数器-1,不会漏掉这个操作
			fp := inFileImgDir + f.Name()
			fmt.Println(fp)
			ImageFile(fp, outFileImgDir)		// 不接收返回值,也就是说,忽略错误
			waiting <- struct{}{}		// 往waiting通道中发送空结构体
		}(f)
	}

	// 单独开一个goroutine来监控计数器的状态
	go func() {
		counter.Wait()		// 如果counter计数器内的值大于0,wait方法会一直阻塞这个goroutine;如果=0则唤醒这个goroutine
		close(waiting)		// 关闭waiting通道,这样main中的for range waiting就会跳出,主协程会结束
	}()

	// 阻塞 main goroutine,直到所有负责压缩的goroutine都已经完成任务才结束main goroutine
	for range waiting {}		// 只从waiting取值,但不赋给任何变量。也不在for代码块中做任何操作,因为这里for的作用仅仅是为了阻塞main等待所有子协程完成而已。
}

这里的counter.Add()Done()对计数器的修改都是加了锁的,是并发安全的。

上面的程序代码是当我们使用并发循环,但又不知道迭代次数(不知道当前有多少个协程在运行,什么时候运行完)时很通常而且很地道的(结束main写法。

 

现在我们考虑是否能将 counter.Wait放在main中而不是放在单独开的一个goroutine中:

func runConcurrently2 (file []os.FileInfo, inFileImgDir, outFileImgDir string) {
	// ...略

	for _, f := range file {
		counter.Add(1)	// 每开启一个goroutine都会对计数器+1
		go func(f os.FileInfo) {
			// ...略
		}(f)
	}

	counter.Wait()		
	close(waiting)		
for range waiting {}
}

如果这样的话,所有子协程都会在压缩完图片后被 waiting<-struct{}{} 操作阻塞,因为主线程没有接收waiting中的信息,主协程被 counter.Wait()给阻塞住了,无法运行到 for range waiting{}。此时所有子协程都在等待主协程接收channel中的消息,主协程又在等待子协程对counter0,于是发生死锁。

 

如果是下面这样:

for range waiting {}
counter.Wait()		
close(waiting)	

那么子协程是全都可以运行完毕被释放,但是主协程会被waiting永久阻塞,因为waiting永远都无法关闭。

 

并发的Web爬虫

接下来我们做一个简单的web爬虫,这个爬虫会接收用户输入的一个或者多个初始url进行爬取,并将爬取到的内容存到本地html文件,同时会从html中解析更多url继续爬取直到爬完该域名下所有的链接。

 

接下来我们看看这段程序的具体实现:

// main.go
// 爬虫程序的可执行文件
func main() {
	defer func (st time.Time) {
		fmt.Println(float64(time.Now().UnixNano() - st.UnixNano()) / 1e9)
	}(time.Now())

	startUrls := os.Args[1:]
	domain := startUrls[0]		// 必须以根域名为第一参,如 http://www.baidu.com/
	htmlDir := "./html"
	fmt.Println(startUrls, domain, htmlDir)

	// 开始爬取
	spider.Run(startUrls, domain, htmlDir)
}
// spider/crawler.go

/* 爬虫 包含请求器 下载器 解析器 */

// 请求器
func Crawl(url string) (cont []byte, err error) {
	resp, err := http.Get(url)
	if err != nil {
		return nil, fmt.Errorf("Failed to connect: '%s' because of %s", url, err)
	}

	defer resp.Body.Close()
	cont, err = ioutil.ReadAll(resp.Body)
	if err != nil {
		err = fmt.Errorf("Failed to get url content: '%s' because of %s", url, err)
	}
	return cont, err
}

// 下载器
func Download(cont []byte, url, htmlDir string) error {
	if filepath.Ext(url) != ".html"{
		return nil
	}
fmt.Println("预备下载")
	if isDirNotExist(htmlDir) {
		os.Mkdir(htmlDir, 0777)
	}
//fn := md5_encrypt(url) + ".html"
	fn := filepath.Base(url)
	fp := strings.TrimRight(htmlDir, "/") + "/" + fn
fmt.Println("准备下载" + fp)
	return writeFile(cont, fp)
}

// 判断目录是否存在
func isDirNotExist(htmlDir string) bool {
	_, err := os.Stat(htmlDir)
	if err != nil && os.IsNotExist(err){
		return true
	}
	return false
}

// 对url加密,用于生成一个html文件名
func md5_encrypt(str string) (encrypt_str string){
	encrypt := md5.New()
	io.WriteString(encrypt, str)
	return fmt.Sprintf("%x", encrypt.Sum(nil))
}

// 将内容写入到一个文件
func writeFile(cont []byte, fp string) error {
	f, err := os.OpenFile(fp, os.O_CREATE, 0777)		// 以只写方式打开文件,如果文件不存在则创建
	if err != nil{
		return err
	}

	defer f.Close()
	_, err = f.Write(cont)
fmt.Printf("写入内容到 %s", fp)
	return err
}

// 解析器(解析url)
func ParseUrl(cont []byte, domain string) (map[string]struct{}){
	urls := map[string]struct{}{}
	reg := regexp.MustCompile("(?:<a.*?href=['\"](.*?)['\"].*?>)")
	res := reg.FindAllSubmatch(cont, -1)

	for _, r := range res {
		href := string(r[1])
		if href == "javascript:;"{
			continue
		}
//fmt.Printf("href: %s", href)
		domain = strings.TrimRight(domain, "/")
		domainWithoutScheme := strings.Replace(domain, "http://", "", -1)
		domainWithoutScheme = strings.Replace(domain, "https://", "", -1)

		// 判断是否为domain域名下的url,不是则跳过
		if strings.HasPrefix(href, "http://") || strings.HasPrefix(href, "https://") || strings.HasPrefix(href,"//"){
			if !strings.Contains(href, domainWithoutScheme){
				continue
			}
		}

		// 如果获取的url是一个相对路径就要拼接为完整路径
		if !strings.Contains(href, domainWithoutScheme) {
			href = domain + "/" + strings.TrimLeft(href, "/")
		}
		urls[href] = struct{}{}		// 选用map而不是slice来存储一个页面所有的url时为了去掉这 一 个页面的重复url,这点很重要而且并不是多余的操作,它和正式爬取的去重(seen)缺一不可
	}
//fmt.Println(urls)
	return urls
}

 

接下来是调度器,调度器实现了如何并发的调度爬虫爬取,我们下面会围绕着这个scheduler.go调度器来进行讨论和改进。

先是第一个版本的调度器

type Scheduler struct{
	crawl_tasks chan string			// 存放要爬取的url的channel
	seen map[string]bool		// 记录爬取过的url
	seen_lock sync.Mutex		// 修改seen时加锁,防止seen在并发过程中被改乱
}

// 开始爬取
func (s *Scheduler) Start(startUrls []string) {
	for _, startUrl := range startUrls {
		s.crawl_tasks <- startUrl
	}
}

// 调度器
func (s *Scheduler) Schedule(domain, htmlDir string) {
	for url := range s.crawl_tasks {
		go func(url string) {
			// 校验url是否已经爬取过
			s.seen_lock.Lock()
			_, ok := s.seen[url]
			s.seen_lock.Unlock()
			if ok {
				return
			}

			// 爬取url
			s.seen_lock.Lock()
			s.seen[url] = true		// 添加已爬取url要放在真正爬之前
			s.seen_lock.Unlock()
			cont, err := Crawl(url)
			if err != nil {
				fmt.Println(err)
			}

			func() {
				if err := Download(cont, url, htmlDir); err != nil {
					fmt.Println(err)
				}
			}()

			// 解析新url,将新生成的url发送到channel
			for new_url := range ParseUrl(cont, domain){
				go func() {s.crawl_tasks <- new_url}()
			}
		}(url)
	}
}

// 运行爬虫
func Run(startUrls []string, domain, htmlDir string){
	scheduler := Scheduler{
		crawl_tasks: make(chan string),
		seen: map[string]bool{},
		seen_lock: sync.Mutex{},
	}
	go scheduler.Schedule(domain, htmlDir)	// 调度爬虫爬取
	scheduler.Start(startUrls)		// 开始爬取
}

这个调度器的思路很简单:先将初始url加入到任务channel中,与此同时并发的scheduler.Schedule方法中会不断channel取出url,对每一个url开启一个爬虫goroutine进行爬取,这些爬虫goroutine是一次性的不会复用的,也就是说,爬完一个url后一个爬虫goroutine就会被销毁,对于爬取新的url会由调度器生成新的goroutine去做(这在多线程编程中是不能接受的,因为频繁的创建线程和销毁线程是对资源的一种浪费,但是goroutine协程则不一样,因为一个goroutine占的内存很小,而且创建和销毁也比线程的创建和销毁快,所以可以这样做)。

可是这个程序的一个问题是:main goroutine调用Run开始爬的时候没有等待爬虫爬取完url就马上结束了。我们缺少了一个类似于wait的方法来让main goroutine等待所有子协程的运行结束。

为此,我们可以用一个变量来记录子协程的数量。当子协程的数量大于0则阻塞main goroutine,直到达到0而且是一段时间内多次检验达到0(因为这个例子中,子协程会一边随着爬取结束被销毁一边又随着新url的加入而创建,所以goroutine的数量可能会多次达到0又多次增加为大于0),如果一段时间内多次检验达到0说明整个域名下的url都爬取完毕,可以结束goroutine了。

但是这个变量会在多个goroutine中被修改,这意味着不对其加锁的话会造成这个变量被改乱,而且这个变量会作为一个阻塞main routine的判断条件,需要在这个变量大于0时阻塞主协程,因此我们可以用条件变量来保护这个变量。

 

下面这是第二版的调度器

type GoroutineCounter int		// 新建类型goroutine计数器
type Scheduler struct{
	crawl_tasks chan string			// 存放要爬取的url的channel
	seen map[string]bool		// 记录爬取过的url
	seen_lock sync.Mutex		// 修改seen时加锁,防止seen在并发过程中被改乱
	counter GoroutineCounter		// 记录调度器中子协程的数量
	counter_cond *sync.Cond    // 条件变量
}

func (gc *GoroutineCounter) Add(cond *sync.Cond) {
	cond.L.Lock()		// 修改gc前要加锁(使用的是条件变量内含的锁)
	*gc++
	cond.L.Unlock()
}

func (gc *GoroutineCounter) Done(cond *sync.Cond) {
	cond.L.Lock()		// 修改gc前要加锁
	*gc--
	if *gc == 0 {	// 如果此时所有工作的子协程为0,则唤醒条件变量(Add中不用检查gc是否为0,因为gc增加后不可能变为0,只有gc减少后才可能变为0)
		cond.Signal()
	}
	cond.L.Unlock()
}

// 开始爬取   !!!!!!!! 一定要用 *Scheduler 而非Scheduler作为接收器,否则就给自己挖了一个巨坑了
func (s *Scheduler) Start(startUrls []string) {
	s.counter.Add(s.counter_cond)
	for _, startUrl := range startUrls {
		s.crawl_tasks <- startUrl
	}
}

// 调度器
func (s *Scheduler) Schedule(domain, htmlDir string) {
	// [1] 请不要在这里对s.counter加一,而是在下面的for中加一。目的是不让s.counter的加和减对称的进行,否则可能会大大增加程序结束前s.counter为0的次数让main goroutine在不该结束的时候提早结束
	for url := range s.crawl_tasks {
		go func(url string) {
			defer func(url string) {
				s.counter.Done(s.counter_cond)
			}(url)

			// 校验url是否已经爬取过
			s.seen_lock.Lock()
			_, ok := s.seen[url]
			s.seen_lock.Unlock()
			if ok {
				return
			}

			// 爬取url
			s.seen_lock.Lock()
			s.seen[url] = true		// 添加已爬取url要放在真正爬之前
			s.seen_lock.Unlock()
			cont, err := Crawl(url)
			if err != nil {
				fmt.Println(err)
			}

			// 保存html文件
			func() {
				if err := Download(cont, url, htmlDir); err != nil {
					fmt.Println(err)
				}
			}()

			// 解析新url,将新生成的url发送到channel
			for new_url := range ParseUrl(cont, domain){
				s.counter.Add(s.counter_cond)
				go func() {s.crawl_tasks <- new_url}()
			}
		}(url)
	}
}

// 阻塞等待Schedule方法执行完毕
func (s *Scheduler) Wait(maxZeroTimes int){
	zeroTimes := 0
	s.counter_cond.L.Lock()		// 判断(读取)条件变量要加锁(而且调用条件变量对象的Wait方法前必须加锁,否则会报错)
	for {
		if s.counter > 0 {		// 如果当前子协程数大于0,说明爬虫还没有爬完所有url,所以阻塞
			zeroTimes = 0
			s.counter_cond.Wait()
		}else {		// 如果当前协程等于0, 可能爬虫之后还会生成新的url,所以保险起见先不结束主协程,而是多验证验证几次,看看s.counter之后会不会又从0变为大于0
			if zeroTimes >= maxZeroTimes{	// 如果连续 maxZeroTimes 次出现 s.counter 为0,我们就相信爬虫真的已经爬完了,不会再生成新的子协程了
				break
			}
			zeroTimes++
			s.counter_cond.L.Unlock()	// 在Sleep之前要释放锁,否则其他子协程无法对s.counter进行修改
			time.Sleep(3 * time.Second / 10)
			s.counter_cond.L.Lock()
		}
	}
	s.counter_cond.L.Unlock()
	close(s.crawl_tasks)		// 记得关闭一下channel,不过不关也没关系,程序能走到这里说明main goroutine要结束了,channel会随着程序结束而回收
}

// 运行爬虫
func Run(startUrls []string, domain, htmlDir string){
	scheduler := Scheduler{
		crawl_tasks: make(chan string),
		seen: map[string]bool{},
		seen_lock: sync.Mutex{},
		counter_cond: sync.NewCond(&sync.Mutex{}),
	}
	go scheduler.Schedule(domain, htmlDir)	// 调度爬虫爬取
	scheduler.Start(startUrls)		// 开始爬取
	scheduler.Wait(10)	// 等待所有Schedule内的爬取子协程结束,当连续10次(而不是总共10次)检测到子协程的个数为0时就会被唤醒,结束main goroutine
}

 

有一个很重要的点:Scheduler类型的所有方法都要用指针*Scheduler作为接收器而不能用一个值作为接收器,他们的区别是:(其实接收器这个形参也是方法的一个参数,传入一个参数到函数中函数就会拷贝这个变量)前者会拷贝指针,然后这个指针和Run中的scheduler指针共享一个Scheduler类型结构体;后者则会拷贝一个完全一样的结构体,Run中的scheduler变量和方法中的s变量是两个独立个体。这意味着,后者在方法中对s的修改无法影响到Run中的scheduler,因此你即使在Schedule()方法中调用 s.counter.Add 方法对counter属性+1,你在Run中检测 s.counter 得到的值还是0,因为Run()中的s变量和Schedule()中的s变量是两个不同的Scheduler变量,他们在两块内存中。所以Run检测到s.counter永远都是0,程序在3秒内(0.3 * 10)就结束了。

同样的道理,GroutineCounter 中的方法接收器也要是指针,否则也会有这样的问题。

用指针还是用变量值作为接收器的区别在于:前者在方法中对对象作出的改变会影响到方法外的该对象,后者则不会影响。

还有就是将一个条件变量对象传给Add() Done() 方法的时候必须要传入指针,否则会拷贝一个条件变量,多个Add之间和Done之间都是用一个独立的条件变量,那就无法达到同步的作用了。还有就是不建议将同步机制的相关变量(如锁,条件变量)作为新类型的成员,而是单独提出来作为包级变量,而且操作这些同步机制变量的时候我们尽可能去操作其指针而不是其结构体本身,这些都是为了防止在操作中(无意识的)拷贝了一份新的锁和条件变量导致同步失败。

现在基本上没有问题了,但是我们可以稍微优化一下:下载html文件是一个磁盘io的过程,也会发生阻塞,为了提高效率我们也可以将这个过程与爬虫goroutine的请求过程并发进行(Schedule()方法中下载html的那段代码改为)

// 单独开一个协程保存html文件
s.counter.Add(s.counter_cond)
go func() {
    defer s.counter.Done(s.counter_cond)
    if err := Download(cont, url, htmlDir); err != nil {
        fmt.Println(err)
    }
}()

最后,我们的这个并发爬取是没有限速的,假如一个页面中有100url,我瞬间解析了100个页面,就意味着解析出来1万条url,这1万条url会一瞬间加入到crawl_tasks这个channel中,并且被Schedule()方法瞬间生成1万个爬虫goroutine。这会导致很多问题,无论是对 客户端还是服务端。对客户端而言:一次性创建了太多网络连接,超过了每一个进程的打开文件数限制;CPU核心数会限制你的计算负载;硬盘转轴和磁头数限制了你的本地磁盘IO操作频率;网络带宽限制了你的下载速度上限。

对于服务端而已就更不用说了,过多的并发连接和请求会让服务端的访问卡顿甚至崩溃。

因此,我们可以用一个有容量限制的buffered channel来控制并发请求的数量,这里我们只需要控制并发请求的个数而不是协程的个数。但是前者会间接的限制后者让goroutine协程的个数也得到控制。

改进的代码如下:

type Scheduler struct{
	// ...
	limitSpeed chan struct{}        // 添加一个用于限速的limitSpeed成员channel
}

然后初始化调度器对象的时候:

scheduler := Scheduler{
    // ...
    limitSpeed: make(chan struct{}, 20),	// 让limitSpeed的容量为20
}

 

并且,在爬虫协程中用这个channel进行限速:

s.limitSpeed <- struct{}{}
cont, err := Crawl(url)
<- s.limitSpeed

现在这个程序的并发请求量从无限大变成了20

 

这里要注意的是,我们平时接收channel和发送信号到channel都是在不同的goroutine进行的。但是这个程序中 limitSpeed的接收和发送都在同一个goroutine中,这种情况下channel其实充当的是一个类似于pythonsemphore信号量的作用,可以限制一种资源的并发数量。所以gochannel很灵活,几乎可以替代所有常规并发同步机制。

 

很可惜的是,这个程序在逻辑上没有发现什么错误,我在3台机器上跑了这段程序爬取我自己的博客网站,一共207个详情页html文件。

在家里的机器(4 核),花了23~25秒的时间,成功下载到207html文件。

在笔记本电脑( 6核),花了17~19秒的时间,却只成功爬取和下载到160~180html文件。

在自己的腾讯云服务器上(1核),只能爬到56html(很明显,这是失败的)

虽然上面的代码运行上有些问题,但是整体的思路是没错的。

 

最后经过重新整理,修改为最终完美版本的调度器代码,这次就没有出现上面的无法爬取完所有url的问题。

package main

import (
	"fmt"
	"os"
	"spider"
	"sync"
	"time"
)

type GoroutineCounter struct{
    number int
    cond *sync.Cond
}

var crawl_tasks chan string
var limitSpeed chan struct{}
var seen map[string]bool
var seen_lock sync.Mutex
var counter *GoroutineCounter

func (gc *GoroutineCounter) Add(n int) {
    gc.cond.L.Lock()
    gc.number += n
    gc.cond.L.Unlock()
}

func (gc *GoroutineCounter) Done() {
    gc.cond.L.Lock()
    gc.number--
    if gc.number <=0 {
        gc.cond.Signal()
    }
    gc.cond.L.Unlock()
}

func (gc *GoroutineCounter) Wait(maxZeroTimes int) {
    time.Sleep(1 * time.Second)
    curZeroTimes := 0
    gc.cond.L.Lock()
    defer func () {
        gc.cond.L.Unlock()
    }()
    for {
        if gc.number > 0 {
            curZeroTimes = 0
            gc.cond.Wait()
        }else{
            if curZeroTimes >= maxZeroTimes{
                break
            }
            curZeroTimes++
            time.Sleep(3 * time.Second / 10)
        }
    }
}

func main() {
	defer func (st time.Time) {
		fmt.Println(float64(time.Now().UnixNano() - st.UnixNano()) / 1e9)
	}(time.Now())

	startUrls := os.Args[1:]
	domain := startUrls[0]		// 必须以根域名为第一参,如 http://www.baidu.com/
	htmlDir := "./html"

	crawl_tasks = make(chan string, 1000)
	seen = map[string]bool{}
		counter = &GoroutineCounter{number: 0, cond: sync.NewCond(&sync.Mutex{}),}

	go func() {
		for _, startUrl := range startUrls {
			// 将初始url假如到channel
			counter.Add(1)
			crawl_tasks <- startUrl
		}
	}()

	// 控制main goroutine的结束(如果没有这个goroutine的话,main goroutine在爬取完成后是会被crawl_tasks的接收一直阻塞的)
	go func(c *GoroutineCounter) {
		c.Wait(10)

		// 关闭 crawl_tasks 就可以终止crawl_tasks的接收了
		close(crawl_tasks)
	}(counter)

	// 开始爬取
	for url := range crawl_tasks {
		go func(url string) {
			defer counter.Done()
			// 判断是否已经爬取过url
			seen_lock.Lock()
			crawled := seen[url]
			seen_lock.Unlock()
			if crawled {
				return
			}

			// 没有爬过
			seen_lock.Lock()
			seen[url] = true
			seen_lock.Unlock()

			limitSpeed<- struct{}{}
			cont,err := spider.Crawl(url)
			if err != nil {
				fmt.Println(err)
			}
			<-limitSpeed

			// 解析url
			new_urls := spider.ParseUrl(cont,domain)

			// 生成新任务。生成新的goroutine做这件事是因为防止添加添加新任务时阻塞住爬虫goroutine
			go func(new_urls map[string]struct{}){
				counter.Add(len(new_urls))
				for new_url := range new_urls {
					crawl_tasks <- new_url
				}
			}(new_urls)

			// 下载cont内容
			spider.Download(cont, url, htmlDir)
		}(url)
	}
}

最后再强调一次:所有的结构体类型,锁,条件变量,都要用指针操作和传递!

如果您需要转载,可以点击下方按钮可以进行复制粘贴;本站博客文章为原创,请转载时注明以下信息

张柏沛IT技术博客 > Go入门系列(十五) go并发编程之Goroutine与channel(中)

热门推荐