Pārlūkot izejas kodu

chore: coderabbit suggestions

jamesread 2 nedēļas atpakaļ
vecāks
revīzija
0e8f7c7be3

+ 1 - 0
frontend/resources/vue/ActionButton.vue

@@ -297,6 +297,7 @@ function onExecutionStarted(logEntry) {
   }
   }
 
 
   isDisabled.value = true
   isDisabled.value = true
+  updateDom(null, title.value)
 }
 }
 
 
 function onExecutionFinished(logEntry) {
 function onExecutionFinished(logEntry) {

+ 21 - 8
service/internal/config/sanitize.go

@@ -25,10 +25,10 @@ func (cfg *Config) Sanitize() {
 		cfg.Actions[idx].sanitize(cfg)
 		cfg.Actions[idx].sanitize(cfg)
 	}
 	}
 
 
-	cfg.sanitizeActionGroupReferences()
-
 	cfg.sanitizeDashboardsForInlineActions()
 	cfg.sanitizeDashboardsForInlineActions()
 
 
+	cfg.sanitizeActionGroupReferences()
+
 	if err := cfg.validateReservedActionArgumentNames(); err != nil {
 	if err := cfg.validateReservedActionArgumentNames(); err != nil {
 		log.Fatalf("%v", err)
 		log.Fatalf("%v", err)
 	}
 	}
@@ -218,16 +218,29 @@ func appendUniqueString(out []string, seen map[string]struct{}, value string) []
 func (cfg *Config) sanitizeActionGroupReferences() {
 func (cfg *Config) sanitizeActionGroupReferences() {
 	for _, action := range cfg.Actions {
 	for _, action := range cfg.Actions {
 		for _, groupName := range action.Groups {
 		for _, groupName := range action.Groups {
-			if _, found := cfg.ActionGroups[groupName]; !found {
-				log.WithFields(log.Fields{
-					"actionTitle": action.Title,
-					"groupName":   groupName,
-				}).Warn("Action references unknown action group")
-			}
+			cfg.warnInvalidActionGroupReference(action, groupName)
 		}
 		}
 	}
 	}
 }
 }
 
 
+func (cfg *Config) warnInvalidActionGroupReference(action *Action, groupName string) {
+	group, found := cfg.ActionGroups[groupName]
+	if !found {
+		log.WithFields(log.Fields{
+			"actionTitle": action.Title,
+			"groupName":   groupName,
+		}).Warn("Action references unknown action group")
+		return
+	}
+
+	if group == nil || group.MaxConcurrent < 1 {
+		log.WithFields(log.Fields{
+			"actionTitle": action.Title,
+			"groupName":   groupName,
+		}).Warn("Action references action group that will not be enforced at runtime")
+	}
+}
+
 func (cfg *Config) sanitizeAuthRequireGuestsToLogin() {
 func (cfg *Config) sanitizeAuthRequireGuestsToLogin() {
 	if cfg.AuthRequireGuestsToLogin {
 	if cfg.AuthRequireGuestsToLogin {
 		log.Infof("AuthRequireGuestsToLogin is enabled. All defaultPermissions will be set to false")
 		log.Infof("AuthRequireGuestsToLogin is enabled. All defaultPermissions will be set to false")

+ 197 - 73
service/internal/executor/executor.go

@@ -95,6 +95,48 @@ type ExecutionRequest struct {
 	skipRequestRegistration bool
 	skipRequestRegistration bool
 }
 }
 
 
+func (req *ExecutionRequest) mutateLogEntry(mutator func(*InternalLogEntry)) {
+	if req.executor == nil {
+		mutator(req.logEntry)
+		return
+	}
+
+	req.executor.logmutex.Lock()
+	defer req.executor.logmutex.Unlock()
+
+	mutator(req.logEntry)
+}
+
+// LogEntrySnapshot is a copy of selected log entry fields for race-safe reads.
+type LogEntrySnapshot struct {
+	Queued            bool
+	Blocked           bool
+	ExecutionStarted  bool
+	ExecutionFinished bool
+	ExitCode          int32
+	Output            string
+}
+
+// SnapshotLog returns a copy of selected log entry fields under read lock.
+func (e *Executor) SnapshotLog(trackingID string) (LogEntrySnapshot, bool) {
+	e.logmutex.RLock()
+	defer e.logmutex.RUnlock()
+
+	entry, found := e.logs[trackingID]
+	if !found {
+		return LogEntrySnapshot{}, false
+	}
+
+	return LogEntrySnapshot{
+		Queued:            entry.Queued,
+		Blocked:           entry.Blocked,
+		ExecutionStarted:  entry.ExecutionStarted,
+		ExecutionFinished: entry.ExecutionFinished,
+		ExitCode:          entry.ExitCode,
+		Output:            entry.Output,
+	}, true
+}
+
 // InternalLogEntry objects are created by an Executor, and represent the final
 // InternalLogEntry objects are created by an Executor, and represent the final
 // state of execution (even if the command is not executed). It's designed to be
 // state of execution (even if the command is not executed). It's designed to be
 // easily serializable.
 // easily serializable.
@@ -375,7 +417,7 @@ func (e *Executor) GetLogsByBindingId(bindingId string) []*InternalLogEntry {
 
 
 // shouldCountExecution checks if a log entry should be counted for rate limiting.
 // shouldCountExecution checks if a log entry should be counted for rate limiting.
 func shouldCountExecution(logEntry *InternalLogEntry, windowStart time.Time) bool {
 func shouldCountExecution(logEntry *InternalLogEntry, windowStart time.Time) bool {
-	return !logEntry.Blocked && logEntry.DatetimeStarted.After(windowStart)
+	return !logEntry.Blocked && !logEntry.Queued && logEntry.DatetimeStarted.After(windowStart)
 }
 }
 
 
 // updateOldestExecution updates the oldest execution time if this entry is older.
 // updateOldestExecution updates the oldest execution time if this entry is older.
@@ -489,24 +531,30 @@ func (e *Executor) GetTimeUntilAvailable(binding *ActionBinding) int64 {
 	return maxExpiryTime.Unix()
 	return maxExpiryTime.Unix()
 }
 }
 
 
-func (e *Executor) SetLog(trackingID string, entry *InternalLogEntry) {
+func (e *Executor) SetLog(trackingID string, entry *InternalLogEntry) string {
 	e.logmutex.Lock()
 	e.logmutex.Lock()
+	defer e.logmutex.Unlock()
+
+	if _, found := e.logs[trackingID]; found || !isValidTrackingID(trackingID) {
+		trackingID = uuid.NewString()
+		entry.ExecutionTrackingID = trackingID
+	}
 
 
 	entry.Index = int64(len(e.logsTrackingIdsByDate))
 	entry.Index = int64(len(e.logsTrackingIdsByDate))
 
 
 	e.logs[trackingID] = entry
 	e.logs[trackingID] = entry
 	e.logsTrackingIdsByDate = append(e.logsTrackingIdsByDate, trackingID)
 	e.logsTrackingIdsByDate = append(e.logsTrackingIdsByDate, trackingID)
 
 
-	e.logmutex.Unlock()
+	return trackingID
 }
 }
 
 
 // ExecRequest processes an ExecutionRequest
 // ExecRequest processes an ExecutionRequest
 func (e *Executor) ExecRequest(req *ExecutionRequest) (*sync.WaitGroup, string) {
 func (e *Executor) ExecRequest(req *ExecutionRequest) (*sync.WaitGroup, string) {
 	e.initializeExecRequest(req)
 	e.initializeExecRequest(req)
 
 
-	log.Tracef("executor.ExecRequest(): %v", req)
+	log.Tracef("executor.ExecRequest(): trackingID=%s bindingID=%s", req.TrackingID, bindingIDForTrace(req))
 
 
-	e.SetLog(req.TrackingID, req.logEntry)
+	req.TrackingID = e.SetLog(req.TrackingID, req.logEntry)
 
 
 	wg := new(sync.WaitGroup)
 	wg := new(sync.WaitGroup)
 	wg.Add(1)
 	wg.Add(1)
@@ -540,12 +588,15 @@ func (e *Executor) initializeExecRequest(req *ExecutionRequest) {
 		Username:            req.AuthenticatedUser.Username,
 		Username:            req.AuthenticatedUser.Username,
 	}
 	}
 
 
-	_, isDuplicate := e.GetLog(req.TrackingID)
-	if isDuplicate || !isValidTrackingID(req.TrackingID) {
-		req.TrackingID = uuid.NewString()
+	req.logEntry.ExecutionTrackingID = req.TrackingID
+}
+
+func bindingIDForTrace(req *ExecutionRequest) string {
+	if req.Binding == nil {
+		return ""
 	}
 	}
 
 
-	req.logEntry.ExecutionTrackingID = req.TrackingID
+	return req.Binding.ID
 }
 }
 
 
 func (e *Executor) execChain(req *ExecutionRequest, wg *sync.WaitGroup) bool {
 func (e *Executor) execChain(req *ExecutionRequest, wg *sync.WaitGroup) bool {
@@ -572,7 +623,17 @@ func (e *Executor) registerOrQueueRequest(req *ExecutionRequest, wg *sync.WaitGr
 		return false, false
 		return false, false
 	}
 	}
 
 
+	return e.queueRequestAfterACL(req, wg)
+}
+
+func (e *Executor) queueRequestAfterACL(req *ExecutionRequest, wg *sync.WaitGroup) (finished bool, queued bool) {
+	if !stepACLCheck(req) {
+		e.finishExecChain(req)
+		return true, false
+	}
+
 	e.queueRequest(req, wg)
 	e.queueRequest(req, wg)
+	notifyListenersStarted(req)
 
 
 	return false, true
 	return false, true
 }
 }
@@ -586,11 +647,14 @@ func (e *Executor) runExecutionSteps(req *ExecutionRequest) {
 }
 }
 
 
 func (e *Executor) finishExecChain(req *ExecutionRequest) {
 func (e *Executor) finishExecChain(req *ExecutionRequest) {
-	if req.logEntry.DatetimeFinished.IsZero() {
-		req.logEntry.DatetimeFinished = time.Now()
-	}
+	req.mutateLogEntry(func(entry *InternalLogEntry) {
+		if entry.DatetimeFinished.IsZero() {
+			entry.DatetimeFinished = time.Now()
+		}
+
+		entry.ExecutionFinished = true
+	})
 
 
-	req.logEntry.ExecutionFinished = true
 	notifyListenersFinished(req)
 	notifyListenersFinished(req)
 	e.drainGroupQueue()
 	e.drainGroupQueue()
 }
 }
@@ -599,9 +663,10 @@ func getConcurrentCount(req *ExecutionRequest) int {
 	concurrentCount := 0
 	concurrentCount := 0
 
 
 	req.executor.logmutex.RLock()
 	req.executor.logmutex.RLock()
+	logs := req.executor.LogsByBindingId[req.Binding.ID]
 
 
-	for _, log := range req.executor.GetLogsByBindingId(req.Binding.ID) {
-		if !log.ExecutionFinished {
+	for _, log := range logs {
+		if !log.ExecutionFinished && !log.Queued {
 			concurrentCount += 1
 			concurrentCount += 1
 		}
 		}
 	}
 	}
@@ -622,8 +687,10 @@ func stepConcurrencyCheck(req *ExecutionRequest) bool {
 			"maxConcurrent":   req.Binding.Action.MaxConcurrent,
 			"maxConcurrent":   req.Binding.Action.MaxConcurrent,
 		}).Warnf("Blocked from executing due to concurrency limit")
 		}).Warnf("Blocked from executing due to concurrency limit")
 
 
