Goroutine并发模式

学习

Goroutine 并发模式

1. runner

简介,使用通道监视程序执行时间,当开发需要调度后台处理任务的程序的时候,这个程序可能会座位 cron 作业执行,或者基于定时任务的云环境执行

调度运行无人值守的面向任务程序,支持以下:

  • 程序可以在分配的时间内完成工作,正常终止;
  • 程序没有及时完成工作,“自杀”
  • 接受到操作系统发送的中断事件,程序立刻试图清理状态并停止工作

调用 errors/ os/signal /time/ os 声明 interrupt,complete,timeout 三个通道

  1. interrupt chan os.Signal 从主机操作系统接收中断事件 signal 抽象了不同操作系统上捕获和报告信号事件的具体实现
  2. complete chan error,complete 收发 error 接口类型值,被执行任务的 goroutine 用来发送任务已完成信号,返回 error 接口类型值或 nil,声明 ErrTimeout 和 ErrInterrupt
  3. timeout <-chan time.Time,管理执行任务时间,收到值后会试图清理状态并停止工作
  4. tasks []func(int),tasks 是一个函数值切片,表示执行顺序

初始化过程 interrupt 缓冲区容量为 1 的通道,保证通道至少能接受一个来自语言运行时的 os.Signal 值,如果 goroutine 没有准备好接受这个值就会被丢弃,ctrl+c 程序只会在这个通道的缓冲区可用时接受事件,其余所有事件被丢弃

gotInterrupt 检查是否有要从操作系统接收的事件

没有任何要接收的数据返回 false,有中断信号接收则调用 signal.Stop()之后返回 true

主流程 Start,匿名函数启动 goroutine,将 run 的 error 接口值发到 complete 通道,如果 complete 接收到 error 表示中断或完成返回,否则超时返回

主函数 main,规定 timeout,加入执行任务,执行任务处理结果,返回一个根据 id 休眠指定秒数

主要负责处理超时/报错/加入多个任务

runner 展开代码
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package runner

import (
	"errors"
	"os"
	"os/signal"
	"time"
)

var ErrTimeOut = errors.New("执行者执行超时")
var ErrInterrupt = errors.New("执行者被中断")
/*
runner 可以执行任何程序,可以监控程序,可以发送信号终止程序
 */
type Runner struct {
	interrupt chan os.Signal   //发送的信号,用来终止程序
	complete  chan error       //用于通知任务全部完成
	timeout   <-chan time.Time //程序的超时时间
	tasks     []func(int)      //要执行的任务
}

//工厂函数
func New(tm time.Duration) *Runner {
	return &Runner{
		complete:  make(chan error), //无缓冲
		timeout:   time.After(tm),
		interrupt: make(chan os.Signal, 1), //有缓冲
	}
}

//将需要执行的任务,添加到Runner里
func (r *Runner) Add(tasks ...func(int)) {
	r.tasks = append(r.tasks, tasks...)
}

//执行任务,执行的过程中接收到中断信号时,返回中断错误
//如果任务全部执行完,还没有接收到中断信号,则返回nil
func (r *Runner) run() error {
	for id, task := range r.tasks {
		if r.isInterrupt() {
			return ErrInterrupt
		}
		task(id)
	}
	return nil
}

//检查是否接收到了中断信号
func (r *Runner) isInterrupt() bool {
	select {
	case <-r.interrupt:
		signal.Stop(r.interrupt)
		return true
	default:
		return false
	}
}
//开始执行所有任务,并且监视通道事件
func (r *Runner) Start() error {
	//希望接收哪些系统信号
	signal.Notify(r.interrupt, os.Interrupt)

	go func() {
		r.complete <- r.run()
	}()

	select {
	case err := <-r.complete:
		return err
	case <-r.timeout:
		return ErrTimeOut
	}
}
main 展开代码
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package main

import (
	"log"
	"os"
	"runner" //这个是写的runner所在包
	"time"
)

//必须在3内完成,超时时间
const timeout = 3 * time.Second

