|
|
@@ -236,20 +236,19 @@ func TestPerActionConcurrencyStillBlocksWithoutQueue(t *testing.T) {
|
|
|
assert.False(t, snapshot.Queued)
|
|
|
}
|
|
|
|
|
|
-func TestPerActionConcurrencyBlocksSameBindingBeforeGroupQueue(t *testing.T) {
|
|
|
+func TestGroupedSameBindingQueuesWhenGroupFull(t *testing.T) {
|
|
|
t.Parallel()
|
|
|
|
|
|
action := &config.Action{
|
|
|
- Title: "Single binding grouped",
|
|
|
- Shell: "sleep 1",
|
|
|
- MaxConcurrent: 1,
|
|
|
- Groups: []string{"unity"},
|
|
|
+ Title: "Single binding grouped",
|
|
|
+ Shell: "sleep 1",
|
|
|
+ Groups: []string{"unity"},
|
|
|
}
|
|
|
|
|
|
e, cfg := testGroupExecutor(
|
|
|
[]*config.Action{action},
|
|
|
map[string]*config.ActionGroup{
|
|
|
- "unity": {MaxConcurrent: 1},
|
|
|
+ "unity": {MaxConcurrent: 1, QueueSize: 5},
|
|
|
},
|
|
|
)
|
|
|
binding := e.FindBindingWithNoEntity(action)
|
|
|
@@ -268,15 +267,117 @@ func TestPerActionConcurrencyBlocksSameBindingBeforeGroupQueue(t *testing.T) {
|
|
|
AuthenticatedUser: auth.UserFromSystem(cfg, "testuser"),
|
|
|
})
|
|
|
|
|
|
+ require.Eventually(t, func() bool {
|
|
|
+ snapshot, ok := e.SnapshotLog(tracking2)
|
|
|
+ return ok && snapshot.Queued
|
|
|
+ }, time.Second, 10*time.Millisecond)
|
|
|
+
|
|
|
wg1.Wait()
|
|
|
wg2.Wait()
|
|
|
|
|
|
snapshot, ok := e.SnapshotLog(tracking2)
|
|
|
require.True(t, ok)
|
|
|
- assert.True(t, snapshot.Blocked)
|
|
|
+ assert.False(t, snapshot.Blocked)
|
|
|
+}
|
|
|
+
|
|
|
+func TestGroupAllowsTwoConcurrentSameBinding(t *testing.T) {
|
|
|
+ t.Parallel()
|
|
|
+
|
|
|
+ action := &config.Action{
|
|
|
+ Title: "Long running action",
|
|
|
+ Shell: "sleep 1",
|
|
|
+ Groups: []string{"con2queue10"},
|
|
|
+ }
|
|
|
+
|
|
|
+ e, cfg := testGroupExecutor(
|
|
|
+ []*config.Action{action},
|
|
|
+ map[string]*config.ActionGroup{
|
|
|
+ "con2queue10": {MaxConcurrent: 2, QueueSize: 10},
|
|
|
+ },
|
|
|
+ )
|
|
|
+ binding := e.FindBindingWithNoEntity(action)
|
|
|
+
|
|
|
+ wg1, tracking1 := e.ExecRequest(&ExecutionRequest{
|
|
|
+ Binding: binding,
|
|
|
+ Cfg: cfg,
|
|
|
+ AuthenticatedUser: auth.UserFromSystem(cfg, "testuser"),
|
|
|
+ })
|
|
|
+
|
|
|
+ waitUntilExecutionStarted(t, e, tracking1)
|
|
|
+
|
|
|
+ wg2, tracking2 := e.ExecRequest(&ExecutionRequest{
|
|
|
+ Binding: binding,
|
|
|
+ Cfg: cfg,
|
|
|
+ AuthenticatedUser: auth.UserFromSystem(cfg, "testuser"),
|
|
|
+ })
|
|
|
+
|
|
|
+ require.Eventually(t, func() bool {
|
|
|
+ snapshot, ok := e.SnapshotLog(tracking2)
|
|
|
+ return ok && snapshot.ExecutionStarted && !snapshot.Queued && !snapshot.Blocked
|
|
|
+ }, 2*time.Second, 10*time.Millisecond)
|
|
|
+
|
|
|
+ wg1.Wait()
|
|
|
+ wg2.Wait()
|
|
|
+
|
|
|
+ snapshot, ok := e.SnapshotLog(tracking2)
|
|
|
+ require.True(t, ok)
|
|
|
+ assert.False(t, snapshot.Blocked)
|
|
|
assert.False(t, snapshot.Queued)
|
|
|
}
|
|
|
|
|
|
+func TestGroupQueuesThirdAndBlocksWhenQueueFull(t *testing.T) {
|
|
|
+ t.Parallel()
|
|
|
+
|
|
|
+ action := &config.Action{
|
|
|
+ Title: "Long running action",
|
|
|
+ Shell: "sleep 1",
|
|
|
+ Groups: []string{"con2queue10"},
|
|
|
+ }
|
|
|
+
|
|
|
+ e, cfg := testGroupExecutor(
|
|
|
+ []*config.Action{action},
|
|
|
+ map[string]*config.ActionGroup{
|
|
|
+ "con2queue10": {MaxConcurrent: 2, QueueSize: 2},
|
|
|
+ },
|
|
|
+ )
|
|
|
+ binding := e.FindBindingWithNoEntity(action)
|
|
|
+
|
|
|
+ wg1, tracking1 := e.ExecRequest(&ExecutionRequest{
|
|
|
+ Binding: binding,
|
|
|
+ Cfg: cfg,
|
|
|
+ AuthenticatedUser: auth.UserFromSystem(cfg, "testuser"),
|
|
|
+ })
|
|
|
+ waitUntilExecutionStarted(t, e, tracking1)
|
|
|
+
|
|
|
+ wg2, tracking2 := e.ExecRequest(&ExecutionRequest{
|
|
|
+ Binding: binding,
|
|
|
+ Cfg: cfg,
|
|
|
+ AuthenticatedUser: auth.UserFromSystem(cfg, "testuser"),
|
|
|
+ })
|
|
|
+ waitUntilExecutionStarted(t, e, tracking2)
|
|
|
+
|
|
|
+ trackings := []string{tracking1, tracking2}
|
|
|
+ waitGroups := []*sync.WaitGroup{wg1, wg2}
|
|
|
+
|
|
|
+ for idx := 0; idx < 3; idx++ {
|
|
|
+ wg, tracking := e.ExecRequest(&ExecutionRequest{
|
|
|
+ Binding: binding,
|
|
|
+ Cfg: cfg,
|
|
|
+ AuthenticatedUser: auth.UserFromSystem(cfg, "testuser"),
|
|
|
+ })
|
|
|
+ trackings = append(trackings, tracking)
|
|
|
+ waitGroups = append(waitGroups, wg)
|
|
|
+ }
|
|
|
+
|
|
|
+ require.Eventually(t, func() bool {
|
|
|
+ return groupExecutionDistributionMatches(e, trackings, 2, 2, 1)
|
|
|
+ }, 2*time.Second, 20*time.Millisecond)
|
|
|
+
|
|
|
+ for _, wg := range waitGroups {
|
|
|
+ wg.Wait()
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
func waitUntilExecutionStarted(t *testing.T, e *Executor, trackingID string) {
|
|
|
t.Helper()
|
|
|
|
|
|
@@ -390,6 +491,79 @@ func TestStartActionAndWaitWaitsForQueuedExecution(t *testing.T) {
|
|
|
assert.Contains(t, snapshot.Output, "waited")
|
|
|
}
|
|
|
|
|
|
+func TestGroupQueueBlocksWhenQueueFull(t *testing.T) {
|
|
|
+ t.Parallel()
|
|
|
+
|
|
|
+ actions := []*config.Action{
|
|
|
+ {Title: "Hold 1", Shell: "sleep 1", Groups: []string{"unity"}},
|
|
|
+ {Title: "Hold 2", Shell: "sleep 1", Groups: []string{"unity"}},
|
|
|
+ {Title: "Hold 3", Shell: "sleep 1", Groups: []string{"unity"}},
|
|
|
+ {Title: "Hold 4", Shell: "sleep 1", Groups: []string{"unity"}},
|
|
|
+ }
|
|
|
+
|
|
|
+ e, cfg := testGroupExecutor(
|
|
|
+ actions,
|
|
|
+ map[string]*config.ActionGroup{
|
|
|
+ "unity": {MaxConcurrent: 1, QueueSize: 2},
|
|
|
+ },
|
|
|
+ )
|
|
|
+
|
|
|
+ trackings, waitGroups := execAllGroupActions(t, e, cfg, actions)
|
|
|
+
|
|
|
+ require.Eventually(t, func() bool {
|
|
|
+ return countSnapshots(e, trackings, func(snapshot LogEntrySnapshot) bool { return snapshot.Blocked }) == 1 &&
|
|
|
+ countSnapshots(e, trackings, func(snapshot LogEntrySnapshot) bool { return snapshot.Queued }) == 2 &&
|
|
|
+ countSnapshots(e, trackings, isRunningSnapshot) == 1
|
|
|
+ }, 2*time.Second, 20*time.Millisecond)
|
|
|
+
|
|
|
+ for _, wg := range waitGroups {
|
|
|
+ wg.Wait()
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func execAllGroupActions(t *testing.T, e *Executor, cfg *config.Config, actions []*config.Action) ([]string, []*sync.WaitGroup) {
|
|
|
+ t.Helper()
|
|
|
+
|
|
|
+ trackings := make([]string, len(actions))
|
|
|
+ waitGroups := make([]*sync.WaitGroup, len(actions))
|
|
|
+
|
|
|
+ for idx, action := range actions {
|
|
|
+ wg, tracking := e.ExecRequest(&ExecutionRequest{
|
|
|
+ Binding: e.FindBindingWithNoEntity(action),
|
|
|
+ Cfg: cfg,
|
|
|
+ AuthenticatedUser: auth.UserFromSystem(cfg, "testuser"),
|
|
|
+ })
|
|
|
+ trackings[idx] = tracking
|
|
|
+ waitGroups[idx] = wg
|
|
|
+ }
|
|
|
+
|
|
|
+ return trackings, waitGroups
|
|
|
+}
|
|
|
+
|
|
|
+func groupExecutionDistributionMatches(e *Executor, trackings []string, wantRunning, wantQueued, wantBlocked int) bool {
|
|
|
+ running := countSnapshots(e, trackings, isRunningSnapshot)
|
|
|
+ queued := countSnapshots(e, trackings, func(snapshot LogEntrySnapshot) bool { return snapshot.Queued })
|
|
|
+ blocked := countSnapshots(e, trackings, func(snapshot LogEntrySnapshot) bool { return snapshot.Blocked })
|
|
|
+ return running == wantRunning && queued == wantQueued && blocked == wantBlocked
|
|
|
+}
|
|
|
+
|
|
|
+func countSnapshots(e *Executor, trackings []string, matches func(LogEntrySnapshot) bool) int {
|
|
|
+ count := 0
|
|
|
+
|
|
|
+ for _, tracking := range trackings {
|
|
|
+ snapshot, ok := e.SnapshotLog(tracking)
|
|
|
+ if ok && matches(snapshot) {
|
|
|
+ count++
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return count
|
|
|
+}
|
|
|
+
|
|
|
+func isRunningSnapshot(snapshot LogEntrySnapshot) bool {
|
|
|
+ return snapshot.ExecutionStarted && !snapshot.ExecutionFinished
|
|
|
+}
|
|
|
+
|
|
|
func TestUnknownActionGroupReferenceWarnsAndSkipsLimit(t *testing.T) {
|
|
|
t.Parallel()
|
|
|
|