本章主要内容
在第6章中,我们学习了什么是并发,通道是如何工作的,并学习了可以实际工作的并发代码。本章将通过学习更多代码来扩展这些知识。我们会学习3个可以在实际工程里使用的包,这3个包分别实现了不同的并发模式。每个包从一个实用的视角来讲解如何使用并发和通道。我们会学习如何用这个包简化并发程序的编写,以及为什么能简化的原因。
runner包用于展示如何使用通道来监视程序的执行时间,如果程序运行时间太长,也可以用runner包来终止程序。当开发需要调度后台处理任务的程序的时候,这种模式会很有用。这个程序可能会作为cron作业执行,或者在基于定时任务的云环境(如iron.io)里执行。
让我们来看一下runner包里的runner.go代码文件,如代码清单7-1所示。
代码清单7-1 runner/runner.go
01 // Gabriel Aszalos协助完成了这个示例
02 // runner包管理处理任务的运行和生命周期
03 package runner
04
05 import (
06 "errors"
07 "os"
08 "os/signal"
09 "time"
10 )
11
12 // Runner在给定的超时时间内执行一组任务,
13 // 并且在操作系统发送中断信号时结束这些任务
14 type Runner struct {
15 // interrupt通道报告从操作系统
16 // 发送的信号
17 interrupt chan os.Signal
18
19 // complete通道报告处理任务已经完成
20 complete chan error
21
22 // timeout报告处理任务已经超时
23 timeout <-chan time.Time
24
25 // tasks持有一组以索引顺序依次执行的
26 // 函数
27 tasks []func(int)
28 }
29
30 // ErrTimeout会在任务执行超时时返回
31 var ErrTimeout = errors.New("received timeout")
32
33 // ErrInterrupt会在接收到操作系统的事件时返回
34 var ErrInterrupt = errors.New("received interrupt")
35
36 // New返回一个新的准备使用的Runner
37 func New(d time.Duration) *Runner {
38 return &Runner{
39 interrupt: make(chan os.Signal, 1),
40 complete: make(chan error),
41 timeout: time.After(d),
42 }
43 }
44
45 // Add将一个任务附加到Runner上。这个任务是一个
46 // 接收一个int类型的ID作为参数的函数
47 func (r *Runner) Add(tasks ...func(int)) {
48 r.tasks = append(r.tasks, tasks...)
49 }
50
51 // Start执行所有任务,并监视通道事件
52 func (r *Runner) Start() error {
53 // 我们希望接收所有中断信号
54 signal.Notify(r.interrupt, os.Interrupt)
55
56 // 用不同的goroutine执行不同的任务
57 go func() {
58 r.complete <- r.run()
59 }()
60
61 select {
62 // 当任务处理完成时发出的信号
63 case err := <-r.complete:
64 return err
65
66 // 当任务处理程序运行超时时发出的信号
67 case <-r.timeout:
68 return ErrTimeout
69 }
70 }
71
72 // run执行每一个已注册的任务
73 func (r *Runner) run() error {
74 for id, task := range r.tasks {
75 // 检测操作系统的中断信号
76 if r.gotInterrupt() {
77 return ErrInterrupt
78 }
79
80 // 执行已注册的任务
81 task(id)
82 }
83
84 return nil
85 }
86
87 // gotInterrupt验证是否接收到了中断信号
88 func (r *Runner) gotInterrupt() bool {
89 select {
90 // 当中断事件被触发时发出的信号
91 case <-r.interrupt:
92 // 停止接收后续的任何信号
93 signal.Stop(r.interrupt)
95 return true
96
97 // 继续正常运行
98 default:
99 return false
100 }
101 }代码清单7-1 中的程序展示了依据调度运行的无人值守的面向任务的程序,及其所使用的并发模式。在设计上,可支持以下终止点:
让我们走查一遍代码,看看每个终止点是如何实现的,如代码清单7-2所示。
代码清单7-2 runner/runner.go:第12行到第28行
12 // Runner在给定的超时时间内执行一组任务,
13 // 并且在操作系统发送中断信号时结束这些任务
14 type Runner struct {
15 // interrupt通道报告从操作系统
16 // 发送的信号
17 interrupt chan os.Signal
18
19 // complete通道报告处理任务已经完成
20 complete chan error
21
22 // timeout报告处理任务已经超时
23 timeout <-chan time.Time
24
25 // tasks持有一组以索引顺序依次执行的
26 // 函数
27 tasks []func(int)
28 }代码清单7-2从第14行声明Runner结构开始。这个类型声明了3个通道,用来辅助管理程序的生命周期,以及用来表示顺序执行的不同任务的函数切片。
第17行的interrupt通道收发os.Signal接口类型的值,用来从主机操作系统接收中断事件。os.Signal接口的声明如代码清单7-3所示。
代码清单7-3 golang.org/pkg/os/#Signal
// Signal用来描述操作系统发送的信号。其底层实现通常会
// 依赖操作系统的具体实现:在UNIX系统上是
// syscall.Signal
type Signal interface {
String() string
Signal()//用来区分其他Stringer
}代码清单7-3展示了os.Signal接口的声明。这个接口抽象了不同操作系统上捕获和报告信号事件的具体实现。
第二个字段被命名为complete,是一个收发error接口类型值的通道,如代码清单7-4所示。
代码清单7-4 runner/runner.go:第19行到第20行
19 // complete通道报告处理任务已经完成
20 complete chan error这个通道被命名为complete,因为它被执行任务的goroutine用来发送任务已经完成的信号。如果执行任务时发生了错误,会通过这个通道发回一个error接口类型的值。如果没有发生错误,会通过这个通道发回一个nil值作为error接口值。
第三个字段被命名为timeout,接收time.Time值,如代码清单7-5所示。
代码清单7-5 runner/runner.go:第22行到第23行
22 // timeout报告处理任务已经超时
23 timeout <-chan time.Time这个通道用来管理执行任务的时间。如果从这个通道接收到一个time.Time的值,这个程序就会试图清理状态并停止工作。
最后一个字段被命名为tasks,是一个函数值的切片,如代码清单7-6所示。
代码清单7-6 runner/runner.go:第25行到第27行
25 // tasks持有一组以索引顺序依次执行的
26 // 函数
27 tasks []func(int)这些函数值代表一个接一个顺序执行的函数。会有一个与main函数分离的goroutine来执行这些函数。
现在已经声明了Runner类型,接下来看一下两个error接口变量,这两个变量分别代表不同的错误值,如代码清单7-7所示。
代码清单7-7 runner/runner.go:第30行到第34行
30 // ErrTimeout会在任务执行超时时返回
31 var ErrTimeout = errors.New("received timeout")
32
33 // ErrInterrupt会在接收到操作系统的事件时返回
34 var ErrInterrupt = errors.New("received interrupt")第一个error接口变量名为ErrTimeout。这个错误值会在收到超时事件时,由Start方法返回。第二个error接口变量名为ErrInterrupt。这个错误值会在收到操作系统的中断事件时,由Start方法返回。
现在我们来看一下用户如何创建一个Runner类型的值,如代码清单7-8所示。
代码清单7-8 runner/runner.go:第36行到第43行
36 // New返回一个新的准备使用的Runner
37 func New(d time.Duration) *Runner {
38 return &Runner{
39 interrupt: make(chan os.Signal, 1),
40 complete: make(chan error),
41 timeout: time.After(d),
42 }
43 }代码清单7-8 展示了名为New的工厂函数。这个函数接收一个time.Duration类型的值,并返回Runner类型的指针。这个函数会创建一个Runner类型的值,并初始化每个通道字段。因为task字段的零值是nil,已经满足初始化的要求,所以没有被明确初始化。每个通道字段都有独立的初始化过程,让我们探究一下每个字段的初始化细节。
通道interrupt被初始化为缓冲区容量为1的通道。这可以保证通道至少能接收一个来自语言运行时的os.Signal值,确保语言运行时发送这个事件的时候不会被阻塞。如果goroutine没有准备好接收这个值,这个值就会被丢弃。例如,如果用户反复敲 Ctrl+C组合键,程序只会在这个通道的缓冲区可用的时候接收事件,其余的所有事件都会被丢弃。
通道complete被初始化为无缓冲的通道。当执行任务的goroutine完成时,会向这个通道发送一个error类型的值或者nil值。之后就会等待main函数接收这个值。一旦main接收了这个error值,goroutine就可以安全地终止了。
最后一个通道timeout是用time包的After函数初始化的。After函数返回一个time.Time类型的通道。语言运行时会在指定的duration时间到期之后,向这个通道发送一个time.Time的值。
现在知道了如何创建并初始化一个Runner值,我们再来看一下与Runner类型关联的方法。第一个方法Add用来增加一个要执行的任务函数,如代码清单7-9所示。
代码清单7-9 runner/runner.go:第45行到第49行
45 // Add将一个任务附加到Runner上。这个任务是一个
46 // 接收一个int类型的ID作为参数的函数
47 func (r *Runner) Add(tasks ...func(int)) {
48 r.tasks = append(r.tasks, tasks...)
49 }代码清单7-9展示了Add方法,这个方法接收一个名为tasks的可变参数。可变参数可以接受任意数量的值作为传入参数。这个例子里,这些传入的值必须是一个接收一个整数且什么都不返回的函数。函数执行时的参数tasks是一个存储所有这些传入函数值的切片。
现在让我们来看一下run方法,如代码清单7-10所示。
代码清单7-10 runner/runner.go:第72行到第85行
72 // run执行每一个已注册的任务
73 func (r *Runner) run() error {
74 for id, task := range r.tasks {
75 // 检测操作系统的中断信号
76 if r.gotInterrupt() {
77 return ErrInterrupt
78 }
79
80 // 执行已注册的任务
81 task(id)
82 }
83
84 return nil
85 }代码清单7-10的第73行的run方法会迭代tasks切片,并按顺序执行每个函数。函数会在第81行被执行。在执行之前,会在第76行调用gotInterrupt方法来检查是否有要从操作系统接收的事件。
代码清单7-11中的方法gotInterrupt展示了带default分支的select语句的经典用法。
代码清单7-11 runner/runner.go:第87行到第101行
87 // gotInterrupt验证是否接收到了中断信号
88 func (r *Runner) gotInterrupt() bool {
89 select {
90 // 当中断事件被触发时发出的信号
91 case <-r.interrupt:
92 // 停止接收后续的任何信号
93 signal.Stop(r.interrupt)
95 return true
96
97 // 继续正常运行
98 default:
99 return false
100 }
101 }在第91行,代码试图从interrupt通道去接收信号。一般来说,select语句在没有任何要接收的数据时会阻塞,不过有了第98行的default分支就不会阻塞了。default分支会将接收interrupt通道的阻塞调用转变为非阻塞的。如果interrupt通道有中断信号需要接收,就会接收并处理这个中断。如果没有需要接收的信号,就会执行default分支。
当收到中断信号后,代码会通过在第93行调用Stop方法来停止接收之后的所有事件。之后函数返回true。如果没有收到中断信号,在第99行该方法会返回false。本质上,gotInterrupt方法会让goroutine检查中断信号,如果没有发出中断信号,就继续处理工作。
这个包里的最后一个方法名为Start,如代码清单7-12所示。
代码清单7-12 runner/runner.go:第51行到第70行
51 // Start执行所有任务,并监视通道事件
52 func (r *Runner) Start() error {
53 // 我们希望接收所有中断信号
54 signal.Notify(r.interrupt, os.Interrupt)
55
56 // 用不同的goroutine执行不同的任务
57 go func() {
58 r.complete <- r.run()
59 }()
60
61 select {
62 // 当任务处理完成时发出的信号
63 case err := <-r.complete:
64 return err
65
66 // 当任务处理程序运行超时时发出的信号
67 case <-r.timeout:
68 return ErrTimeout
69 }
70 }方法Start实现了程序的主流程。在代码清单7-12的第52行,Start设置了gotInterrupt方法要从操作系统接收的中断信号。在第56行到第59行,声明了一个匿名函数,并单独启动goroutine来执行。这个goroutine会执行一系列被赋予的任务。在第58行,在goroutine的内部调用了run方法,并将这个方法返回的error接口值发送到complete通道。一旦error接口的值被接收,该goroutine就会通过通道将这个值返回给调用者。
创建goroutine后,Start进入一个select语句,阻塞等待两个事件中的任意一个。如果从complete通道接收到error接口值,那么该goroutine要么在规定的时间内完成了分配的工作,要么收到了操作系统的中断信号。无论哪种情况,收到的error接口值都会被返回,随后方法终止。如果从timeout通道接收到time.Time值,就表示goroutine没有在规定的时间内完成工作。这种情况下,程序会返回ErrTimeout变量。
现在看过了runner包的代码,并了解了代码是如何工作的,让我们看一下main.go代码文件中的测试程序,如代码清单7-13所示。
代码清单7-13 runner/main/main.go
01 // 这个示例程序演示如何使用通道来监视
02 // 程序运行的时间,以在程序运行时间过长
03 // 时如何终止程序
03 package main
04
05 import (
06 "log"
07 "time"
08
09 "github.com/goinaction/code/chapter7/patterns/runner"
10 )
11
12 // timeout规定了必须在多少秒内处理完成
13 const timeout = 3 * time.Second
14
15 // main是程序的入口
16 func main() {
17 log.Println("Starting work.")
18
19 // 为本次执行分配超时时间
20 r := runner.New(timeout)
21
22 // 加入要执行的任务
23 r.Add(createTask(), createTask(), createTask())
24
25 // 执行任务并处理结果
26 if err := r.Start(); err != nil {
27 switch err {
28 case runner.ErrTimeout:
29 log.Println("Terminating due to timeout.")
30 os.Exit(1)
31 case runner.ErrInterrupt:
32 log.Println("Terminating due to interrupt.")
33 os.Exit(2)
34 }
35 }
36
37 log.Println("Process ended.")
38 }
39
40 // createTask返回一个根据id
41 // 休眠指定秒数的示例任务
42 func createTask() func(int) {
43 return func(id int) {
44 log.Printf("Processor - Task #%d.", id)
45 time.Sleep(time.Duration(id) * time.Second)
46 }
47 }代码清单7-13的第16行是main函数。在第20行,使用timeout作为超时时间传给New函数,并返回了一个指向Runner类型的指针。之后在第23行,使用createTask函数创建了几个任务,并被加入Runner里。在第42行声明了createTask函数。这个函数创建的任务只是休眠了一段时间,用来模拟正在进行工作。增加完任务后,在第26行调用了Start方法,main函数会等待Start方法的返回。
当Start返回时,会检查其返回的error接口值,并存入err变量。如果确实发生了错误,代码会根据err变量的值来判断方法是由于超时终止的,还是由于收到了中断信号终止。如果没有错误,任务就是按时执行完成的。如果执行超时,程序就会用错误码1终止。如果接收到中断信号,程序就会用错误码2终止。其他情况下,程序会使用错误码0正常终止。
本章会介绍pool包
。这个包用于展示如何使用有缓冲的通道实现资源池,来管理可以在任意数量的goroutine之间共享及独立使用的资源。这种模式在需要共享一组静态资源的情况(如共享数据库连接或者内存缓冲区)下非常有用。如果goroutine需要从池里得到这些资源中的一个,它可以从池里申请,使用完后归还到资源池里。
让我们看一下pool包里的 pool.go 代码文件,如代码清单7-14所示。
代码清单7-14 pool/pool.go
01 // Fatih Arslan和Gabriel Aszalos协助完成了这个示例
02 // 包pool管理用户定义的一组资源
03 package pool
04
05 import (
06 "errors"
07 "log"
08 "io"
09 "sync"
10 )
11
12 // Pool管理一组可以安全地在多个goroutine间
13 // 共享的资源。被管理的资源必须
14 // 实现io.Closer接口
15 type Pool struct {
16 m sync.Mutex
17 resources chan io.Closer
18 factory func() (io.Closer, error)
19 closed bool
20 }
21
22 // ErrPoolClosed表示请求(Acquire)了一个
23 // 已经关闭的池
24 var ErrPoolClosed = errors.New("Pool has been closed.")
25
26 // New创建一个用来管理资源的池。
27 // 这个池需要一个可以分配新资源的函数,
28 // 并规定池的大小
29 func New(fn func() (io.Closer, error), size uint) (*Pool, error) {
30 if size <= 0 {
31 return nil, errors.New("Size value too small.")
32 }
33
34 return &Pool{
35 factory: fn,
36 resources: make(chan io.Closer, size),
37 }, nil
38 }
39
40 // Acquire从池中获取一个资源
41 func (p *Pool) Acquire() (io.Closer, error) {
42 select {
43 // 检查是否有空闲的资源
44 case r, ok := <-p.resources:
45 log.Println("Acquire:", "Shared Resource")
46 if !ok {
47 return nil, ErrPoolClosed
48 }
49 return r, nil
50
51 // 因为没有空闲资源可用,所以提供一个新资源
52 default:
53 log.Println("Acquire:", "New Resource")
54 return p.factory()
55 }
56 }
57
58 // Release将一个使用后的资源放回池里
59 func (p *Pool) Release(r io.Closer) {
60 // 保证本操作和Close操作的安全
61 p.m.Lock()
62 defer p.m.Unlock()
63
64 // 如果池已经被关闭,销毁这个资源
65 if p.closed {
66 r.Close()
67 return
68 }
69
70 select {
71 // 试图将这个资源放入队列
72 case p.resources <- r:
73 log.Println("Release:", "In Queue")
74
75 // 如果队列已满,则关闭这个资源
76 default:
77 log.Println("Release:", "Closing")
78 r.Close()
79 }
80 }
81
82 // Close会让资源池停止工作,并关闭所有现有的资源
83 func (p *Pool) Close() {
84 // 保证本操作与Release操作的安全
85 p.m.Lock()
86 defer p.m.Unlock()
87
88 // 如果pool已经被关闭,什么也不做
89 if p.closed {
90 return
91 }
92
93 // 将池关闭
94 p.closed = true
95
96 // 在清空通道里的资源之前,将通道关闭
97 // 如果不这样做,会发生死锁
98 close(p.resources)
99
100 // 关闭资源
101 for r := range p.resources {
102 r.Close()
103 }
104 }代码清单7-14中的pool包的代码声明了一个名为Pool的结构,该结构允许调用者根据所需数量创建不同的资源池。只要某类资源实现了io.Closer接口,就可以用这个资源池来管理。让我们看一下Pool结构的声明,如代码清单7-15所示。
代码清单7-15 pool/pool.go:第12行到第20行
12 // Pool管理一组可以安全地在多个goroutine间
13 // 共享的资源。被管理的资源必须
14 // 实现io.Closer接口
15 type Pool struct {
16 m sync.Mutex
17 resources chan io.Closer
18 factory func() (io.Closer, error)
19 closed bool
20 }Pool结构声明了4个字段,每个字段都用来辅助以goroutine安全的方式来管理资源池。在第16行,结构以一个sync.Mutex类型的字段开始。这个互斥锁用来保证在多个goroutine访问资源池时,池内的值是安全的。第二个字段名为resources,被声明为io.Closer接口类型的通道。这个通道是作为一个有缓冲的通道创建的,用来保存共享的资源。由于通道的类型是一个接口,所以池可以管理任意实现了io.Closer接口的资源类型。
factory字段是一个函数类型。任何一个没有输入参数且返回一个io.Closer和一个error接口值的函数,都可以赋值给这个字段。这个函数的目的是,当池需要一个新资源时,可以用这个函数创建。这个函数的实现细节超出了pool包的范围,并且需要由包的使用者实现并提供。
第19行中的最后一个字段是closed字段。这个字段是一个标志,表示Pool是否已经被关闭。现在已经了解了Pool结构的声明,让我们看一下第24行声明的error接口变量,如代码清单7-16所示。
代码清单7-16 pool/pool.go:第22行到第24行
22 // ErrPoolClosed表示请求(Acquire)了一个
23 // 已经关闭的池
24 var ErrPoolClosed = errors.New("Pool has been closed.")Go语言里会经常创建error接口变量。这可以让调用者来判断某个包里的函数或者方法返回的具体的错误值。当调用者对一个已经关闭的池调用Acquire方法时,会返回代码清单7-16里的error接口变量。因为Acquire方法可能返回多个不同类型的错误,所以Pool已经关闭时会关闭时返回这个错误变量可以让调用者从其他错误中识别出这个特定的错误。
既然已经声明了Pool类型和error接口值,我们就可以开始看一下pool包里声明的函数和方法了。让我们从池的工厂函数开始,这个函数名为New,如代码清单7-17所示。
代码清单7-17 pool/pool.go:第26行到第38行
26 // New创建一个用来管理资源的池。
27 // 这个池需要一个可以分配新资源的函数,
28 // 并规定池的大小
29 func New(fn func() (io.Closer, error), size uint) (*Pool, error) {
30 if size <= 0 {
31 return nil, errors.New("Size value too small.")
32 }
33
34 return &Pool{
35 factory: fn,
36 resources: make(chan io.Closer, size),
37 }, nil
38 }代码清单7-17中的New函数接受两个参数,并返回两个值。第一个参数fn声明为一个函数类型,这个函数不接受任何参数,返回一个io.Closer和一个error接口值。这个作为参数的函数是一个工厂函数,用来创建由池管理的资源的值。第二个参数size表示为了保存资源而创建的有缓冲的通道的缓冲区大小。
第30行检查了size的值,保证这个值不小于等于0。如果这个值小于等于0,就会使用nil值作为返回的pool指针值,然后为该错误创建一个error接口值。因为这是这个函数唯一可能返回的错误值,所以不需要为这个错误单独创建和使用一个error接口变量。如果能够接受传入的size,就会创建并初始化一个新的Pool值。在第35行,函数参数fn被赋值给factory字段,并且在第36行,使用size值创建有缓冲的通道。在return语句里,可以构造并初始化任何值。因此,第34行的return语句用指向新创建的Pool类型值的指针和nil值作为error接口值,返回给函数的调用者。
在创建并初始化Pool类型的值之后,接下来让我们来看一下Acquire方法,如代码清单7-18所示。这个方法可以让调用者从池里获得资源。
代码清单7-18 pool/pool.go:第40行到第56行
40 // Acquire从池中获取一个资源
41 func (p *Pool) Acquire() (io.Closer, error) {
42 select {
43 // 检查是否有空闲的资源
44 case r, ok := <-p.resources:
45 log.Println("Acquire:", "Shared Resource")
46 if !ok {
47 return nil, ErrPoolClosed
48 }
49 return r, nil
50
51 // 因为没有空闲资源可用,所以提供一个新资源
52 default:
53 log.Println("Acquire:", "New Resource")
54 return p.factory()
55 }
56 }代码清单7-18包含了Acquire方法的代码。这个方法在还有可用资源时会从资源池里返回一个资源,否则会为该调用创建并返回一个新的资源。这个实现是通过select/case语句来检查有缓冲的通道里是否还有资源来完成的。如果通道里还有资源,如第44行到第49行所写,就取出这个资源,并返回给调用者。如果该通道里没有资源可取,就会执行default分支。在这个示例中,在第54行执行用户提供的工厂函数,并且创建并返回一个新资源。
如果不再需要已经获得的资源,必须将这个资源释放回资源池里。这是Release方法的任务。不过在理解Release方法的代码背后的机制之前,我们需要先看一下Close方法,如代码清单7-19所示。
代码清单7-19 pool/pool.go:第82行到第104行
82 // Close会让资源池停止工作,并关闭所有现有的资源
83 func (p *Pool) Close() {
84 // 保证本操作与Release操作的安全
85 p.m.Lock()
86 defer p.m.Unlock()
87
88 // 如果pool已经被关闭,什么也不做
89 if p.closed {
90 return
91 }
92
93 // 将池关闭
94 p.closed = true
95
96 // 在清空通道里的资源之前,将通道关闭
97 // 如果不这样做,会发生死锁
98 close(p.resources)
99
100 // 关闭资源
101 for r := range p.resources {
102 r.Close()
103 }
104 }一旦程序不再使用资源池,需要调用这个资源池的Close方法。代码清单7-19中展示了Close方法的代码。在第98行到第101行,这个方法关闭并清空了有缓冲的通道,并将缓冲的空闲资源关闭。需要注意的是,在同一时刻只能有一个goroutine执行这段代码。事实上,当这段代码被执行时,必须保证其他goroutine中没有同时执行Release方法。你一会儿就会理解为什么这很重要。
在第85行到第86行,互斥量被加锁,并在函数返回时解锁。在第89行,检查closed标志,判断池是不是已经关闭。如果已经关闭,该方法会直接返回,并释放锁。如果这个方法第一次被调用,就会将这个标志设置为true,并关闭且清空resources通道。
现在我们可以看一下Release方法,看看这个方法是如何和Close方法配合的,如代码清单7-20所示。
代码清单7-20 pool/pool.go:第58行到第80行
58 // Release将一个使用后的资源放回池里
59 func (p *Pool) Release(r io.Closer) {
60 // 保证本操作和Close操作的安全
61 p.m.Lock()
62 defer p.m.Unlock()
63
64 // 如果池已经被关闭,销毁这个资源
65 if p.closed {
66 r.Close()
67 return
68 }
69
70 select {
71 // 试图将这个资源放入队列
72 case p.resources <- r:
73 log.Println("Release:", "In Queue")
74
75 // 如果队列已满,则关闭这个资源
76 default:
77 log.Println("Release:", "Closing")
78 r.Close()
79 }
80 }在代码清单7-20中可以找到Release方法的实现。该方法一开始在第61行和第62行对互斥量进行加锁和解锁。这和Close方法中的互斥量是同一个互斥量。这样可以阻止这两个方法在不同goroutine里同时运行。使用互斥量有两个目的。第一,可以保护第65行中读取closed标志的行为,保证同一时刻不会有其他goroutine调用Close方法写同一个标志。第二,我们不想往一个已经关闭的通道里发送数据,因为那样会引起崩溃。如果closed标志是true,我们就知道resources通道已经被关闭。
在第66行,如果池已经被关闭,会直接调用资源值r的Close方法。因为这时已经清空并关闭了池,所以无法将资源重新放回到该资源池里。对closed标志的读写必须进行同步,否则可能误导其他goroutine,让其认为该资源池依旧是打开的,并试图对通道进行无效的操作。
现在看过了池的代码,了解了池是如何工作的,让我们看一下main.go代码文件里的测试程序,如代码清单7-21所示。
代码清单7-21 pool/main/main.go
01 // 这个示例程序展示如何使用pool包
02 // 来共享一组模拟的数据库连接
03 package main
04
05 import (
06 "log"
07 "io"
08 "math/rand"
09 "sync"
10 "sync/atomic"
11 "time"
12
13 "github.com/goinaction/code/chapter7/patterns/pool"
14 )
15
16 const (
17 maxGoroutines = 25 // 要使用的goroutine的数量
18 pooledResources = 2 // 池中的资源的数量
19 )
20
21 // dbConnection模拟要共享的资源
22 type dbConnection struct {
23 ID int32
24 }
25
26 // Close实现了io.Closer接口,以便dbConnection
27 // 可以被池管理。Close用来完成任意资源的
28 // 释放管理
29 func (dbConn *dbConnection) Close() error {
30 log.Println("Close: Connection", dbConn.ID)
31 return nil
32 }
33
34 // idCounter用来给每个连接分配一个独一无二的id
35 var idCounter int32
36
37 // createConnection是一个工厂函数,
38 // 当需要一个新连接时,资源池会调用这个函数
39 func createConnection() (io.Closer, error) {
40 id := atomic.AddInt32(&idCounter, 1)
41 log.Println("Create: New Connection", id)
42
43 return &dbConnection{id}, nil
44 }
45
46 // main是所有Go程序的入口
47 func main() {
48 var wg sync.WaitGroup
49 wg.Add(maxGoroutines)
50
51 // 创建用来管理连接的池
52 p, err := pool.New(createConnection, pooledResources)
53 if err != nil {
54 log.Println(err)
55 }
56
57 // 使用池里的连接来完成查询
58 for query := 0; query < maxGoroutines; query++ {
59 // 每个goroutine需要自己复制一份要
60 // 查询值的副本,不然所有的查询会共享
61 // 同一个查询变量
62 go func(q int) {
63 performQueries(q, p)
64 wg.Done()
65 }(query)
66 }
67
68 // 等待goroutine结束
69 wg.Wait()
70
71 // 关闭池
72 log.Println("Shutdown Program.")
73 p.Close()
74 }
75
76 // performQueries用来测试连接的资源池
77 func performQueries(query int, p *pool.Pool) {
78 // 从池里请求一个连接
79 conn, err := p.Acquire()
80 if err != nil {
81 log.Println(err)
82 return
83 }
84
85 // 将该连接释放回池里
86 defer p.Release(conn)
87
88 // 用等待来模拟查询响应
89 time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
90 log.Printf("QID[%d] CID[%d]\n", query, conn.(*dbConnection).ID)
91 }代码清单7-21展示的main.go中的代码使用pool包来管理一组模拟数据库连接的连接池。代码一开始声明了两个常量maxGoroutines和pooledResource,用来设置goroutine的数量以及程序将要使用资源的数量。资源的声明以及io.Closer接口的实现如代码清单7-22所示。
代码清单7-22 pool/main/main.go:第21行到第32行
21 // dbConnection模拟要共享的资源
22 type dbConnection struct {
23 ID int32
24 }
25
26 // Close实现了io.Closer接口,以便dbConnection
27 // 可以被池管理。Close用来完成任意资源的
28 // 释放管理
29 func (dbConn *dbConnection) Close() error {
30 log.Println("Close: Connection", dbConn.ID)
31 return nil
32 }代码清单7-22展示了dbConnection结构的声明以及io.Closer接口的实现。dbConnection类型模拟了管理数据库连接的结构,当前版本只包含一个字段ID,用来保存每个连接的唯一标识。Close方法只是报告了连接正在被关闭,并显示出要关闭连接的标识。
接下来我们来看一下创建dbConnection值的工厂函数,如代码清单7-23所示。
代码清单7-23 pool/main/main.go:第34行到第44行
34 // idCounter用来给每个连接分配一个独一无二的id
35 var idCounter int32
36
37 // createConnection是一个工厂函数,
38 // 当需要一个新连接时,资源池会调用这个函数
39 func createConnection() (io.Closer, error) {
40 id := atomic.AddInt32(&idCounter, 1)
41 log.Println("Create: New Connection", id)
42
43 return &dbConnection{id}, nil
44 }代码清单7-23展示了createConnection函数的实现。这个函数给连接生成了一个唯一标识,显示连接正在被创建,并返回指向带有唯一标识的dbConnection类型值的指针。唯一标识是通过atomic.AddInt32函数生成的。这个函数可以安全地增加包级变量idCounter的值。现在有了资源以及工厂函数,我们可以配合使用pool包了。
接下来让我们看一下main函数的代码,如代码清单7-24所示。
代码清单7-24 pool/main/main.go:第48行到第55行
48 var wg sync.WaitGroup
49 wg.Add(maxGoroutines)
50
51 // 创建用来管理连接的池
52 p, err := pool.New(createConnection, pooledResources)
53 if err != nil {
54 log.Println(err)
55 }在第48行,main函数一开始就声明了一个WaitGroup值,并将WaitGroup的值设置为要创建的goroutine的数量。之后使用pool包里的New函数创建了一个新的Pool类型。工厂函数和要管理的资源的数量会传入New函数。这个函数会返回一个指向Pool值的指针,并检查可能的错误。现在我们有了一个Pool类型的资源池实例,就可以创建goroutine,并使用这个资源池在goroutine之间共享资源,如代码清单7-25所示。
代码清单7-25 pool/main/main.go:第57行到第66行
57 // 使用池里的连接来完成查询
58 for query := 0; query < maxGoroutines; query++ {
59 // 每个goroutine需要自己复制一份要
60 // 查询值的副本,不然所有的查询会共享
61 // 同一个查询变量
62 go func(q int) {
63 performQueries(q, p)
64 wg.Done()
65 }(query)
66 }代码清单7-25中用一个for循环创建要使用池的goroutine。每个goroutine调用一次performQueries函数然后退出。performQueries函数需要传入一个唯一的ID值用于做日志以及一个指向Pool的指针。一旦所有的goroutine都创建完成,main函数就等待所有goroutine执行完毕,如代码清单7-26所示。
代码清单7-26 pool/main/main.go:第68行到第73行
68 // 等待goroutine结束
69 wg.Wait()
70
71 // 关闭池
72 log.Println("Shutdown Program.")
73 p.Close()在代码清单7-26中,main函数等待WaitGroup实例的Wait方法执行完成。一旦所有goroutine都报告其执行完成,就关闭Pool,并且终止程序。接下来,让我们看一下performQueries函数。这个函数使用了池的Acquire方法和Release方法,如代码清单7-27所示。
代码清单7-27 pool/main/main.go:第76行到第91行
76 // performQueries用来测试连接的资源池
77 func performQueries(query int, p *pool.Pool) {
78 // 从池里请求一个连接
79 conn, err := p.Acquire()
80 if err != nil {
81 log.Println(err)
82 return
83 }
84
85 // 将该连接释放回池里
86 defer p.Release(conn)
87
88 // 用等待来模拟查询响应
89 time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
90 log.Printf("QID[%d] CID[%d]\n", query, conn.(*dbConnection).ID)
91 }代码清单7-27展示了performQueries的实现。这个实现使用了pool的Acquire方法和Release方法。这个函数首先调用了Acquire方法,从池里获得dbConnection。之后会检查返回的error接口值,在第86行,再使用defer语句在函数退出时将dbConnection释放回池里。在第89行和第90行,随机休眠一段时间,以此来模拟使用dbConnection工作时间。
work包的目的是展示如何使用无缓冲的通道来创建一个goroutine池,这些goroutine执行并控制一组工作,让其并发执行。在这种情况下,使用无缓冲的通道要比随意指定一个缓冲区大小的有缓冲的通道好,因为这个情况下既不需要一个工作队列,也不需要一组goroutine配合执行。无缓冲的通道保证两个goroutine之间的数据交换。这种使用无缓冲的通道的方法允许使用者知道什么时候goroutine池正在执行工作,而且如果池里的所有goroutine都忙,无法接受新的工作的时候,也能及时通过通道来通知调用者。使用无缓冲的通道不会有工作在队列里丢失或者卡住,所有工作都会被处理。
让我们来看一下work包里的work.go代码文件,如代码清单7-28所示。
代码清单7-28 work/work.go
01 // Jason Waldrip协助完成了这个示例
02 // work包管理一个goroutine池来完成工作
03 package work
04
05 import "sync"
06
07 // Worker必须满足接口类型,
08 // 才能使用工作池
09 type Worker interface {
10 Task()
11 }
12
13 // Pool提供一个goroutine池,这个池可以完成
14 // 任何已提交的Worker任务
15 type Pool struct {
16 work chan Worker
17 wg sync.WaitGroup
18 }
19
20 // New创建一个新工作池
21 func New(maxGoroutines int) *Pool {
22 p := Pool{
23 work: make(chan Worker),
24 }
25
26 p.wg.Add(maxGoroutines)
27 for i := 0; i < maxGoroutines; i++ {
28 go func() {
29 for w := range p.work {
30 w.Task()
31 }
32 p.wg.Done()
33 }()
34 }
35
36 return &p
37 }
38
39 // Run提交工作到工作池
40 func (p *Pool) Run(w Worker) {
41 p.work <- w
42 }
43
44 // Shutdown等待所有goroutine停止工作
45 func (p *Pool) Shutdown() {
46 close(p.work)
47 p.wg.Wait()
48 }代码清单7-28中展示的work包一开始声明了名为Worker的接口和名为Pool的结构,如代码清单7-29所示。
代码清单7-29 work/work.go:第07行到第18行
07 // Worker必须满足接口类型,
08 // 才能使用工作池
09 type Worker interface {
10 Task()
11 }
12
13 // Pool提供一个goroutine池,这个池可以完成
14 // 任何已提交的Worker任务
15 type Pool struct {
16 work chan Worker
17 wg sync.WaitGroup
18 }代码清单7-29的第09行中的Worker接口声明了一个名为Task的方法。在第15行,声明了名为Pool的结构,这个结构类型实现了goroutine池,并实现了一些处理工作的方法。这个结构类型声明了两个字段,一个名为work(一个Worker接口类型的通道),另一个名为wg的sync.WaitGroup类型。
接下来,让我们来看一下work包的工厂函数,如代码清单7-30所示。
代码清单7-30 work/work.go:第20行到第37行
20 // New创建一个新工作池
21 func New(maxGoroutines int) *Pool {
22 p := Pool{
23 work: make(chan Worker),
24 }
25
26 p.wg.Add(maxGoroutines)
27 for i := 0; i < maxGoroutines; i++ {
28 go func() {
29 for w := range p.work {
30 w.Task()
31 }
32 p.wg.Done()
33 }()
34 }
35
36 return &p
37 }代码清单7-30展示了New函数,这个函数使用固定数量的goroutine来创建一个工作池。goroutine的数量作为参数传给New函数。在第22行,创建了一个Pool类型的值,并使用无缓冲的通道来初始化work字段。
之后,在第26行,初始化WaitGroup需要等待的数量,并在第27行到第34行,创建了同样数量的goroutine。这些goroutine只接收Worker类型的接口值,并调用这个值的Task方法,如代码清单7-31所示。
代码清单7-31 work/work.go:第28行到第33行
28 go func() {
29 for w := range p.work {
30 w.Task()
31 }
32 p.wg.Done()
33 }()代码清单7-31里的for range循环会一直阻塞,直到从work通道收到一个Worker接口值。如果收到一个值,就会执行这个值的Task方法。一旦work通道被关闭,for range循环就会结束,并调用WaitGroup的Done方法。然后goroutine终止。
现在我们可以创建一个等待并执行工作的goroutine池了。让我们看一下如何向池里提交工作,如代码清单7-32所示。
代码清单7-32 work/work.go:第39行到第42行
39 // Run提交工作到工作池
40 func (p *Pool) Run(w Worker) {
41 p.work <- w
42 }代码清单7-32展示了Run方法。这个方法可以向池里提交工作。该方法接受一个Worker类型的接口值作为参数,并将这个值通过work通道发送。由于work通道是一个无缓冲的通道,调用者必须等待工作池里的某个goroutine接收到这个值才会返回。这正是我们想要的,这样可以保证调用的Run返回时,提交的工作已经开始执行。
在某个时间点,需要关闭工作池。这是Shutdown方法所做的事情,如代码清单7-33所示。
代码清单7-33 work/work.go:第44行到第48行
44 // Shutdown等待所有goroutine停止工作
45 func (p *Pool) Shutdown() {
46 close(p.work)
47 p.wg.Wait()
48 }代码清单7-33中的Shutdown方法做了两件事,首先,它关闭了work通道,这会导致所有池里的goroutine停止工作,并调用WaitGroup的Done方法;然后,Shutdown方法调用WaitGroup的Wait方法,这会让Shutdown方法等待所有goroutine终止。
我们看了work包的代码,并了解了它是如何工作的,接下来让我们看一下main.go源代码文件中的测试程序,如代码清单7-34所示。
代码清单7-34 work/main/main.go
01 // 这个示例程序展示如何使用work包
02 // 创建一个goroutine池并完成工作
03 package main
04
05 import (
06 "log"
07 "sync"
08 "time"
09
10 "github.com/goinaction/code/chapter7/patterns/work"
11 )
12
13 // names提供了一组用来显示的名字
14 var names = []string{
15 "steve",
16 "bob",
17 "mary",
18 "therese",
19 "jason",
20 }
21
22 // namePrinter使用特定方式打印名字
23 type namePrinter struct {
24 name string
25 }
26
27 // Task实现Worker接口
28 func (m *namePrinter) Task() {
29 log.Println(m.name)
30 time.Sleep(time.Second)
31 }
32
33 // main是所有Go程序的入口
34 func main() {
35 // 使用两个goroutine来创建工作池
36 p := work.New(2)
37
38 var wg sync.WaitGroup
39 wg.Add(100 * len(names))
40
41 for i := 0; i < 100; i++ {
42 // 迭代names切片
43 for _, name := range names {
44 // 创建一个namePrinter并提供
45 // 指定的名字
46 np := namePrinter{
47 name: name,
48 }
49
50 go func() {
51 // 将任务提交执行。当Run返回时
52 // 我们就知道任务已经处理完成
53 p.Run(&np)
54 wg.Done()
55 }()
56 }
57 }
58
59 wg.Wait()
60
61 // 让工作池停止工作,等待所有现有的
62 // 工作完成
63 p.Shutdown()
64 }代码清单7-34展示了使用work包来完成名字显示工作的测试程序。这段代码一开始在第14行声明了名为names的包级的变量,这个变量被声明为一个字符串切片。这个切片使用5个名字进行了初始化。然后声明了名为namePrinter的类型,如代码清单7-35所示。
代码清单7-35 work/main/main.go:第22行到第31行
22 // namePrinter使用特定方式打印名字
23 type namePrinter struct {
24 name string
25 }
26
27 // Task实现Worker接口
28 func (m *namePrinter) Task() {
29 log.Println(m.name)
30 time.Sleep(time.Second)
31 }在代码清单7-35的第23行,声明了namePrinter类型,接着是这个类型对Worker接口的实现。这个类型的工作任务是在显示器上显示名字。这个类型只包含一个字段,即name,它包含要显示的名字。Worker接口的实现Task函数用log.Println函数来显示名字,之后等待1 秒再退出。等待这1秒只是为了让测试程序运行的速度慢一些,以便看到并发的效果。
有了Worker接口的实现,我们就可以看一下main函数内部的代码了,如代码清单7-36所示。
代码清单7-36 work/main/main.go:第33行到第64行
33 // main是所有Go程序的入口
34 func main() {
35 // 使用两个goroutine来创建工作池
36 p := work.New(2)
37
38 var wg sync.WaitGroup
39 wg.Add(100 * len(names))
40
41 for i := 0; i < 100; i++ {
42 // 迭代names切片
43 for _, name := range names {
44 // 创建一个namePrinter并提供
45 // 指定的名字
46 np := namePrinter{
47 name: name,
48 }
49
50 go func() {
51 // 将任务提交执行。当Run返回时
52 // 我们就知道任务已经处理完成
53 p.Run(&np)
54 wg.Done()
55 }()
56 }
57 }
58
59 wg.Wait()
60
61 // 让工作池停止工作,等待所有现有的
62 // 工作完成
63 p.Shutdown()
64 }在代码清单7-36第36行,调用work包里的New函数创建一个工作池。这个调用传入的参数是2,表示这个工作池只会包含两个执行任务的goroutine。在第38行和第39行,声明了一个WaitGroup,并初始化为要执行任务的goroutine数。在这个例子里,names切片里的每个名字都会创建100个goroutine来提交任务。这样就会有一堆goroutine互相竞争,将任务提交到池里。
在第41行到第43行,内部和外部的for循环用来声明并创建所有的goroutine。每次内部循环都会创建一个namePrinter类型的值,并提供一个用来打印的名字。之后,在第50行,声明了一个匿名函数,并创建一个goroutine执行这个函数。这个goroutine会调用工作池的Run方法,将namePrinter的值提交到池里。一旦工作池里的goroutine接收到这个值,Run方法就会返回。这也会导致goroutine将WaitGroup的计数递减,并终止goroutine。
一旦所有的goroutine都创建完成,main函数就会调用WaitGroup的Wait方法。这个调用会等待所有创建的goroutine提交它们的工作。一旦Wait返回,就会调用工作池的Shutdown方法来关闭工作池。Shutdown方法直到所有的工作都做完才会返回。在这个例子里,最多只会等待两个工作的完成。
default分支的select语句可以用来尝试向通道发送或者接收数据,而不会阻塞。本书是以Go1.5版本为基础写作而成的。在Go1.6及之后的版本中,标准库里自带了资源池的实现(sync.Pool)。推荐使用。——译者注