فهرست منبع

feat: Action grouping for concurrency limit and queuing (#1048)

James Read 2 هفته پیش
والد
کامیت
f6fc775ef7

+ 27 - 1
docs/modules/ROOT/pages/action_customization/concurrency.adoc

@@ -3,7 +3,7 @@
 
 
 By default, OliveTin will allow you to run several instances of an action at the same time. For example, an action might take 20 seconds, and if you click the button 3 times, for a time there will be 3 actions running at the same time.
 By default, OliveTin will allow you to run several instances of an action at the same time. For example, an action might take 20 seconds, and if you click the button 3 times, for a time there will be 3 actions running at the same time.
 
 
-Sometimes you don't want to allow this - an example case where it would not make sense is in the case of a backup script. To stop this, we can set `maxConcurrent` to `1`. 
+Sometimes you don't want to allow this - an example case where it would not make sense is in the case of a backup script. To stop this, we can set `maxConcurrent` to `1`.
 
 
 [source,yaml]
 [source,yaml]
 ----
 ----
@@ -29,4 +29,30 @@ WARN Blocked from executing. This would mean this action is running 2 times conc
 
 
 Naturally, you can set `maxConcurrent` to `3` or some other number, to limit the amount of times the action executes at once.
 Naturally, you can set `maxConcurrent` to `3` or some other number, to limit the amount of times the action executes at once.
 
 
+== Action groups
 
 
+Sometimes you need to limit concurrency across several different actions. For example, Unity only allows one build at a time, but you might have separate actions for different platforms.
+
+Use `actionGroups` to define a shared limit, and assign actions to a group with `groups`:
+
+[source,yaml]
+----
+actionGroups:
+  unity:
+    maxConcurrent: 1
+
+actions:
+  - title: Unity Android Build
+    shell: /opt/unity/build-android.sh
+    groups: [ unity ]
+
+  - title: Unity iOS Build
+    shell: /opt/unity/build-ios.sh
+    groups: [ unity ]
+----
+
+When the group limit is reached, additional requests are queued automatically and run in order when a slot becomes free. Queued executions appear in the logs with a queued status.
+
+Per-action `maxConcurrent` still applies separately. If the same action binding is started twice while one is already running, the second request is blocked immediately (not queued).
+
+The queue is held in memory. If OliveTin restarts while actions are queued, those queued requests are not preserved.

+ 18 - 2
frontend/js/websocket.js

@@ -194,11 +194,27 @@ async function handleConfigChangedEvent (j) {
   window.dispatchEvent(j)
   window.dispatchEvent(j)
 }
 }
 
 
