package main import ( "context" "log" "runtime" "strconv" "sync" "time" ) type JobResult struct { Success bool Output string } type Job interface { Execute() Result() JobResult } type PrintJob struct { result *JobResult } func (j *PrintJob) Execute() { j.result = &JobResult{ Success: true, Output: time.Now().String(), } } func (j *PrintJob) Result() JobResult { return *j.result } type Worker struct { id string incomingChan chan Job wg *sync.WaitGroup } func (w *Worker) Start(ctx context.Context) { defer w.wg.Done() for { select { case job, ok := <-w.incomingChan: if !ok { return } job.Execute() log.Printf("worker=%s result=%s", w.id, job.Result().Output) case <-ctx.Done(): return } } } func NewWorker(id string, wg *sync.WaitGroup, incomingChan chan Job) *Worker { return &Worker{ id: id, wg: wg, incomingChan: incomingChan, } } type WorkerPool struct { size int wg *sync.WaitGroup workerJobChan chan Job cancel context.CancelFunc } func (wp *WorkerPool) Start(ctx context.Context) { var wpCtx context.Context wpCtx, wp.cancel = context.WithCancel(ctx) for i := 0; i <= wp.size; i++ { wp.wg.Add(1) worker := NewWorker(strconv.Itoa(i), wp.wg, wp.workerJobChan) go worker.Start(wpCtx) } } func (wp *WorkerPool) Stop() { close(wp.workerJobChan) wp.wg.Add(1) go wp.flush() wp.wg.Wait() } func (wp *WorkerPool) Wait() { wp.wg.Wait() } func (wp *WorkerPool) flush() { defer wp.wg.Done() for job := range wp.workerJobChan { job.Execute() log.Printf("worker=%s result=%s", "main", job.Result().Output) } } func (wp *WorkerPool) AddJob(job Job) { wp.workerJobChan <- job } func NewWorkerPool(size, jobBufferSize int) *WorkerPool { return &WorkerPool{ size: size, wg: &sync.WaitGroup{}, workerJobChan: make(chan Job, jobBufferSize), } } func main() { ctx := context.Background() wp := NewWorkerPool(runtime.GOMAXPROCS(0), 2>>8) wp.Start(ctx) // Populate jobs for some time, then stop workerpool populateCtx, cancelPopulate := context.WithTimeout(ctx, time.Millisecond*300) defer cancelPopulate() for { select { case <-populateCtx.Done(): log.Println("STOPPING!!") wp.Stop() return default: job := PrintJob{} wp.AddJob(&job) } } }