func main() {
	log.Println("开始工作")
	r := runner.New(timeout)                        //初始化runner
	r.Add(createTask(), createTask(), createTask()) //添加任务
	//执行任务并处理结果
	if err := r.Start(); err != nil {
		switch err {
		case runner.ErrTimeOut:
			log.Println("超时错误")
			os.Exit(1)
		case runner.ErrInterrupt:
			log.Println("中断错误")
			os.Exit(2)
		}
	}
	log.Print("工作完成")
}

func createTask() func(int) {
	return func(id int) {
		log.Printf("正在执行任务#%d ", id)
		time.Sleep(time.Duration(id) * time.Second)
	}
}

2. pool

1.6 后标准库自带资源池 sync.Pool

简介,如何使用有缓冲的通道实现资源池,管理任意数量 goroutine 之间共享及独立使用的资源 作用:共享数据库连接或者内存缓冲区,申请与归还资源到池中

调用 errors/log/io/sync

  1. m sync.Mutex,保证多个 goroutine 访问资源池时值安全
  2. resources chan io.Closer,有缓冲通道保存共享资源,池可以管理任意实现了 io.Closer 接口的资源类型
  3. factory func() (io.Closer, error),任何一个没有输入参数且返回一个 io.Closer 和 error 接口值的函数,当池需要一个新资源时可以用这个函数创建
  4. closed bool,表示池是否被关闭,ErrPoolClosed

New 工厂函数对接口进行初始化,fn 声明为一个函数类型不接受任何参数,返回一个 io.Closer 和一个 error 接口值,resources 创建一个有缓冲的管道,return 可以构造并初始化任何值

Acquire 方法在还有可用资源时从资源池返回一个资源,否则为该调用创建并返回一个新的资源,select/case 检查通道是否还有资源,有则返回资源,无则 default 工厂函数创建并返回新资源,如果不需要已经获得的资源要释放回资源池里

close 会让资源池停止工作,并关闭所有现有资源,将进程池上锁,检查进程池关闭,在清空通道资源前先关闭,否则发生死锁,关闭资源

Release 传入进程池,返回资源和 io.Closer,加锁保证本操作和 Close 操作的安全,如果池被关闭则关闭资源,select/case 尝试把资源放进队列,队列满则关闭资源

初始化过程,maxGoroutines 定义要使用的 goroutine 数量,pooledResources 定义池中资源数量,idCounter 分配连接 id,

主函数 main,创建管理连接的池,使用池里的连接来完成查询,查询值是副本,不然所有的查询会共享同一个查询变量,等待 goroutine 结束关闭池

管理池里的资源,负责获得/查询/关闭/连接资源

pool 展开代码
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package pool

import (
	"errors"
	"io"
	"log"
	"sync"
)

//一个安全的资源池,被管理的资源必须都实现io.Close接口
type Pool struct {
	m       sync.Mutex
	res     chan io.Closer
	factory func() (io.Closer, error)
	closed  bool
}

var ErrPoolClosed = errors.New("资源池已经被关闭。")

//创建一个资源池
func New(fn func() (io.Closer, error), size uint) (*Pool, error) {
	if size <= 0 {
		return nil, errors.New("size的值太小了。")
	}
	return &Pool{
		factory: fn,
		res:     make(chan io.Closer, size),
	}, nil
}
//从资源池里获取一个资源
func (p *Pool) Acquire() (io.Closer,error) {
	select {
	case r,ok := <-p.res:
		log.Println("Acquire:共享资源")
		if !ok {
			return nil,ErrPoolClosed
		}
		return r,nil
	default:
		log.Println("Acquire:新生成资源")
		return p.factory()
	}
}

//关闭资源池,释放资源
func (p *Pool) Close() {
	p.m.Lock()
	defer p.m.Unlock()

	if p.closed {
		return
	}

	p.closed = true

	//关闭通道,不让写入了
	close(p.res)

	//关闭通道里的资源
	for r:=range p.res {
		r.Close()
	}
}

