Kaynağa Gözat

feat: Action grouping for concurrency limit and queuing

jamesread 2 hafta önce
ebeveyn
işleme
92f564ab45

+ 27 - 1
docs/modules/ROOT/pages/action_customization/concurrency.adoc

@@ -3,7 +3,7 @@
 
 By default, OliveTin will allow you to run several instances of an action at the same time. For example, an action might take 20 seconds, and if you click the button 3 times, for a time there will be 3 actions running at the same time.
 
-Sometimes you don't want to allow this - an example case where it would not make sense is in the case of a backup script. To stop this, we can set `maxConcurrent` to `1`. 
+Sometimes you don't want to allow this - an example case where it would not make sense is in the case of a backup script. To stop this, we can set `maxConcurrent` to `1`.
 
 [source,yaml]
 ----
@@ -29,4 +29,30 @@ WARN Blocked from executing. This would mean this action is running 2 times conc
 
 Naturally, you can set `maxConcurrent` to `3` or some other number, to limit the amount of times the action executes at once.
 
+== Action groups
 
+Sometimes you need to limit concurrency across several different actions. For example, Unity only allows one build at a time, but you might have separate actions for different platforms.
+
+Use `actionGroups` to define a shared limit, and assign actions to a group with `groups`:
+
+[source,yaml]
+----
+actionGroups:
+  unity:
+    maxConcurrent: 1
+
+actions:
+  - title: Unity Android Build
+    shell: /opt/unity/build-android.sh
+    groups: [ unity ]
+
+  - title: Unity iOS Build
+    shell: /opt/unity/build-ios.sh
+    groups: [ unity ]
+----
+
+When the group limit is reached, additional requests are queued automatically and run in order when a slot becomes free. Queued executions appear in the logs with a queued status.
+
+Per-action `maxConcurrent` still applies separately. If the same action binding is started twice while one is already running, the second request is blocked immediately (not queued).
+
+The queue is held in memory. If OliveTin restarts while actions are queued, those queued requests are not preserved.

+ 18 - 2
frontend/js/websocket.js

@@ -194,11 +194,27 @@ async function handleConfigChangedEvent (j) {
   window.dispatchEvent(j)
 }
 
