mirror of https://github.com/mindoc-org/mindoc.git
100 lines
1.8 KiB
Go
100 lines
1.8 KiB
Go
|
package gopool
|
||
|
|
||
|
import (
|
||
|
"sync"
|
||
|
"errors"
|
||
|
"fmt"
|
||
|
)
|
||
|
var (
|
||
|
HandlerIsExistErr = errors.New("指定的键已存在")
|
||
|
WorkerChanClosedErr = errors.New("队列已关闭")
|
||
|
)
|
||
|
type ChannelHandler func()
|
||
|
|
||
|
type entry struct {
|
||
|
handler ChannelHandler
|
||
|
key string
|
||
|
}
|
||
|
|
||
|
type ChannelPool struct {
|
||
|
maxWorkerNum int
|
||
|
maxPoolNum int
|
||
|
wait *sync.WaitGroup
|
||
|
cache *sync.Map
|
||
|
worker chan *entry
|
||
|
limit chan bool
|
||
|
isClosed bool
|
||
|
once *sync.Once
|
||
|
}
|
||
|
|
||
|
func NewChannelPool(maxWorkerNum, maxPoolNum int) (*ChannelPool) {
|
||
|
if maxWorkerNum <= 0 {
|
||
|
maxWorkerNum = 1
|
||
|
}
|
||
|
if maxPoolNum <= 0 {
|
||
|
maxWorkerNum = 100
|
||
|
}
|
||
|
return &ChannelPool{
|
||
|
maxWorkerNum: maxWorkerNum,
|
||
|
maxPoolNum: maxPoolNum,
|
||
|
wait: &sync.WaitGroup{},
|
||
|
cache: &sync.Map{},
|
||
|
worker: make(chan *entry, maxWorkerNum),
|
||
|
limit: make(chan bool, maxWorkerNum),
|
||
|
isClosed: false,
|
||
|
once: &sync.Once{},
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (pool *ChannelPool) LoadOrStore(key string,value ChannelHandler) error {
|
||
|
if pool.isClosed {
|
||
|
return WorkerChanClosedErr
|
||
|
}
|
||
|
if _,loaded := pool.cache.LoadOrStore(key,false); loaded {
|
||
|
return HandlerIsExistErr
|
||
|
}else{
|
||
|
pool.worker <- &entry{handler:value,key:key}
|
||
|
return nil
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (pool *ChannelPool) Start() {
|
||
|
pool.once.Do(func() {
|
||
|
go func() {
|
||
|
for i :=0; i < pool.maxWorkerNum; i ++ {
|
||
|
pool.limit <- true
|
||
|
}
|
||
|
for {
|
||
|
actual, isClosed := <-pool.worker
|
||
|
//当队列被关闭,则跳出循环
|
||
|
if actual == nil && !isClosed {
|
||
|
fmt.Println("工作队列已关闭")
|
||
|
break
|
||
|
}
|
||
|
limit := <-pool.limit
|
||
|
|
||
|
if limit {
|
||
|
pool.wait.Add(1)
|
||
|
go func(actual *entry) {
|
||
|
defer pool.wait.Done()
|
||
|
defer func() {
|
||
|
pool.cache.Delete(actual.key)
|
||
|
pool.limit <- true
|
||
|
}()
|
||
|
|
||
|
actual.handler()
|
||
|
|
||
|
}(actual)
|
||
|
}
|
||
|
}
|
||
|
}()
|
||
|
})
|
||
|
}
|
||
|
|
||
|
func (pool *ChannelPool) Wait() {
|
||
|
close(pool.worker)
|
||
|
|
||
|
pool.wait.Wait()
|
||
|
}
|
||
|
|