group_concurrency_test.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586
  1. package executor
  2. import (
  3. "sync"
  4. "testing"
  5. "time"
  6. "github.com/OliveTin/OliveTin/internal/auth"
  7. config "github.com/OliveTin/OliveTin/internal/config"
  8. "github.com/stretchr/testify/assert"
  9. "github.com/stretchr/testify/require"
  10. )
  11. func testGroupExecutor(actions []*config.Action, groups map[string]*config.ActionGroup) (*Executor, *config.Config) {
  12. cfg := config.DefaultConfig()
  13. cfg.ActionGroups = groups
  14. cfg.Actions = actions
  15. cfg.Sanitize()
  16. e := DefaultExecutor(cfg)
  17. e.RebuildActionMap()
  18. return e, cfg
  19. }
  20. func TestGroupConcurrencyQueuesSecondAction(t *testing.T) {
  21. t.Parallel()
  22. slowAction := &config.Action{
  23. Title: "Unity Job 1",
  24. Shell: "sleep 2",
  25. Groups: []string{"unity"},
  26. }
  27. fastAction := &config.Action{
  28. Title: "Unity Job 2",
  29. Shell: "echo queued-run",
  30. Groups: []string{"unity"},
  31. }
  32. e, cfg := testGroupExecutor(
  33. []*config.Action{slowAction, fastAction},
  34. map[string]*config.ActionGroup{
  35. "unity": {MaxConcurrent: 1},
  36. },
  37. )
  38. binding1 := e.FindBindingWithNoEntity(slowAction)
  39. binding2 := e.FindBindingWithNoEntity(fastAction)
  40. require.NotNil(t, binding1)
  41. require.NotNil(t, binding2)
  42. wg1, tracking1 := e.ExecRequest(&ExecutionRequest{
  43. Binding: binding1,
  44. Cfg: cfg,
  45. AuthenticatedUser: auth.UserFromSystem(cfg, "testuser"),
  46. })
  47. waitUntilExecutionStarted(t, e, tracking1)
  48. wg2, tracking2 := e.ExecRequest(&ExecutionRequest{
  49. Binding: binding2,
  50. Cfg: cfg,
  51. AuthenticatedUser: auth.UserFromSystem(cfg, "testuser"),
  52. })
  53. require.Eventually(t, func() bool {
  54. snapshot, ok := e.SnapshotLog(tracking2)
  55. return ok && snapshot.Queued
  56. }, time.Second, 10*time.Millisecond)
  57. wg1.Wait()
  58. wg2.Wait()
  59. snapshot, ok := e.SnapshotLog(tracking2)
  60. require.True(t, ok)
  61. assert.False(t, snapshot.Queued)
  62. assert.False(t, snapshot.Blocked)
  63. assert.Equal(t, int32(0), snapshot.ExitCode)
  64. assert.Contains(t, snapshot.Output, "queued-run")
  65. }
  66. func TestQueuedActionNotifiesWhenExecutionBegins(t *testing.T) {
  67. t.Parallel()
  68. slowAction := &config.Action{
  69. Title: "Hold group",
  70. Shell: "sleep 1",
  71. Groups: []string{"unity"},
  72. }
  73. queuedAction := &config.Action{
  74. Title: "Queued job",
  75. Shell: "echo queued-run",
  76. Groups: []string{"unity"},
  77. }
  78. e, cfg := testGroupExecutor(
  79. []*config.Action{slowAction, queuedAction},
  80. map[string]*config.ActionGroup{
  81. "unity": {MaxConcurrent: 1},
  82. },
  83. )
  84. notifications := make(chan startedNotification, 8)
  85. e.AddListener(&executionStartedCollector{ch: notifications})
  86. wg1, tracking1 := e.ExecRequest(&ExecutionRequest{
  87. Binding: e.FindBindingWithNoEntity(slowAction),
  88. Cfg: cfg,
  89. AuthenticatedUser: auth.UserFromSystem(cfg, "testuser"),
  90. })
  91. waitUntilExecutionStarted(t, e, tracking1)
  92. wg2, tracking2 := e.ExecRequest(&ExecutionRequest{
  93. Binding: e.FindBindingWithNoEntity(queuedAction),
  94. Cfg: cfg,
  95. AuthenticatedUser: auth.UserFromSystem(cfg, "testuser"),
  96. })
  97. require.Eventually(t, func() bool {
  98. snapshot, ok := e.SnapshotLog(tracking2)
  99. return ok && snapshot.Queued
  100. }, time.Second, 10*time.Millisecond)
  101. wg1.Wait()
  102. wg2.Wait()
  103. sawQueuedStart, sawRunningStart := collectQueuedStartNotifications(notifications, tracking2)
  104. assert.True(t, sawQueuedStart, "queued action should notify when queued")
  105. assert.True(t, sawRunningStart, "queued action should notify again when execution begins")
  106. }
  107. func isQueuedStartNotification(notification startedNotification, trackingID string) bool {
  108. return notification.trackingID == trackingID && notification.queued && !notification.started
  109. }
  110. func isRunningStartNotification(notification startedNotification, trackingID string) bool {
  111. return notification.trackingID == trackingID && !notification.queued && notification.started
  112. }
  113. func collectQueuedStartNotifications(notifications <-chan startedNotification, trackingID string) (sawQueuedStart, sawRunningStart bool) {
  114. for len(notifications) > 0 {
  115. notification := <-notifications
  116. if isQueuedStartNotification(notification, trackingID) {
  117. sawQueuedStart = true
  118. }
  119. if isRunningStartNotification(notification, trackingID) {
  120. sawRunningStart = true
  121. }
  122. }
  123. return sawQueuedStart, sawRunningStart
  124. }
  125. func TestDifferentGroupsRunConcurrently(t *testing.T) {
  126. t.Parallel()
  127. actionA := &config.Action{
  128. Title: "Group A Job",
  129. Shell: "sleep 1",
  130. Groups: []string{"groupA"},
  131. }
  132. actionB := &config.Action{
  133. Title: "Group B Job",
  134. Shell: "echo group-b",
  135. Groups: []string{"groupB"},
  136. }
  137. e, cfg := testGroupExecutor(
  138. []*config.Action{actionA, actionB},
  139. map[string]*config.ActionGroup{
  140. "groupA": {MaxConcurrent: 1},
  141. "groupB": {MaxConcurrent: 1},
  142. },
  143. )
  144. wg1, tracking1 := e.ExecRequest(&ExecutionRequest{
  145. Binding: e.FindBindingWithNoEntity(actionA),
  146. Cfg: cfg,
  147. AuthenticatedUser: auth.UserFromSystem(cfg, "testuser"),
  148. })
  149. waitUntilExecutionStarted(t, e, tracking1)
  150. wg2, tracking2 := e.ExecRequest(&ExecutionRequest{
  151. Binding: e.FindBindingWithNoEntity(actionB),
  152. Cfg: cfg,
  153. AuthenticatedUser: auth.UserFromSystem(cfg, "testuser"),
  154. })
  155. require.Eventually(t, func() bool {
  156. snapshot, ok := e.SnapshotLog(tracking2)
  157. return ok && snapshot.ExecutionFinished && !snapshot.Queued
  158. }, 2*time.Second, 20*time.Millisecond)
  159. wg1.Wait()
  160. wg2.Wait()
  161. snapshot, ok := e.SnapshotLog(tracking2)
  162. require.True(t, ok)
  163. assert.Contains(t, snapshot.Output, "group-b")
  164. }
  165. func TestPerActionConcurrencyStillBlocksWithoutQueue(t *testing.T) {
  166. t.Parallel()
  167. action := &config.Action{
  168. Title: "Single binding",
  169. Shell: "sleep 1",
  170. MaxConcurrent: 1,
  171. }
  172. e, cfg := testGroupExecutor([]*config.Action{action}, nil)
  173. binding := e.FindBindingWithNoEntity(action)
  174. wg1, tracking1 := e.ExecRequest(&ExecutionRequest{
  175. Binding: binding,
  176. Cfg: cfg,
  177. AuthenticatedUser: auth.UserFromSystem(cfg, "testuser"),
  178. })
  179. waitUntilExecutionStarted(t, e, tracking1)
  180. wg2, tracking2 := e.ExecRequest(&ExecutionRequest{
  181. Binding: binding,
  182. Cfg: cfg,
  183. AuthenticatedUser: auth.UserFromSystem(cfg, "testuser"),
  184. })
  185. wg1.Wait()
  186. wg2.Wait()
  187. snapshot, ok := e.SnapshotLog(tracking2)
  188. require.True(t, ok)
  189. assert.True(t, snapshot.Blocked)
  190. assert.False(t, snapshot.Queued)
  191. }
  192. func TestGroupedSameBindingQueuesWhenGroupFull(t *testing.T) {
  193. t.Parallel()
  194. action := &config.Action{
  195. Title: "Single binding grouped",
  196. Shell: "sleep 1",
  197. Groups: []string{"unity"},
  198. }
  199. e, cfg := testGroupExecutor(
  200. []*config.Action{action},
  201. map[string]*config.ActionGroup{
  202. "unity": {MaxConcurrent: 1, QueueSize: 5},
  203. },
  204. )
  205. binding := e.FindBindingWithNoEntity(action)
  206. wg1, tracking1 := e.ExecRequest(&ExecutionRequest{
  207. Binding: binding,
  208. Cfg: cfg,
  209. AuthenticatedUser: auth.UserFromSystem(cfg, "testuser"),
  210. })
  211. waitUntilExecutionStarted(t, e, tracking1)
  212. wg2, tracking2 := e.ExecRequest(&ExecutionRequest{
  213. Binding: binding,
  214. Cfg: cfg,
  215. AuthenticatedUser: auth.UserFromSystem(cfg, "testuser"),
  216. })
  217. require.Eventually(t, func() bool {
  218. snapshot, ok := e.SnapshotLog(tracking2)
  219. return ok && snapshot.Queued
  220. }, time.Second, 10*time.Millisecond)
  221. wg1.Wait()
  222. wg2.Wait()
  223. snapshot, ok := e.SnapshotLog(tracking2)
  224. require.True(t, ok)
  225. assert.False(t, snapshot.Blocked)
  226. }
  227. func TestGroupAllowsTwoConcurrentSameBinding(t *testing.T) {
  228. t.Parallel()
  229. action := &config.Action{
  230. Title: "Long running action",
  231. Shell: "sleep 1",
  232. Groups: []string{"con2queue10"},
  233. }
  234. e, cfg := testGroupExecutor(
  235. []*config.Action{action},
  236. map[string]*config.ActionGroup{
  237. "con2queue10": {MaxConcurrent: 2, QueueSize: 10},
  238. },
  239. )
  240. binding := e.FindBindingWithNoEntity(action)
  241. wg1, tracking1 := e.ExecRequest(&ExecutionRequest{
  242. Binding: binding,
  243. Cfg: cfg,
  244. AuthenticatedUser: auth.UserFromSystem(cfg, "testuser"),
  245. })
  246. waitUntilExecutionStarted(t, e, tracking1)
  247. wg2, tracking2 := e.ExecRequest(&ExecutionRequest{
  248. Binding: binding,
  249. Cfg: cfg,
  250. AuthenticatedUser: auth.UserFromSystem(cfg, "testuser"),
  251. })
  252. require.Eventually(t, func() bool {
  253. snapshot, ok := e.SnapshotLog(tracking2)
  254. return ok && snapshot.ExecutionStarted && !snapshot.Queued && !snapshot.Blocked
  255. }, 2*time.Second, 10*time.Millisecond)
  256. wg1.Wait()
  257. wg2.Wait()
  258. snapshot, ok := e.SnapshotLog(tracking2)
  259. require.True(t, ok)
  260. assert.False(t, snapshot.Blocked)
  261. assert.False(t, snapshot.Queued)
  262. }
  263. func TestGroupQueuesThirdAndBlocksWhenQueueFull(t *testing.T) {
  264. t.Parallel()
  265. action := &config.Action{
  266. Title: "Long running action",
  267. Shell: "sleep 1",
  268. Groups: []string{"con2queue10"},
  269. }
  270. e, cfg := testGroupExecutor(
  271. []*config.Action{action},
  272. map[string]*config.ActionGroup{
  273. "con2queue10": {MaxConcurrent: 2, QueueSize: 2},
  274. },
  275. )
  276. binding := e.FindBindingWithNoEntity(action)
  277. wg1, tracking1 := e.ExecRequest(&ExecutionRequest{
  278. Binding: binding,
  279. Cfg: cfg,
  280. AuthenticatedUser: auth.UserFromSystem(cfg, "testuser"),
  281. })
  282. waitUntilExecutionStarted(t, e, tracking1)
  283. wg2, tracking2 := e.ExecRequest(&ExecutionRequest{
  284. Binding: binding,
  285. Cfg: cfg,
  286. AuthenticatedUser: auth.UserFromSystem(cfg, "testuser"),
  287. })
  288. waitUntilExecutionStarted(t, e, tracking2)
  289. trackings := []string{tracking1, tracking2}
  290. waitGroups := []*sync.WaitGroup{wg1, wg2}
  291. for idx := 0; idx < 3; idx++ {
  292. wg, tracking := e.ExecRequest(&ExecutionRequest{
  293. Binding: binding,
  294. Cfg: cfg,
  295. AuthenticatedUser: auth.UserFromSystem(cfg, "testuser"),
  296. })
  297. trackings = append(trackings, tracking)
  298. waitGroups = append(waitGroups, wg)
  299. }
  300. require.Eventually(t, func() bool {
  301. return groupExecutionDistributionMatches(e, trackings, 2, 2, 1)
  302. }, 2*time.Second, 20*time.Millisecond)
  303. for _, wg := range waitGroups {
  304. wg.Wait()
  305. }
  306. }
  307. func waitUntilExecutionStarted(t *testing.T, e *Executor, trackingID string) {
  308. t.Helper()
  309. require.Eventually(t, func() bool {
  310. snapshot, ok := e.SnapshotLog(trackingID)
  311. return ok && snapshot.ExecutionStarted
  312. }, 2*time.Second, 10*time.Millisecond)
  313. }
  314. type executionStartedCollector struct {
  315. ch chan startedNotification
  316. }
  317. type startedNotification struct {
  318. trackingID string
  319. started bool
  320. queued bool
  321. }
  322. func (c *executionStartedCollector) OnExecutionStarted(entry *InternalLogEntry) {
  323. c.ch <- startedNotification{
  324. trackingID: entry.ExecutionTrackingID,
  325. started: entry.ExecutionStarted,
  326. queued: entry.Queued,
  327. }
  328. }
  329. func (c *executionStartedCollector) OnExecutionFinished(_ *InternalLogEntry) {}
  330. func (c *executionStartedCollector) OnOutputChunk(_ []byte, _ string) {}
  331. func (c *executionStartedCollector) OnActionMapRebuilt() {}
  332. func assertWaitGroupPending(t *testing.T, wg *sync.WaitGroup) {
  333. t.Helper()
  334. done := make(chan struct{})
  335. go func() {
  336. wg.Wait()
  337. close(done)
  338. }()
  339. select {
  340. case <-done:
  341. t.Fatal("wait group completed before queued execution finished")
  342. case <-time.After(100 * time.Millisecond):
  343. }
  344. }
  345. func assertWaitGroupCompletes(t *testing.T, wg *sync.WaitGroup) {
  346. t.Helper()
  347. done := make(chan struct{})
  348. go func() {
  349. wg.Wait()
  350. close(done)
  351. }()
  352. select {
  353. case <-done:
  354. case <-time.After(3 * time.Second):
  355. t.Fatal("wait group did not complete after queue drained")
  356. }
  357. }
  358. func TestStartActionAndWaitWaitsForQueuedExecution(t *testing.T) {
  359. t.Parallel()
  360. first := &config.Action{
  361. Title: "Hold group",
  362. Shell: "sleep 1",
  363. Groups: []string{"unity"},
  364. }
  365. second := &config.Action{
  366. Title: "Wait in queue",
  367. Shell: "echo waited",
  368. Groups: []string{"unity"},
  369. }
  370. e, cfg := testGroupExecutor(
  371. []*config.Action{first, second},
  372. map[string]*config.ActionGroup{
  373. "unity": {MaxConcurrent: 1},
  374. },
  375. )
  376. wg1, tracking1 := e.ExecRequest(&ExecutionRequest{
  377. Binding: e.FindBindingWithNoEntity(first),
  378. Cfg: cfg,
  379. AuthenticatedUser: auth.UserFromSystem(cfg, "testuser"),
  380. })
  381. waitUntilExecutionStarted(t, e, tracking1)
  382. wg2, tracking2 := e.ExecRequest(&ExecutionRequest{
  383. Binding: e.FindBindingWithNoEntity(second),
  384. Cfg: cfg,
  385. AuthenticatedUser: auth.UserFromSystem(cfg, "testuser"),
  386. })
  387. assertWaitGroupPending(t, wg2)
  388. wg1.Wait()
  389. assertWaitGroupCompletes(t, wg2)
  390. snapshot, ok := e.SnapshotLog(tracking2)
  391. require.True(t, ok)
  392. assert.Contains(t, snapshot.Output, "waited")
  393. }
  394. func TestGroupQueueBlocksWhenQueueFull(t *testing.T) {
  395. t.Parallel()
  396. actions := []*config.Action{
  397. {Title: "Hold 1", Shell: "sleep 1", Groups: []string{"unity"}},
  398. {Title: "Hold 2", Shell: "sleep 1", Groups: []string{"unity"}},
  399. {Title: "Hold 3", Shell: "sleep 1", Groups: []string{"unity"}},
  400. {Title: "Hold 4", Shell: "sleep 1", Groups: []string{"unity"}},
  401. }
  402. e, cfg := testGroupExecutor(
  403. actions,
  404. map[string]*config.ActionGroup{
  405. "unity": {MaxConcurrent: 1, QueueSize: 2},
  406. },
  407. )
  408. wg1, tracking1 := e.ExecRequest(&ExecutionRequest{
  409. Binding: e.FindBindingWithNoEntity(actions[0]),
  410. Cfg: cfg,
  411. AuthenticatedUser: auth.UserFromSystem(cfg, "testuser"),
  412. })
  413. waitUntilExecutionStarted(t, e, tracking1)
  414. trackings := []string{tracking1}
  415. waitGroups := []*sync.WaitGroup{wg1}
  416. for _, action := range actions[1:] {
  417. wg, tracking := e.ExecRequest(&ExecutionRequest{
  418. Binding: e.FindBindingWithNoEntity(action),
  419. Cfg: cfg,
  420. AuthenticatedUser: auth.UserFromSystem(cfg, "testuser"),
  421. })
  422. trackings = append(trackings, tracking)
  423. waitGroups = append(waitGroups, wg)
  424. }
  425. require.Eventually(t, func() bool {
  426. return groupExecutionDistributionMatches(e, trackings, 1, 2, 1)
  427. }, 2*time.Second, 20*time.Millisecond)
  428. for _, wg := range waitGroups {
  429. wg.Wait()
  430. }
  431. }
  432. func groupExecutionDistributionMatches(e *Executor, trackings []string, wantRunning, wantQueued, wantBlocked int) bool {
  433. running := countSnapshots(e, trackings, isRunningSnapshot)
  434. queued := countSnapshots(e, trackings, func(snapshot LogEntrySnapshot) bool { return snapshot.Queued })
  435. blocked := countSnapshots(e, trackings, func(snapshot LogEntrySnapshot) bool { return snapshot.Blocked })
  436. return running == wantRunning && queued == wantQueued && blocked == wantBlocked
  437. }
  438. func countSnapshots(e *Executor, trackings []string, matches func(LogEntrySnapshot) bool) int {
  439. count := 0
  440. for _, tracking := range trackings {
  441. snapshot, ok := e.SnapshotLog(tracking)
  442. if ok && matches(snapshot) {
  443. count++
  444. }
  445. }
  446. return count
  447. }
  448. func isRunningSnapshot(snapshot LogEntrySnapshot) bool {
  449. return snapshot.ExecutionStarted && !snapshot.ExecutionFinished
  450. }
  451. func TestUnknownActionGroupReferenceWarnsAndSkipsLimit(t *testing.T) {
  452. t.Parallel()
  453. action := &config.Action{
  454. Title: "Unknown group action",
  455. Shell: "echo ok",
  456. Groups: []string{"missing"},
  457. }
  458. e, cfg := testGroupExecutor([]*config.Action{action}, map[string]*config.ActionGroup{})
  459. wg, tracking := e.ExecRequest(&ExecutionRequest{
  460. Binding: e.FindBindingWithNoEntity(action),
  461. Cfg: cfg,
  462. AuthenticatedUser: auth.UserFromSystem(cfg, "testuser"),
  463. })
  464. wg.Wait()
  465. snapshot, ok := e.SnapshotLog(tracking)
  466. require.True(t, ok)
  467. assert.False(t, snapshot.Queued)
  468. assert.Equal(t, int32(0), snapshot.ExitCode)
  469. }