package roadrunner
import (
"bytes"
"fmt"
"github.com/pkg/errors"
"github.com/spiral/goridge"
"os"
"os/exec"
"runtime"
"strconv"
"strings"
"sync"
)
// Worker - supervised process with api over goridge.Relay.
type Worker struct {
// Pid of the process, points to Pid of underlying process and
// can be nil while process is not started.
Pid *int
// state holds information about current worker state,
// number of worker executions, last status change time.
// publicly this object is receive-only and protected using Mutex
// and atomic counter.
state *state
// underlying command with associated process, command must be
// provided to worker from outside in non-started form. Cmd
// stdErr direction will be handled by worker to aggregate error message.
cmd *exec.Cmd
// err aggregates stderr output from underlying process. Value can be
// receive only once command is completed and all pipes are closed.
err *bytes.Buffer
// channel is being closed once command is complete.
waitDone chan interface{}
// contains information about resulted process state.
endState *os.ProcessState
// ensures than only one execution can be run at once.
mu sync.Mutex
// communication bus with underlying process.
rl goridge.Relay
}
// newWorker creates new worker over given exec.cmd.
func newWorker(cmd *exec.Cmd) (*Worker, error) {
if cmd.Process != nil {
return nil, fmt.Errorf("can't attach to running process")
}
w := &Worker{
cmd: cmd,
err: new(bytes.Buffer),
waitDone: make(chan interface{}),
state: newState(StateInactive),
}
// piping all stderr to command buffer
w.cmd.Stderr = w.err
return w, nil
}
// State return receive-only worker state object, state can be used to safely access
// worker status, time when status changed and number of worker executions.
func (w *Worker) State() State {
return w.state
}
// String returns worker description.
func (w *Worker) String() string {
state := w.state.String()
if w.Pid != nil {
state = state + ", pid:" + strconv.Itoa(*w.Pid)
}
return fmt.Sprintf(
"(`%s` [%s], numExecs: %v)",
strings.Join(w.cmd.Args, " "),
state,
w.state.NumExecs(),
)
}
// Wait must be called once for each worker, call will be released once worker is
// complete and will return process error (if any), if stderr is presented it's value
// will be wrapped as WorkerError. Method will return error code if php process fails
// to find or start the script.
func (w *Worker) Wait() error {
<-w.waitDone
// ensure that all receive/send operations are complete
w.mu.Lock()
defer w.mu.Unlock()
if runtime.GOOS != "windows" {
// windows handles processes and close pipes differently,
// we can ignore wait here as process.Wait() already being handled above
w.cmd.Wait()
}
if w.endState.Success() {
return nil
}
if w.err.Len() != 0 {
return errors.New(w.err.String())
}
// generic process error
return &exec.ExitError{ProcessState: w.endState}
}
// Stop sends soft termination command to the worker and waits for process completion.
func (w *Worker) Stop() error {
select {
case <-w.waitDone:
return nil
default:
w.mu.Lock()
defer w.mu.Unlock()
w.state.set(StateInactive)
err := sendPayload(w.rl, &stopCommand{Stop: true})
<-w.waitDone
return err
}
}
// Kill kills underlying process, make sure to call Wait() func to gather
// error log from the stderr. Waits for process completion.
func (w *Worker) Kill() error {
select {
case <-w.waitDone:
return nil
default:
w.mu.Lock()
defer w.mu.Unlock()
w.state.set(StateInactive)
err := w.cmd.Process.Signal(os.Kill)
<-w.waitDone
return err
}
}
// Exec sends payload to worker, executes it and returns result or
// error. Make sure to handle worker.Wait() to gather worker level
// errors. Method might return JobError indicating issue with payload.
func (w *Worker) Exec(rqs *Payload) (rsp *Payload, err error) {
w.mu.Lock()
defer w.mu.Unlock()
if rqs == nil {
return nil, fmt.Errorf("payload can not be empty")
}
if w.state.Value() != StateReady {
return nil, fmt.Errorf("worker is not ready (%s)", w.state.Value())
}
w.state.set(StateWorking)
defer w.state.registerExec()
rsp, err = w.execPayload(rqs)
if err != nil {
if _, ok := err.(JobError); !ok {
w.state.set(StateErrored)
return nil, err
}
}
w.state.set(StateReady)
return rsp, err
}
func (w *Worker) start() error {
if err := w.cmd.Start(); err != nil {
close(w.waitDone)
return err
}
w.Pid = &w.cmd.Process.Pid
// wait for process to complete
go func() {
w.endState, _ = w.cmd.Process.Wait()
if w.waitDone != nil {
w.state.set(StateStopped)
close(w.waitDone)
if w.rl != nil {
w.mu.Lock()
defer w.mu.Unlock()
w.rl.Close()
}
}
}()
return nil
}
func (w *Worker) execPayload(rqs *Payload) (rsp *Payload, err error) {
if err := sendPayload(w.rl, rqs.Context); err != nil {
return nil, errors.Wrap(err, "header error")
}
w.rl.Send(rqs.Body, 0)
var pr goridge.Prefix
rsp = new(Payload)
if rsp.Context, pr, err = w.rl.Receive(); err != nil {
return nil, errors.Wrap(err, "worker error")
}
if !pr.HasFlag(goridge.PayloadControl) {
return nil, fmt.Errorf("mailformed worker response")
}
if pr.HasFlag(goridge.PayloadError) {
return nil, JobError(rsp.Context)
}
if rsp.Body, pr, err = w.rl.Receive(); err != nil {
return nil, errors.Wrap(err, "worker error")
}
return rsp, nil
}
|