group_concurrency.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278
  1. package executor
  2. import (
  3. "fmt"
  4. "slices"
  5. "sync"
  6. config "github.com/OliveTin/OliveTin/internal/config"
  7. log "github.com/sirupsen/logrus"
  8. )
  9. type groupLimit struct {
  10. name string
  11. maxConcurrent int
  12. queueSize int
  13. }
  14. type queuedExecution struct {
  15. req *ExecutionRequest
  16. wg *sync.WaitGroup
  17. }
  18. func actionGroupLimits(req *ExecutionRequest) []groupLimit {
  19. if !hasActionGroupContext(req) {
  20. return nil
  21. }
  22. limits := make([]groupLimit, 0, len(req.Binding.Action.Groups))
  23. for _, groupName := range req.Binding.Action.Groups {
  24. if limit, ok := groupLimitFromConfig(req.Cfg, groupName); ok {
  25. limits = append(limits, limit)
  26. }
  27. }
  28. return limits
  29. }
  30. func hasActionGroupContext(req *ExecutionRequest) bool {
  31. return req != nil && req.Binding != nil && req.Binding.Action != nil && req.Cfg != nil
  32. }
  33. func groupLimitFromConfig(cfg *config.Config, groupName string) (groupLimit, bool) {
  34. group, found := cfg.ActionGroups[groupName]
  35. if !found || group == nil || group.MaxConcurrent < 1 {
  36. return groupLimit{}, false
  37. }
  38. return groupLimit{
  39. name: groupName,
  40. maxConcurrent: group.MaxConcurrent,
  41. queueSize: group.QueueSize,
  42. }, true
  43. }
  44. func actionNeedsGroupLimit(req *ExecutionRequest) bool {
  45. return len(actionGroupLimits(req)) > 0
  46. }
  47. func actionInGroup(action *config.Action, groupName string) bool {
  48. if action == nil {
  49. return false
  50. }
  51. return slices.Contains(action.Groups, groupName)
  52. }
  53. func (e *Executor) countActiveInGroup(groupName string) int {
  54. e.logmutex.RLock()
  55. defer e.logmutex.RUnlock()
  56. return e.countActiveInGroupLocked(groupName)
  57. }
  58. func (e *Executor) countActiveInGroupLocked(groupName string) int {
  59. count := 0
  60. for _, logEntry := range e.logs {
  61. if logEntryIsActiveInGroup(logEntry, groupName) {
  62. count++
  63. }
  64. }
  65. return count
  66. }
  67. func (e *Executor) countQueuedInGroupLocked(groupName string) int {
  68. count := 0
  69. for _, logEntry := range e.logs {
  70. if queuedLogEntryInGroup(logEntry, groupName) {
  71. count++
  72. }
  73. }
  74. return count
  75. }
  76. func queuedLogEntryInGroup(logEntry *InternalLogEntry, groupName string) bool {
  77. if !logEntryIsBound(logEntry) {
  78. return false
  79. }
  80. if !logEntry.Queued || logEntry.ExecutionFinished {
  81. return false
  82. }
  83. return actionInGroup(logEntry.Binding.Action, groupName)
  84. }
  85. func logEntryIsBound(logEntry *InternalLogEntry) bool {
  86. return logEntry != nil && logEntry.Binding != nil && logEntry.Binding.Action != nil
  87. }
  88. func groupIsAtActiveCapacity(activeCount int, limit groupLimit) bool {
  89. return activeCount >= (limit.maxConcurrent + 1)
  90. }
  91. func (e *Executor) fullGroupWithQueueExceededLocked(req *ExecutionRequest) string {
  92. for _, limit := range actionGroupLimits(req) {
  93. if !groupIsAtActiveCapacity(e.countActiveInGroupLocked(limit.name), limit) {
  94. continue
  95. }
  96. if e.countQueuedInGroupLocked(limit.name) >= limit.queueSize {
  97. return limit.name
  98. }
  99. }
  100. return ""
  101. }
  102. func (e *Executor) blockRequestForGroupQueue(req *ExecutionRequest, groupName string) {
  103. log.WithFields(log.Fields{
  104. "actionTitle": req.logEntry.ActionTitle,
  105. "groupName": groupName,
  106. }).Warnf("Blocked from executing due to action group queue limit")
  107. req.mutateLogEntry(func(entry *InternalLogEntry) {
  108. entry.Output = fmt.Sprintf("Blocked from executing due to action group %q queue limit", groupName)
  109. entry.Blocked = true
  110. })
  111. }
  112. func logEntryIsActiveInGroup(logEntry *InternalLogEntry, groupName string) bool {
  113. if inactiveLogEntry(logEntry) {
  114. return false
  115. }
  116. return actionInGroup(logEntry.Binding.Action, groupName)
  117. }
  118. func inactiveLogEntry(logEntry *InternalLogEntry) bool {
  119. if logEntry == nil {
  120. return true
  121. }
  122. return logEntryIsInactive(logEntry)
  123. }
  124. func logEntryIsInactive(logEntry *InternalLogEntry) bool {
  125. if logEntry.ExecutionFinished || logEntry.Queued {
  126. return true
  127. }
  128. return logEntry.Binding == nil || logEntry.Binding.Action == nil
  129. }
  130. func (e *Executor) groupsHaveCapacityForActive(req *ExecutionRequest) bool {
  131. for _, limit := range actionGroupLimits(req) {
  132. if e.countActiveInGroup(limit.name) >= (limit.maxConcurrent + 1) {
  133. return false
  134. }
  135. }
  136. return true
  137. }
  138. func (e *Executor) groupsHaveCapacityForQueued(req *ExecutionRequest) bool {
  139. for _, limit := range actionGroupLimits(req) {
  140. if e.countActiveInGroup(limit.name) >= limit.maxConcurrent {
  141. return false
  142. }
  143. }
  144. return true
  145. }
  146. func firstFullGroupName(e *Executor, req *ExecutionRequest) string {
  147. for _, limit := range actionGroupLimits(req) {
  148. if e.countActiveInGroup(limit.name) >= (limit.maxConcurrent + 1) {
  149. return limit.name
  150. }
  151. }
  152. return ""
  153. }
  154. func firstFullGroupNameLocked(e *Executor, req *ExecutionRequest) string {
  155. for _, limit := range actionGroupLimits(req) {
  156. if e.countActiveInGroupLocked(limit.name) >= (limit.maxConcurrent + 1) {
  157. return limit.name
  158. }
  159. }
  160. return ""
  161. }
  162. func (e *Executor) queueRequest(req *ExecutionRequest, wg *sync.WaitGroup) bool {
  163. e.groupQueueMu.Lock()
  164. e.logmutex.RLock()
  165. groupName := e.fullGroupWithQueueExceededLocked(req)
  166. e.logmutex.RUnlock()
  167. if groupName != "" {
  168. e.groupQueueMu.Unlock()
  169. e.blockRequestForGroupQueue(req, groupName)
  170. return true
  171. }
  172. var waitingForGroup string
  173. req.mutateLogEntry(func(entry *InternalLogEntry) {
  174. waitingForGroup = firstFullGroupNameLocked(e, req)
  175. entry.Queued = true
  176. entry.QueuedForGroup = waitingForGroup
  177. entry.Output = fmt.Sprintf("Queued waiting for action group %q", waitingForGroup)
  178. })
  179. e.groupQueue = append(e.groupQueue, &queuedExecution{req: req, wg: wg})
  180. e.groupQueueMu.Unlock()
  181. e.drainGroupQueue()
  182. log.WithFields(log.Fields{
  183. "actionTitle": req.logEntry.ActionTitle,
  184. "groupName": waitingForGroup,
  185. }).Infof("Action queued due to action group concurrency limit")
  186. return false
  187. }
  188. func (e *Executor) drainGroupQueue() {
  189. e.groupQueueMu.Lock()
  190. if len(e.groupQueue) == 0 {
  191. e.groupQueueMu.Unlock()
  192. return
  193. }
  194. next := e.groupQueue[0]
  195. if !e.groupsHaveCapacityForQueued(next.req) {
  196. e.groupQueueMu.Unlock()
  197. return
  198. }
  199. e.groupQueue = e.groupQueue[1:]
  200. next.req.mutateLogEntry(func(entry *InternalLogEntry) {
  201. entry.Queued = false
  202. entry.QueuedForGroup = ""
  203. })
  204. e.groupQueueMu.Unlock()
  205. go e.runDequeuedExecution(next)
  206. }
  207. func (e *Executor) runDequeuedExecution(queued *queuedExecution) {
  208. req := queued.req
  209. req.skipRequestRegistration = true
  210. e.runExecutionSteps(req)
  211. e.finishExecChain(req)
  212. queued.wg.Done()
  213. }