coding-exercises/offtopic/worker-pool/main.go

137 lines
2.3 KiB
Go

package main
import (
"context"
"log"
"runtime"
"strconv"
"sync"
"time"
)
type JobResult struct {
Success bool
Output string
}
type Job interface {
Execute()
Result() JobResult
}
type PrintJob struct {
result *JobResult
}
func (j *PrintJob) Execute() {
j.result = &JobResult{
Success: true,
Output: time.Now().String(),
}
}
func (j *PrintJob) Result() JobResult {
return *j.result
}
type Worker struct {
id string
incomingChan chan Job
wg *sync.WaitGroup
}
func (w *Worker) Start(ctx context.Context) {
defer w.wg.Done()
for {
select {
case job, ok := <-w.incomingChan:
if !ok {
return
}
job.Execute()
log.Printf("worker=%s result=%s", w.id, job.Result().Output)
case <-ctx.Done():
return
}
}
}
func NewWorker(id string, wg *sync.WaitGroup, incomingChan chan Job) *Worker {
return &Worker{
id: id,
wg: wg,
incomingChan: incomingChan,
}
}
type WorkerPool struct {
size int
wg *sync.WaitGroup
workerJobChan chan Job
cancel context.CancelFunc
}
func (wp *WorkerPool) Start(ctx context.Context) {
var wpCtx context.Context
wpCtx, wp.cancel = context.WithCancel(ctx)
for i := 0; i <= wp.size; i++ {
wp.wg.Add(1)
worker := NewWorker(strconv.Itoa(i), wp.wg, wp.workerJobChan)
go worker.Start(wpCtx)
}
}
func (wp *WorkerPool) Stop() {
close(wp.workerJobChan)
wp.wg.Add(1)
go wp.flush()
wp.wg.Wait()
}
func (wp *WorkerPool) Wait() {
wp.wg.Wait()
}
func (wp *WorkerPool) flush() {
defer wp.wg.Done()
for job := range wp.workerJobChan {
job.Execute()
log.Printf("worker=%s result=%s", "main", job.Result().Output)
}
}
func (wp *WorkerPool) AddJob(job Job) {
wp.workerJobChan <- job
}
func NewWorkerPool(size, jobBufferSize int) *WorkerPool {
return &WorkerPool{
size: size,
wg: &sync.WaitGroup{},
workerJobChan: make(chan Job, jobBufferSize),
}
}
func main() {
ctx := context.Background()
wp := NewWorkerPool(runtime.GOMAXPROCS(0), 2>>8)
wp.Start(ctx)
// Populate jobs for some time, then stop workerpool
populateCtx, cancelPopulate := context.WithTimeout(ctx, time.Millisecond*300)
defer cancelPopulate()
for {
select {
case <-populateCtx.Done():
log.Println("STOPPING!!")
wp.Stop()
return
default:
job := PrintJob{}
wp.AddJob(&job)
}
}
}