-		req.logEntry.Output = "Blocked from executing due to concurrency limit"
-		req.logEntry.Blocked = true
+		req.mutateLogEntry(func(entry *InternalLogEntry) {
+			entry.Output = "Blocked from executing due to concurrency limit"
+			entry.Blocked = true
+		})
 		return false
 		return false
 	}
 	}
 
 
@@ -642,24 +709,35 @@ func parseDuration(rate config.RateSpec) time.Duration {
 	return duration
 	return duration
 }
 }
 
 
-//gocyclo:ignore
-func getExecutionsCount(rate config.RateSpec, req *ExecutionRequest) int {
-	executions := -1 // Because we will find ourself when checking execution logs
-
-	duration := parseDuration(rate)
+func entityPrefixForRequest(req *ExecutionRequest) string {
+	if req.Binding != nil && req.Binding.Entity != nil {
+		return req.Binding.Entity.UniqueKey
+	}
 
 
-	then := time.Now().Add(-duration)
+	return ""
+}
 
 
-	currentEntityPrefix := ""
-	if req.Binding != nil && req.Binding.Entity != nil {
-		currentEntityPrefix = req.Binding.Entity.UniqueKey
+func rateExecutionMatchesScope(logEntry *InternalLogEntry, req *ExecutionRequest, entityPrefix string) bool {
+	if logEntry.EntityPrefix != entityPrefix {
+		return false
 	}
 	}
-	for _, logEntry := range req.executor.GetLogsByBindingId(req.Binding.ID) {
-		if logEntry.EntityPrefix != currentEntityPrefix {
-			continue
-		}
-		if logEntry.DatetimeStarted.After(then) && !logEntry.Blocked {
 
 
+	return !logEntry.Queued && logEntry.ExecutionTrackingID != req.TrackingID
+}
+
+func logEntryStartedInWindow(logEntry *InternalLogEntry, windowStart time.Time) bool {
+	return logEntry.DatetimeStarted.After(windowStart) && !logEntry.Blocked
+}
+
+func rateExecutionCountsForRate(logEntry *InternalLogEntry, req *ExecutionRequest, entityPrefix string, windowStart time.Time) bool {
+	return rateExecutionMatchesScope(logEntry, req, entityPrefix) && logEntryStartedInWindow(logEntry, windowStart)
+}
+
+func countRateExecutions(logs []*InternalLogEntry, req *ExecutionRequest, entityPrefix string, windowStart time.Time) int {
+	executions := 0
+
+	for _, logEntry := range logs {
+		if rateExecutionCountsForRate(logEntry, req, entityPrefix, windowStart) {
 			executions += 1
 			executions += 1
 		}
 		}
 	}
 	}
@@ -667,6 +745,18 @@ func getExecutionsCount(rate config.RateSpec, req *ExecutionRequest) int {
 	return executions
 	return executions
 }
 }
 
 
+func getExecutionsCount(rate config.RateSpec, req *ExecutionRequest) int {
+	duration := parseDuration(rate)
+	then := time.Now().Add(-duration)
+
+	req.executor.logmutex.RLock()
+	logs := req.executor.LogsByBindingId[req.Binding.ID]
+	executions := countRateExecutions(logs, req, entityPrefixForRequest(req), then)
+	req.executor.logmutex.RUnlock()
+
+	return executions
+}
+
 func stepRateCheck(req *ExecutionRequest) bool {
 func stepRateCheck(req *ExecutionRequest) bool {
 	for _, rate := range req.Binding.Action.MaxRate {
 	for _, rate := range req.Binding.Action.MaxRate {
 		executions := getExecutionsCount(rate, req)
 		executions := getExecutionsCount(rate, req)
@@ -679,8 +769,10 @@ func stepRateCheck(req *ExecutionRequest) bool {
 				"duration":    rate.Duration,
 				"duration":    rate.Duration,
 			}).Infof("Blocked from executing due to rate limit")
 			}).Infof("Blocked from executing due to rate limit")
 
 
-			req.logEntry.Output = "Blocked from executing due to rate limit"
-			req.logEntry.Blocked = true
+			req.mutateLogEntry(func(entry *InternalLogEntry) {
+				entry.Output = "Blocked from executing due to rate limit"
+				entry.Blocked = true
+			})
 			return false
 			return false
 		}
 		}
 	}
 	}