+const eventCaseToTypeName = {
+  entityChanged: 'EventEntityChanged',
+  configChanged: 'EventConfigChanged',
+  executionFinished: 'EventExecutionFinished',
+  executionStarted: 'EventExecutionStarted',
+  outputChunk: 'EventOutputChunk',
+  heartbeat: 'EventHeartbeat'
+}
+
 function handleEvent (msg) {
-  const typeName = msg.event.value.$typeName.replace('olivetin.api.v1.', '')
+  const eventCase = msg?.event?.case
+  const eventValue = msg?.event?.value
+  const typeName = eventCaseToTypeName[eventCase]
+
+  if (!typeName || !eventValue) {
+    console.warn('Skipping websocket event with no payload:', msg)
+    return
+  }
 
   const j = new Event(typeName)
-  j.payload = msg.event.value
+  j.payload = eventValue
 
   switch (typeName) {
     case 'EventConfigChanged':

+ 10 - 0
frontend/resources/scripts/gen/olivetin/api/v1/olivetin_pb.d.ts

@@ -673,6 +673,16 @@ export declare type LogEntry = Message<"olivetin.api.v1.LogEntry"> & {
    * @generated from field: string binding_id = 20;
    */
   bindingId: string;
+
+  /**
+   * @generated from field: bool queued = 21;
+   */
+  queued: boolean;
+
+  /**
+   * @generated from field: string queued_for_group = 22;
+   */
+  queuedForGroup: string;
 };
 
 /**

Dosya farkı çok büyük olduğundan ihmal edildi
+ 0 - 0
frontend/resources/scripts/gen/olivetin/api/v1/olivetin_pb.js


+ 13 - 0
frontend/resources/vue/ActionButton.vue

@@ -279,11 +279,18 @@ async function startAction(actionArgs) {
 function onLogEntryChanged(logEntry) {
   if (logEntry.executionFinished) {
 	onExecutionFinished(logEntry)
+  } else if (logEntry.queued && !logEntry.executionStarted) {
+	onExecutionQueued(logEntry)
   } else {
 	onExecutionStarted(logEntry)
   }
 }
 
+function onExecutionQueued(_logEntry) {
+  isDisabled.value = true
+  updateDom('action-queued', '[Queued]')
+}
+
 function onExecutionStarted(logEntry) {
   if (popupOnStart.value && popupOnStart.value.includes('execution-dialog')) {
 	router.push(`/logs/${logEntry.executionTrackingId}`)
@@ -440,6 +447,12 @@ defineExpose({
 		color: #721c24;
 	}
 
+	.action-button button.action-queued {
+		background: #fff3cd !important;
+		border-color: #ffeaa7;
+		color: #856404;
+	}
+
 	.action-button button.action-nonzero-exit {
 		background: #f8d7da !important;
 		border-color: #f5c6cb;

+ 2 - 0
proto/olivetin/api/v1/olivetin.proto

@@ -157,6 +157,8 @@ message LogEntry {
 	bool can_kill = 18;
 	string datetime_rate_limit_expires = 19; // Datetime when rate limit expires (empty string if not rate limited), format: "2006-01-02 15:04:05"
 	string binding_id = 20; // Binding ID for matching rate limits to action buttons
+	bool queued = 21;
+	string queued_for_group = 22;
 }
 
 message GetLogsResponse {

+ 20 - 2
service/gen/olivetin/api/v1/olivetin.pb.go

@@ -1305,6 +1305,8 @@ type LogEntry struct {
 	CanKill                  bool                   `protobuf:"varint,18,opt,name=can_kill,json=canKill,proto3" json:"can_kill,omitempty"`
 	DatetimeRateLimitExpires string                 `protobuf:"bytes,19,opt,name=datetime_rate_limit_expires,json=datetimeRateLimitExpires,proto3" json:"datetime_rate_limit_expires,omitempty"` // Datetime when rate limit expires (empty string if not rate limited), format: "2006-01-02 15:04:05"
 	BindingId                string                 `protobuf:"bytes,20,opt,name=binding_id,json=bindingId,proto3" json:"binding_id,omitempty"`                                                  // Binding ID for matching rate limits to action buttons
+	Queued                   bool                   `protobuf:"varint,21,opt,name=queued,proto3" json:"queued,omitempty"`
+	QueuedForGroup           string                 `protobuf:"bytes,22,opt,name=queued_for_group,json=queuedForGroup,proto3" json:"queued_for_group,omitempty"`
 	unknownFields            protoimpl.UnknownFields
 	sizeCache                protoimpl.SizeCache
 }
@@ -1465,6 +1467,20 @@ func (x *LogEntry) GetBindingId() string {
 	return ""
 }
 
+func (x *LogEntry) GetQueued() bool {
+	if x != nil {
+		return x.Queued
+	}
+	return false
+}
+
+func (x *LogEntry) GetQueuedForGroup() string {
+	if x != nil {
+		return x.QueuedForGroup
+	}
+	return ""
+}
+
 type GetLogsResponse struct {
 	state          protoimpl.MessageState `protogen:"open.v1"`
 	Logs           []*LogEntry            `protobuf:"bytes,1,rep,name=logs,proto3" json:"logs,omitempty"`
@@ -4183,7 +4199,7 @@ const file_olivetin_api_v1_olivetin_proto_rawDesc = "" +
 	"\fstart_offset\x18\x01 \x01(\x03R\vstartOffset\x12\x1f\n" +
 	"\vdate_filter\x18\x02 \x01(\tR\n" +
 	"dateFilter\x12\x1b\n" +
-	"\tpage_size\x18\x03 \x01(\x03R\bpageSize\"\x89\x05\n" +
+	"\tpage_size\x18\x03 \x01(\x03R\bpageSize\"\xcb\x05\n" +
 	"\bLogEntry\x12)\n" +
 	"\x10datetime_started\x18\x01 \x01(\tR\x0fdatetimeStarted\x12!\n" +
 	"\faction_title\x18\x02 \x01(\tR\vactionTitle\x12\x16\n" +
@@ -4206,7 +4222,9 @@ const file_olivetin_api_v1_olivetin_proto_rawDesc = "" +
 	"\bcan_kill\x18\x12 \x01(\bR\acanKill\x12=\n" +
 	"\x1bdatetime_rate_limit_expires\x18\x13 \x01(\tR\x18datetimeRateLimitExpires\x12\x1d\n" +
 	"\n" +
-	"binding_id\x18\x14 \x01(\tR\tbindingId\"\xca\x01\n" +
+	"binding_id\x18\x14 \x01(\tR\tbindingId\x12\x16\n" +
+	"\x06queued\x18\x15 \x01(\bR\x06queued\x12(\n" +
+	"\x10queued_for_group\x18\x16 \x01(\tR\x0equeuedForGroup\"\xca\x01\n" +
 	"\x0fGetLogsResponse\x12-\n" +
 	"\x04logs\x18\x01 \x03(\v2\x19.olivetin.api.v1.LogEntryR\x04logs\x12'\n" +
 	"\x0fcount_remaining\x18\x02 \x01(\x03R\x0ecountRemaining\x12\x1b\n" +

+ 2 - 0
service/internal/api/api.go

@@ -378,6 +378,8 @@ func (api *oliveTinAPI) internalLogEntryToPb(logEntry *executor.InternalLogEntry
 		Output:                   logEntry.Output,
 		TimedOut:                 logEntry.TimedOut,
 		Blocked:                  logEntry.Blocked,
+		Queued:                   logEntry.Queued,
+		QueuedForGroup:           logEntry.QueuedForGroup,
 		ExitCode:                 logEntry.ExitCode,
 		Tags:                     logEntry.Tags,
 		ExecutionTrackingId:      logEntry.ExecutionTrackingID,

+ 8 - 0
service/internal/config/config.go

@@ -33,6 +33,13 @@ type Action struct {
 	PopupOnStart           string           `koanf:"popupOnStart"`
 	SaveLogs               SaveLogsConfig   `koanf:"saveLogs"`
 	EnabledExpression      string           `koanf:"enabledExpression"`
+	Groups                 []string         `koanf:"groups"`
+}
+
+// ActionGroup defines shared limits and metadata for a set of actions.
+type ActionGroup struct {
+	MaxConcurrent int    `koanf:"maxConcurrent"`
+	Icon          string `koanf:"icon"`
 }
 
 // ActionArgument objects appear on Actions.
@@ -134,6 +141,7 @@ type Config struct {
 	LogLevel                        string                     `koanf:"logLevel"`
 	LogDebugOptions                 LogDebugOptions            `koanf:"logDebugOptions"`
 	LogHistoryPageSize              int64                      `koanf:"logHistoryPageSize"`
+	ActionGroups                    map[string]*ActionGroup    `koanf:"actionGroups"`
 	Actions                         []*Action                  `koanf:"actions"`
 	Entities                        []*EntityFile              `koanf:"entities"`
 	Dashboards                      []*DashboardComponent      `koanf:"dashboards"`

+ 42 - 0
service/internal/config/sanitize.go

@@ -25,6 +25,8 @@ func (cfg *Config) Sanitize() {
 		cfg.Actions[idx].sanitize(cfg)
 	}
 
+	cfg.sanitizeActionGroupReferences()
+
 	cfg.sanitizeDashboardsForInlineActions()
 
 	if err := cfg.validateReservedActionArgumentNames(); err != nil {
@@ -181,11 +183,51 @@ func (action *Action) sanitize(cfg *Config) {
 		action.MaxConcurrent = 1
 	}
 
+	action.Groups = dedupeStrings(action.Groups)
+
 	for idx := range action.Arguments {
 		action.Arguments[idx].sanitize()
 	}
 }
 
+func dedupeStrings(values []string) []string {
+	seen := make(map[string]struct{}, len(values))
+	out := make([]string, 0, len(values))
+
+	for _, value := range values {
+		out = appendUniqueString(out, seen, value)
+	}
+
+	return out
+}
+
+func appendUniqueString(out []string, seen map[string]struct{}, value string) []string {
+	if value == "" {
+		return out
+	}
+
+	if _, found := seen[value]; found {
+		return out
+	}
+
+	seen[value] = struct{}{}
+
+	return append(out, value)
+}
+
+func (cfg *Config) sanitizeActionGroupReferences() {
+	for _, action := range cfg.Actions {
+		for _, groupName := range action.Groups {
+			if _, found := cfg.ActionGroups[groupName]; !found {
+				log.WithFields(log.Fields{
+					"actionTitle": action.Title,
+					"groupName":   groupName,
+				}).Warn("Action references unknown action group")
+			}
+		}
+	}
+}
+
 func (cfg *Config) sanitizeAuthRequireGuestsToLogin() {
 	if cfg.AuthRequireGuestsToLogin {
 		log.Infof("AuthRequireGuestsToLogin is enabled. All defaultPermissions will be set to false")

+ 18 - 0
service/internal/config/sanitize_test.go

@@ -107,6 +107,24 @@ func TestValidateReservedActionArgumentNames(t *testing.T) {
 	assert.Contains(t, err.Error(), `action "Reserved arg" argument "ot_custom" uses reserved prefix "ot_"`)
 }
 
+func TestSanitizeActionGroupsDedupesGroupNames(t *testing.T) {
+	c := DefaultConfig()
+	c.ActionGroups = map[string]*ActionGroup{
+		"unity": {MaxConcurrent: 1},
+	}
+	c.Actions = append(c.Actions, &Action{
+		Title:  "Build",
+		Shell:  "true",
+		Groups: []string{"unity", "unity", ""},
+	})
+
+	c.Sanitize()
+
+	action := c.findAction("Build")
+	require.NotNil(t, action)
+	assert.Equal(t, []string{"unity"}, action.Groups)
+}
+
 func TestValidateReservedActionArgumentNamesAllowsNonReserved(t *testing.T) {
 	c := DefaultConfig()
 	c.Actions = append(c.Actions, &Action{

+ 60 - 21
service/internal/executor/executor.go

@@ -71,6 +71,9 @@ type Executor struct {
 	listeners []listener
 
 	chainOfCommand []executorStepFunc
+
+	groupQueue   []*queuedExecution
+	groupQueueMu sync.Mutex
 }
 
 // ExecutionRequest is a request to execute an action. It's passed to an
@@ -84,11 +87,12 @@ type ExecutionRequest struct {
 	AuthenticatedUser *authpublic.AuthenticatedUser
 	TriggerDepth      int
 
-	logEntry           *InternalLogEntry
-	finalParsedCommand string
-	execArgs           []string
-	useDirectExec      bool
-	executor           *Executor
+	logEntry                *InternalLogEntry
+	finalParsedCommand      string
+	execArgs                []string
+	useDirectExec           bool
+	executor                *Executor
+	skipRequestRegistration bool
 }
 
 // InternalLogEntry objects are created by an Executor, and represent the final
@@ -101,6 +105,8 @@ type InternalLogEntry struct {
 	Output              string
 	TimedOut            bool
 	Blocked             bool
+	Queued              bool
+	QueuedForGroup      string
 	ExitCode            int32
 	Tags                []string
 	ExecutionStarted    bool
@@ -496,6 +502,26 @@ func (e *Executor) SetLog(trackingID string, entry *InternalLogEntry) {
 
 // ExecRequest processes an ExecutionRequest
 func (e *Executor) ExecRequest(req *ExecutionRequest) (*sync.WaitGroup, string) {
+	e.initializeExecRequest(req)
+
+	log.Tracef("executor.ExecRequest(): %v", req)
+
+	e.SetLog(req.TrackingID, req.logEntry)
+
+	wg := new(sync.WaitGroup)
+	wg.Add(1)
+
+	go func() {
+		queued := e.execChain(req, wg)
+		if !queued {
+			wg.Done()
+		}
+	}()
+
+	return wg, req.TrackingID
+}
+
+func (e *Executor) initializeExecRequest(req *ExecutionRequest) {
 	if req.AuthenticatedUser == nil {
 		req.AuthenticatedUser = auth.UserGuest(req.Cfg)
 	}
@@ -519,41 +545,54 @@ func (e *Executor) ExecRequest(req *ExecutionRequest) (*sync.WaitGroup, string)
 		req.TrackingID = uuid.NewString()
 	}
 
-	// Update the log entry with the final tracking ID
 	req.logEntry.ExecutionTrackingID = req.TrackingID
+}
 
-	log.Tracef("executor.ExecRequest(): %v", req)
+func (e *Executor) execChain(req *ExecutionRequest, wg *sync.WaitGroup) bool {
+	if !req.skipRequestRegistration {
+		finished, queued := e.registerOrQueueRequest(req, wg)
+		if finished || queued {
+			return queued
+		}
+	}
 
-	e.SetLog(req.TrackingID, req.logEntry)
+	e.runExecutionSteps(req)
+	e.finishExecChain(req)
 
-	wg := new(sync.WaitGroup)
-	wg.Add(1)
+	return false
+}
 
-	go func() {
-		e.execChain(req)
-		defer wg.Done()
-	}()
+func (e *Executor) registerOrQueueRequest(req *ExecutionRequest, wg *sync.WaitGroup) (finished bool, queued bool) {
+	if !stepRequestAction(req) {
+		e.finishExecChain(req)
+		return true, false
+	}
 
-	return wg, req.TrackingID
+	if !actionNeedsGroupLimit(req) || e.groupsHaveCapacityForActive(req) {
+		return false, false
+	}
+
+	e.queueRequest(req, wg)
+
+	return false, true
 }
 
-func (e *Executor) execChain(req *ExecutionRequest) {
-	for _, step := range e.chainOfCommand {
+func (e *Executor) runExecutionSteps(req *ExecutionRequest) {
+	for _, step := range e.chainOfCommand[1:] {
 		if !step(req) {
 			break
 		}
 	}
+}
 
-	// Ensure DatetimeFinished is set even if execution was blocked early
+func (e *Executor) finishExecChain(req *ExecutionRequest) {
 	if req.logEntry.DatetimeFinished.IsZero() {
 		req.logEntry.DatetimeFinished = time.Now()
 	}
 
 	req.logEntry.ExecutionFinished = true
-
-	// This isn't a step, because we want to notify all listeners, irrespective
-	// of how many steps were actually executed.
 	notifyListenersFinished(req)
+	e.drainGroupQueue()
 }
 
 func getConcurrentCount(req *ExecutionRequest) int {

+ 182 - 0
service/internal/executor/group_concurrency.go

@@ -0,0 +1,182 @@
+package executor
+
+import (
+	"fmt"
+	"slices"
+	"sync"
+
+	config "github.com/OliveTin/OliveTin/internal/config"
+	log "github.com/sirupsen/logrus"
+)
+
+type groupLimit struct {
+	name          string
+	maxConcurrent int
+}
+
+type queuedExecution struct {
+	req *ExecutionRequest
+	wg  *sync.WaitGroup
+}
+
+func actionGroupLimits(req *ExecutionRequest) []groupLimit {
+	if !hasActionGroupContext(req) {
+		return nil
+	}
+
+	limits := make([]groupLimit, 0, len(req.Binding.Action.Groups))
+
+	for _, groupName := range req.Binding.Action.Groups {
+		if limit, ok := groupLimitFromConfig(req.Cfg, groupName); ok {
+			limits = append(limits, limit)
+		}
+	}
+
+	return limits
+}
+
+func hasActionGroupContext(req *ExecutionRequest) bool {
+	return req != nil && req.Binding != nil && req.Binding.Action != nil && req.Cfg != nil
+}
+
+func groupLimitFromConfig(cfg *config.Config, groupName string) (groupLimit, bool) {
+	group, found := cfg.ActionGroups[groupName]
+	if !found || group == nil || group.MaxConcurrent < 1 {
+		return groupLimit{}, false
+	}
+
+	return groupLimit{name: groupName, maxConcurrent: group.MaxConcurrent}, true
+}
+
+func actionNeedsGroupLimit(req *ExecutionRequest) bool {
+	return len(actionGroupLimits(req)) > 0
+}
+
+func actionInGroup(action *config.Action, groupName string) bool {
+	if action == nil {
+		return false
+	}
+
+	return slices.Contains(action.Groups, groupName)
+}
+
+func (e *Executor) countActiveInGroup(groupName string) int {
+	count := 0
+
+	e.logmutex.RLock()
+	defer e.logmutex.RUnlock()
+
+	for _, logEntry := range e.logs {
+		if logEntryIsActiveInGroup(logEntry, groupName) {
+			count++
+		}
+	}
+
+	return count
+}
+
+func logEntryIsActiveInGroup(logEntry *InternalLogEntry, groupName string) bool {
+	if inactiveLogEntry(logEntry) {
+		return false
+	}
+
+	return actionInGroup(logEntry.Binding.Action, groupName)
+}
+
+func inactiveLogEntry(logEntry *InternalLogEntry) bool {
+	if logEntry == nil {
+		return true
+	}
+
+	return logEntryIsInactive(logEntry)
+}
+
+func logEntryIsInactive(logEntry *InternalLogEntry) bool {
+	if logEntry.ExecutionFinished || logEntry.Queued {
+		return true
+	}
+
+	return logEntry.Binding == nil || logEntry.Binding.Action == nil
+}
+
+func (e *Executor) groupsHaveCapacityForActive(req *ExecutionRequest) bool {
+	for _, limit := range actionGroupLimits(req) {
+		if e.countActiveInGroup(limit.name) >= (limit.maxConcurrent + 1) {
+			return false
+		}
+	}
+
+	return true
+}
+
+func (e *Executor) groupsHaveCapacityForQueued(req *ExecutionRequest) bool {
+	for _, limit := range actionGroupLimits(req) {
+		if e.countActiveInGroup(limit.name) >= limit.maxConcurrent {
+			return false
+		}
+	}
+
+	return true
+}
+
+func firstFullGroupName(e *Executor, req *ExecutionRequest) string {
+	for _, limit := range actionGroupLimits(req) {
+		if e.countActiveInGroup(limit.name) >= (limit.maxConcurrent + 1) {
+			return limit.name
+		}
+	}
+
+	return ""
+}
+
+func (e *Executor) queueRequest(req *ExecutionRequest, wg *sync.WaitGroup) {
+	groupName := firstFullGroupName(e, req)
+
+	req.logEntry.Queued = true
+	req.logEntry.QueuedForGroup = groupName
+	req.logEntry.Output = fmt.Sprintf("Queued waiting for action group %q", groupName)
+
+	log.WithFields(log.Fields{
+		"actionTitle": req.logEntry.ActionTitle,
+		"groupName":   groupName,
+	}).Infof("Action queued due to action group concurrency limit")
+
+	e.groupQueueMu.Lock()
+	e.groupQueue = append(e.groupQueue, &queuedExecution{req: req, wg: wg})
+	e.groupQueueMu.Unlock()
+}
+
+func (e *Executor) drainGroupQueue() {
+	e.groupQueueMu.Lock()
+
+	if len(e.groupQueue) == 0 {
+		e.groupQueueMu.Unlock()
+		return
+	}
+
+	next := e.groupQueue[0]
+	if !e.groupsHaveCapacityForQueued(next.req) {
+		e.groupQueueMu.Unlock()
+		return
+	}
+
+	e.groupQueue = e.groupQueue[1:]
+	e.groupQueueMu.Unlock()
+
+	go e.runDequeuedExecution(next)
+}
+
+func (e *Executor) runDequeuedExecution(queued *queuedExecution) {
+	req := queued.req
+
+	e.logmutex.Lock()
+	req.logEntry.Queued = false
+	req.logEntry.QueuedForGroup = ""
+	e.logmutex.Unlock()
+
+	req.skipRequestRegistration = true
+
+	e.runExecutionSteps(req)
+	e.finishExecChain(req)
+	queued.wg.Done()
+}

+ 277 - 0
service/internal/executor/group_concurrency_test.go

@@ -0,0 +1,277 @@
+package executor
+
+import (
+	"sync"
+	"testing"
+	"time"
+
+	"github.com/OliveTin/OliveTin/internal/auth"
+	config "github.com/OliveTin/OliveTin/internal/config"
+	"github.com/stretchr/testify/assert"
+	"github.com/stretchr/testify/require"
+)
+
+func testGroupExecutor(actions []*config.Action, groups map[string]*config.ActionGroup) (*Executor, *config.Config) {
+	cfg := config.DefaultConfig()
+	cfg.ActionGroups = groups
+	cfg.Actions = actions
+	cfg.Sanitize()
+
+	e := DefaultExecutor(cfg)
+	e.RebuildActionMap()
+
+	return e, cfg
+}
+
+func TestGroupConcurrencyQueuesSecondAction(t *testing.T) {
+	t.Parallel()
+
+	slowAction := &config.Action{
+		Title:  "Unity Job 1",
+		Shell:  "sleep 2",
+		Groups: []string{"unity"},
+	}
+	fastAction := &config.Action{
+		Title:  "Unity Job 2",
+		Shell:  "echo queued-run",
+		Groups: []string{"unity"},
+	}
+
+	e, cfg := testGroupExecutor(
+		[]*config.Action{slowAction, fastAction},
+		map[string]*config.ActionGroup{
+			"unity": {MaxConcurrent: 1},
+		},
+	)
+
+	binding1 := e.FindBindingWithNoEntity(slowAction)
+	binding2 := e.FindBindingWithNoEntity(fastAction)
+	require.NotNil(t, binding1)
+	require.NotNil(t, binding2)
+
+	wg1, tracking1 := e.ExecRequest(&ExecutionRequest{
+		Binding:           binding1,
+		Cfg:               cfg,
+		AuthenticatedUser: auth.UserFromSystem(cfg, "testuser"),
+	})
+
+	waitUntilExecutionStarted(t, e, tracking1)
+
+	wg2, tracking2 := e.ExecRequest(&ExecutionRequest{
+		Binding:           binding2,
+		Cfg:               cfg,
+		AuthenticatedUser: auth.UserFromSystem(cfg, "testuser"),
+	})
+
+	require.Eventually(t, func() bool {
+		logEntry, ok := e.GetLog(tracking2)
+		return ok && logEntry.Queued
+	}, time.Second, 10*time.Millisecond)
+
+	wg1.Wait()
+	wg2.Wait()
+
+	logEntry, ok := e.GetLog(tracking2)
+	require.True(t, ok)
+	assert.False(t, logEntry.Queued)
+	assert.False(t, logEntry.Blocked)
+	assert.Equal(t, int32(0), logEntry.ExitCode)
+	assert.Contains(t, logEntry.Output, "queued-run")
+}
+
+func TestDifferentGroupsRunConcurrently(t *testing.T) {
+	t.Parallel()
+
+	actionA := &config.Action{
+		Title:  "Group A Job",
+		Shell:  "sleep 1",
+		Groups: []string{"groupA"},
+	}
+	actionB := &config.Action{
+		Title:  "Group B Job",
+		Shell:  "echo group-b",
+		Groups: []string{"groupB"},
+	}
+
+	e, cfg := testGroupExecutor(
+		[]*config.Action{actionA, actionB},
+		map[string]*config.ActionGroup{
+			"groupA": {MaxConcurrent: 1},
+			"groupB": {MaxConcurrent: 1},
+		},
+	)
+
+	wg1, _ := e.ExecRequest(&ExecutionRequest{
+		Binding:           e.FindBindingWithNoEntity(actionA),
+		Cfg:               cfg,
+		AuthenticatedUser: auth.UserFromSystem(cfg, "testuser"),
+	})
+
+	time.Sleep(50 * time.Millisecond)
+
+	wg2, tracking2 := e.ExecRequest(&ExecutionRequest{
+		Binding:           e.FindBindingWithNoEntity(actionB),
+		Cfg:               cfg,
+		AuthenticatedUser: auth.UserFromSystem(cfg, "testuser"),
+	})
+
+	require.Eventually(t, func() bool {
+		logEntry, ok := e.GetLog(tracking2)
+		return ok && logEntry.ExecutionFinished && !logEntry.Queued
+	}, 2*time.Second, 20*time.Millisecond)
+
+	wg1.Wait()
+	wg2.Wait()
+
+	logEntry, ok := e.GetLog(tracking2)
+	require.True(t, ok)
+	assert.Contains(t, logEntry.Output, "group-b")
+}
+
+func TestPerActionConcurrencyStillBlocksWithoutQueue(t *testing.T) {
+	t.Parallel()
+
+	action := &config.Action{
+		Title:         "Single binding",
+		Shell:         "sleep 1",
+		MaxConcurrent: 1,
+	}
+
+	e, cfg := testGroupExecutor([]*config.Action{action}, nil)
+	binding := e.FindBindingWithNoEntity(action)
+
+	wg1, _ := e.ExecRequest(&ExecutionRequest{
+		Binding:           binding,
+		Cfg:               cfg,
+		AuthenticatedUser: auth.UserFromSystem(cfg, "testuser"),
+	})
+
+	time.Sleep(50 * time.Millisecond)
+
+	wg2, tracking2 := e.ExecRequest(&ExecutionRequest{
+		Binding:           binding,
+		Cfg:               cfg,
+		AuthenticatedUser: auth.UserFromSystem(cfg, "testuser"),
+	})
+
+	wg1.Wait()
+	wg2.Wait()
+
+	logEntry, ok := e.GetLog(tracking2)
+	require.True(t, ok)
+	assert.True(t, logEntry.Blocked)
+	assert.False(t, logEntry.Queued)
+}
+
+func waitUntilExecutionStarted(t *testing.T, e *Executor, trackingID string) {
+	t.Helper()
+
+	require.Eventually(t, func() bool {
+		logEntry, ok := e.GetLog(trackingID)
+		return ok && logEntry.ExecutionStarted
+	}, 2*time.Second, 10*time.Millisecond)
+}
+
+func assertWaitGroupPending(t *testing.T, wg *sync.WaitGroup) {
+	t.Helper()
+
+	done := make(chan struct{})
+
+	go func() {
+		wg.Wait()
+		close(done)
+	}()
+
+	select {
+	case <-done:
+		t.Fatal("wait group completed before queued execution finished")
+	case <-time.After(100 * time.Millisecond):
+	}
+}
+
+func assertWaitGroupCompletes(t *testing.T, wg *sync.WaitGroup) {
+	t.Helper()
+
+	done := make(chan struct{})
+
+	go func() {
+		wg.Wait()
+		close(done)
+	}()
+
+	select {
+	case <-done:
+	case <-time.After(3 * time.Second):
+		t.Fatal("wait group did not complete after queue drained")
+	}
+}
+
+func TestStartActionAndWaitWaitsForQueuedExecution(t *testing.T) {
+	t.Parallel()
+
+	first := &config.Action{
+		Title:  "Hold group",
+		Shell:  "sleep 1",
+		Groups: []string{"unity"},
+	}
+	second := &config.Action{
+		Title:  "Wait in queue",
+		Shell:  "echo waited",
+		Groups: []string{"unity"},
+	}
+
+	e, cfg := testGroupExecutor(
+		[]*config.Action{first, second},
+		map[string]*config.ActionGroup{
+			"unity": {MaxConcurrent: 1},
+		},
+	)
+
+	wg1, tracking1 := e.ExecRequest(&ExecutionRequest{
+		Binding:           e.FindBindingWithNoEntity(first),
+		Cfg:               cfg,
+		AuthenticatedUser: auth.UserFromSystem(cfg, "testuser"),
+	})
+
+	waitUntilExecutionStarted(t, e, tracking1)
+
+	wg2, tracking2 := e.ExecRequest(&ExecutionRequest{
+		Binding:           e.FindBindingWithNoEntity(second),
+		Cfg:               cfg,
+		AuthenticatedUser: auth.UserFromSystem(cfg, "testuser"),
+	})
+
+	assertWaitGroupPending(t, wg2)
+
+	wg1.Wait()
+
+	assertWaitGroupCompletes(t, wg2)
+
+	logEntry, ok := e.GetLog(tracking2)
+	require.True(t, ok)
+	assert.Contains(t, logEntry.Output, "waited")
+}
+
+func TestUnknownActionGroupReferenceWarnsAndSkipsLimit(t *testing.T) {
+	t.Parallel()
+
+	action := &config.Action{
+		Title:  "Unknown group action",
+		Shell:  "echo ok",
+		Groups: []string{"missing"},
+	}
+
+	e, cfg := testGroupExecutor([]*config.Action{action}, map[string]*config.ActionGroup{})
+	wg, tracking := e.ExecRequest(&ExecutionRequest{
+		Binding:           e.FindBindingWithNoEntity(action),
+		Cfg:               cfg,
+		AuthenticatedUser: auth.UserFromSystem(cfg, "testuser"),
+	})
+
+	wg.Wait()
+
+	logEntry, ok := e.GetLog(tracking)
+	require.True(t, ok)
+	assert.False(t, logEntry.Queued)
+	assert.Equal(t, int32(0), logEntry.ExitCode)
+}

+ 19 - 0
specs/action-group-concurrency.md

@@ -0,0 +1,19 @@
+# Action group concurrency
+
+Actions may belong to one or more named groups. Each group may define a maximum number of concurrent executions shared across all actions in that group.
+
+When a user or trigger starts an action that belongs to a group, OliveTin counts how many executions for that group are currently active. Active means the execution has been requested but not yet finished, and is not waiting in a queue.
+
+If every configured group for that action has spare capacity, the execution proceeds through the normal execution pipeline.
+
+If any configured group is at capacity, the new execution is queued instead of rejected. The request receives a tracking identifier immediately. The log entry shows a queued status until the execution actually starts.
+
+Queued executions run in first-in-first-out order per OliveTin instance. When an active execution in a group finishes, OliveTin attempts to start the oldest queued execution that belongs to that group, provided all groups for that queued action now have spare capacity.
+
+An action may belong to multiple groups. In that case, all group limits must be satisfied before the action starts or leaves the queue.
+
+Per-action concurrency limits apply only to executions of the same action binding. When a per-action limit is exceeded, the request is blocked immediately and is not queued.
+
+Action group concurrency limits do not survive a process restart. Queued executions that have not started are discarded when OliveTin stops.
+
+If an action references a group name that is not defined in configuration, OliveTin logs a warning and does not apply a group limit for that name.

Bu fark içinde çok fazla dosya değişikliği olduğu için bazı dosyalar gösterilmiyor