4
0

api_queue.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. package api
  2. import (
  3. ctx "context"
  4. "sort"
  5. "connectrpc.com/connect"
  6. apiv1 "github.com/OliveTin/OliveTin/gen/olivetin/api/v1"
  7. "github.com/OliveTin/OliveTin/internal/auth"
  8. authpublic "github.com/OliveTin/OliveTin/internal/auth/authpublic"
  9. config "github.com/OliveTin/OliveTin/internal/config"
  10. "github.com/OliveTin/OliveTin/internal/executor"
  11. )
  12. const defaultActionGroupName = "default"
  13. type executionQueueBucketKey struct {
  14. groupName string
  15. bindingID string
  16. }
  17. func (api *oliveTinAPI) GetExecutionQueue(ctx ctx.Context, req *connect.Request[apiv1.GetExecutionQueueRequest]) (*connect.Response[apiv1.GetExecutionQueueResponse], error) {
  18. user := auth.UserFromApiCall(ctx, req, api.cfg)
  19. if err := api.checkDashboardAccess(user); err != nil {
  20. return nil, err
  21. }
  22. active := api.executor.GetActiveExecutionsACL(api.cfg, user)
  23. groups := buildExecutionQueueGroups(active, user, api)
  24. return connect.NewResponse(&apiv1.GetExecutionQueueResponse{
  25. Groups: groups,
  26. TotalActive: int32(len(active)),
  27. }), nil
  28. }
  29. func buildExecutionQueueGroups(active []*executor.InternalLogEntry, user *authpublic.AuthenticatedUser, api *oliveTinAPI) []*apiv1.ExecutionQueueGroup {
  30. actionBuckets := make(map[executionQueueBucketKey]*apiv1.ExecutionQueueAction)
  31. for _, entry := range active {
  32. addActiveEntryToActionBuckets(actionBuckets, entry, api.cfg, user, api)
  33. }
  34. return buildExecutionQueueGroupsFromBuckets(actionBuckets, api.cfg)
  35. }
  36. func addActiveEntryToActionBuckets(
  37. buckets map[executionQueueBucketKey]*apiv1.ExecutionQueueAction,
  38. entry *executor.InternalLogEntry,
  39. cfg *config.Config,
  40. user *authpublic.AuthenticatedUser,
  41. api *oliveTinAPI,
  42. ) {
  43. for _, groupName := range enforcedActionGroupNames(entry, cfg) {
  44. key := executionQueueBucketKey{
  45. groupName: groupName,
  46. bindingID: entry.GetBindingId(),
  47. }
  48. action := buckets[key]
  49. if action == nil {
  50. action = newExecutionQueueAction(entry)
  51. buckets[key] = action
  52. }
  53. action.Entries = append(action.Entries, api.internalLogEntryToPb(entry, user))
  54. }
  55. }
  56. func finalizeExecutionQueueGroup(group *apiv1.ExecutionQueueGroup) {
  57. sortExecutionQueueActions(group.Actions)
  58. group.ActiveCount = sumExecutionQueueActionEntries(group.Actions)
  59. group.QueuedCount = countQueuedGroupEntries(group.Actions)
  60. }
  61. func buildExecutionQueueGroupsFromBuckets(
  62. buckets map[executionQueueBucketKey]*apiv1.ExecutionQueueAction,
  63. cfg *config.Config,
  64. ) []*apiv1.ExecutionQueueGroup {
  65. grouped := make(map[string]*apiv1.ExecutionQueueGroup)
  66. for key, action := range buckets {
  67. sortQueueEntries(action.Entries)
  68. action.ActiveCount = int32(len(action.Entries))
  69. group := grouped[key.groupName]
  70. if group == nil {
  71. group = newExecutionQueueGroup(key.groupName, cfg)
  72. grouped[key.groupName] = group
  73. }
  74. group.Actions = append(group.Actions, action)
  75. }
  76. groups := make([]*apiv1.ExecutionQueueGroup, 0, len(grouped))
  77. for _, group := range grouped {
  78. finalizeExecutionQueueGroup(group)
  79. groups = append(groups, group)
  80. }
  81. sortExecutionQueueGroups(groups)
  82. return groups
  83. }
  84. func hasExecutionQueueBinding(entry *executor.InternalLogEntry, cfg *config.Config) bool {
  85. return entry != nil && entry.Binding != nil && entry.Binding.Action != nil && cfg != nil
  86. }
  87. func collectEnforcedActionGroupNames(groups []string, cfg *config.Config) []string {
  88. names := make([]string, 0, len(groups))
  89. for _, groupName := range groups {
  90. if isEnforcedActionGroup(cfg, groupName) {
  91. names = append(names, groupName)
  92. }
  93. }
  94. return names
  95. }
  96. func enforcedActionGroupNames(entry *executor.InternalLogEntry, cfg *config.Config) []string {
  97. if !hasExecutionQueueBinding(entry, cfg) {
  98. return []string{defaultActionGroupName}
  99. }
  100. names := collectEnforcedActionGroupNames(entry.Binding.Action.Groups, cfg)
  101. if len(names) == 0 {
  102. return []string{defaultActionGroupName}
  103. }
  104. return names
  105. }
  106. func isEnforcedActionGroup(cfg *config.Config, groupName string) bool {
  107. group, found := cfg.ActionGroups[groupName]
  108. return found && group != nil && group.MaxConcurrent >= 1
  109. }
  110. func newExecutionQueueGroup(name string, cfg *config.Config) *apiv1.ExecutionQueueGroup {
  111. group := &apiv1.ExecutionQueueGroup{Name: name}
  112. if name == defaultActionGroupName {
  113. return group
  114. }
  115. actionGroup, found := cfg.ActionGroups[name]
  116. if !found || actionGroup == nil {
  117. return group
  118. }
  119. group.Icon = actionGroup.Icon
  120. group.MaxConcurrent = int32(actionGroup.MaxConcurrent)
  121. group.QueueSize = int32(actionGroup.QueueSize)
  122. return group
  123. }
  124. func newExecutionQueueAction(entry *executor.InternalLogEntry) *apiv1.ExecutionQueueAction {
  125. action := &apiv1.ExecutionQueueAction{
  126. BindingId: entry.GetBindingId(),
  127. ActionTitle: entry.ActionTitle,
  128. ActionIcon: entry.ActionIcon,
  129. EntityPrefix: entry.EntityPrefix,
  130. }
  131. if entry.Binding != nil && entry.Binding.Action != nil {
  132. action.MaxConcurrent = int32(entry.Binding.Action.MaxConcurrent)
  133. }
  134. return action
  135. }
  136. func sumExecutionQueueActionEntries(actions []*apiv1.ExecutionQueueAction) int32 {
  137. var total int32
  138. for _, action := range actions {
  139. total += int32(len(action.Entries))
  140. }
  141. return total
  142. }
  143. func countQueuedGroupEntries(actions []*apiv1.ExecutionQueueAction) int32 {
  144. var total int32
  145. for _, action := range actions {
  146. for _, entry := range action.Entries {
  147. if entry.Queued {
  148. total++
  149. }
  150. }
  151. }
  152. return total
  153. }
  154. func sortQueueEntries(entries []*apiv1.LogEntry) {
  155. sort.Slice(entries, func(i, j int) bool {
  156. return entries[i].DatetimeStarted < entries[j].DatetimeStarted
  157. })
  158. }
  159. func sortExecutionQueueActions(actions []*apiv1.ExecutionQueueAction) {
  160. sort.Slice(actions, func(i, j int) bool {
  161. left := actions[i].ActionTitle
  162. right := actions[j].ActionTitle
  163. if left == right {
  164. return actions[i].EntityPrefix < actions[j].EntityPrefix
  165. }
  166. return left < right
  167. })
  168. }
  169. func sortExecutionQueueGroups(groups []*apiv1.ExecutionQueueGroup) {
  170. sort.Slice(groups, func(i, j int) bool {
  171. left := groups[i].Name
  172. right := groups[j].Name
  173. if left == defaultActionGroupName {
  174. return false
  175. }
  176. if right == defaultActionGroupName {
  177. return true
  178. }
  179. return left < right
  180. })
  181. }