+const eventCaseToTypeName = {
+  entityChanged: 'EventEntityChanged',
+  configChanged: 'EventConfigChanged',
+  executionFinished: 'EventExecutionFinished',
+  executionStarted: 'EventExecutionStarted',
+  outputChunk: 'EventOutputChunk',
+  heartbeat: 'EventHeartbeat'
+}
+
 function handleEvent (msg) {
 function handleEvent (msg) {
-  const typeName = msg.event.value.$typeName.replace('olivetin.api.v1.', '')
+  const eventCase = msg?.event?.case
+  const eventValue = msg?.event?.value
+  const typeName = eventCaseToTypeName[eventCase]
+
+  if (!typeName || !eventValue) {
+    console.warn('Skipping websocket event with no payload:', msg)
+    return
+  }
 
 
   const j = new Event(typeName)
   const j = new Event(typeName)
-  j.payload = msg.event.value
+  j.payload = eventValue
 
 
   switch (typeName) {
   switch (typeName) {
     case 'EventConfigChanged':
     case 'EventConfigChanged':

+ 10 - 0
frontend/resources/scripts/gen/olivetin/api/v1/olivetin_pb.d.ts

@@ -673,6 +673,16 @@ export declare type LogEntry = Message<"olivetin.api.v1.LogEntry"> & {
    * @generated from field: string binding_id = 20;
    * @generated from field: string binding_id = 20;
    */
    */
   bindingId: string;
   bindingId: string;
+
+  /**
+   * @generated from field: bool queued = 21;
+   */
+  queued: boolean;
+
+  /**
+   * @generated from field: string queued_for_group = 22;
+   */
+  queuedForGroup: string;
 };
 };
 
 
 /**
 /**

تفاوت فایلی نمایش داده نمی شود زیرا این فایل بسیار بزرگ است
+ 0 - 0
frontend/resources/scripts/gen/olivetin/api/v1/olivetin_pb.js


+ 14 - 0
frontend/resources/vue/ActionButton.vue

@@ -287,17 +287,25 @@ async function startAction(actionArgs) {
 function onLogEntryChanged(logEntry) {
 function onLogEntryChanged(logEntry) {
   if (logEntry.executionFinished) {
   if (logEntry.executionFinished) {
 	onExecutionFinished(logEntry)
 	onExecutionFinished(logEntry)
+  } else if (logEntry.queued && !logEntry.executionStarted) {
+	onExecutionQueued(logEntry)
   } else {
   } else {
 	onExecutionStarted(logEntry)
 	onExecutionStarted(logEntry)
   }
   }
 }
 }
 
 
+function onExecutionQueued(_logEntry) {
+  isDisabled.value = true
+  updateDom('action-queued', '[Queued]')
+}
+
 function onExecutionStarted(logEntry) {
 function onExecutionStarted(logEntry) {
   if (popupOnStart.value && popupOnStart.value.includes('execution-dialog')) {
   if (popupOnStart.value && popupOnStart.value.includes('execution-dialog')) {
 	router.push(`/logs/${logEntry.executionTrackingId}`)
 	router.push(`/logs/${logEntry.executionTrackingId}`)
   }
   }
 
 
   isDisabled.value = true
   isDisabled.value = true
+  updateDom(null, title.value)
 }
 }
 
 
 function onExecutionFinished(logEntry) {
 function onExecutionFinished(logEntry) {
@@ -449,6 +457,12 @@ defineExpose({
 		color: #721c24;
 		color: #721c24;
 	}
 	}
 
 
+	.action-button button.action-queued {
+		background: #fff3cd !important;
+		border-color: #ffeaa7;
+		color: #856404;
+	}
+
 	.action-button button.action-nonzero-exit {
 	.action-button button.action-nonzero-exit {
 		background: #f8d7da !important;
 		background: #f8d7da !important;
 		border-color: #f5c6cb;
 		border-color: #f5c6cb;

+ 2 - 0
proto/olivetin/api/v1/olivetin.proto

@@ -157,6 +157,8 @@ message LogEntry {
 	bool can_kill = 18;
 	bool can_kill = 18;
 	string datetime_rate_limit_expires = 19; // Datetime when rate limit expires (empty string if not rate limited), format: "2006-01-02 15:04:05"
 	string datetime_rate_limit_expires = 19; // Datetime when rate limit expires (empty string if not rate limited), format: "2006-01-02 15:04:05"
 	string binding_id = 20; // Binding ID for matching rate limits to action buttons
 	string binding_id = 20; // Binding ID for matching rate limits to action buttons
+	bool queued = 21;
+	string queued_for_group = 22;
 }
 }
 
 
 message GetLogsResponse {
 message GetLogsResponse {

+ 20 - 2
service/gen/olivetin/api/v1/olivetin.pb.go

@@ -1305,6 +1305,8 @@ type LogEntry struct {
 	CanKill                  bool                   `protobuf:"varint,18,opt,name=can_kill,json=canKill,proto3" json:"can_kill,omitempty"`
 	CanKill                  bool                   `protobuf:"varint,18,opt,name=can_kill,json=canKill,proto3" json:"can_kill,omitempty"`
 	DatetimeRateLimitExpires string                 `protobuf:"bytes,19,opt,name=datetime_rate_limit_expires,json=datetimeRateLimitExpires,proto3" json:"datetime_rate_limit_expires,omitempty"` // Datetime when rate limit expires (empty string if not rate limited), format: "2006-01-02 15:04:05"
 	DatetimeRateLimitExpires string                 `protobuf:"bytes,19,opt,name=datetime_rate_limit_expires,json=datetimeRateLimitExpires,proto3" json:"datetime_rate_limit_expires,omitempty"` // Datetime when rate limit expires (empty string if not rate limited), format: "2006-01-02 15:04:05"
 	BindingId                string                 `protobuf:"bytes,20,opt,name=binding_id,json=bindingId,proto3" json:"binding_id,omitempty"`                                                  // Binding ID for matching rate limits to action buttons
 	BindingId                string                 `protobuf:"bytes,20,opt,name=binding_id,json=bindingId,proto3" json:"binding_id,omitempty"`                                                  // Binding ID for matching rate limits to action buttons
+	Queued                   bool                   `protobuf:"varint,21,opt,name=queued,proto3" json:"queued,omitempty"`
+	QueuedForGroup           string                 `protobuf:"bytes,22,opt,name=queued_for_group,json=queuedForGroup,proto3" json:"queued_for_group,omitempty"`
 	unknownFields            protoimpl.UnknownFields
 	unknownFields            protoimpl.UnknownFields
 	sizeCache                protoimpl.SizeCache
 	sizeCache                protoimpl.SizeCache
 }
 }
@@ -1465,6 +1467,20 @@ func (x *LogEntry) GetBindingId() string {
 	return ""
 	return ""
 }
 }
 
 
+func (x *LogEntry) GetQueued() bool {
+	if x != nil {
+		return x.Queued
+	}
+	return false
+}
+
+func (x *LogEntry) GetQueuedForGroup() string {
+	if x != nil {
+		return x.QueuedForGroup
+	}
+	return ""
+}
+
 type GetLogsResponse struct {
 type GetLogsResponse struct {
 	state          protoimpl.MessageState `protogen:"open.v1"`
 	state          protoimpl.MessageState `protogen:"open.v1"`
 	Logs           []*LogEntry            `protobuf:"bytes,1,rep,name=logs,proto3" json:"logs,omitempty"`
 	Logs           []*LogEntry            `protobuf:"bytes,1,rep,name=logs,proto3" json:"logs,omitempty"`
@@ -4183,7 +4199,7 @@ const file_olivetin_api_v1_olivetin_proto_rawDesc = "" +
 	"\fstart_offset\x18\x01 \x01(\x03R\vstartOffset\x12\x1f\n" +
 	"\fstart_offset\x18\x01 \x01(\x03R\vstartOffset\x12\x1f\n" +
 	"\vdate_filter\x18\x02 \x01(\tR\n" +
 	"\vdate_filter\x18\x02 \x01(\tR\n" +
 	"dateFilter\x12\x1b\n" +
 	"dateFilter\x12\x1b\n" +
-	"\tpage_size\x18\x03 \x01(\x03R\bpageSize\"\x89\x05\n" +
+	"\tpage_size\x18\x03 \x01(\x03R\bpageSize\"\xcb\x05\n" +
 	"\bLogEntry\x12)\n" +
 	"\bLogEntry\x12)\n" +
 	"\x10datetime_started\x18\x01 \x01(\tR\x0fdatetimeStarted\x12!\n" +
 	"\x10datetime_started\x18\x01 \x01(\tR\x0fdatetimeStarted\x12!\n" +
 	"\faction_title\x18\x02 \x01(\tR\vactionTitle\x12\x16\n" +
 	"\faction_title\x18\x02 \x01(\tR\vactionTitle\x12\x16\n" +
@@ -4206,7 +4222,9 @@ const file_olivetin_api_v1_olivetin_proto_rawDesc = "" +
 	"\bcan_kill\x18\x12 \x01(\bR\acanKill\x12=\n" +
 	"\bcan_kill\x18\x12 \x01(\bR\acanKill\x12=\n" +
 	"\x1bdatetime_rate_limit_expires\x18\x13 \x01(\tR\x18datetimeRateLimitExpires\x12\x1d\n" +
 	"\x1bdatetime_rate_limit_expires\x18\x13 \x01(\tR\x18datetimeRateLimitExpires\x12\x1d\n" +
 	"\n" +
 	"\n" +
-	"binding_id\x18\x14 \x01(\tR\tbindingId\"\xca\x01\n" +
+	"binding_id\x18\x14 \x01(\tR\tbindingId\x12\x16\n" +
+	"\x06queued\x18\x15 \x01(\bR\x06queued\x12(\n" +
+	"\x10queued_for_group\x18\x16 \x01(\tR\x0equeuedForGroup\"\xca\x01\n" +
 	"\x0fGetLogsResponse\x12-\n" +
 	"\x0fGetLogsResponse\x12-\n" +
 	"\x04logs\x18\x01 \x03(\v2\x19.olivetin.api.v1.LogEntryR\x04logs\x12'\n" +
 	"\x04logs\x18\x01 \x03(\v2\x19.olivetin.api.v1.LogEntryR\x04logs\x12'\n" +
 	"\x0fcount_remaining\x18\x02 \x01(\x03R\x0ecountRemaining\x12\x1b\n" +
 	"\x0fcount_remaining\x18\x02 \x01(\x03R\x0ecountRemaining\x12\x1b\n" +

+ 2 - 0
service/internal/api/api.go

@@ -378,6 +378,8 @@ func (api *oliveTinAPI) internalLogEntryToPb(logEntry *executor.InternalLogEntry
 		Output:                   logEntry.Output,
 		Output:                   logEntry.Output,
 		TimedOut:                 logEntry.TimedOut,
 		TimedOut:                 logEntry.TimedOut,
 		Blocked:                  logEntry.Blocked,
 		Blocked:                  logEntry.Blocked,
+		Queued:                   logEntry.Queued,
+		QueuedForGroup:           logEntry.QueuedForGroup,
 		ExitCode:                 logEntry.ExitCode,
 		ExitCode:                 logEntry.ExitCode,
 		Tags:                     logEntry.Tags,
 		Tags:                     logEntry.Tags,
 		ExecutionTrackingId:      logEntry.ExecutionTrackingID,
 		ExecutionTrackingId:      logEntry.ExecutionTrackingID,

+ 8 - 0
service/internal/config/config.go

@@ -33,6 +33,13 @@ type Action struct {
 	PopupOnStart           string           `koanf:"popupOnStart"`
 	PopupOnStart           string           `koanf:"popupOnStart"`
 	SaveLogs               SaveLogsConfig   `koanf:"saveLogs"`
 	SaveLogs               SaveLogsConfig   `koanf:"saveLogs"`
 	EnabledExpression      string           `koanf:"enabledExpression"`
 	EnabledExpression      string           `koanf:"enabledExpression"`
+	Groups                 []string         `koanf:"groups"`
+}
+
+// ActionGroup defines shared limits and metadata for a set of actions.
+type ActionGroup struct {
+	MaxConcurrent int    `koanf:"maxConcurrent"`
+	Icon          string `koanf:"icon"`
 }
 }
 
 
 // ActionArgument objects appear on Actions.
 // ActionArgument objects appear on Actions.
@@ -134,6 +141,7 @@ type Config struct {
 	LogLevel                        string                     `koanf:"logLevel"`
 	LogLevel                        string                     `koanf:"logLevel"`
 	LogDebugOptions                 LogDebugOptions            `koanf:"logDebugOptions"`
 	LogDebugOptions                 LogDebugOptions            `koanf:"logDebugOptions"`
 	LogHistoryPageSize              int64                      `koanf:"logHistoryPageSize"`
 	LogHistoryPageSize              int64                      `koanf:"logHistoryPageSize"`
+	ActionGroups                    map[string]*ActionGroup    `koanf:"actionGroups"`
 	Actions                         []*Action                  `koanf:"actions"`
 	Actions                         []*Action                  `koanf:"actions"`
 	Entities                        []*EntityFile              `koanf:"entities"`
 	Entities                        []*EntityFile              `koanf:"entities"`
 	Dashboards                      []*DashboardComponent      `koanf:"dashboards"`
 	Dashboards                      []*DashboardComponent      `koanf:"dashboards"`

+ 55 - 0
service/internal/config/sanitize.go

@@ -29,6 +29,8 @@ func (cfg *Config) Sanitize() {
 
 
 	cfg.sanitizeDashboardsForInlineActions()
 	cfg.sanitizeDashboardsForInlineActions()
 
 
+	cfg.sanitizeActionGroupReferences()
+
 	if err := cfg.validateReservedActionArgumentNames(); err != nil {
 	if err := cfg.validateReservedActionArgumentNames(); err != nil {
 		log.Fatalf("%v", err)
 		log.Fatalf("%v", err)
 	}
 	}
@@ -193,11 +195,64 @@ func (action *Action) sanitize(cfg *Config) {
 		action.MaxConcurrent = 1
 		action.MaxConcurrent = 1
 	}
 	}
 
 
+	action.Groups = dedupeStrings(action.Groups)
+
 	for idx := range action.Arguments {
 	for idx := range action.Arguments {
 		action.Arguments[idx].sanitize()
 		action.Arguments[idx].sanitize()
 	}
 	}
 }
 }
 
 
+func dedupeStrings(values []string) []string {
+	seen := make(map[string]struct{}, len(values))
+	out := make([]string, 0, len(values))
+
+	for _, value := range values {
+		out = appendUniqueString(out, seen, value)
+	}
+
+	return out
+}
+
+func appendUniqueString(out []string, seen map[string]struct{}, value string) []string {
+	if value == "" {
+		return out
+	}
+
+	if _, found := seen[value]; found {
+		return out
+	}
+
+	seen[value] = struct{}{}
+
+	return append(out, value)
+}
+
+func (cfg *Config) sanitizeActionGroupReferences() {
+	for _, action := range cfg.Actions {
+		for _, groupName := range action.Groups {
+			cfg.warnInvalidActionGroupReference(action, groupName)
+		}
+	}
+}
+
+func (cfg *Config) warnInvalidActionGroupReference(action *Action, groupName string) {
+	group, found := cfg.ActionGroups[groupName]
+	if !found {
+		log.WithFields(log.Fields{
+			"actionTitle": action.Title,
+			"groupName":   groupName,
+		}).Warn("Action references unknown action group")
+		return
+	}
+
+	if group == nil || group.MaxConcurrent < 1 {
+		log.WithFields(log.Fields{
+			"actionTitle": action.Title,
+			"groupName":   groupName,
+		}).Warn("Action references action group that will not be enforced at runtime")
+	}
+}
+
 func (cfg *Config) sanitizeAuthRequireGuestsToLogin() {
 func (cfg *Config) sanitizeAuthRequireGuestsToLogin() {
 	if cfg.AuthRequireGuestsToLogin {
 	if cfg.AuthRequireGuestsToLogin {
 		log.Infof("AuthRequireGuestsToLogin is enabled. All defaultPermissions will be set to false")
 		log.Infof("AuthRequireGuestsToLogin is enabled. All defaultPermissions will be set to false")

+ 18 - 0
service/internal/config/sanitize_test.go

@@ -110,6 +110,24 @@ func TestValidateReservedActionArgumentNames(t *testing.T) {
 	assert.Contains(t, err.Error(), `action "Reserved arg" argument "ot_custom" uses reserved prefix "ot_"`)
 	assert.Contains(t, err.Error(), `action "Reserved arg" argument "ot_custom" uses reserved prefix "ot_"`)
 }
 }
 
 
+func TestSanitizeActionGroupsDedupesGroupNames(t *testing.T) {
+	c := DefaultConfig()
+	c.ActionGroups = map[string]*ActionGroup{
+		"unity": {MaxConcurrent: 1},
+	}
+	c.Actions = append(c.Actions, &Action{
+		Title:  "Build",
+		Shell:  "true",
+		Groups: []string{"unity", "unity", ""},
+	})
+
+	c.Sanitize()
+
+	action := c.findAction("Build")
+	require.NotNil(t, action)
+	assert.Equal(t, []string{"unity"}, action.Groups)
+}
+
 func TestValidateReservedActionArgumentNamesAllowsNonReserved(t *testing.T) {
 func TestValidateReservedActionArgumentNamesAllowsNonReserved(t *testing.T) {
 	c := DefaultConfig()
 	c := DefaultConfig()
 	c.Actions = append(c.Actions, &Action{
 	c.Actions = append(c.Actions, &Action{

+ 252 - 91
service/internal/executor/executor.go

@@ -71,6 +71,9 @@ type Executor struct {
 	listeners []listener
 	listeners []listener
 
 
 	chainOfCommand []executorStepFunc
 	chainOfCommand []executorStepFunc
+
+	groupQueue   []*queuedExecution
+	groupQueueMu sync.Mutex
 }
 }
 
 
 // ExecutionRequest is a request to execute an action. It's passed to an
 // ExecutionRequest is a request to execute an action. It's passed to an
@@ -84,11 +87,54 @@ type ExecutionRequest struct {
 	AuthenticatedUser *authpublic.AuthenticatedUser
 	AuthenticatedUser *authpublic.AuthenticatedUser
 	TriggerDepth      int
 	TriggerDepth      int
 
 
-	logEntry           *InternalLogEntry
-	finalParsedCommand string
-	execArgs           []string
-	useDirectExec      bool
-	executor           *Executor
+	logEntry                *InternalLogEntry
+	finalParsedCommand      string
+	execArgs                []string
+	useDirectExec           bool
+	executor                *Executor
+	skipRequestRegistration bool
+}
+
+func (req *ExecutionRequest) mutateLogEntry(mutator func(*InternalLogEntry)) {
+	if req.executor == nil {
+		mutator(req.logEntry)
+		return
+	}
+
+	req.executor.logmutex.Lock()
+	defer req.executor.logmutex.Unlock()
+
+	mutator(req.logEntry)
+}
+
+// LogEntrySnapshot is a copy of selected log entry fields for race-safe reads.
+type LogEntrySnapshot struct {
+	Queued            bool
+	Blocked           bool
+	ExecutionStarted  bool
+	ExecutionFinished bool
+	ExitCode          int32
+	Output            string
+}
+
+// SnapshotLog returns a copy of selected log entry fields under read lock.
+func (e *Executor) SnapshotLog(trackingID string) (LogEntrySnapshot, bool) {
+	e.logmutex.RLock()
+	defer e.logmutex.RUnlock()
+
+	entry, found := e.logs[trackingID]
+	if !found {
+		return LogEntrySnapshot{}, false
+	}
+
+	return LogEntrySnapshot{
+		Queued:            entry.Queued,
+		Blocked:           entry.Blocked,
+		ExecutionStarted:  entry.ExecutionStarted,
+		ExecutionFinished: entry.ExecutionFinished,
+		ExitCode:          entry.ExitCode,
+		Output:            entry.Output,
+	}, true
 }
 }
 
 
 // InternalLogEntry objects are created by an Executor, and represent the final
 // InternalLogEntry objects are created by an Executor, and represent the final
@@ -101,6 +147,8 @@ type InternalLogEntry struct {
 	Output              string
 	Output              string
 	TimedOut            bool
 	TimedOut            bool
 	Blocked             bool
 	Blocked             bool
+	Queued              bool
+	QueuedForGroup      string
 	ExitCode            int32
 	ExitCode            int32
 	Tags                []string
 	Tags                []string
 	ExecutionStarted    bool
 	ExecutionStarted    bool
@@ -369,7 +417,7 @@ func (e *Executor) GetLogsByBindingId(bindingId string) []*InternalLogEntry {
 
 
 // shouldCountExecution checks if a log entry should be counted for rate limiting.
 // shouldCountExecution checks if a log entry should be counted for rate limiting.
 func shouldCountExecution(logEntry *InternalLogEntry, windowStart time.Time) bool {
 func shouldCountExecution(logEntry *InternalLogEntry, windowStart time.Time) bool {
-	return !logEntry.Blocked && logEntry.DatetimeStarted.After(windowStart)
+	return !logEntry.Blocked && !logEntry.Queued && logEntry.DatetimeStarted.After(windowStart)
 }
 }
 
 
 // updateOldestExecution updates the oldest execution time if this entry is older.
 // updateOldestExecution updates the oldest execution time if this entry is older.
@@ -483,19 +531,45 @@ func (e *Executor) GetTimeUntilAvailable(binding *ActionBinding) int64 {
 	return maxExpiryTime.Unix()
 	return maxExpiryTime.Unix()
 }
 }
 
 
-func (e *Executor) SetLog(trackingID string, entry *InternalLogEntry) {
+func (e *Executor) SetLog(trackingID string, entry *InternalLogEntry) string {
 	e.logmutex.Lock()
 	e.logmutex.Lock()
+	defer e.logmutex.Unlock()
+
+	if _, found := e.logs[trackingID]; found || !isValidTrackingID(trackingID) {
+		trackingID = uuid.NewString()
+		entry.ExecutionTrackingID = trackingID
+	}
 
 
 	entry.Index = int64(len(e.logsTrackingIdsByDate))
 	entry.Index = int64(len(e.logsTrackingIdsByDate))
 
 
 	e.logs[trackingID] = entry
 	e.logs[trackingID] = entry
 	e.logsTrackingIdsByDate = append(e.logsTrackingIdsByDate, trackingID)
 	e.logsTrackingIdsByDate = append(e.logsTrackingIdsByDate, trackingID)
 
 
-	e.logmutex.Unlock()
+	return trackingID
 }
 }
 
 
 // ExecRequest processes an ExecutionRequest
 // ExecRequest processes an ExecutionRequest
 func (e *Executor) ExecRequest(req *ExecutionRequest) (*sync.WaitGroup, string) {
 func (e *Executor) ExecRequest(req *ExecutionRequest) (*sync.WaitGroup, string) {
+	e.initializeExecRequest(req)
+
+	log.Tracef("executor.ExecRequest(): trackingID=%s bindingID=%s", req.TrackingID, bindingIDForTrace(req))
+
+	req.TrackingID = e.SetLog(req.TrackingID, req.logEntry)
+
+	wg := new(sync.WaitGroup)
+	wg.Add(1)
+
+	go func() {
+		queued := e.execChain(req, wg)
+		if !queued {
+			wg.Done()
+		}
+	}()
+
+	return wg, req.TrackingID
+}
+
+func (e *Executor) initializeExecRequest(req *ExecutionRequest) {
 	if req.AuthenticatedUser == nil {
 	if req.AuthenticatedUser == nil {
 		req.AuthenticatedUser = auth.UserGuest(req.Cfg)
 		req.AuthenticatedUser = auth.UserGuest(req.Cfg)
 	}
 	}
@@ -513,56 +587,84 @@ func (e *Executor) ExecRequest(req *ExecutionRequest) (*sync.WaitGroup, string)
 		ActionIcon:          "&#x1f4a9;",
 		ActionIcon:          "&#x1f4a9;",
 		Username:            req.AuthenticatedUser.Username,
 		Username:            req.AuthenticatedUser.Username,
 	}
 	}
+}
 
 
-	_, isDuplicate := e.GetLog(req.TrackingID)
-	if isDuplicate || !isValidTrackingID(req.TrackingID) {
-		req.TrackingID = uuid.NewString()
+func bindingIDForTrace(req *ExecutionRequest) string {
+	if req.Binding == nil {
+		return ""
 	}
 	}
 
 
-	// Update the log entry with the final tracking ID
-	req.logEntry.ExecutionTrackingID = req.TrackingID
+	return req.Binding.ID
+}
 
 
-	log.Tracef("executor.ExecRequest(): %v", req)
+func (e *Executor) execChain(req *ExecutionRequest, wg *sync.WaitGroup) bool {
+	if !req.skipRequestRegistration {
+		finished, queued := e.registerOrQueueRequest(req, wg)
+		if finished || queued {
+			return queued
+		}
+	}
 
 
-	e.SetLog(req.TrackingID, req.logEntry)
+	e.runExecutionSteps(req)
+	e.finishExecChain(req)
 
 
-	wg := new(sync.WaitGroup)
-	wg.Add(1)
+	return false
+}
 
 
-	go func() {
-		e.execChain(req)
-		defer wg.Done()
-	}()
+func (e *Executor) registerOrQueueRequest(req *ExecutionRequest, wg *sync.WaitGroup) (finished bool, queued bool) {
+	if !stepRequestAction(req) {
+		e.finishExecChain(req)
+		return true, false
+	}
 
 
-	return wg, req.TrackingID
+	if !actionNeedsGroupLimit(req) || e.groupsHaveCapacityForActive(req) {
+		return false, false
+	}
+
+	return e.queueRequestAfterACL(req, wg)
 }
 }
 
 
-func (e *Executor) execChain(req *ExecutionRequest) {
-	for _, step := range e.chainOfCommand {
+func (e *Executor) queueRequestAfterACL(req *ExecutionRequest, wg *sync.WaitGroup) (finished bool, queued bool) {
+	if !stepACLCheck(req) {
+		e.finishExecChain(req)
+		return true, false
+	}
+
+	e.queueRequest(req, wg)
+	notifyListenersStarted(req)
+
+	return false, true
+}
+
+func (e *Executor) runExecutionSteps(req *ExecutionRequest) {
+	for _, step := range e.chainOfCommand[1:] {
 		if !step(req) {
 		if !step(req) {
 			break
 			break
 		}
 		}
 	}
 	}
+}
 
 
-	// Ensure DatetimeFinished is set even if execution was blocked early
-	if req.logEntry.DatetimeFinished.IsZero() {
-		req.logEntry.DatetimeFinished = time.Now()
-	}
+func (e *Executor) finishExecChain(req *ExecutionRequest) {
+	req.mutateLogEntry(func(entry *InternalLogEntry) {
+		if entry.DatetimeFinished.IsZero() {
+			entry.DatetimeFinished = time.Now()
+		}
 
 
-	req.logEntry.ExecutionFinished = true
+		entry.ExecutionFinished = true
+	})
 
 
-	// This isn't a step, because we want to notify all listeners, irrespective
-	// of how many steps were actually executed.
 	notifyListenersFinished(req)
 	notifyListenersFinished(req)
+	e.drainGroupQueue()
 }
 }
 
 
 func getConcurrentCount(req *ExecutionRequest) int {
 func getConcurrentCount(req *ExecutionRequest) int {
 	concurrentCount := 0
 	concurrentCount := 0
 
 
 	req.executor.logmutex.RLock()
 	req.executor.logmutex.RLock()
+	logs := req.executor.LogsByBindingId[req.Binding.ID]
 
 
-	for _, log := range req.executor.GetLogsByBindingId(req.Binding.ID) {
-		if !log.ExecutionFinished {
+	for _, logEntry := range logs {
+		if !logEntry.ExecutionFinished && !logEntry.Queued {
 			concurrentCount += 1
 			concurrentCount += 1
 		}
 		}
 	}
 	}
@@ -583,8 +685,10 @@ func stepConcurrencyCheck(req *ExecutionRequest) bool {
 			"maxConcurrent":   req.Binding.Action.MaxConcurrent,
 			"maxConcurrent":   req.Binding.Action.MaxConcurrent,
 		}).Warnf("Blocked from executing due to concurrency limit")
 		}).Warnf("Blocked from executing due to concurrency limit")
 
 
-		req.logEntry.Output = "Blocked from executing due to concurrency limit"
-		req.logEntry.Blocked = true
+		req.mutateLogEntry(func(entry *InternalLogEntry) {
+			entry.Output = "Blocked from executing due to concurrency limit"
+			entry.Blocked = true
+		})
 		return false
 		return false
 	}
 	}
 
 
@@ -603,24 +707,35 @@ func parseDuration(rate config.RateSpec) time.Duration {
 	return duration
 	return duration
 }
 }
 
 
-//gocyclo:ignore
-func getExecutionsCount(rate config.RateSpec, req *ExecutionRequest) int {
-	executions := -1 // Because we will find ourself when checking execution logs
-
-	duration := parseDuration(rate)
+func entityPrefixForRequest(req *ExecutionRequest) string {
+	if req.Binding != nil && req.Binding.Entity != nil {
+		return req.Binding.Entity.UniqueKey
+	}
 
 
-	then := time.Now().Add(-duration)
+	return ""
+}
 
 
-	currentEntityPrefix := ""
-	if req.Binding != nil && req.Binding.Entity != nil {
-		currentEntityPrefix = req.Binding.Entity.UniqueKey
+func rateExecutionMatchesScope(logEntry *InternalLogEntry, req *ExecutionRequest, entityPrefix string) bool {
+	if logEntry.EntityPrefix != entityPrefix {
+		return false
 	}
 	}
-	for _, logEntry := range req.executor.GetLogsByBindingId(req.Binding.ID) {
-		if logEntry.EntityPrefix != currentEntityPrefix {
-			continue
-		}
-		if logEntry.DatetimeStarted.After(then) && !logEntry.Blocked {
 
 
+	return !logEntry.Queued && logEntry.ExecutionTrackingID != req.TrackingID
+}
+
+func logEntryStartedInWindow(logEntry *InternalLogEntry, windowStart time.Time) bool {
+	return logEntry.DatetimeStarted.After(windowStart) && !logEntry.Blocked
+}
+
+func rateExecutionCountsForRate(logEntry *InternalLogEntry, req *ExecutionRequest, entityPrefix string, windowStart time.Time) bool {
+	return rateExecutionMatchesScope(logEntry, req, entityPrefix) && logEntryStartedInWindow(logEntry, windowStart)
+}
+
+func countRateExecutions(logs []*InternalLogEntry, req *ExecutionRequest, entityPrefix string, windowStart time.Time) int {
+	executions := 0
+
+	for _, logEntry := range logs {
+		if rateExecutionCountsForRate(logEntry, req, entityPrefix, windowStart) {
 			executions += 1
 			executions += 1
 		}
 		}
 	}
 	}
@@ -628,6 +743,18 @@ func getExecutionsCount(rate config.RateSpec, req *ExecutionRequest) int {
 	return executions
 	return executions
 }
 }
 
 
+func getExecutionsCount(rate config.RateSpec, req *ExecutionRequest) int {
+	duration := parseDuration(rate)
+	then := time.Now().Add(-duration)
+
+	req.executor.logmutex.RLock()
+	logs := req.executor.LogsByBindingId[req.Binding.ID]
+	executions := countRateExecutions(logs, req, entityPrefixForRequest(req), then)
+	req.executor.logmutex.RUnlock()
+
+	return executions
+}
+
 func stepRateCheck(req *ExecutionRequest) bool {
 func stepRateCheck(req *ExecutionRequest) bool {
 	for _, rate := range req.Binding.Action.MaxRate {
 	for _, rate := range req.Binding.Action.MaxRate {
 		executions := getExecutionsCount(rate, req)
 		executions := getExecutionsCount(rate, req)
@@ -640,8 +767,10 @@ func stepRateCheck(req *ExecutionRequest) bool {
 				"duration":    rate.Duration,
 				"duration":    rate.Duration,
 			}).Infof("Blocked from executing due to rate limit")
 			}).Infof("Blocked from executing due to rate limit")
 
 
-			req.logEntry.Output = "Blocked from executing due to rate limit"
-			req.logEntry.Blocked = true
+			req.mutateLogEntry(func(entry *InternalLogEntry) {
+				entry.Output = "Blocked from executing due to rate limit"
+				entry.Blocked = true
+			})
 			return false
 			return false
 		}
 		}
 	}
 	}
@@ -653,8 +782,10 @@ func stepACLCheck(req *ExecutionRequest) bool {
 	canExec := acl.IsAllowedExec(req.Cfg, req.AuthenticatedUser, req.Binding.Action)
 	canExec := acl.IsAllowedExec(req.Cfg, req.AuthenticatedUser, req.Binding.Action)
 
 
 	if !canExec {
 	if !canExec {
-		req.logEntry.Output = "ACL check failed. Blocked from executing."
-		req.logEntry.Blocked = true
+		req.mutateLogEntry(func(entry *InternalLogEntry) {
+			entry.Output = "ACL check failed. Blocked from executing."
+			entry.Blocked = true
+		})
 
 
 		log.WithFields(log.Fields{
 		log.WithFields(log.Fields{
 			"actionTitle": req.logEntry.ActionTitle,
 			"actionTitle": req.logEntry.ActionTitle,
@@ -792,7 +923,9 @@ func hasExec(req *ExecutionRequest) bool {
 }
 }
 
 
 func fail(req *ExecutionRequest, err error) bool {
 func fail(req *ExecutionRequest, err error) bool {
-	req.logEntry.Output = err.Error()
+	req.mutateLogEntry(func(entry *InternalLogEntry) {
+		entry.Output = err.Error()
+	})
 	log.Warn(err.Error())
 	log.Warn(err.Error())
 	return false
 	return false
 }
 }
@@ -826,14 +959,16 @@ func stepRequestActionHasBinding(req *ExecutionRequest) bool {
 }
 }
 
 
 func stepRequestActionPopulateLogEntry(req *ExecutionRequest) {
 func stepRequestActionPopulateLogEntry(req *ExecutionRequest) {
-	req.logEntry.Binding = req.Binding
-	req.logEntry.ActionConfigTitle = req.Binding.Action.Title
-	req.logEntry.ActionTitle = tpl.ParseTemplateOfActionBeforeExec(req.Binding.Action.Title, req.Binding.Entity)
-	req.logEntry.ActionIcon = req.Binding.Action.Icon
-	req.logEntry.Tags = req.Tags
-	if req.Binding.Entity != nil {
-		req.logEntry.EntityPrefix = req.Binding.Entity.UniqueKey
-	}
+	req.mutateLogEntry(func(entry *InternalLogEntry) {
+		entry.Binding = req.Binding
+		entry.ActionConfigTitle = req.Binding.Action.Title
+		entry.ActionTitle = tpl.ParseTemplateOfActionBeforeExec(req.Binding.Action.Title, req.Binding.Entity)
+		entry.ActionIcon = req.Binding.Action.Icon
+		entry.Tags = req.Tags
+		if req.Binding.Entity != nil {
+			entry.EntityPrefix = req.Binding.Entity.UniqueKey
+		}
+	})
 }
 }
 
 
 func stepRequestActionRegisterLog(req *ExecutionRequest) {
 func stepRequestActionRegisterLog(req *ExecutionRequest) {
@@ -856,7 +991,9 @@ func stepLogStart(req *ExecutionRequest) bool {
 }
 }
 
 
 func stepLogFinish(req *ExecutionRequest) bool {
 func stepLogFinish(req *ExecutionRequest) bool {
-	req.logEntry.ExecutionFinished = true
+	req.mutateLogEntry(func(entry *InternalLogEntry) {
+		entry.ExecutionFinished = true
+	})
 
 
 	log.WithFields(log.Fields{
 	log.WithFields(log.Fields{
 		"actionTitle":  req.logEntry.ActionTitle,
 		"actionTitle":  req.logEntry.ActionTitle,
@@ -880,10 +1017,14 @@ func notifyListenersStarted(req *ExecutionRequest) {
 	}
 	}
 }
 }
 
 
-func appendErrorToStderr(err error, logEntry *InternalLogEntry) {
-	if err != nil {
-		logEntry.Output = err.Error() + "\n\n" + logEntry.Output
+func appendErrorToStderr(req *ExecutionRequest, err error) {
+	if err == nil {
+		return
 	}
 	}
+
+	req.mutateLogEntry(func(entry *InternalLogEntry) {
+		entry.Output = err.Error() + "\n\n" + entry.Output
+	})
 }
 }
 
 
 type OutputStreamer struct {
 type OutputStreamer struct {
@@ -926,31 +1067,41 @@ func stepExec(req *ExecutionRequest) bool {
 	streamer := &OutputStreamer{Req: req}
 	streamer := &OutputStreamer{Req: req}
 	cmd := buildCommand(ctx, req)
 	cmd := buildCommand(ctx, req)
 	if cmd == nil {
 	if cmd == nil {
-		req.logEntry.Output = "Cannot execute: no command arguments provided"
+		req.mutateLogEntry(func(entry *InternalLogEntry) {
+			entry.Output = "Cannot execute: no command arguments provided"
+		})
 		log.Warn("Cannot execute: no command arguments provided")
 		log.Warn("Cannot execute: no command arguments provided")
 		return false
 		return false
 	}
 	}
 	prepareCommand(cmd, streamer, req)
 	prepareCommand(cmd, streamer, req)
 	runerr := cmd.Start()
 	runerr := cmd.Start()
-	req.logEntry.Process = cmd.Process
+	req.mutateLogEntry(func(entry *InternalLogEntry) {
+		entry.Process = cmd.Process
+	})
 	ctx.setProcess(cmd.Process)
 	ctx.setProcess(cmd.Process)
 	waiterr := cmd.Wait()
 	waiterr := cmd.Wait()
-	req.logEntry.ExitCode = int32(cmd.ProcessState.ExitCode())
-	req.logEntry.Output = streamer.String()
+	req.mutateLogEntry(func(entry *InternalLogEntry) {
+		entry.ExitCode = int32(cmd.ProcessState.ExitCode())
+		entry.Output = streamer.String()
+	})
 
 
-	appendErrorToStderr(runerr, req.logEntry)
-	appendErrorToStderr(waiterr, req.logEntry)
+	appendErrorToStderr(req, runerr)
+	appendErrorToStderr(req, waiterr)
 
 
 	if ctx.Err() == context.DeadlineExceeded {
 	if ctx.Err() == context.DeadlineExceeded {
 		log.WithFields(log.Fields{
 		log.WithFields(log.Fields{
 			"actionTitle": req.logEntry.ActionTitle,
 			"actionTitle": req.logEntry.ActionTitle,
 		}).Warnf("Action timed out")
 		}).Warnf("Action timed out")
 
 
-		req.logEntry.TimedOut = true
-		req.logEntry.Output += "OliveTin::timeout - this action timed out after " + fmt.Sprintf("%v", req.Binding.Action.Timeout) + " seconds. If you need more time for this action, set a longer timeout. See https://docs.olivetin.app/action_customization/timeouts.html for more help."
+		req.mutateLogEntry(func(entry *InternalLogEntry) {
+			entry.TimedOut = true
+			entry.Output += "OliveTin::timeout - this action timed out after " + fmt.Sprintf("%v", req.Binding.Action.Timeout) + " seconds. If you need more time for this action, set a longer timeout. See https://docs.olivetin.app/action_customization/timeouts.html for more help."
+		})
 	}
 	}
 
 
-	req.logEntry.DatetimeFinished = time.Now()
+	req.mutateLogEntry(func(entry *InternalLogEntry) {
+		entry.DatetimeFinished = time.Now()
+	})
 
 
 	return true
 	return true
 }
 }
@@ -966,7 +1117,9 @@ func prepareCommand(cmd *exec.Cmd, streamer *OutputStreamer, req *ExecutionReque
 	cmd.Stdout = streamer
 	cmd.Stdout = streamer
 	cmd.Stderr = streamer
 	cmd.Stderr = streamer
 	cmd.Env = buildEnv(req.Arguments)
 	cmd.Env = buildEnv(req.Arguments)
-	req.logEntry.ExecutionStarted = true
+	req.mutateLogEntry(func(entry *InternalLogEntry) {
+		entry.ExecutionStarted = true
+	})
 }
 }
 
 
 func stepExecAfter(req *ExecutionRequest) bool {
 func stepExecAfter(req *ExecutionRequest) bool {
@@ -991,24 +1144,28 @@ func stepExecAfter(req *ExecutionRequest) bool {
 
 
 	waiterr := cmd.Wait()
 	waiterr := cmd.Wait()
 
 
-	req.logEntry.Output += "\n"
-	req.logEntry.Output += "OliveTin::shellAfterCompleted stdout\n"
-	req.logEntry.Output += stdout.String()
-
-	req.logEntry.Output += "OliveTin::shellAfterCompleted stderr\n"
-	req.logEntry.Output += stderr.String()
+	req.mutateLogEntry(func(entry *InternalLogEntry) {
+		entry.Output += "\n"
+		entry.Output += "OliveTin::shellAfterCompleted stdout\n"
+		entry.Output += stdout.String()
+		entry.Output += "OliveTin::shellAfterCompleted stderr\n"
+		entry.Output += stderr.String()
+		entry.Output += "OliveTin::shellAfterCompleted errors and summary\n"
+	})
 
 
-	req.logEntry.Output += "OliveTin::shellAfterCompleted errors and summary\n"
-	appendErrorToStderr(runerr, req.logEntry)
-	appendErrorToStderr(waiterr, req.logEntry)
+	appendErrorToStderr(req, runerr)
+	appendErrorToStderr(req, waiterr)
 
 
 	if ctx.Err() == context.DeadlineExceeded {
 	if ctx.Err() == context.DeadlineExceeded {
-		req.logEntry.Output += "Your shellAfterCompleted command timed out."
+		req.mutateLogEntry(func(entry *InternalLogEntry) {
+			entry.Output += "Your shellAfterCompleted command timed out."
+		})
 	}
 	}
 
 
-	req.logEntry.Output += fmt.Sprintf("Your shellAfterCompleted exited with code %v\n", cmd.ProcessState.ExitCode())
-
-	req.logEntry.Output += "OliveTin::shellAfterCompleted output complete\n"
+	req.mutateLogEntry(func(entry *InternalLogEntry) {
+		entry.Output += fmt.Sprintf("Your shellAfterCompleted exited with code %v\n", cmd.ProcessState.ExitCode())
+		entry.Output += "OliveTin::shellAfterCompleted output complete\n"
+	})
 
 
 	return true
 	return true
 }
 }
@@ -1026,7 +1183,9 @@ func buildShellAfterCommand(ctx context.Context, req *ExecutionRequest, stdout,
 	finalParsedCommand, err := tpl.ParseTemplateWithActionContext(req.Binding.Action.ShellAfterCompleted, req.Binding.Entity, args)
 	finalParsedCommand, err := tpl.ParseTemplateWithActionContext(req.Binding.Action.ShellAfterCompleted, req.Binding.Entity, args)
 	if err != nil {
 	if err != nil {
 		msg := "Could not prepare shellAfterCompleted command: " + err.Error() + "\n"
 		msg := "Could not prepare shellAfterCompleted command: " + err.Error() + "\n"
-		req.logEntry.Output += msg
+		req.mutateLogEntry(func(entry *InternalLogEntry) {
+			entry.Output += msg
+		})
 		log.Warn(msg)
 		log.Warn(msg)
 		return nil, nil, nil
 		return nil, nil, nil
 	}
 	}
@@ -1061,7 +1220,9 @@ func stepTrigger(req *ExecutionRequest) bool {
 			"actionTitle": req.logEntry.ActionTitle,
 			"actionTitle": req.logEntry.ActionTitle,
 			"depth":       req.TriggerDepth,
 			"depth":       req.TriggerDepth,
 		}).Warnf("Trigger action reached maximum depth of %v. Not triggering further actions.", MaxTriggerDepth)
 		}).Warnf("Trigger action reached maximum depth of %v. Not triggering further actions.", MaxTriggerDepth)
-		req.logEntry.Output += fmt.Sprintf("OliveTin::trigger - this action reached maximum trigger depth of %v. Not triggering further actions.", MaxTriggerDepth)
+		req.mutateLogEntry(func(entry *InternalLogEntry) {
+			entry.Output += fmt.Sprintf("OliveTin::trigger - this action reached maximum trigger depth of %v. Not triggering further actions.", MaxTriggerDepth)
+		})
 		return true
 		return true
 	}
 	}
 
 

+ 203 - 0
service/internal/executor/group_concurrency.go

@@ -0,0 +1,203 @@
+package executor
+
+import (
+	"fmt"
+	"slices"
+	"sync"
+
+	config "github.com/OliveTin/OliveTin/internal/config"
+	log "github.com/sirupsen/logrus"
+)
+
+type groupLimit struct {
+	name          string
+	maxConcurrent int
+}
+
+type queuedExecution struct {
+	req *ExecutionRequest
+	wg  *sync.WaitGroup
+}
+
+func actionGroupLimits(req *ExecutionRequest) []groupLimit {
+	if !hasActionGroupContext(req) {
+		return nil
+	}
+
+	limits := make([]groupLimit, 0, len(req.Binding.Action.Groups))
+
+	for _, groupName := range req.Binding.Action.Groups {
+		if limit, ok := groupLimitFromConfig(req.Cfg, groupName); ok {
+			limits = append(limits, limit)
+		}
+	}
+
+	return limits
+}
+
+func hasActionGroupContext(req *ExecutionRequest) bool {
+	return req != nil && req.Binding != nil && req.Binding.Action != nil && req.Cfg != nil
+}
+
+func groupLimitFromConfig(cfg *config.Config, groupName string) (groupLimit, bool) {
+	group, found := cfg.ActionGroups[groupName]
+	if !found || group == nil || group.MaxConcurrent < 1 {
+		return groupLimit{}, false
+	}
+
+	return groupLimit{name: groupName, maxConcurrent: group.MaxConcurrent}, true
+}
+
+func actionNeedsGroupLimit(req *ExecutionRequest) bool {
+	return len(actionGroupLimits(req)) > 0
+}
+
+func actionInGroup(action *config.Action, groupName string) bool {
+	if action == nil {
+		return false
+	}
+
+	return slices.Contains(action.Groups, groupName)
+}
+
+func (e *Executor) countActiveInGroup(groupName string) int {
+	e.logmutex.RLock()
+	defer e.logmutex.RUnlock()
+
+	return e.countActiveInGroupLocked(groupName)
+}
+
+func (e *Executor) countActiveInGroupLocked(groupName string) int {
+	count := 0
+
+	for _, logEntry := range e.logs {
+		if logEntryIsActiveInGroup(logEntry, groupName) {
+			count++
+		}
+	}
+
+	return count
+}
+
+func logEntryIsActiveInGroup(logEntry *InternalLogEntry, groupName string) bool {
+	if inactiveLogEntry(logEntry) {
+		return false
+	}
+
+	return actionInGroup(logEntry.Binding.Action, groupName)
+}
+
+func inactiveLogEntry(logEntry *InternalLogEntry) bool {
+	if logEntry == nil {
+		return true
+	}
+
+	return logEntryIsInactive(logEntry)
+}
+
+func logEntryIsInactive(logEntry *InternalLogEntry) bool {
+	if logEntry.ExecutionFinished || logEntry.Queued {
+		return true
+	}
+
+	return logEntry.Binding == nil || logEntry.Binding.Action == nil
+}
+
+func (e *Executor) groupsHaveCapacityForActive(req *ExecutionRequest) bool {
+	for _, limit := range actionGroupLimits(req) {
+		if e.countActiveInGroup(limit.name) >= (limit.maxConcurrent + 1) {
+			return false
+		}
+	}
+
+	return true
+}
+
+func (e *Executor) groupsHaveCapacityForQueued(req *ExecutionRequest) bool {
+	for _, limit := range actionGroupLimits(req) {
+		if e.countActiveInGroup(limit.name) >= limit.maxConcurrent {
+			return false
+		}
+	}
+
+	return true
+}
+
+func firstFullGroupName(e *Executor, req *ExecutionRequest) string {
+	for _, limit := range actionGroupLimits(req) {
+		if e.countActiveInGroup(limit.name) >= (limit.maxConcurrent + 1) {
+			return limit.name
+		}
+	}
+
+	return ""
+}
+
+func firstFullGroupNameLocked(e *Executor, req *ExecutionRequest) string {
+	for _, limit := range actionGroupLimits(req) {
+		if e.countActiveInGroupLocked(limit.name) >= (limit.maxConcurrent + 1) {
+			return limit.name
+		}
+	}
+
+	return ""
+}
+
+func (e *Executor) queueRequest(req *ExecutionRequest, wg *sync.WaitGroup) {
+	e.groupQueueMu.Lock()
+
+	var groupName string
+
+	req.mutateLogEntry(func(entry *InternalLogEntry) {
+		groupName = firstFullGroupNameLocked(e, req)
+		entry.Queued = true
+		entry.QueuedForGroup = groupName
+		entry.Output = fmt.Sprintf("Queued waiting for action group %q", groupName)
+	})
+
+	e.groupQueue = append(e.groupQueue, &queuedExecution{req: req, wg: wg})
+	e.groupQueueMu.Unlock()
+
+	e.drainGroupQueue()
+
+	log.WithFields(log.Fields{
+		"actionTitle": req.logEntry.ActionTitle,
+		"groupName":   groupName,
+	}).Infof("Action queued due to action group concurrency limit")
+}
+
+func (e *Executor) drainGroupQueue() {
+	e.groupQueueMu.Lock()
+
+	if len(e.groupQueue) == 0 {
+		e.groupQueueMu.Unlock()
+		return
+	}
+
+	next := e.groupQueue[0]
+	if !e.groupsHaveCapacityForQueued(next.req) {
+		e.groupQueueMu.Unlock()
+		return
+	}
+
+	e.groupQueue = e.groupQueue[1:]
+
+	next.req.mutateLogEntry(func(entry *InternalLogEntry) {
+		entry.Queued = false
+		entry.QueuedForGroup = ""
+	})
+
+	e.groupQueueMu.Unlock()
+
+	go e.runDequeuedExecution(next)
+}
+
+func (e *Executor) runDequeuedExecution(queued *queuedExecution) {
+	req := queued.req
+
+	req.skipRequestRegistration = true
+
+	e.runExecutionSteps(req)
+	e.finishExecChain(req)
+	queued.wg.Done()
+}

+ 277 - 0
service/internal/executor/group_concurrency_test.go

@@ -0,0 +1,277 @@
+package executor
+
+import (
+	"sync"
+	"testing"
+	"time"
+
+	"github.com/OliveTin/OliveTin/internal/auth"
+	config "github.com/OliveTin/OliveTin/internal/config"
+	"github.com/stretchr/testify/assert"
+	"github.com/stretchr/testify/require"
+)
+
+func testGroupExecutor(actions []*config.Action, groups map[string]*config.ActionGroup) (*Executor, *config.Config) {
+	cfg := config.DefaultConfig()
+	cfg.ActionGroups = groups
+	cfg.Actions = actions
+	cfg.Sanitize()
+
+	e := DefaultExecutor(cfg)
+	e.RebuildActionMap()
+
+	return e, cfg
+}
+
+func TestGroupConcurrencyQueuesSecondAction(t *testing.T) {
+	t.Parallel()
+
+	slowAction := &config.Action{
+		Title:  "Unity Job 1",
+		Shell:  "sleep 2",
+		Groups: []string{"unity"},
+	}
+	fastAction := &config.Action{
+		Title:  "Unity Job 2",
+		Shell:  "echo queued-run",
+		Groups: []string{"unity"},
+	}
+
+	e, cfg := testGroupExecutor(
+		[]*config.Action{slowAction, fastAction},
+		map[string]*config.ActionGroup{
+			"unity": {MaxConcurrent: 1},
+		},
+	)
+
+	binding1 := e.FindBindingWithNoEntity(slowAction)
+	binding2 := e.FindBindingWithNoEntity(fastAction)
+	require.NotNil(t, binding1)
+	require.NotNil(t, binding2)
+
+	wg1, tracking1 := e.ExecRequest(&ExecutionRequest{
+		Binding:           binding1,
+		Cfg:               cfg,
+		AuthenticatedUser: auth.UserFromSystem(cfg, "testuser"),
+	})
+
+	waitUntilExecutionStarted(t, e, tracking1)
+
+	wg2, tracking2 := e.ExecRequest(&ExecutionRequest{
+		Binding:           binding2,
+		Cfg:               cfg,
+		AuthenticatedUser: auth.UserFromSystem(cfg, "testuser"),
+	})
+
+	require.Eventually(t, func() bool {
+		snapshot, ok := e.SnapshotLog(tracking2)
+		return ok && snapshot.Queued
+	}, time.Second, 10*time.Millisecond)
+
+	wg1.Wait()
+	wg2.Wait()
+
+	snapshot, ok := e.SnapshotLog(tracking2)
+	require.True(t, ok)
+	assert.False(t, snapshot.Queued)
+	assert.False(t, snapshot.Blocked)
+	assert.Equal(t, int32(0), snapshot.ExitCode)
+	assert.Contains(t, snapshot.Output, "queued-run")
+}
+
+func TestDifferentGroupsRunConcurrently(t *testing.T) {
+	t.Parallel()
+
+	actionA := &config.Action{
+		Title:  "Group A Job",
+		Shell:  "sleep 1",
+		Groups: []string{"groupA"},
+	}
+	actionB := &config.Action{
+		Title:  "Group B Job",
+		Shell:  "echo group-b",
+		Groups: []string{"groupB"},
+	}
+
+	e, cfg := testGroupExecutor(
+		[]*config.Action{actionA, actionB},
+		map[string]*config.ActionGroup{
+			"groupA": {MaxConcurrent: 1},
+			"groupB": {MaxConcurrent: 1},
+		},
+	)
+
+	wg1, tracking1 := e.ExecRequest(&ExecutionRequest{
+		Binding:           e.FindBindingWithNoEntity(actionA),
+		Cfg:               cfg,
+		AuthenticatedUser: auth.UserFromSystem(cfg, "testuser"),
+	})
+
+	waitUntilExecutionStarted(t, e, tracking1)
+
+	wg2, tracking2 := e.ExecRequest(&ExecutionRequest{
+		Binding:           e.FindBindingWithNoEntity(actionB),
+		Cfg:               cfg,
+		AuthenticatedUser: auth.UserFromSystem(cfg, "testuser"),
+	})
+
+	require.Eventually(t, func() bool {
+		snapshot, ok := e.SnapshotLog(tracking2)
+		return ok && snapshot.ExecutionFinished && !snapshot.Queued
+	}, 2*time.Second, 20*time.Millisecond)
+
+	wg1.Wait()
+	wg2.Wait()
+
+	snapshot, ok := e.SnapshotLog(tracking2)
+	require.True(t, ok)
+	assert.Contains(t, snapshot.Output, "group-b")
+}
+
+func TestPerActionConcurrencyStillBlocksWithoutQueue(t *testing.T) {
+	t.Parallel()
+
+	action := &config.Action{
+		Title:         "Single binding",
+		Shell:         "sleep 1",
+		MaxConcurrent: 1,
+	}
+
+	e, cfg := testGroupExecutor([]*config.Action{action}, nil)
+	binding := e.FindBindingWithNoEntity(action)
+
+	wg1, tracking1 := e.ExecRequest(&ExecutionRequest{
+		Binding:           binding,
+		Cfg:               cfg,
+		AuthenticatedUser: auth.UserFromSystem(cfg, "testuser"),
+	})
+
+	waitUntilExecutionStarted(t, e, tracking1)
+
+	wg2, tracking2 := e.ExecRequest(&ExecutionRequest{
+		Binding:           binding,
+		Cfg:               cfg,
+		AuthenticatedUser: auth.UserFromSystem(cfg, "testuser"),
+	})
+
+	wg1.Wait()
+	wg2.Wait()
+
+	snapshot, ok := e.SnapshotLog(tracking2)
+	require.True(t, ok)
+	assert.True(t, snapshot.Blocked)
+	assert.False(t, snapshot.Queued)
+}
+
+func waitUntilExecutionStarted(t *testing.T, e *Executor, trackingID string) {
+	t.Helper()
+
+	require.Eventually(t, func() bool {
+		snapshot, ok := e.SnapshotLog(trackingID)
+		return ok && snapshot.ExecutionStarted
+	}, 2*time.Second, 10*time.Millisecond)
+}
+
+func assertWaitGroupPending(t *testing.T, wg *sync.WaitGroup) {
+	t.Helper()
+
+	done := make(chan struct{})
+
+	go func() {
+		wg.Wait()
+		close(done)
+	}()
+
+	select {
+	case <-done:
+		t.Fatal("wait group completed before queued execution finished")
+	case <-time.After(100 * time.Millisecond):
+	}
+}
+
+func assertWaitGroupCompletes(t *testing.T, wg *sync.WaitGroup) {
+	t.Helper()
+
+	done := make(chan struct{})
+
+	go func() {
+		wg.Wait()
+		close(done)
+	}()
+
+	select {
+	case <-done:
+	case <-time.After(3 * time.Second):
+		t.Fatal("wait group did not complete after queue drained")
+	}
+}
+
+func TestStartActionAndWaitWaitsForQueuedExecution(t *testing.T) {
+	t.Parallel()
+
+	first := &config.Action{
+		Title:  "Hold group",
+		Shell:  "sleep 1",
+		Groups: []string{"unity"},
+	}
+	second := &config.Action{
+		Title:  "Wait in queue",
+		Shell:  "echo waited",
+		Groups: []string{"unity"},
+	}
+
+	e, cfg := testGroupExecutor(
+		[]*config.Action{first, second},
+		map[string]*config.ActionGroup{
+			"unity": {MaxConcurrent: 1},
+		},
+	)
+
+	wg1, tracking1 := e.ExecRequest(&ExecutionRequest{
+		Binding:           e.FindBindingWithNoEntity(first),
+		Cfg:               cfg,
+		AuthenticatedUser: auth.UserFromSystem(cfg, "testuser"),
+	})
+
+	waitUntilExecutionStarted(t, e, tracking1)
+
+	wg2, tracking2 := e.ExecRequest(&ExecutionRequest{
+		Binding:           e.FindBindingWithNoEntity(second),
+		Cfg:               cfg,
+		AuthenticatedUser: auth.UserFromSystem(cfg, "testuser"),
+	})
+
+	assertWaitGroupPending(t, wg2)
+
+	wg1.Wait()
+
+	assertWaitGroupCompletes(t, wg2)
+
+	snapshot, ok := e.SnapshotLog(tracking2)
+	require.True(t, ok)
+	assert.Contains(t, snapshot.Output, "waited")
+}
+
+func TestUnknownActionGroupReferenceWarnsAndSkipsLimit(t *testing.T) {
+	t.Parallel()
+
+	action := &config.Action{
+		Title:  "Unknown group action",
+		Shell:  "echo ok",
+		Groups: []string{"missing"},
+	}
+
+	e, cfg := testGroupExecutor([]*config.Action{action}, map[string]*config.ActionGroup{})
+	wg, tracking := e.ExecRequest(&ExecutionRequest{
+		Binding:           e.FindBindingWithNoEntity(action),
+		Cfg:               cfg,
+		AuthenticatedUser: auth.UserFromSystem(cfg, "testuser"),
+	})
+
+	wg.Wait()
+
+	snapshot, ok := e.SnapshotLog(tracking)
+	require.True(t, ok)
+	assert.False(t, snapshot.Queued)
+	assert.Equal(t, int32(0), snapshot.ExitCode)
+}

+ 19 - 0
specs/action-group-concurrency.md

@@ -0,0 +1,19 @@
+# Action group concurrency
+
+Actions may belong to one or more named groups. Each group may define a maximum number of concurrent executions shared across all actions in that group.
+
+When a user or trigger starts an action that belongs to a group, OliveTin counts how many executions for that group are currently active. Active means the execution has been requested but not yet finished, and is not waiting in a queue.
+
+If every configured group for that action has spare capacity, the execution proceeds through the normal execution pipeline.
+
+If any configured group is at capacity, the new execution is queued instead of rejected. The request receives a tracking identifier immediately. The log entry shows a queued status until the execution actually starts.
+
+Queued executions run in first-in-first-out order per OliveTin instance. When an active execution in a group finishes, OliveTin attempts to start the oldest queued execution that belongs to that group, provided all groups for that queued action now have spare capacity.
+
+An action may belong to multiple groups. In that case, all group limits must be satisfied before the action starts or leaves the queue.
+
+Per-action concurrency limits apply only to executions of the same action binding. When a per-action limit is exceeded, the request is blocked immediately and is not queued.
+
+Action group concurrency limits do not survive a process restart. Queued executions that have not started are discarded when OliveTin stops.
+
+If an action references a group name that is not defined in configuration, OliveTin logs a warning and does not apply a group limit for that name.

برخی فایل ها در این مقایسه diff نمایش داده نمی شوند زیرا تعداد فایل ها بسیار زیاد است