nezabx/runners/command.go

233 lines
5.2 KiB
Go
Raw Permalink Normal View History

2022-05-28 17:48:56 +00:00
package runners
import (
"context"
"crypto/sha256"
"encoding/hex"
"errors"
"fmt"
"os/exec"
"strings"
"time"
"nezabx/db"
"nezabx/notifications/email"
2022-05-28 17:48:56 +00:00
"github.com/google/shlex"
"github.com/robfig/cron/v3"
"go.uber.org/zap"
)
type (
Command struct {
orig string
cmd string
args []string
}
CommandRunner struct {
Log *zap.Logger
DB *db.Bolt
MailNotificator *email.Notificator
Command *Command
Name string
Threshold uint
ThresholdSleep time.Duration
Timeout time.Duration
Interval time.Duration
CronSchedule cron.Schedule
Notifications []string
}
)
func NewCommand(path string) (*Command, error) {
command, err := shlex.Split(path)
if err != nil {
return nil, err
}
if len(command) < 1 {
return nil, errors.New("empty command")
}
return &Command{
orig: path,
cmd: command[0],
args: command[1:],
}, nil
}
func (h Command) Exec() ([]byte, error) {
cmd := exec.Command(h.cmd, h.args...)
return cmd.CombinedOutput()
}
func (h Command) String() string {
return h.orig
}
func (c CommandRunner) Run(ctx context.Context) {
h := sha256.Sum256([]byte(c.Name))
id := h[:]
go func() {
firstIter := true
for {
if !firstIter || c.CronSchedule == nil {
c.run(ctx, id)
}
firstIter = false
select {
case <-ctx.Done():
return
case <-time.After(c.untilNextIteration()):
}
}
}()
}
func (c CommandRunner) run(ctx context.Context, id []byte) {
st, err := c.DB.Status(id)
if err != nil {
c.Log.Warn("database is broken", zap.String("id", hex.EncodeToString(id)), zap.Error(err))
return
}
c.Log.Debug("starting script", zap.Stringer("cmd", c.Command))
output, err := execScript(ctx, c.Command, c.Timeout)
if err == nil {
c.processSuccessfulExecution(id, st)
return
}
if !st.Notified {
for i := 1; i < int(c.Threshold); i++ {
c.Log.Info("script run failed", zap.Stringer("cmd", c.Command), zap.Int("iteration", i))
select {
case <-ctx.Done():
return
case <-time.After(c.ThresholdSleep):
}
output, err = execScript(ctx, c.Command, c.Timeout)
if err == nil {
c.processSuccessfulExecution(id, st)
return
}
}
}
c.processFailedExecution(id, st, output, err)
}
func (c CommandRunner) processSuccessfulExecution(id []byte, st db.Status) {
if !st.Failed {
c.Log.Info("script run ok", zap.Stringer("cmd", c.Command),
zap.Time("next iteration at", c.nextIteration()))
return
}
c.Log.Info("script run ok, recovered after failure", zap.Stringer("cmd", c.Command),
zap.Time("next iteration at", c.nextIteration()))
st.Failed = false
st.Notified = false
err := c.DB.SetStatus(id, st)
if err != nil {
c.Log.Warn("database is broken", zap.String("id", hex.EncodeToString(id)), zap.Error(err))
}
}
func (c CommandRunner) processFailedExecution(id []byte, st db.Status, output []byte, err error) {
if st.Failed {
c.Log.Info("script run failed, notification has already been sent",
zap.Stringer("cmd", c.Command),
zap.Time("next iteration at", c.nextIteration()))
return
}
c.Log.Info("script run failed, sending notification",
zap.Stringer("cmd", c.Command),
zap.Time("next iteration at", c.nextIteration()))
st.Failed = true
st.Notified = true
err = c.notify(output, err)
if err != nil {
c.Log.Warn("notification was not sent", zap.Stringer("cmd", c.Command), zap.Error(err))
st.Notified = false
}
err = c.DB.SetStatus(id, st)
if err != nil {
c.Log.Warn("database is broken", zap.String("id", hex.EncodeToString(id)), zap.Error(err))
}
}
func (c CommandRunner) notify(out []byte, err error) error {
msg := fmt.Sprintf("Script runner <b>\"%s\"</b> has failed.<br>"+
"Executed command: <b>%s</b><br>"+
"Exit error: <b>%s</b><br><br>"+
"Terminal output:<pre>%s</pre>", c.Name, c.Command, err.Error(), out)
for _, target := range c.Notifications {
kv := strings.Split(target, ":")
if len(kv) != 2 {
c.Log.Warn("invalid notification target", zap.String("value", target))
continue
}
switch kv[0] {
case "email":
if c.MailNotificator == nil {
c.Log.Warn("email notifications were not configured")
continue
}
err = c.MailNotificator.Send(kv[1], "Nezabx alert message", msg)
if err != nil {
return err
}
default:
c.Log.Warn("invalid notification type", zap.String("value", target))
continue
}
}
return nil
}
func (c CommandRunner) untilNextIteration() time.Duration {
if c.CronSchedule != nil {
return time.Until(c.CronSchedule.Next(time.Now()))
}
return c.Interval
}
func (c CommandRunner) nextIteration() time.Time {
if c.CronSchedule != nil {
return c.CronSchedule.Next(time.Now()) // no truncate because cron operates at most with minute truncated values
}
return time.Now().Add(c.Interval).Truncate(time.Second)
}
func execScript(ctx context.Context, cmd *Command, timeout time.Duration) (output []byte, err error) {
type cmdOutput struct {
out []byte
err error
}
res := make(chan cmdOutput)
go func(ch chan<- cmdOutput) {
v, err := cmd.Exec()
res <- cmdOutput{
out: v,
err: err,
}
close(res)
}(res)
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-time.After(timeout):
return nil, errors.New("script execution timeout")
case v := <-res:
return v.out, v.err
}
}