|
|
@@ -6,6 +6,7 @@ import (
|
|
|
authpublic "github.com/OliveTin/OliveTin/internal/auth/authpublic"
|
|
|
config "github.com/OliveTin/OliveTin/internal/config"
|
|
|
"github.com/OliveTin/OliveTin/internal/entities"
|
|
|
+ "github.com/OliveTin/OliveTin/internal/logfilter"
|
|
|
"github.com/OliveTin/OliveTin/internal/tpl"
|
|
|
"github.com/google/uuid"
|
|
|
log "github.com/sirupsen/logrus"
|
|
|
@@ -71,6 +72,9 @@ type Executor struct {
|
|
|
listeners []listener
|
|
|
|
|
|
chainOfCommand []executorStepFunc
|
|
|
+
|
|
|
+ groupQueue []*queuedExecution
|
|
|
+ groupQueueMu sync.Mutex
|
|
|
}
|
|
|
|
|
|
// ExecutionRequest is a request to execute an action. It's passed to an
|
|
|
@@ -84,11 +88,54 @@ type ExecutionRequest struct {
|
|
|
AuthenticatedUser *authpublic.AuthenticatedUser
|
|
|
TriggerDepth int
|
|
|
|
|
|
- logEntry *InternalLogEntry
|
|
|
- finalParsedCommand string
|
|
|
- execArgs []string
|
|
|
- useDirectExec bool
|
|
|
- executor *Executor
|
|
|
+ logEntry *InternalLogEntry
|
|
|
+ finalParsedCommand string
|
|
|
+ execArgs []string
|
|
|
+ useDirectExec bool
|
|
|
+ executor *Executor
|
|
|
+ skipRequestRegistration bool
|
|
|
+}
|
|
|
+
|
|
|
+func (req *ExecutionRequest) mutateLogEntry(mutator func(*InternalLogEntry)) {
|
|
|
+ if req.executor == nil {
|
|
|
+ mutator(req.logEntry)
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ req.executor.logmutex.Lock()
|
|
|
+ defer req.executor.logmutex.Unlock()
|
|
|
+
|
|
|
+ mutator(req.logEntry)
|
|
|
+}
|
|
|
+
|
|
|
+// LogEntrySnapshot is a copy of selected log entry fields for race-safe reads.
|
|
|
+type LogEntrySnapshot struct {
|
|
|
+ Queued bool
|
|
|
+ Blocked bool
|
|
|
+ ExecutionStarted bool
|
|
|
+ ExecutionFinished bool
|
|
|
+ ExitCode int32
|
|
|
+ Output string
|
|
|
+}
|
|
|
+
|
|
|
+// SnapshotLog returns a copy of selected log entry fields under read lock.
|
|
|
+func (e *Executor) SnapshotLog(trackingID string) (LogEntrySnapshot, bool) {
|
|
|
+ e.logmutex.RLock()
|
|
|
+ defer e.logmutex.RUnlock()
|
|
|
+
|
|
|
+ entry, found := e.logs[trackingID]
|
|
|
+ if !found {
|
|
|
+ return LogEntrySnapshot{}, false
|
|
|
+ }
|
|
|
+
|
|
|
+ return LogEntrySnapshot{
|
|
|
+ Queued: entry.Queued,
|
|
|
+ Blocked: entry.Blocked,
|
|
|
+ ExecutionStarted: entry.ExecutionStarted,
|
|
|
+ ExecutionFinished: entry.ExecutionFinished,
|
|
|
+ ExitCode: entry.ExitCode,
|
|
|
+ Output: entry.Output,
|
|
|
+ }, true
|
|
|
}
|
|
|
|
|
|
// InternalLogEntry objects are created by an Executor, and represent the final
|
|
|
@@ -101,6 +148,8 @@ type InternalLogEntry struct {
|
|
|
Output string
|
|
|
TimedOut bool
|
|
|
Blocked bool
|
|
|
+ Queued bool
|
|
|
+ QueuedForGroup string
|
|
|
ExitCode int32
|
|
|
Tags []string
|
|
|
ExecutionStarted bool
|
|
|
@@ -338,9 +387,22 @@ func paginateFilteredLogs(filtered []*InternalLogEntry, startOffset int64, pageC
|
|
|
// GetLogTrackingIdsACL returns logs filtered by ACL visibility for the user and
|
|
|
// paginated correctly based on the filtered set.
|
|
|
// dateFilter is optional and should be in YYYY-MM-DD format. If empty, no date filtering is applied.
|
|
|
-func (e *Executor) GetLogTrackingIdsACL(cfg *config.Config, user *authpublic.AuthenticatedUser, startOffset int64, pageCount int64, dateFilter string) ([]*InternalLogEntry, *PagingResult) {
|
|
|
+// expressionFilter is an optional filter expression applied after ACL checks.
|
|
|
+func (e *Executor) GetLogTrackingIdsACL(cfg *config.Config, user *authpublic.AuthenticatedUser, startOffset int64, pageCount int64, dateFilter string, expressionFilter string) ([]*InternalLogEntry, *PagingResult, error) {
|
|
|
filtered := e.filterLogsByACL(cfg, user, dateFilter)
|
|
|
- return paginateFilteredLogs(filtered, startOffset, pageCount)
|
|
|
+
|
|
|
+ program, err := logfilter.Compile(expressionFilter)
|
|
|
+ if err != nil {
|
|
|
+ return nil, nil, err
|
|
|
+ }
|
|
|
+
|
|
|
+ filtered, err = applyLogFilter(filtered, program)
|
|
|
+ if err != nil {
|
|
|
+ return nil, nil, err
|
|
|
+ }
|
|
|
+
|
|
|
+ logs, paging := paginateFilteredLogs(filtered, startOffset, pageCount)
|
|
|
+ return logs, paging, nil
|
|
|
}
|
|
|
|
|
|
func (e *Executor) GetLog(trackingID string) (*InternalLogEntry, bool) {
|
|
|
@@ -369,7 +431,7 @@ func (e *Executor) GetLogsByBindingId(bindingId string) []*InternalLogEntry {
|
|
|
|
|
|
// shouldCountExecution checks if a log entry should be counted for rate limiting.
|
|
|
func shouldCountExecution(logEntry *InternalLogEntry, windowStart time.Time) bool {
|
|
|
- return !logEntry.Blocked && logEntry.DatetimeStarted.After(windowStart)
|
|
|
+ return !logEntry.Blocked && !logEntry.Queued && logEntry.DatetimeStarted.After(windowStart)
|
|
|
}
|
|
|
|
|
|
// updateOldestExecution updates the oldest execution time if this entry is older.
|
|
|
@@ -483,19 +545,45 @@ func (e *Executor) GetTimeUntilAvailable(binding *ActionBinding) int64 {
|
|
|
return maxExpiryTime.Unix()
|
|
|
}
|
|
|
|
|
|
-func (e *Executor) SetLog(trackingID string, entry *InternalLogEntry) {
|
|
|
+func (e *Executor) SetLog(trackingID string, entry *InternalLogEntry) string {
|
|
|
e.logmutex.Lock()
|
|
|
+ defer e.logmutex.Unlock()
|
|
|
+
|
|
|
+ if _, found := e.logs[trackingID]; found || !isValidTrackingID(trackingID) {
|
|
|
+ trackingID = uuid.NewString()
|
|
|
+ entry.ExecutionTrackingID = trackingID
|
|
|
+ }
|
|
|
|
|
|
entry.Index = int64(len(e.logsTrackingIdsByDate))
|
|
|
|
|
|
e.logs[trackingID] = entry
|
|
|
e.logsTrackingIdsByDate = append(e.logsTrackingIdsByDate, trackingID)
|
|
|
|
|
|
- e.logmutex.Unlock()
|
|
|
+ return trackingID
|
|
|
}
|
|
|
|
|
|
// ExecRequest processes an ExecutionRequest
|
|
|
func (e *Executor) ExecRequest(req *ExecutionRequest) (*sync.WaitGroup, string) {
|
|
|
+ e.initializeExecRequest(req)
|
|
|
+
|
|
|
+ log.Tracef("executor.ExecRequest(): trackingID=%s bindingID=%s", req.TrackingID, bindingIDForTrace(req))
|
|
|
+
|
|
|
+ req.TrackingID = e.SetLog(req.TrackingID, req.logEntry)
|
|
|
+
|
|
|
+ wg := new(sync.WaitGroup)
|
|
|
+ wg.Add(1)
|
|
|
+
|
|
|
+ go func() {
|
|
|
+ queued := e.execChain(req, wg)
|
|
|
+ if !queued {
|
|
|
+ wg.Done()
|
|
|
+ }
|
|
|
+ }()
|
|
|
+
|
|
|
+ return wg, req.TrackingID
|
|
|
+}
|
|
|
+
|
|
|
+func (e *Executor) initializeExecRequest(req *ExecutionRequest) {
|
|
|
if req.AuthenticatedUser == nil {
|
|
|
req.AuthenticatedUser = auth.UserGuest(req.Cfg)
|
|
|
}
|
|
|
@@ -513,56 +601,84 @@ func (e *Executor) ExecRequest(req *ExecutionRequest) (*sync.WaitGroup, string)
|
|
|
ActionIcon: "💩",
|
|
|
Username: req.AuthenticatedUser.Username,
|
|
|
}
|
|
|
+}
|
|
|
|
|
|
- _, isDuplicate := e.GetLog(req.TrackingID)
|
|
|
- if isDuplicate || !isValidTrackingID(req.TrackingID) {
|
|
|
- req.TrackingID = uuid.NewString()
|
|
|
+func bindingIDForTrace(req *ExecutionRequest) string {
|
|
|
+ if req.Binding == nil {
|
|
|
+ return ""
|
|
|
}
|
|
|
|
|
|
- // Update the log entry with the final tracking ID
|
|
|
- req.logEntry.ExecutionTrackingID = req.TrackingID
|
|
|
+ return req.Binding.ID
|
|
|
+}
|
|
|
|
|
|
- log.Tracef("executor.ExecRequest(): %v", req)
|
|
|
+func (e *Executor) execChain(req *ExecutionRequest, wg *sync.WaitGroup) bool {
|
|
|
+ if !req.skipRequestRegistration {
|
|
|
+ finished, queued := e.registerOrQueueRequest(req, wg)
|
|
|
+ if finished || queued {
|
|
|
+ return queued
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- e.SetLog(req.TrackingID, req.logEntry)
|
|
|
+ e.runExecutionSteps(req)
|
|
|
+ e.finishExecChain(req)
|
|
|
|
|
|
- wg := new(sync.WaitGroup)
|
|
|
- wg.Add(1)
|
|
|
+ return false
|
|
|
+}
|
|
|
|
|
|
- go func() {
|
|
|
- e.execChain(req)
|
|
|
- defer wg.Done()
|
|
|
- }()
|
|
|
+func (e *Executor) registerOrQueueRequest(req *ExecutionRequest, wg *sync.WaitGroup) (finished bool, queued bool) {
|
|
|
+ if !stepRequestAction(req) {
|
|
|
+ e.finishExecChain(req)
|
|
|
+ return true, false
|
|
|
+ }
|
|
|
|
|
|
- return wg, req.TrackingID
|
|
|
+ if !actionNeedsGroupLimit(req) || e.groupsHaveCapacityForActive(req) {
|
|
|
+ return false, false
|
|
|
+ }
|
|
|
+
|
|
|
+ return e.queueRequestAfterACL(req, wg)
|
|
|
}
|
|
|
|
|
|
-func (e *Executor) execChain(req *ExecutionRequest) {
|
|
|
- for _, step := range e.chainOfCommand {
|
|
|
+func (e *Executor) queueRequestAfterACL(req *ExecutionRequest, wg *sync.WaitGroup) (finished bool, queued bool) {
|
|
|
+ if !stepACLCheck(req) {
|
|
|
+ e.finishExecChain(req)
|
|
|
+ return true, false
|
|
|
+ }
|
|
|
+
|
|
|
+ e.queueRequest(req, wg)
|
|
|
+ notifyListenersStarted(req)
|
|
|
+
|
|
|
+ return false, true
|
|
|
+}
|
|
|
+
|
|
|
+func (e *Executor) runExecutionSteps(req *ExecutionRequest) {
|
|
|
+ for _, step := range e.chainOfCommand[1:] {
|
|
|
if !step(req) {
|
|
|
break
|
|
|
}
|
|
|
}
|
|
|
+}
|
|
|
|
|
|
- // Ensure DatetimeFinished is set even if execution was blocked early
|
|
|
- if req.logEntry.DatetimeFinished.IsZero() {
|
|
|
- req.logEntry.DatetimeFinished = time.Now()
|
|
|
- }
|
|
|
+func (e *Executor) finishExecChain(req *ExecutionRequest) {
|
|
|
+ req.mutateLogEntry(func(entry *InternalLogEntry) {
|
|
|
+ if entry.DatetimeFinished.IsZero() {
|
|
|
+ entry.DatetimeFinished = time.Now()
|
|
|
+ }
|
|
|
|
|
|
- req.logEntry.ExecutionFinished = true
|
|
|
+ entry.ExecutionFinished = true
|
|
|
+ })
|
|
|
|
|
|
- // This isn't a step, because we want to notify all listeners, irrespective
|
|
|
- // of how many steps were actually executed.
|
|
|
notifyListenersFinished(req)
|
|
|
+ e.drainGroupQueue()
|
|
|
}
|
|
|
|
|
|
func getConcurrentCount(req *ExecutionRequest) int {
|
|
|
concurrentCount := 0
|
|
|
|
|
|
req.executor.logmutex.RLock()
|
|
|
+ logs := req.executor.LogsByBindingId[req.Binding.ID]
|
|
|
|
|
|
- for _, log := range req.executor.GetLogsByBindingId(req.Binding.ID) {
|
|
|
- if !log.ExecutionFinished {
|
|
|
+ for _, logEntry := range logs {
|
|
|
+ if !logEntry.ExecutionFinished && !logEntry.Queued {
|
|
|
concurrentCount += 1
|
|
|
}
|
|
|
}
|
|
|
@@ -583,8 +699,10 @@ func stepConcurrencyCheck(req *ExecutionRequest) bool {
|
|
|
"maxConcurrent": req.Binding.Action.MaxConcurrent,
|
|
|
}).Warnf("Blocked from executing due to concurrency limit")
|
|
|
|
|
|
- req.logEntry.Output = "Blocked from executing due to concurrency limit"
|
|
|
- req.logEntry.Blocked = true
|
|
|
+ req.mutateLogEntry(func(entry *InternalLogEntry) {
|
|
|
+ entry.Output = "Blocked from executing due to concurrency limit"
|
|
|
+ entry.Blocked = true
|
|
|
+ })
|
|
|
return false
|
|
|
}
|
|
|
|
|
|
@@ -603,24 +721,35 @@ func parseDuration(rate config.RateSpec) time.Duration {
|
|
|
return duration
|
|
|
}
|
|
|
|
|
|
-//gocyclo:ignore
|
|
|
-func getExecutionsCount(rate config.RateSpec, req *ExecutionRequest) int {
|
|
|
- executions := -1 // Because we will find ourself when checking execution logs
|
|
|
-
|
|
|
- duration := parseDuration(rate)
|
|
|
+func entityPrefixForRequest(req *ExecutionRequest) string {
|
|
|
+ if req.Binding != nil && req.Binding.Entity != nil {
|
|
|
+ return req.Binding.Entity.UniqueKey
|
|
|
+ }
|
|
|
|
|
|
- then := time.Now().Add(-duration)
|
|
|
+ return ""
|
|
|
+}
|
|
|
|
|
|
- currentEntityPrefix := ""
|
|
|
- if req.Binding != nil && req.Binding.Entity != nil {
|
|
|
- currentEntityPrefix = req.Binding.Entity.UniqueKey
|
|
|
+func rateExecutionMatchesScope(logEntry *InternalLogEntry, req *ExecutionRequest, entityPrefix string) bool {
|
|
|
+ if logEntry.EntityPrefix != entityPrefix {
|
|
|
+ return false
|
|
|
}
|
|
|
- for _, logEntry := range req.executor.GetLogsByBindingId(req.Binding.ID) {
|
|
|
- if logEntry.EntityPrefix != currentEntityPrefix {
|
|
|
- continue
|
|
|
- }
|
|
|
- if logEntry.DatetimeStarted.After(then) && !logEntry.Blocked {
|
|
|
|
|
|
+ return !logEntry.Queued && logEntry.ExecutionTrackingID != req.TrackingID
|
|
|
+}
|
|
|
+
|
|
|
+func logEntryStartedInWindow(logEntry *InternalLogEntry, windowStart time.Time) bool {
|
|
|
+ return logEntry.DatetimeStarted.After(windowStart) && !logEntry.Blocked
|
|
|
+}
|
|
|
+
|
|
|
+func rateExecutionCountsForRate(logEntry *InternalLogEntry, req *ExecutionRequest, entityPrefix string, windowStart time.Time) bool {
|
|
|
+ return rateExecutionMatchesScope(logEntry, req, entityPrefix) && logEntryStartedInWindow(logEntry, windowStart)
|
|
|
+}
|
|
|
+
|
|
|
+func countRateExecutions(logs []*InternalLogEntry, req *ExecutionRequest, entityPrefix string, windowStart time.Time) int {
|
|
|
+ executions := 0
|
|
|
+
|
|
|
+ for _, logEntry := range logs {
|
|
|
+ if rateExecutionCountsForRate(logEntry, req, entityPrefix, windowStart) {
|
|
|
executions += 1
|
|
|
}
|
|
|
}
|
|
|
@@ -628,6 +757,18 @@ func getExecutionsCount(rate config.RateSpec, req *ExecutionRequest) int {
|
|
|
return executions
|
|
|
}
|
|
|
|
|
|
+func getExecutionsCount(rate config.RateSpec, req *ExecutionRequest) int {
|
|
|
+ duration := parseDuration(rate)
|
|
|
+ then := time.Now().Add(-duration)
|
|
|
+
|
|
|
+ req.executor.logmutex.RLock()
|
|
|
+ logs := req.executor.LogsByBindingId[req.Binding.ID]
|
|
|
+ executions := countRateExecutions(logs, req, entityPrefixForRequest(req), then)
|
|
|
+ req.executor.logmutex.RUnlock()
|
|
|
+
|
|
|
+ return executions
|
|
|
+}
|
|
|
+
|
|
|
func stepRateCheck(req *ExecutionRequest) bool {
|
|
|
for _, rate := range req.Binding.Action.MaxRate {
|
|
|
executions := getExecutionsCount(rate, req)
|
|
|
@@ -640,8 +781,10 @@ func stepRateCheck(req *ExecutionRequest) bool {
|
|
|
"duration": rate.Duration,
|
|
|
}).Infof("Blocked from executing due to rate limit")
|
|
|
|
|
|
- req.logEntry.Output = "Blocked from executing due to rate limit"
|
|
|
- req.logEntry.Blocked = true
|
|
|
+ req.mutateLogEntry(func(entry *InternalLogEntry) {
|
|
|
+ entry.Output = "Blocked from executing due to rate limit"
|
|
|
+ entry.Blocked = true
|
|
|
+ })
|
|
|
return false
|
|
|
}
|
|
|
}
|
|
|
@@ -653,8 +796,10 @@ func stepACLCheck(req *ExecutionRequest) bool {
|
|
|
canExec := acl.IsAllowedExec(req.Cfg, req.AuthenticatedUser, req.Binding.Action)
|
|
|
|
|
|
if !canExec {
|
|
|
- req.logEntry.Output = "ACL check failed. Blocked from executing."
|
|
|
- req.logEntry.Blocked = true
|
|
|
+ req.mutateLogEntry(func(entry *InternalLogEntry) {
|
|
|
+ entry.Output = "ACL check failed. Blocked from executing."
|
|
|
+ entry.Blocked = true
|
|
|
+ })
|
|
|
|
|
|
log.WithFields(log.Fields{
|
|
|
"actionTitle": req.logEntry.ActionTitle,
|
|
|
@@ -792,7 +937,9 @@ func hasExec(req *ExecutionRequest) bool {
|
|
|
}
|
|
|
|
|
|
func fail(req *ExecutionRequest, err error) bool {
|
|
|
- req.logEntry.Output = err.Error()
|
|
|
+ req.mutateLogEntry(func(entry *InternalLogEntry) {
|
|
|
+ entry.Output = err.Error()
|
|
|
+ })
|
|
|
log.Warn(err.Error())
|
|
|
return false
|
|
|
}
|
|
|
@@ -826,14 +973,16 @@ func stepRequestActionHasBinding(req *ExecutionRequest) bool {
|
|
|
}
|
|
|
|
|
|
func stepRequestActionPopulateLogEntry(req *ExecutionRequest) {
|
|
|
- req.logEntry.Binding = req.Binding
|
|
|
- req.logEntry.ActionConfigTitle = req.Binding.Action.Title
|
|
|
- req.logEntry.ActionTitle = tpl.ParseTemplateOfActionBeforeExec(req.Binding.Action.Title, req.Binding.Entity)
|
|
|
- req.logEntry.ActionIcon = req.Binding.Action.Icon
|
|
|
- req.logEntry.Tags = req.Tags
|
|
|
- if req.Binding.Entity != nil {
|
|
|
- req.logEntry.EntityPrefix = req.Binding.Entity.UniqueKey
|
|
|
- }
|
|
|
+ req.mutateLogEntry(func(entry *InternalLogEntry) {
|
|
|
+ entry.Binding = req.Binding
|
|
|
+ entry.ActionConfigTitle = req.Binding.Action.Title
|
|
|
+ entry.ActionTitle = tpl.ParseTemplateOfActionBeforeExec(req.Binding.Action.Title, req.Binding.Entity)
|
|
|
+ entry.ActionIcon = req.Binding.Action.Icon
|
|
|
+ entry.Tags = req.Tags
|
|
|
+ if req.Binding.Entity != nil {
|
|
|
+ entry.EntityPrefix = req.Binding.Entity.UniqueKey
|
|
|
+ }
|
|
|
+ })
|
|
|
}
|
|
|
|
|
|
func stepRequestActionRegisterLog(req *ExecutionRequest) {
|
|
|
@@ -856,7 +1005,9 @@ func stepLogStart(req *ExecutionRequest) bool {
|
|
|
}
|
|
|
|
|
|
func stepLogFinish(req *ExecutionRequest) bool {
|
|
|
- req.logEntry.ExecutionFinished = true
|
|
|
+ req.mutateLogEntry(func(entry *InternalLogEntry) {
|
|
|
+ entry.ExecutionFinished = true
|
|
|
+ })
|
|
|
|
|
|
log.WithFields(log.Fields{
|
|
|
"actionTitle": req.logEntry.ActionTitle,
|
|
|
@@ -880,10 +1031,14 @@ func notifyListenersStarted(req *ExecutionRequest) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func appendErrorToStderr(err error, logEntry *InternalLogEntry) {
|
|
|
- if err != nil {
|
|
|
- logEntry.Output = err.Error() + "\n\n" + logEntry.Output
|
|
|
+func appendErrorToStderr(req *ExecutionRequest, err error) {
|
|
|
+ if err == nil {
|
|
|
+ return
|
|
|
}
|
|
|
+
|
|
|
+ req.mutateLogEntry(func(entry *InternalLogEntry) {
|
|
|
+ entry.Output = err.Error() + "\n\n" + entry.Output
|
|
|
+ })
|
|
|
}
|
|
|
|
|
|
type OutputStreamer struct {
|
|
|
@@ -926,31 +1081,41 @@ func stepExec(req *ExecutionRequest) bool {
|
|
|
streamer := &OutputStreamer{Req: req}
|
|
|
cmd := buildCommand(ctx, req)
|
|
|
if cmd == nil {
|
|
|
- req.logEntry.Output = "Cannot execute: no command arguments provided"
|
|
|
+ req.mutateLogEntry(func(entry *InternalLogEntry) {
|
|
|
+ entry.Output = "Cannot execute: no command arguments provided"
|
|
|
+ })
|
|
|
log.Warn("Cannot execute: no command arguments provided")
|
|
|
return false
|
|
|
}
|
|
|
prepareCommand(cmd, streamer, req)
|
|
|
runerr := cmd.Start()
|
|
|
- req.logEntry.Process = cmd.Process
|
|
|
+ req.mutateLogEntry(func(entry *InternalLogEntry) {
|
|
|
+ entry.Process = cmd.Process
|
|
|
+ })
|
|
|
ctx.setProcess(cmd.Process)
|
|
|
waiterr := cmd.Wait()
|
|
|
- req.logEntry.ExitCode = int32(cmd.ProcessState.ExitCode())
|
|
|
- req.logEntry.Output = streamer.String()
|
|
|
+ req.mutateLogEntry(func(entry *InternalLogEntry) {
|
|
|
+ entry.ExitCode = int32(cmd.ProcessState.ExitCode())
|
|
|
+ entry.Output = streamer.String()
|
|
|
+ })
|
|
|
|
|
|
- appendErrorToStderr(runerr, req.logEntry)
|
|
|
- appendErrorToStderr(waiterr, req.logEntry)
|
|
|
+ appendErrorToStderr(req, runerr)
|
|
|
+ appendErrorToStderr(req, waiterr)
|
|
|
|
|
|
if ctx.Err() == context.DeadlineExceeded {
|
|
|
log.WithFields(log.Fields{
|
|
|
"actionTitle": req.logEntry.ActionTitle,
|
|
|
}).Warnf("Action timed out")
|
|
|
|
|
|
- req.logEntry.TimedOut = true
|
|
|
- req.logEntry.Output += "OliveTin::timeout - this action timed out after " + fmt.Sprintf("%v", req.Binding.Action.Timeout) + " seconds. If you need more time for this action, set a longer timeout. See https://docs.olivetin.app/action_customization/timeouts.html for more help."
|
|
|
+ req.mutateLogEntry(func(entry *InternalLogEntry) {
|
|
|
+ entry.TimedOut = true
|
|
|
+ entry.Output += "OliveTin::timeout - this action timed out after " + fmt.Sprintf("%v", req.Binding.Action.Timeout) + " seconds. If you need more time for this action, set a longer timeout. See https://docs.olivetin.app/action_customization/timeouts.html for more help."
|
|
|
+ })
|
|
|
}
|
|
|
|
|
|
- req.logEntry.DatetimeFinished = time.Now()
|
|
|
+ req.mutateLogEntry(func(entry *InternalLogEntry) {
|
|
|
+ entry.DatetimeFinished = time.Now()
|
|
|
+ })
|
|
|
|
|
|
return true
|
|
|
}
|
|
|
@@ -966,7 +1131,9 @@ func prepareCommand(cmd *exec.Cmd, streamer *OutputStreamer, req *ExecutionReque
|
|
|
cmd.Stdout = streamer
|
|
|
cmd.Stderr = streamer
|
|
|
cmd.Env = buildEnv(req.Arguments)
|
|
|
- req.logEntry.ExecutionStarted = true
|
|
|
+ req.mutateLogEntry(func(entry *InternalLogEntry) {
|
|
|
+ entry.ExecutionStarted = true
|
|
|
+ })
|
|
|
}
|
|
|
|
|
|
func stepExecAfter(req *ExecutionRequest) bool {
|
|
|
@@ -991,24 +1158,28 @@ func stepExecAfter(req *ExecutionRequest) bool {
|
|
|
|
|
|
waiterr := cmd.Wait()
|
|
|
|
|
|
- req.logEntry.Output += "\n"
|
|
|
- req.logEntry.Output += "OliveTin::shellAfterCompleted stdout\n"
|
|
|
- req.logEntry.Output += stdout.String()
|
|
|
-
|
|
|
- req.logEntry.Output += "OliveTin::shellAfterCompleted stderr\n"
|
|
|
- req.logEntry.Output += stderr.String()
|
|
|
+ req.mutateLogEntry(func(entry *InternalLogEntry) {
|
|
|
+ entry.Output += "\n"
|
|
|
+ entry.Output += "OliveTin::shellAfterCompleted stdout\n"
|
|
|
+ entry.Output += stdout.String()
|
|
|
+ entry.Output += "OliveTin::shellAfterCompleted stderr\n"
|
|
|
+ entry.Output += stderr.String()
|
|
|
+ entry.Output += "OliveTin::shellAfterCompleted errors and summary\n"
|
|
|
+ })
|
|
|
|
|
|
- req.logEntry.Output += "OliveTin::shellAfterCompleted errors and summary\n"
|
|
|
- appendErrorToStderr(runerr, req.logEntry)
|
|
|
- appendErrorToStderr(waiterr, req.logEntry)
|
|
|
+ appendErrorToStderr(req, runerr)
|
|
|
+ appendErrorToStderr(req, waiterr)
|
|
|
|
|
|
if ctx.Err() == context.DeadlineExceeded {
|
|
|
- req.logEntry.Output += "Your shellAfterCompleted command timed out."
|
|
|
+ req.mutateLogEntry(func(entry *InternalLogEntry) {
|
|
|
+ entry.Output += "Your shellAfterCompleted command timed out."
|
|
|
+ })
|
|
|
}
|
|
|
|
|
|
- req.logEntry.Output += fmt.Sprintf("Your shellAfterCompleted exited with code %v\n", cmd.ProcessState.ExitCode())
|
|
|
-
|
|
|
- req.logEntry.Output += "OliveTin::shellAfterCompleted output complete\n"
|
|
|
+ req.mutateLogEntry(func(entry *InternalLogEntry) {
|
|
|
+ entry.Output += fmt.Sprintf("Your shellAfterCompleted exited with code %v\n", cmd.ProcessState.ExitCode())
|
|
|
+ entry.Output += "OliveTin::shellAfterCompleted output complete\n"
|
|
|
+ })
|
|
|
|
|
|
return true
|
|
|
}
|
|
|
@@ -1026,7 +1197,9 @@ func buildShellAfterCommand(ctx context.Context, req *ExecutionRequest, stdout,
|
|
|
finalParsedCommand, err := tpl.ParseTemplateWithActionContext(req.Binding.Action.ShellAfterCompleted, req.Binding.Entity, args)
|
|
|
if err != nil {
|
|
|
msg := "Could not prepare shellAfterCompleted command: " + err.Error() + "\n"
|
|
|
- req.logEntry.Output += msg
|
|
|
+ req.mutateLogEntry(func(entry *InternalLogEntry) {
|
|
|
+ entry.Output += msg
|
|
|
+ })
|
|
|
log.Warn(msg)
|
|
|
return nil, nil, nil
|
|
|
}
|
|
|
@@ -1061,7 +1234,9 @@ func stepTrigger(req *ExecutionRequest) bool {
|
|
|
"actionTitle": req.logEntry.ActionTitle,
|
|
|
"depth": req.TriggerDepth,
|
|
|
}).Warnf("Trigger action reached maximum depth of %v. Not triggering further actions.", MaxTriggerDepth)
|
|
|
- req.logEntry.Output += fmt.Sprintf("OliveTin::trigger - this action reached maximum trigger depth of %v. Not triggering further actions.", MaxTriggerDepth)
|
|
|
+ req.mutateLogEntry(func(entry *InternalLogEntry) {
|
|
|
+ entry.Output += fmt.Sprintf("OliveTin::trigger - this action reached maximum trigger depth of %v. Not triggering further actions.", MaxTriggerDepth)
|
|
|
+ })
|
|
|
return true
|
|
|
}
|
|
|
|