@@ -692,8 +784,10 @@ func stepACLCheck(req *ExecutionRequest) bool {
 	canExec := acl.IsAllowedExec(req.Cfg, req.AuthenticatedUser, req.Binding.Action)
 	canExec := acl.IsAllowedExec(req.Cfg, req.AuthenticatedUser, req.Binding.Action)
 
 
 	if !canExec {
 	if !canExec {
-		req.logEntry.Output = "ACL check failed. Blocked from executing."
-		req.logEntry.Blocked = true
+		req.mutateLogEntry(func(entry *InternalLogEntry) {
+			entry.Output = "ACL check failed. Blocked from executing."
+			entry.Blocked = true
+		})
 
 
 		log.WithFields(log.Fields{
 		log.WithFields(log.Fields{
 			"actionTitle": req.logEntry.ActionTitle,
 			"actionTitle": req.logEntry.ActionTitle,
@@ -831,7 +925,9 @@ func hasExec(req *ExecutionRequest) bool {
 }
 }
 
 
 func fail(req *ExecutionRequest, err error) bool {
 func fail(req *ExecutionRequest, err error) bool {
-	req.logEntry.Output = err.Error()
+	req.mutateLogEntry(func(entry *InternalLogEntry) {
+		entry.Output = err.Error()
+	})
 	log.Warn(err.Error())
 	log.Warn(err.Error())
 	return false
 	return false
 }
 }
@@ -865,14 +961,16 @@ func stepRequestActionHasBinding(req *ExecutionRequest) bool {
 }
 }
 
 
 func stepRequestActionPopulateLogEntry(req *ExecutionRequest) {
 func stepRequestActionPopulateLogEntry(req *ExecutionRequest) {
-	req.logEntry.Binding = req.Binding
-	req.logEntry.ActionConfigTitle = req.Binding.Action.Title
-	req.logEntry.ActionTitle = tpl.ParseTemplateOfActionBeforeExec(req.Binding.Action.Title, req.Binding.Entity)
-	req.logEntry.ActionIcon = req.Binding.Action.Icon
-	req.logEntry.Tags = req.Tags
-	if req.Binding.Entity != nil {
-		req.logEntry.EntityPrefix = req.Binding.Entity.UniqueKey
-	}
+	req.mutateLogEntry(func(entry *InternalLogEntry) {
+		entry.Binding = req.Binding
+		entry.ActionConfigTitle = req.Binding.Action.Title
+		entry.ActionTitle = tpl.ParseTemplateOfActionBeforeExec(req.Binding.Action.Title, req.Binding.Entity)
+		entry.ActionIcon = req.Binding.Action.Icon
+		entry.Tags = req.Tags
+		if req.Binding.Entity != nil {
+			entry.EntityPrefix = req.Binding.Entity.UniqueKey
+		}
+	})
 }
 }
 
 
 func stepRequestActionRegisterLog(req *ExecutionRequest) {
 func stepRequestActionRegisterLog(req *ExecutionRequest) {
@@ -895,7 +993,9 @@ func stepLogStart(req *ExecutionRequest) bool {
 }
 }
 
 
 func stepLogFinish(req *ExecutionRequest) bool {
 func stepLogFinish(req *ExecutionRequest) bool {
-	req.logEntry.ExecutionFinished = true
+	req.mutateLogEntry(func(entry *InternalLogEntry) {
+		entry.ExecutionFinished = true
+	})
 
 
 	log.WithFields(log.Fields{
 	log.WithFields(log.Fields{
 		"actionTitle":  req.logEntry.ActionTitle,
 		"actionTitle":  req.logEntry.ActionTitle,
@@ -919,10 +1019,14 @@ func notifyListenersStarted(req *ExecutionRequest) {
 	}
 	}
 }
 }
 
 
-func appendErrorToStderr(err error, logEntry *InternalLogEntry) {
-	if err != nil {
-		logEntry.Output = err.Error() + "\n\n" + logEntry.Output
+func appendErrorToStderr(req *ExecutionRequest, err error) {
+	if err == nil {
+		return
 	}
 	}
+
+	req.mutateLogEntry(func(entry *InternalLogEntry) {
+		entry.Output = err.Error() + "\n\n" + entry.Output
+	})
 }
 }
 
 
 type OutputStreamer struct {
 type OutputStreamer struct {
@@ -965,31 +1069,41 @@ func stepExec(req *ExecutionRequest) bool {
 	streamer := &OutputStreamer{Req: req}
 	streamer := &OutputStreamer{Req: req}
 	cmd := buildCommand(ctx, req)
 	cmd := buildCommand(ctx, req)
 	if cmd == nil {
 	if cmd == nil {
-		req.logEntry.Output = "Cannot execute: no command arguments provided"
+		req.mutateLogEntry(func(entry *InternalLogEntry) {
+			entry.Output = "Cannot execute: no command arguments provided"
+		})
 		log.Warn("Cannot execute: no command arguments provided")
 		log.Warn("Cannot execute: no command arguments provided")
 		return false
 		return false
 	}
 	}
 	prepareCommand(cmd, streamer, req)
 	prepareCommand(cmd, streamer, req)
 	runerr := cmd.Start()
 	runerr := cmd.Start()
-	req.logEntry.Process = cmd.Process
+	req.mutateLogEntry(func(entry *InternalLogEntry) {
+		entry.Process = cmd.Process
+	})
 	ctx.setProcess(cmd.Process)
 	ctx.setProcess(cmd.Process)
 	waiterr := cmd.Wait()
 	waiterr := cmd.Wait()
-	req.logEntry.ExitCode = int32(cmd.ProcessState.ExitCode())
-	req.logEntry.Output = streamer.String()
+	req.mutateLogEntry(func(entry *InternalLogEntry) {
+		entry.ExitCode = int32(cmd.ProcessState.ExitCode())
+		entry.Output = streamer.String()
+	})
 
 
-	appendErrorToStderr(runerr, req.logEntry)
-	appendErrorToStderr(waiterr, req.logEntry)
+	appendErrorToStderr(req, runerr)
+	appendErrorToStderr(req, waiterr)
 
 
 	if ctx.Err() == context.DeadlineExceeded {
 	if ctx.Err() == context.DeadlineExceeded {
 		log.WithFields(log.Fields{
 		log.WithFields(log.Fields{
 			"actionTitle": req.logEntry.ActionTitle,
 			"actionTitle": req.logEntry.ActionTitle,
 		}).Warnf("Action timed out")
 		}).Warnf("Action timed out")
 
 
-		req.logEntry.TimedOut = true
-		req.logEntry.Output += "OliveTin::timeout - this action timed out after " + fmt.Sprintf("%v", req.Binding.Action.Timeout) + " seconds. If you need more time for this action, set a longer timeout. See https://docs.olivetin.app/action_customization/timeouts.html for more help."
+		req.mutateLogEntry(func(entry *InternalLogEntry) {
+			entry.TimedOut = true
+			entry.Output += "OliveTin::timeout - this action timed out after " + fmt.Sprintf("%v", req.Binding.Action.Timeout) + " seconds. If you need more time for this action, set a longer timeout. See https://docs.olivetin.app/action_customization/timeouts.html for more help."
+		})
 	}
 	}
 
 
-	req.logEntry.DatetimeFinished = time.Now()
+	req.mutateLogEntry(func(entry *InternalLogEntry) {
+		entry.DatetimeFinished = time.Now()
+	})
 
 
 	return true
 	return true
 }
 }
@@ -1005,7 +1119,9 @@ func prepareCommand(cmd *exec.Cmd, streamer *OutputStreamer, req *ExecutionReque
 	cmd.Stdout = streamer
 	cmd.Stdout = streamer
 	cmd.Stderr = streamer
 	cmd.Stderr = streamer
 	cmd.Env = buildEnv(req.Arguments)
 	cmd.Env = buildEnv(req.Arguments)
-	req.logEntry.ExecutionStarted = true
+	req.mutateLogEntry(func(entry *InternalLogEntry) {
+		entry.ExecutionStarted = true
+	})
 }
 }
 
 
 func stepExecAfter(req *ExecutionRequest) bool {
 func stepExecAfter(req *ExecutionRequest) bool {
@@ -1030,24 +1146,28 @@ func stepExecAfter(req *ExecutionRequest) bool {
 
 
 	waiterr := cmd.Wait()
 	waiterr := cmd.Wait()
 
 
-	req.logEntry.Output += "\n"
-	req.logEntry.Output += "OliveTin::shellAfterCompleted stdout\n"
-	req.logEntry.Output += stdout.String()
-
-	req.logEntry.Output += "OliveTin::shellAfterCompleted stderr\n"
-	req.logEntry.Output += stderr.String()
+	req.mutateLogEntry(func(entry *InternalLogEntry) {
+		entry.Output += "\n"
+		entry.Output += "OliveTin::shellAfterCompleted stdout\n"
+		entry.Output += stdout.String()
+		entry.Output += "OliveTin::shellAfterCompleted stderr\n"
+		entry.Output += stderr.String()
+		entry.Output += "OliveTin::shellAfterCompleted errors and summary\n"
+	})
 
 
-	req.logEntry.Output += "OliveTin::shellAfterCompleted errors and summary\n"
-	appendErrorToStderr(runerr, req.logEntry)
-	appendErrorToStderr(waiterr, req.logEntry)
+	appendErrorToStderr(req, runerr)
+	appendErrorToStderr(req, waiterr)
 
 
 	if ctx.Err() == context.DeadlineExceeded {
 	if ctx.Err() == context.DeadlineExceeded {
-		req.logEntry.Output += "Your shellAfterCompleted command timed out."
+		req.mutateLogEntry(func(entry *InternalLogEntry) {
+			entry.Output += "Your shellAfterCompleted command timed out."
+		})
 	}
 	}
 
 
-	req.logEntry.Output += fmt.Sprintf("Your shellAfterCompleted exited with code %v\n", cmd.ProcessState.ExitCode())
-
-	req.logEntry.Output += "OliveTin::shellAfterCompleted output complete\n"
+	req.mutateLogEntry(func(entry *InternalLogEntry) {
+		entry.Output += fmt.Sprintf("Your shellAfterCompleted exited with code %v\n", cmd.ProcessState.ExitCode())
+		entry.Output += "OliveTin::shellAfterCompleted output complete\n"
+	})
 
 
 	return true
 	return true
 }
 }
@@ -1065,7 +1185,9 @@ func buildShellAfterCommand(ctx context.Context, req *ExecutionRequest, stdout,
 	finalParsedCommand, err := tpl.ParseTemplateWithActionContext(req.Binding.Action.ShellAfterCompleted, req.Binding.Entity, args)
 	finalParsedCommand, err := tpl.ParseTemplateWithActionContext(req.Binding.Action.ShellAfterCompleted, req.Binding.Entity, args)
 	if err != nil {
 	if err != nil {
 		msg := "Could not prepare shellAfterCompleted command: " + err.Error() + "\n"
 		msg := "Could not prepare shellAfterCompleted command: " + err.Error() + "\n"
-		req.logEntry.Output += msg
+		req.mutateLogEntry(func(entry *InternalLogEntry) {
+			entry.Output += msg
+		})
 		log.Warn(msg)
 		log.Warn(msg)
 		return nil, nil, nil
 		return nil, nil, nil
 	}
 	}
@@ -1100,7 +1222,9 @@ func stepTrigger(req *ExecutionRequest) bool {
 			"actionTitle": req.logEntry.ActionTitle,
 			"actionTitle": req.logEntry.ActionTitle,
 			"depth":       req.TriggerDepth,
 			"depth":       req.TriggerDepth,
 		}).Warnf("Trigger action reached maximum depth of %v. Not triggering further actions.", MaxTriggerDepth)
 		}).Warnf("Trigger action reached maximum depth of %v. Not triggering further actions.", MaxTriggerDepth)
-		req.logEntry.Output += fmt.Sprintf("OliveTin::trigger - this action reached maximum trigger depth of %v. Not triggering further actions.", MaxTriggerDepth)
+		req.mutateLogEntry(func(entry *InternalLogEntry) {
+			entry.Output += fmt.Sprintf("OliveTin::trigger - this action reached maximum trigger depth of %v. Not triggering further actions.", MaxTriggerDepth)
+		})
 		return true
 		return true
 	}
 	}
 
 

+ 28 - 11
service/internal/executor/group_concurrency.go

@@ -61,11 +61,15 @@ func actionInGroup(action *config.Action, groupName string) bool {
 }
 }
 
 
 func (e *Executor) countActiveInGroup(groupName string) int {
 func (e *Executor) countActiveInGroup(groupName string) int {
-	count := 0
-
 	e.logmutex.RLock()
 	e.logmutex.RLock()
 	defer e.logmutex.RUnlock()
 	defer e.logmutex.RUnlock()
 
 
+	return e.countActiveInGroupLocked(groupName)
+}
+
+func (e *Executor) countActiveInGroupLocked(groupName string) int {
+	count := 0
+
 	for _, logEntry := range e.logs {
 	for _, logEntry := range e.logs {
 		if logEntryIsActiveInGroup(logEntry, groupName) {
 		if logEntryIsActiveInGroup(logEntry, groupName) {
 			count++
 			count++
@@ -129,12 +133,25 @@ func firstFullGroupName(e *Executor, req *ExecutionRequest) string {
 	return ""
 	return ""
 }
 }
 
 
+func firstFullGroupNameLocked(e *Executor, req *ExecutionRequest) string {
+	for _, limit := range actionGroupLimits(req) {
+		if e.countActiveInGroupLocked(limit.name) >= (limit.maxConcurrent + 1) {
+			return limit.name
+		}
+	}
+
+	return ""
+}
+
 func (e *Executor) queueRequest(req *ExecutionRequest, wg *sync.WaitGroup) {
 func (e *Executor) queueRequest(req *ExecutionRequest, wg *sync.WaitGroup) {
-	groupName := firstFullGroupName(e, req)
+	var groupName string
 
 
-	req.logEntry.Queued = true
-	req.logEntry.QueuedForGroup = groupName
-	req.logEntry.Output = fmt.Sprintf("Queued waiting for action group %q", groupName)
+	req.mutateLogEntry(func(entry *InternalLogEntry) {
+		groupName = firstFullGroupNameLocked(e, req)
+		entry.Queued = true
+		entry.QueuedForGroup = groupName
+		entry.Output = fmt.Sprintf("Queued waiting for action group %q", groupName)
+	})
 
 
 	log.WithFields(log.Fields{
 	log.WithFields(log.Fields{
 		"actionTitle": req.logEntry.ActionTitle,
 		"actionTitle": req.logEntry.ActionTitle,
@@ -163,17 +180,17 @@ func (e *Executor) drainGroupQueue() {
 	e.groupQueue = e.groupQueue[1:]
 	e.groupQueue = e.groupQueue[1:]
 	e.groupQueueMu.Unlock()
 	e.groupQueueMu.Unlock()
 
 
+	next.req.mutateLogEntry(func(entry *InternalLogEntry) {
+		entry.Queued = false
+		entry.QueuedForGroup = ""
+	})
+
 	go e.runDequeuedExecution(next)
 	go e.runDequeuedExecution(next)
 }
 }
 
 
 func (e *Executor) runDequeuedExecution(queued *queuedExecution) {
 func (e *Executor) runDequeuedExecution(queued *queuedExecution) {
 	req := queued.req
 	req := queued.req
 
 
-	e.logmutex.Lock()
-	req.logEntry.Queued = false
-	req.logEntry.QueuedForGroup = ""
-	e.logmutex.Unlock()
-
 	req.skipRequestRegistration = true
 	req.skipRequestRegistration = true
 
 
 	e.runExecutionSteps(req)
 	e.runExecutionSteps(req)

+ 25 - 25
service/internal/executor/group_concurrency_test.go

@@ -64,19 +64,19 @@ func TestGroupConcurrencyQueuesSecondAction(t *testing.T) {
 	})
 	})
 
 
 	require.Eventually(t, func() bool {
 	require.Eventually(t, func() bool {
-		logEntry, ok := e.GetLog(tracking2)
-		return ok && logEntry.Queued
+		snapshot, ok := e.SnapshotLog(tracking2)
+		return ok && snapshot.Queued
 	}, time.Second, 10*time.Millisecond)
 	}, time.Second, 10*time.Millisecond)
 
 
 	wg1.Wait()
 	wg1.Wait()
 	wg2.Wait()
 	wg2.Wait()
 
 
-	logEntry, ok := e.GetLog(tracking2)
+	snapshot, ok := e.SnapshotLog(tracking2)
 	require.True(t, ok)
 	require.True(t, ok)
-	assert.False(t, logEntry.Queued)
-	assert.False(t, logEntry.Blocked)
-	assert.Equal(t, int32(0), logEntry.ExitCode)
-	assert.Contains(t, logEntry.Output, "queued-run")
+	assert.False(t, snapshot.Queued)
+	assert.False(t, snapshot.Blocked)
+	assert.Equal(t, int32(0), snapshot.ExitCode)
+	assert.Contains(t, snapshot.Output, "queued-run")
 }
 }
 
 
 func TestDifferentGroupsRunConcurrently(t *testing.T) {
 func TestDifferentGroupsRunConcurrently(t *testing.T) {
@@ -101,13 +101,13 @@ func TestDifferentGroupsRunConcurrently(t *testing.T) {
 		},
 		},
 	)
 	)
 
 
-	wg1, _ := e.ExecRequest(&ExecutionRequest{
+	wg1, tracking1 := e.ExecRequest(&ExecutionRequest{
 		Binding:           e.FindBindingWithNoEntity(actionA),
 		Binding:           e.FindBindingWithNoEntity(actionA),
 		Cfg:               cfg,
 		Cfg:               cfg,
 		AuthenticatedUser: auth.UserFromSystem(cfg, "testuser"),
 		AuthenticatedUser: auth.UserFromSystem(cfg, "testuser"),
 	})
 	})
 
 
-	time.Sleep(50 * time.Millisecond)
+	waitUntilExecutionStarted(t, e, tracking1)
 
 
 	wg2, tracking2 := e.ExecRequest(&ExecutionRequest{
 	wg2, tracking2 := e.ExecRequest(&ExecutionRequest{
 		Binding:           e.FindBindingWithNoEntity(actionB),
 		Binding:           e.FindBindingWithNoEntity(actionB),
@@ -116,16 +116,16 @@ func TestDifferentGroupsRunConcurrently(t *testing.T) {
 	})
 	})
 
 
 	require.Eventually(t, func() bool {
 	require.Eventually(t, func() bool {
-		logEntry, ok := e.GetLog(tracking2)
-		return ok && logEntry.ExecutionFinished && !logEntry.Queued
+		snapshot, ok := e.SnapshotLog(tracking2)
+		return ok && snapshot.ExecutionFinished && !snapshot.Queued
 	}, 2*time.Second, 20*time.Millisecond)
 	}, 2*time.Second, 20*time.Millisecond)
 
 
 	wg1.Wait()
 	wg1.Wait()
 	wg2.Wait()
 	wg2.Wait()
 
 
-	logEntry, ok := e.GetLog(tracking2)
+	snapshot, ok := e.SnapshotLog(tracking2)
 	require.True(t, ok)
 	require.True(t, ok)
-	assert.Contains(t, logEntry.Output, "group-b")
+	assert.Contains(t, snapshot.Output, "group-b")
 }
 }
 
 
 func TestPerActionConcurrencyStillBlocksWithoutQueue(t *testing.T) {
 func TestPerActionConcurrencyStillBlocksWithoutQueue(t *testing.T) {
@@ -140,13 +140,13 @@ func TestPerActionConcurrencyStillBlocksWithoutQueue(t *testing.T) {
 	e, cfg := testGroupExecutor([]*config.Action{action}, nil)
 	e, cfg := testGroupExecutor([]*config.Action{action}, nil)
 	binding := e.FindBindingWithNoEntity(action)
 	binding := e.FindBindingWithNoEntity(action)
 
 
-	wg1, _ := e.ExecRequest(&ExecutionRequest{
+	wg1, tracking1 := e.ExecRequest(&ExecutionRequest{
 		Binding:           binding,
 		Binding:           binding,
 		Cfg:               cfg,
 		Cfg:               cfg,
 		AuthenticatedUser: auth.UserFromSystem(cfg, "testuser"),
 		AuthenticatedUser: auth.UserFromSystem(cfg, "testuser"),
 	})
 	})
 
 
-	time.Sleep(50 * time.Millisecond)
+	waitUntilExecutionStarted(t, e, tracking1)
 
 
 	wg2, tracking2 := e.ExecRequest(&ExecutionRequest{
 	wg2, tracking2 := e.ExecRequest(&ExecutionRequest{
 		Binding:           binding,
 		Binding:           binding,
@@ -157,18 +157,18 @@ func TestPerActionConcurrencyStillBlocksWithoutQueue(t *testing.T) {
 	wg1.Wait()
 	wg1.Wait()
 	wg2.Wait()
 	wg2.Wait()
 
 
-	logEntry, ok := e.GetLog(tracking2)
+	snapshot, ok := e.SnapshotLog(tracking2)
 	require.True(t, ok)
 	require.True(t, ok)
-	assert.True(t, logEntry.Blocked)
-	assert.False(t, logEntry.Queued)
+	assert.True(t, snapshot.Blocked)
+	assert.False(t, snapshot.Queued)
 }
 }
 
 
 func waitUntilExecutionStarted(t *testing.T, e *Executor, trackingID string) {
 func waitUntilExecutionStarted(t *testing.T, e *Executor, trackingID string) {
 	t.Helper()
 	t.Helper()
 
 
 	require.Eventually(t, func() bool {
 	require.Eventually(t, func() bool {
-		logEntry, ok := e.GetLog(trackingID)
-		return ok && logEntry.ExecutionStarted
+		snapshot, ok := e.SnapshotLog(trackingID)
+		return ok && snapshot.ExecutionStarted
 	}, 2*time.Second, 10*time.Millisecond)
 	}, 2*time.Second, 10*time.Millisecond)
 }
 }
 
 
@@ -247,9 +247,9 @@ func TestStartActionAndWaitWaitsForQueuedExecution(t *testing.T) {
 
 
 	assertWaitGroupCompletes(t, wg2)
 	assertWaitGroupCompletes(t, wg2)
 
 
-	logEntry, ok := e.GetLog(tracking2)
+	snapshot, ok := e.SnapshotLog(tracking2)
 	require.True(t, ok)
 	require.True(t, ok)
-	assert.Contains(t, logEntry.Output, "waited")
+	assert.Contains(t, snapshot.Output, "waited")
 }
 }
 
 
 func TestUnknownActionGroupReferenceWarnsAndSkipsLimit(t *testing.T) {
 func TestUnknownActionGroupReferenceWarnsAndSkipsLimit(t *testing.T) {
@@ -270,8 +270,8 @@ func TestUnknownActionGroupReferenceWarnsAndSkipsLimit(t *testing.T) {
 
 
 	wg.Wait()
 	wg.Wait()
 
 
-	logEntry, ok := e.GetLog(tracking)
+	snapshot, ok := e.SnapshotLog(tracking)
 	require.True(t, ok)
 	require.True(t, ok)
-	assert.False(t, logEntry.Queued)
-	assert.Equal(t, int32(0), logEntry.ExitCode)
+	assert.False(t, snapshot.Queued)
+	assert.Equal(t, int32(0), snapshot.ExitCode)
 }
 }