func (p *Pool) Release(r io.Closer){
	//保证该操作和Close方法的操作是安全的
	p.m.Lock()
	defer p.m.Unlock()

	//资源池都关闭了,就省这一个没有释放的资源了,释放即可
	if p.closed {
		r.Close()
		return
	}

	select {
	case p.res <- r:
		log.Println("资源释放到池子里了")
	default:
		log.Println("资源池满了,释放这个资源吧")
		r.Close()
	}
}
main 展开代码
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package main

//1.6版本之后自带pool

import (
	"log"
	"math/rand"
	"sync"
	"sync/atomic"
	"time"
)

const (
	//模拟的最大goroutine
	maxGoroutine = 5
)

func main() {
	//等待任务完成
	var wg sync.WaitGroup
	wg.Add(maxGoroutine)

	p:=&sync.Pool{
		New:createConnection,
	}

	//模拟好几个goroutine同时使用资源池查询数据
	for query := 0; query < maxGoroutine; query++ {
		go func(q int) {
			dbQuery(q, p)
			wg.Done()
		}(query)
	}

	wg.Wait()
}

//模拟数据库查询
func dbQuery(query int, pool *sync.Pool) {
	conn:=pool.Get().(*dbConnection)

	defer pool.Put(conn)

	//模拟查询
	time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
	log.Printf("第%d个查询,使用的是ID为%d的数据库连接", query, conn.ID)
}
//数据库连接
type dbConnection struct {
	ID int32//连接的标志
}

//实现io.Closer接口
func (db *dbConnection) Close() error {
	log.Println("关闭连接", db.ID)
	return nil
}

var idCounter int32

//生成数据库连接的方法,以供资源池使用
func createConnection() interface{} {
	//并发安全,给数据库连接生成唯一标志
	id := atomic.AddInt32(&idCounter, 1)
	return &dbConnection{ID:id}
}

3. work

简介,使用无缓冲通道来创建一个 goroutine 池,保证两个 goroutine 间数据交换,不会卡住丢失

Pool 包含 Worker 接口和声明 sync wg New 使用固定数量的 goroutine 来新建一个工作池,创建一个 Pool 并用无缓冲通道初始化 work,创建同数 goroutine,只接收 worker 接口值并调用 Task 方法 循环先阻塞,直到将 worker 传到 work 通道中再执行 Task 方法 Run 方法向池里提交工作,调用者必须等待工作池某个 goroutine 收到这个值才会返回 最后再 shutdown 关闭 work 通道,调用 wg.wait

name 声明了名字切片,并声明 namePrinter 类,实现 Task 接口打印名字并等待一秒 调用 work 包的 New 函数创建工作池,包含两个执行任务的 goroutine,name 切片每个名字会创建 100 个 goroutine 提交任务 每次内部循环都会创建 namePrinter 类型值并提供一个打印名字,声明了一个匿名函数创建一个 goroutine 执行,这个 goroutine 调用 run 将值提交到池,run 方法再返回,wg 计数减少,最终 shutdown

worker 展开代码
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package work

import (
	"sync"
)

// 任务类型接口
type Worker interface {
	Task(goid int)
}

// 任务池
type Pool struct {
	work chan Worker
	wg   sync.WaitGroup
}

// 新建
func New(maxGoroutines int) *Pool {
	//任务池
	p := Pool{
		work: make(chan Worker),
	}
	p.wg.Add(maxGoroutines)
	//创建maxGoroutines个go协程
	for i := 0; i < maxGoroutines; i++ {
		go func(goid int) {
			//保证goroutine不停止执行通道中的任务
			for w := range p.work {
				w.Task(goid)
			}
			//每个goroutine不再执行work通道中任务时停止
			p.wg.Done()
		}(i)
	}
	return &p
}

// 运行
func (p *Pool) Run(r Worker) {
	p.work <- r
}

// 停止
func (p *Pool) Shutdown() {
	close(p.work)
	p.wg.Wait()
}
main 展开代码
最后更新于 05月19日 12点44分, 2026年