package roadrunner
import (
"fmt"
"github.com/pkg/errors"
"os/exec"
"sync"
"time"
)
const (
// StopRequest can be sent by worker to indicate that restart is required.
StopRequest = "{\"stop\":true}"
)
const (
// EventCreated thrown when new worker is spawned.
EventCreated = iota
// EventDestruct thrown before worker destruction.
EventDestruct
// EventError thrown any worker related even happen (error passed as context)
EventError
)
// Pool controls worker creation, destruction and task routing.
type Pool struct {
// Observer is optional callback to handle worker create/destruct/error events.
Observer func(event int, w *Worker, ctx interface{})
// pool behaviour
cfg Config
// worker command creator
cmd func() *exec.Cmd
// creates and connects to workers
factory Factory
// active task executions
tasks sync.WaitGroup
// workers circular allocation buffer
free chan *Worker
// protects state of worker list, does not affect allocation
muw sync.RWMutex
// all registered workers
workers []*Worker
}
// NewPool creates new worker pool and task multiplexer. Pool will initiate with one worker.
func NewPool(cmd func() *exec.Cmd, factory Factory, cfg Config) (*Pool, error) {
if err := cfg.Valid(); err != nil {
return nil, errors.Wrap(err, "config error")
}
p := &Pool{
cfg: cfg,
cmd: cmd,
factory: factory,
workers: make([]*Worker, 0, cfg.NumWorkers),
free: make(chan *Worker, cfg.NumWorkers),
}
// constant number of workers simplify logic
for i := uint64(0); i < p.cfg.NumWorkers; i++ {
// to test if worker ready
w, err := p.createWorker()
if err != nil {
p.Destroy()
return nil, err
}
p.free <- w
}
return p, nil
}
// Config returns associated pool configuration. Immutable.
func (p *Pool) Config() Config {
return p.cfg
}
// Workers returns worker list associated with the pool.
func (p *Pool) Workers() (workers []*Worker) {
p.muw.RLock()
defer p.muw.RUnlock()
for _, w := range p.workers {
workers = append(workers, w)
}
return workers
}
// Exec one task with given payload and context, returns result or error.
func (p *Pool) Exec(rqs *Payload) (rsp *Payload, err error) {
p.tasks.Add(1)
defer p.tasks.Done()
w, err := p.allocateWorker()
if err != nil {
return nil, errors.Wrap(err, "unable to allocate worker")
}
rsp, err = w.Exec(rqs)
if err != nil {
// soft job errors are allowed
if _, jobError := err.(JobError); jobError {
p.free <- w
return nil, err
}
go p.replaceWorker(w, err)
return nil, err
}
// worker want's to be terminated
if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest {
go p.replaceWorker(w, err)
return p.Exec(rqs)
}
if p.cfg.MaxExecutions != 0 && w.State().NumExecs() >= p.cfg.MaxExecutions {
go p.replaceWorker(w, p.cfg.MaxExecutions)
} else {
p.free <- w
}
return rsp, nil
}
// Destroy all underlying workers (but let them to complete the task).
func (p *Pool) Destroy() {
p.tasks.Wait()
var wg sync.WaitGroup
for _, w := range p.Workers() {
wg.Add(1)
go func(w *Worker) {
defer wg.Done()
p.destroyWorker(w)
}(w)
}
wg.Wait()
}
// finds free worker in a given time interval or creates new if allowed.
func (p *Pool) allocateWorker() (w *Worker, err error) {
select {
case w = <-p.free:
return w, nil
default:
// enable timeout handler
}
timeout := time.NewTimer(p.cfg.AllocateTimeout)
select {
case <-timeout.C:
return nil, fmt.Errorf("worker timeout (%s)", p.cfg.AllocateTimeout)
case w := <-p.free:
timeout.Stop()
return w, nil
}
}
// replaces dead or expired worker with new instance
func (p *Pool) replaceWorker(w *Worker, caused interface{}) {
go p.destroyWorker(w)
if nw, err := p.createWorker(); err != nil {
p.throw(EventError, w, err)
if len(p.Workers()) == 0 {
// possible situation when major error causes all PHP scripts to die (for example dead DB)
p.throw(EventError, nil, fmt.Errorf("all workers dead"))
}
} else {
p.free <- nw
}
}
// destroy and remove worker from the pool.
func (p *Pool) destroyWorker(w *Worker) {
p.throw(EventDestruct, w, nil)
// detaching
p.muw.Lock()
for i, wc := range p.workers {
if wc == w {
p.workers = p.workers[:i+1]
break
}
}
p.muw.Unlock()
go w.Stop()
select {
case <-w.waitDone:
// worker is dead
case <-time.NewTimer(p.cfg.DestroyTimeout).C:
// failed to stop process
if err := w.Kill(); err != nil {
p.throw(EventError, w, err)
}
}
}
// creates new worker using associated factory. automatically
// adds worker to the worker list (background)
func (p *Pool) createWorker() (*Worker, error) {
w, err := p.factory.SpawnWorker(p.cmd())
if err != nil {
return nil, err
}
p.throw(EventCreated, w, nil)
go func(w *Worker) {
if err := w.Wait(); err != nil {
p.throw(EventError, w, err)
}
}(w)
p.muw.Lock()
defer p.muw.Unlock()
p.workers = append(p.workers, w)
return w, nil
}
// throw invokes event handler if any.
func (p *Pool) throw(event int, w *Worker, ctx interface{}) {
if p.Observer != nil {
p.Observer(event, w, ctx)
}
}
|