4
0

executor.go 36 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379
  1. package executor
  2. import (
  3. acl "github.com/OliveTin/OliveTin/internal/acl"
  4. "github.com/OliveTin/OliveTin/internal/auth"
  5. authpublic "github.com/OliveTin/OliveTin/internal/auth/authpublic"
  6. config "github.com/OliveTin/OliveTin/internal/config"
  7. "github.com/OliveTin/OliveTin/internal/entities"
  8. "github.com/OliveTin/OliveTin/internal/logfilter"
  9. "github.com/OliveTin/OliveTin/internal/tpl"
  10. "github.com/google/uuid"
  11. log "github.com/sirupsen/logrus"
  12. "gopkg.in/yaml.v3"
  13. "bytes"
  14. "context"
  15. "fmt"
  16. "os"
  17. "os/exec"
  18. "path"
  19. "regexp"
  20. "strings"
  21. "sync"
  22. "time"
  23. )
  24. const (
  25. DefaultExitCodeNotExecuted = -1337
  26. MaxTriggerDepth = 10
  27. )
  28. var validTrackingIDPattern = regexp.MustCompile(`^[a-fA-F0-9\-]+$`)
  29. func isValidTrackingID(id string) bool {
  30. const MaxTrackingIDLength = 36
  31. return id != "" && len(id) <= MaxTrackingIDLength && validTrackingIDPattern.MatchString(id)
  32. }
  33. type ActionBinding struct {
  34. ID string
  35. Action *config.Action
  36. Entity *entities.Entity
  37. ConfigOrder int
  38. OnDashboards []DashboardNavigationTarget
  39. }
  40. // Executor represents a helper class for executing commands. It's main method
  41. // is ExecRequest
  42. type Executor struct {
  43. logs map[string]*InternalLogEntry
  44. logsTrackingIdsByDate []string
  45. LogsByBindingId map[string][]*InternalLogEntry
  46. logmutex sync.RWMutex
  47. MapActionBindings map[string]*ActionBinding
  48. MapActionBindingsLock sync.RWMutex
  49. Cfg *config.Config
  50. listeners []listener
  51. listenersMu sync.RWMutex
  52. chainOfCommand []executorStepFunc
  53. groupQueue []*queuedExecution
  54. groupQueueMu sync.Mutex
  55. }
  56. // ExecutionRequest is a request to execute an action. It's passed to an
  57. // Executor. They're created from the api.
  58. type ExecutionRequest struct {
  59. Binding *ActionBinding
  60. Arguments map[string]string
  61. TrackingID string
  62. Tags []string
  63. Cfg *config.Config
  64. AuthenticatedUser *authpublic.AuthenticatedUser
  65. TriggerDepth int
  66. Justification string
  67. logEntry *InternalLogEntry
  68. finalParsedCommand string
  69. execArgs []string
  70. useDirectExec bool
  71. executor *Executor
  72. skipRequestRegistration bool
  73. }
  74. func (req *ExecutionRequest) mutateLogEntry(mutator func(*InternalLogEntry)) {
  75. if req.executor == nil {
  76. mutator(req.logEntry)
  77. return
  78. }
  79. req.executor.logmutex.Lock()
  80. defer req.executor.logmutex.Unlock()
  81. mutator(req.logEntry)
  82. }
  83. // LogEntrySnapshot is a copy of selected log entry fields for race-safe reads.
  84. type LogEntrySnapshot struct {
  85. Queued bool
  86. Blocked bool
  87. ExecutionStarted bool
  88. ExecutionFinished bool
  89. ExitCode int32
  90. Output string
  91. }
  92. // SnapshotLog returns a copy of selected log entry fields under read lock.
  93. func (e *Executor) SnapshotLog(trackingID string) (LogEntrySnapshot, bool) {
  94. e.logmutex.RLock()
  95. defer e.logmutex.RUnlock()
  96. entry, found := e.logs[trackingID]
  97. if !found {
  98. return LogEntrySnapshot{}, false
  99. }
  100. return LogEntrySnapshot{
  101. Queued: entry.Queued,
  102. Blocked: entry.Blocked,
  103. ExecutionStarted: entry.ExecutionStarted,
  104. ExecutionFinished: entry.ExecutionFinished,
  105. ExitCode: entry.ExitCode,
  106. Output: entry.Output,
  107. }, true
  108. }
  109. // InternalLogEntry objects are created by an Executor, and represent the final
  110. // state of execution (even if the command is not executed). It's designed to be
  111. // easily serializable.
  112. type InternalLogEntry struct {
  113. Binding *ActionBinding
  114. DatetimeStarted time.Time
  115. DatetimeFinished time.Time
  116. Output string
  117. TimedOut bool
  118. Blocked bool
  119. Queued bool
  120. QueuedForGroup string
  121. ExitCode int32
  122. Tags []string
  123. ExecutionStarted bool
  124. ExecutionFinished bool
  125. ExecutionTrackingID string
  126. Process *os.Process
  127. Username string
  128. Index int64
  129. EntityPrefix string
  130. ActionConfigTitle string // This is the title of the action as defined in the config, not the final parsed title.
  131. /*
  132. The following 3 properties are obviously on Action normally, but it's useful
  133. that logs are lightweight (so we don't need to have an action associated to
  134. logs, etc. Therefore, we duplicate those values here.
  135. */
  136. ActionTitle string
  137. ActionIcon string
  138. Justification string
  139. Arguments map[string]string
  140. }
  141. // .Binding can be nil, so we need to handle that.
  142. func (e *InternalLogEntry) GetBindingId() string {
  143. if e.Binding == nil {
  144. return ""
  145. }
  146. return e.Binding.ID
  147. }
  148. type executorStepFunc func(*ExecutionRequest) bool
  149. // DefaultExecutor returns an Executor, with a sensible "chain of command" for
  150. // executing actions.
  151. func DefaultExecutor(cfg *config.Config) *Executor {
  152. e := Executor{}
  153. e.Cfg = cfg
  154. e.logs = make(map[string]*InternalLogEntry)
  155. e.logsTrackingIdsByDate = make([]string, 0)
  156. e.LogsByBindingId = make(map[string][]*InternalLogEntry)
  157. e.MapActionBindings = make(map[string]*ActionBinding)
  158. e.chainOfCommand = []executorStepFunc{
  159. stepRequestAction,
  160. stepConcurrencyCheck,
  161. stepRateCheck,
  162. stepACLCheck,
  163. stepParseArgs,
  164. stepLogStart,
  165. stepExec,
  166. stepExecAfter,
  167. stepLogFinish,
  168. stepSaveLog,
  169. stepTrigger,
  170. }
  171. return &e
  172. }
  173. type listener interface {
  174. OnExecutionStarted(logEntry *InternalLogEntry)
  175. OnExecutionFinished(logEntry *InternalLogEntry)
  176. OnOutputChunk(o []byte, executionTrackingId string)
  177. OnActionMapRebuilt()
  178. }
  179. func (e *Executor) AddListener(m listener) {
  180. e.listenersMu.Lock()
  181. defer e.listenersMu.Unlock()
  182. e.listeners = append(e.listeners, m)
  183. }
  184. func (e *Executor) copyListeners() []listener {
  185. e.listenersMu.RLock()
  186. defer e.listenersMu.RUnlock()
  187. out := make([]listener, len(e.listeners))
  188. copy(out, e.listeners)
  189. return out
  190. }
  191. // getPagingStartIndex calculates the starting index for log pagination.
  192. // Parameters:
  193. //
  194. // startOffset: The offset from the most recent log (0 means start from the most recent)
  195. // totalLogCount: Total number of logs available
  196. // count: Number of logs to retrieve
  197. //
  198. // Returns: The calculated starting index for pagination
  199. func getPagingStartIndex(startOffset int64, totalLogCount int64) int64 {
  200. var startIndex int64
  201. if startOffset <= 0 {
  202. startIndex = totalLogCount
  203. } else {
  204. startIndex = (totalLogCount - startOffset)
  205. if startIndex < 0 {
  206. startIndex = 1
  207. }
  208. }
  209. return startIndex - 1
  210. }
  211. type PagingResult struct {
  212. CountRemaining int64
  213. PageSize int64
  214. TotalCount int64
  215. StartOffset int64
  216. }
  217. func (e *Executor) GetLogTrackingIds(startOffset int64, pageCount int64) ([]*InternalLogEntry, *PagingResult) {
  218. pagingResult := &PagingResult{
  219. CountRemaining: 0,
  220. PageSize: pageCount,
  221. TotalCount: 0,
  222. StartOffset: startOffset,
  223. }
  224. e.logmutex.RLock()
  225. totalLogCount := int64(len(e.logsTrackingIdsByDate))
  226. pagingResult.TotalCount = totalLogCount
  227. startIndex := getPagingStartIndex(startOffset, totalLogCount)
  228. pageCount = min(totalLogCount, pageCount)
  229. endIndex := max(0, (startIndex-pageCount)+1)
  230. log.WithFields(log.Fields{
  231. "startOffset": startOffset,
  232. "pageCount": pageCount,
  233. "total": totalLogCount,
  234. "startIndex": startIndex,
  235. "endIndex": endIndex,
  236. }).Tracef("GetLogTrackingIds")
  237. trackingIds := make([]*InternalLogEntry, 0, pageCount)
  238. if totalLogCount > 0 {
  239. for i := startIndex; i >= endIndex; i-- {
  240. trackingIds = append(trackingIds, e.logs[e.logsTrackingIdsByDate[i]])
  241. }
  242. }
  243. e.logmutex.RUnlock()
  244. pagingResult.CountRemaining = endIndex
  245. return trackingIds, pagingResult
  246. }
  247. func isValidLogEntryForACL(entry *InternalLogEntry) bool {
  248. return entry != nil && entry.Binding != nil && entry.Binding.Action != nil
  249. }
  250. func isLogEntryAllowedByACL(cfg *config.Config, user *authpublic.AuthenticatedUser, entry *InternalLogEntry) bool {
  251. return acl.IsAllowedLogs(cfg, user, entry.Binding.Action)
  252. }
  253. func (e *Executor) filterLogsByACL(cfg *config.Config, user *authpublic.AuthenticatedUser, dateFilter string) []*InternalLogEntry {
  254. e.logmutex.RLock()
  255. defer e.logmutex.RUnlock()
  256. filtered := make([]*InternalLogEntry, 0, len(e.logsTrackingIdsByDate))
  257. filterDate, hasDateFilter := parseDateFilter(dateFilter)
  258. for _, trackingId := range e.logsTrackingIdsByDate {
  259. entry := e.logs[trackingId]
  260. if shouldIncludeLogEntry(cfg, user, entry, filterDate, hasDateFilter) {
  261. filtered = append(filtered, entry)
  262. }
  263. }
  264. return filtered
  265. }
  266. // parseDateFilter parses the date filter string and returns filter information.
  267. func parseDateFilter(dateFilter string) (filterDate time.Time, hasDateFilter bool) {
  268. if dateFilter == "" {
  269. return time.Time{}, false
  270. }
  271. parsedDate, err := time.Parse("2006-01-02", dateFilter)
  272. if err != nil {
  273. log.WithFields(log.Fields{
  274. "dateFilter": dateFilter,
  275. "error": err,
  276. }).Errorf("Failed to parse date filter, expected format YYYY-MM-DD")
  277. return time.Time{}, false
  278. }
  279. return parsedDate, true
  280. }
  281. // shouldIncludeLogEntry determines if a log entry should be included based on ACL and date filter.
  282. func shouldIncludeLogEntry(cfg *config.Config, user *authpublic.AuthenticatedUser, entry *InternalLogEntry, filterDate time.Time, hasDateFilter bool) bool {
  283. if !isValidLogEntryForACL(entry) {
  284. return false
  285. }
  286. if !isLogEntryAllowedByACL(cfg, user, entry) {
  287. return false
  288. }
  289. return matchesDateFilter(entry, filterDate, hasDateFilter)
  290. }
  291. // matchesDateFilter checks if the log entry matches the date filter.
  292. func matchesDateFilter(entry *InternalLogEntry, filterDate time.Time, hasDateFilter bool) bool {
  293. if !hasDateFilter {
  294. return true
  295. }
  296. entryDate := entry.DatetimeStarted.UTC().Truncate(24 * time.Hour)
  297. filterDateUTC := filterDate.UTC().Truncate(24 * time.Hour)
  298. return entryDate.Equal(filterDateUTC)
  299. }
  300. // paginateFilteredLogs applies pagination to a filtered list of logs and returns
  301. // the paginated results along with pagination metadata.
  302. func paginateFilteredLogs(filtered []*InternalLogEntry, startOffset int64, pageCount int64) ([]*InternalLogEntry, *PagingResult) {
  303. total := int64(len(filtered))
  304. paging := &PagingResult{PageSize: pageCount, TotalCount: total, StartOffset: startOffset}
  305. if total == 0 {
  306. paging.CountRemaining = 0
  307. return []*InternalLogEntry{}, paging
  308. }
  309. startIndex := getPagingStartIndex(startOffset, total)
  310. pageCount = min(total, pageCount)
  311. endIndex := max(0, (startIndex-pageCount)+1)
  312. out := make([]*InternalLogEntry, 0, pageCount)
  313. for i := startIndex; i >= endIndex && i < int64(len(filtered)); i-- {
  314. out = append(out, filtered[i])
  315. }
  316. paging.CountRemaining = endIndex
  317. return out, paging
  318. }
  319. // GetLogTrackingIdsACL returns logs filtered by ACL visibility for the user and
  320. // paginated correctly based on the filtered set.
  321. // dateFilter is optional and should be in YYYY-MM-DD format. If empty, no date filtering is applied.
  322. // expressionFilter is an optional filter expression applied after ACL checks.
  323. func (e *Executor) GetLogTrackingIdsACL(cfg *config.Config, user *authpublic.AuthenticatedUser, startOffset int64, pageCount int64, dateFilter string, expressionFilter string) ([]*InternalLogEntry, *PagingResult, error) {
  324. filtered := e.filterLogsByACL(cfg, user, dateFilter)
  325. program, err := logfilter.Compile(expressionFilter)
  326. if err != nil {
  327. return nil, nil, err
  328. }
  329. filtered, err = applyLogFilter(filtered, program)
  330. if err != nil {
  331. return nil, nil, err
  332. }
  333. logs, paging := paginateFilteredLogs(filtered, startOffset, pageCount)
  334. return logs, paging, nil
  335. }
  336. func (e *Executor) GetLog(trackingID string) (*InternalLogEntry, bool) {
  337. e.logmutex.RLock()
  338. entry, found := e.logs[trackingID]
  339. e.logmutex.RUnlock()
  340. return entry, found
  341. }
  342. func (e *Executor) GetLogsByBindingId(bindingId string) []*InternalLogEntry {
  343. e.logmutex.RLock()
  344. logs, found := e.LogsByBindingId[bindingId]
  345. e.logmutex.RUnlock()
  346. if !found {
  347. return make([]*InternalLogEntry, 0)
  348. }
  349. return logs
  350. }
  351. // shouldCountExecution checks if a log entry should be counted for rate limiting.
  352. func shouldCountExecution(logEntry *InternalLogEntry, windowStart time.Time) bool {
  353. return !logEntry.Blocked && !logEntry.Queued && logEntry.DatetimeStarted.After(windowStart)
  354. }
  355. // updateOldestExecution updates the oldest execution time if this entry is older.
  356. func updateOldestExecution(oldestExecutionTime **time.Time, logEntry *InternalLogEntry) {
  357. if *oldestExecutionTime == nil {
  358. *oldestExecutionTime = &logEntry.DatetimeStarted
  359. } else if logEntry.DatetimeStarted.Before(**oldestExecutionTime) {
  360. *oldestExecutionTime = &logEntry.DatetimeStarted
  361. }
  362. }
  363. // findOldestExecutionInWindow finds the oldest execution within the time window and counts executions.
  364. // Returns the count of executions and the oldest execution time, or nil if none found.
  365. func findOldestExecutionInWindow(logs []*InternalLogEntry, windowStart time.Time) (int, *time.Time) {
  366. executions := 0
  367. var oldestExecutionTime *time.Time
  368. for _, logEntry := range logs {
  369. if !shouldCountExecution(logEntry, windowStart) {
  370. continue
  371. }
  372. executions++
  373. updateOldestExecution(&oldestExecutionTime, logEntry)
  374. }
  375. return executions, oldestExecutionTime
  376. }
  377. // calculateExpiryTime calculates when the oldest execution will fall outside the rate limit window.
  378. func calculateExpiryTime(oldestExecutionTime time.Time, duration time.Duration, now time.Time) time.Time {
  379. expiryTime := oldestExecutionTime.Add(duration)
  380. if !expiryTime.After(now) {
  381. return time.Time{}
  382. }
  383. return expiryTime
  384. }
  385. // updateMaxExpiryTime updates maxExpiryTime if expiryTime is later.
  386. func updateMaxExpiryTime(maxExpiryTime *time.Time, expiryTime time.Time) {
  387. if expiryTime.IsZero() {
  388. return
  389. }
  390. if maxExpiryTime.IsZero() || expiryTime.After(*maxExpiryTime) {
  391. *maxExpiryTime = expiryTime
  392. }
  393. }
  394. // calculateExpiryForRate calculates the expiry time for a single rate limit rule.
  395. // Returns the expiry time if the rate limit is exceeded, or zero time if not.
  396. func calculateExpiryForRate(rate config.RateSpec, logs []*InternalLogEntry, now time.Time) time.Time {
  397. duration := parseDuration(rate)
  398. if duration <= 0 {
  399. return time.Time{}
  400. }
  401. windowStart := now.Add(-duration)
  402. executions, oldestExecutionTime := findOldestExecutionInWindow(logs, windowStart)
  403. if executions < rate.Limit || oldestExecutionTime == nil {
  404. return time.Time{}
  405. }
  406. return calculateExpiryTime(*oldestExecutionTime, duration, now)
  407. }
  408. // getLogsForBinding retrieves logs for a binding ID.
  409. func (e *Executor) getLogsForBinding(bindingId string) []*InternalLogEntry {
  410. e.logmutex.RLock()
  411. logs, found := e.LogsByBindingId[bindingId]
  412. e.logmutex.RUnlock()
  413. if !found || len(logs) == 0 {
  414. return nil
  415. }
  416. return logs
  417. }
  418. // calculateMaxExpiryTimeFromRates calculates the maximum expiry time across all rate limit rules.
  419. func calculateMaxExpiryTimeFromRates(rates []config.RateSpec, logs []*InternalLogEntry, now time.Time) time.Time {
  420. var maxExpiryTime time.Time
  421. for _, rate := range rates {
  422. expiryTime := calculateExpiryForRate(rate, logs, now)
  423. updateMaxExpiryTime(&maxExpiryTime, expiryTime)
  424. }
  425. return maxExpiryTime
  426. }
  427. // GetTimeUntilAvailable calculates when an action will be available again based on rate limits.
  428. // Returns the Unix timestamp in seconds when the rate limit expires, or 0 if the action is available now.
  429. func (e *Executor) GetTimeUntilAvailable(binding *ActionBinding) int64 {
  430. if len(binding.Action.MaxRate) == 0 {
  431. return 0
  432. }
  433. logs := e.getLogsForBinding(binding.ID)
  434. if logs == nil {
  435. return 0
  436. }
  437. maxExpiryTime := calculateMaxExpiryTimeFromRates(binding.Action.MaxRate, logs, time.Now())
  438. if maxExpiryTime.IsZero() {
  439. return 0
  440. }
  441. return maxExpiryTime.Unix()
  442. }
  443. func (e *Executor) SetLog(trackingID string, entry *InternalLogEntry) string {
  444. e.logmutex.Lock()
  445. defer e.logmutex.Unlock()
  446. if _, found := e.logs[trackingID]; found || !isValidTrackingID(trackingID) {
  447. trackingID = uuid.NewString()
  448. entry.ExecutionTrackingID = trackingID
  449. }
  450. entry.Index = int64(len(e.logsTrackingIdsByDate))
  451. e.logs[trackingID] = entry
  452. e.logsTrackingIdsByDate = append(e.logsTrackingIdsByDate, trackingID)
  453. return trackingID
  454. }
  455. // ExecRequest processes an ExecutionRequest
  456. func (e *Executor) ExecRequest(req *ExecutionRequest) (*sync.WaitGroup, string) {
  457. e.initializeExecRequest(req)
  458. log.Tracef("executor.ExecRequest(): trackingID=%s bindingID=%s", req.TrackingID, bindingIDForTrace(req))
  459. req.TrackingID = e.SetLog(req.TrackingID, req.logEntry)
  460. wg := new(sync.WaitGroup)
  461. wg.Add(1)
  462. go func() {
  463. queued := e.execChain(req, wg)
  464. if !queued {
  465. wg.Done()
  466. }
  467. }()
  468. return wg, req.TrackingID
  469. }
  470. func (e *Executor) initializeExecRequest(req *ExecutionRequest) {
  471. if req.AuthenticatedUser == nil {
  472. req.AuthenticatedUser = auth.UserGuest(req.Cfg)
  473. }
  474. req.executor = e
  475. req.logEntry = &InternalLogEntry{
  476. Binding: req.Binding,
  477. DatetimeStarted: time.Now(),
  478. ExecutionTrackingID: req.TrackingID,
  479. Output: "",
  480. ExitCode: DefaultExitCodeNotExecuted,
  481. ExecutionStarted: false,
  482. ExecutionFinished: false,
  483. ActionTitle: "notfound",
  484. ActionIcon: "&#x1f4a9;",
  485. Username: req.AuthenticatedUser.Username,
  486. }
  487. }
  488. func bindingIDForTrace(req *ExecutionRequest) string {
  489. if req.Binding == nil {
  490. return ""
  491. }
  492. return req.Binding.ID
  493. }
  494. func (e *Executor) execChain(req *ExecutionRequest, wg *sync.WaitGroup) bool {
  495. if !req.skipRequestRegistration {
  496. finished, queued := e.registerOrQueueRequest(req, wg)
  497. if finished || queued {
  498. return queued
  499. }
  500. }
  501. e.runExecutionSteps(req)
  502. e.finishExecChain(req)
  503. return false
  504. }
  505. func (e *Executor) registerOrQueueRequest(req *ExecutionRequest, wg *sync.WaitGroup) (finished bool, queued bool) {
  506. if !stepRequestAction(req) {
  507. e.finishExecChain(req)
  508. return true, false
  509. }
  510. if e.finishIfConcurrencyBlocked(req) {
  511. return true, false
  512. }
  513. return e.queueRequestIfGroupLimited(req, wg)
  514. }
  515. func (e *Executor) finishIfConcurrencyBlocked(req *ExecutionRequest) bool {
  516. if actionNeedsGroupLimit(req) {
  517. return false
  518. }
  519. if stepConcurrencyCheck(req) {
  520. return false
  521. }
  522. e.finishExecChain(req)
  523. return true
  524. }
  525. func (e *Executor) queueRequestIfGroupLimited(req *ExecutionRequest, wg *sync.WaitGroup) (finished bool, queued bool) {
  526. if !actionNeedsGroupLimit(req) || e.groupsHaveCapacityForActive(req) {
  527. return false, false
  528. }
  529. return e.queueRequestAfterACL(req, wg)
  530. }
  531. func (e *Executor) queueRequestAfterACL(req *ExecutionRequest, wg *sync.WaitGroup) (finished bool, queued bool) {
  532. if !stepACLCheck(req) {
  533. e.finishExecChain(req)
  534. return true, false
  535. }
  536. if e.queueRequest(req, wg) {
  537. e.finishExecChain(req)
  538. return true, false
  539. }
  540. notifyListenersStarted(req)
  541. return false, true
  542. }
  543. func (e *Executor) runExecutionSteps(req *ExecutionRequest) {
  544. for _, step := range e.chainOfCommand[1:] {
  545. if !step(req) {
  546. break
  547. }
  548. }
  549. }
  550. func (e *Executor) finishExecChain(req *ExecutionRequest) {
  551. req.mutateLogEntry(func(entry *InternalLogEntry) {
  552. if entry.DatetimeFinished.IsZero() {
  553. entry.DatetimeFinished = time.Now()
  554. }
  555. entry.ExecutionFinished = true
  556. })
  557. recordExecutionMetrics(req.logEntry)
  558. notifyListenersFinished(req)
  559. e.drainGroupQueue()
  560. }
  561. func getConcurrentCount(req *ExecutionRequest) int {
  562. concurrentCount := 0
  563. req.executor.logmutex.RLock()
  564. logs := req.executor.LogsByBindingId[req.Binding.ID]
  565. for _, logEntry := range logs {
  566. if !logEntry.ExecutionFinished && !logEntry.Queued {
  567. concurrentCount += 1
  568. }
  569. }
  570. req.executor.logmutex.RUnlock()
  571. return concurrentCount
  572. }
  573. func stepConcurrencyCheck(req *ExecutionRequest) bool {
  574. if actionNeedsGroupLimit(req) {
  575. return true
  576. }
  577. concurrentCount := getConcurrentCount(req)
  578. // Note that the current execution is counted int the logs, so when checking we +1
  579. if concurrentCount >= (req.Binding.Action.MaxConcurrent + 1) {
  580. log.WithFields(log.Fields{
  581. "actionTitle": req.logEntry.ActionTitle,
  582. "concurrentCount": concurrentCount,
  583. "maxConcurrent": req.Binding.Action.MaxConcurrent,
  584. }).Warnf("Blocked from executing due to concurrency limit")
  585. req.mutateLogEntry(func(entry *InternalLogEntry) {
  586. entry.Output = "Blocked from executing due to concurrency limit"
  587. entry.Blocked = true
  588. })
  589. return false
  590. }
  591. return true
  592. }
  593. func parseDuration(rate config.RateSpec) time.Duration {
  594. duration, err := time.ParseDuration(rate.Duration)
  595. if err != nil {
  596. log.Warnf("Could not parse duration: %v", rate.Duration)
  597. return -1 * time.Minute
  598. }
  599. return duration
  600. }
  601. func entityPrefixForRequest(req *ExecutionRequest) string {
  602. if req.Binding != nil && req.Binding.Entity != nil {
  603. return req.Binding.Entity.UniqueKey
  604. }
  605. return ""
  606. }
  607. func rateExecutionMatchesScope(logEntry *InternalLogEntry, req *ExecutionRequest, entityPrefix string) bool {
  608. if logEntry.EntityPrefix != entityPrefix {
  609. return false
  610. }
  611. return !logEntry.Queued && logEntry.ExecutionTrackingID != req.TrackingID
  612. }
  613. func logEntryStartedInWindow(logEntry *InternalLogEntry, windowStart time.Time) bool {
  614. return logEntry.DatetimeStarted.After(windowStart) && !logEntry.Blocked
  615. }
  616. func rateExecutionCountsForRate(logEntry *InternalLogEntry, req *ExecutionRequest, entityPrefix string, windowStart time.Time) bool {
  617. return rateExecutionMatchesScope(logEntry, req, entityPrefix) && logEntryStartedInWindow(logEntry, windowStart)
  618. }
  619. func countRateExecutions(logs []*InternalLogEntry, req *ExecutionRequest, entityPrefix string, windowStart time.Time) int {
  620. executions := 0
  621. for _, logEntry := range logs {
  622. if rateExecutionCountsForRate(logEntry, req, entityPrefix, windowStart) {
  623. executions += 1
  624. }
  625. }
  626. return executions
  627. }
  628. func getExecutionsCount(rate config.RateSpec, req *ExecutionRequest) int {
  629. duration := parseDuration(rate)
  630. then := time.Now().Add(-duration)
  631. req.executor.logmutex.RLock()
  632. logs := req.executor.LogsByBindingId[req.Binding.ID]
  633. executions := countRateExecutions(logs, req, entityPrefixForRequest(req), then)
  634. req.executor.logmutex.RUnlock()
  635. return executions
  636. }
  637. func stepRateCheck(req *ExecutionRequest) bool {
  638. for _, rate := range req.Binding.Action.MaxRate {
  639. executions := getExecutionsCount(rate, req)
  640. if executions >= rate.Limit {
  641. log.WithFields(log.Fields{
  642. "actionTitle": req.logEntry.ActionTitle,
  643. "executions": executions,
  644. "limit": rate.Limit,
  645. "duration": rate.Duration,
  646. }).Infof("Blocked from executing due to rate limit")
  647. req.mutateLogEntry(func(entry *InternalLogEntry) {
  648. entry.Output = "Blocked from executing due to rate limit"
  649. entry.Blocked = true
  650. })
  651. return false
  652. }
  653. }
  654. return true
  655. }
  656. func stepACLCheck(req *ExecutionRequest) bool {
  657. canExec := acl.IsAllowedExec(req.Cfg, req.AuthenticatedUser, req.Binding.Action)
  658. if !canExec {
  659. req.mutateLogEntry(func(entry *InternalLogEntry) {
  660. entry.Output = "ACL check failed. Blocked from executing."
  661. entry.Blocked = true
  662. })
  663. log.WithFields(log.Fields{
  664. "actionTitle": req.logEntry.ActionTitle,
  665. }).Warnf("ACL check failed. Blocked from executing.")
  666. }
  667. return canExec
  668. }
  669. func stepParseArgs(req *ExecutionRequest) bool {
  670. ensureArgumentMap(req)
  671. if !hasBindingAndAction(req) {
  672. return fail(req, fmt.Errorf("cannot parse arguments: Binding or Action is nil"))
  673. }
  674. filterToDefinedArgumentsOnly(req)
  675. if err := injectSystemArgs(req); err != nil {
  676. return fail(req, err)
  677. }
  678. mangleInvalidArgumentValues(req)
  679. copyStorableArgumentsToLogEntry(req)
  680. if hasExec(req) {
  681. return handleExecBranch(req)
  682. } else {
  683. return handleShellBranch(req)
  684. }
  685. }
  686. func handleExecBranch(req *ExecutionRequest) bool {
  687. args, err := parseActionExec(req.Arguments, req.Binding.Action, req.Binding.Entity)
  688. if err != nil {
  689. return fail(req, err)
  690. }
  691. req.useDirectExec = true
  692. req.execArgs = args
  693. return true
  694. }
  695. func handleShellBranch(req *ExecutionRequest) bool {
  696. if hasWebhookTag(req) {
  697. return fail(req, fmt.Errorf("webhooks cannot use Shell execution; use exec instead. See https://docs.olivetin.app/action_execution/shellvsexec.html"))
  698. }
  699. if err := checkShellArgumentSafety(req.Binding.Action); err != nil {
  700. return fail(req, err)
  701. }
  702. cmd, err := parseActionArguments(req)
  703. if err != nil {
  704. return fail(req, err)
  705. }
  706. req.useDirectExec = false
  707. req.finalParsedCommand = cmd
  708. return true
  709. }
  710. func ensureArgumentMap(req *ExecutionRequest) {
  711. if req.Arguments == nil {
  712. req.Arguments = make(map[string]string)
  713. }
  714. }
  715. func filterToDefinedArgumentsOnly(req *ExecutionRequest) {
  716. definedNames := make(map[string]struct{})
  717. for _, arg := range req.Binding.Action.Arguments {
  718. definedNames[arg.Name] = struct{}{}
  719. }
  720. filtered := make(map[string]string)
  721. for k, v := range req.Arguments {
  722. if keepArgument(k, definedNames) {
  723. filtered[k] = v
  724. }
  725. }
  726. req.Arguments = filtered
  727. }
  728. func keepArgument(name string, definedNames map[string]struct{}) bool {
  729. _, ok := definedNames[name]
  730. return ok
  731. }
  732. func hasWebhookTag(req *ExecutionRequest) bool {
  733. for _, tag := range req.Tags {
  734. if tag == "webhook" {
  735. return true
  736. }
  737. }
  738. return false
  739. }
  740. var systemArgumentDefinitions = []config.ActionArgument{
  741. {Name: "ot_executionTrackingId", Type: "ascii_identifier", RejectNull: true},
  742. {Name: "ot_username", Type: "shell_safe_identifier", RejectNull: true},
  743. }
  744. func injectSystemArgs(req *ExecutionRequest) error {
  745. args, err := validatedSystemArgs(req)
  746. if err != nil {
  747. return err
  748. }
  749. for name, value := range args {
  750. req.Arguments[name] = value
  751. }
  752. return nil
  753. }
  754. func validatedSystemArgs(req *ExecutionRequest) (map[string]string, error) {
  755. values := map[string]string{
  756. "ot_executionTrackingId": req.TrackingID,
  757. "ot_username": req.AuthenticatedUser.Username,
  758. }
  759. for i := range systemArgumentDefinitions {
  760. arg := &systemArgumentDefinitions[i]
  761. if err := ValidateArgument(arg, values[arg.Name], req.Binding.Action); err != nil {
  762. return nil, fmt.Errorf("system argument %q failed validation: %w", arg.Name, err)
  763. }
  764. }
  765. return values, nil
  766. }
  767. func hasBindingAndAction(req *ExecutionRequest) bool {
  768. return !(req.Binding == nil || req.Binding.Action == nil)
  769. }
  770. func hasExec(req *ExecutionRequest) bool {
  771. return len(req.Binding.Action.Exec) > 0
  772. }
  773. func fail(req *ExecutionRequest, err error) bool {
  774. req.mutateLogEntry(func(entry *InternalLogEntry) {
  775. entry.Output = err.Error()
  776. })
  777. log.Warn(err.Error())
  778. return false
  779. }
  780. func stepRequestAction(req *ExecutionRequest) bool {
  781. metricActionsRequested.Inc()
  782. if !stepRequestActionHasBinding(req) {
  783. return false
  784. }
  785. stepRequestActionPopulateLogEntry(req)
  786. stepRequestActionRegisterLog(req)
  787. log.WithFields(log.Fields{
  788. "actionTitle": req.logEntry.ActionTitle,
  789. "tags": req.Tags,
  790. }).Infof("Action requested")
  791. notifyListenersStarted(req)
  792. return true
  793. }
  794. func stepRequestActionHasBinding(req *ExecutionRequest) bool {
  795. if req.Binding == nil || req.Binding.Action == nil {
  796. log.Warnf("Action request has no binding/action; skipping execution")
  797. return false
  798. }
  799. return true
  800. }
  801. func stepRequestActionPopulateLogEntry(req *ExecutionRequest) {
  802. req.mutateLogEntry(func(entry *InternalLogEntry) {
  803. entry.Binding = req.Binding
  804. entry.ActionConfigTitle = req.Binding.Action.Title
  805. entry.ActionTitle = tpl.ParseTemplateOfActionBeforeExec(req.Binding.Action.Title, req.Binding.Entity)
  806. entry.ActionIcon = tpl.ParseTemplateOfActionBeforeExec(req.Binding.Action.Icon, req.Binding.Entity)
  807. entry.Tags = req.Tags
  808. entry.Justification = ResolveJustification(req)
  809. if req.Binding.Entity != nil {
  810. entry.EntityPrefix = req.Binding.Entity.UniqueKey
  811. }
  812. })
  813. }
  814. func stepRequestActionRegisterLog(req *ExecutionRequest) {
  815. req.executor.logmutex.Lock()
  816. defer req.executor.logmutex.Unlock()
  817. if _, containsKey := req.executor.LogsByBindingId[req.Binding.ID]; !containsKey {
  818. req.executor.LogsByBindingId[req.Binding.ID] = make([]*InternalLogEntry, 0)
  819. }
  820. req.executor.LogsByBindingId[req.Binding.ID] = append(req.executor.LogsByBindingId[req.Binding.ID], req.logEntry)
  821. }
  822. func stepLogStart(req *ExecutionRequest) bool {
  823. log.WithFields(log.Fields{
  824. "actionTitle": req.logEntry.ActionTitle,
  825. "timeout": req.Binding.Action.Timeout,
  826. }).Infof("Action started")
  827. return true
  828. }
  829. func stepLogFinish(req *ExecutionRequest) bool {
  830. req.mutateLogEntry(func(entry *InternalLogEntry) {
  831. entry.ExecutionFinished = true
  832. })
  833. log.WithFields(log.Fields{
  834. "actionTitle": req.logEntry.ActionTitle,
  835. "outputLength": len(req.logEntry.Output),
  836. "timedOut": req.logEntry.TimedOut,
  837. "exit": req.logEntry.ExitCode,
  838. }).Infof("Action finished")
  839. return true
  840. }
  841. func notifyListenersFinished(req *ExecutionRequest) {
  842. for _, listener := range req.executor.copyListeners() {
  843. listener.OnExecutionFinished(req.logEntry)
  844. }
  845. }
  846. func notifyListenersStarted(req *ExecutionRequest) {
  847. for _, listener := range req.executor.copyListeners() {
  848. listener.OnExecutionStarted(req.logEntry)
  849. }
  850. }
  851. func appendErrorToStderr(req *ExecutionRequest, err error) {
  852. if err == nil {
  853. return
  854. }
  855. req.mutateLogEntry(func(entry *InternalLogEntry) {
  856. entry.Output = err.Error() + "\n\n" + entry.Output
  857. })
  858. }
  859. type OutputStreamer struct {
  860. Req *ExecutionRequest
  861. output bytes.Buffer
  862. }
  863. func (ost *OutputStreamer) Write(o []byte) (n int, err error) {
  864. for _, listener := range ost.Req.executor.copyListeners() {
  865. listener.OnOutputChunk(o, ost.Req.TrackingID)
  866. }
  867. return ost.output.Write(o)
  868. }
  869. func (ost *OutputStreamer) String() string {
  870. return ost.output.String()
  871. }
  872. func buildEnv(args map[string]string) []string {
  873. ret := append(os.Environ(), "OLIVETIN=1")
  874. for k, v := range args {
  875. varName := fmt.Sprintf("%v", strings.TrimSpace(strings.ToUpper(k)))
  876. // Skip arguments that might not have a name (eg, confirmation), as this causes weird bugs on Windows.
  877. if varName == "" {
  878. continue
  879. }
  880. ret = append(ret, fmt.Sprintf("%v=%v", varName, v))
  881. }
  882. return ret
  883. }
  884. func commandExitCode(cmd *exec.Cmd) int {
  885. if cmd == nil || cmd.ProcessState == nil {
  886. return -1
  887. }
  888. return cmd.ProcessState.ExitCode()
  889. }
  890. func stepExec(req *ExecutionRequest) bool {
  891. ctx, cancel := newTimeoutContext(context.Background(), time.Duration(req.Binding.Action.Timeout)*time.Second, req.executor)
  892. defer cancel()
  893. streamer := &OutputStreamer{Req: req}
  894. cmd := buildCommand(ctx, req)
  895. if cmd == nil {
  896. req.mutateLogEntry(func(entry *InternalLogEntry) {
  897. entry.Output = "Cannot execute: no command arguments provided"
  898. })
  899. log.Warn("Cannot execute: no command arguments provided")
  900. return false
  901. }
  902. prepareCommand(cmd, streamer, req)
  903. runerr := cmd.Start()
  904. req.mutateLogEntry(func(entry *InternalLogEntry) {
  905. entry.Process = cmd.Process
  906. })
  907. ctx.setProcess(cmd.Process)
  908. waiterr := cmd.Wait()
  909. req.mutateLogEntry(func(entry *InternalLogEntry) {
  910. entry.ExitCode = int32(commandExitCode(cmd))
  911. entry.Output = streamer.String()
  912. })
  913. appendErrorToStderr(req, runerr)
  914. appendErrorToStderr(req, waiterr)
  915. if ctx.Err() == context.DeadlineExceeded {
  916. log.WithFields(log.Fields{
  917. "actionTitle": req.logEntry.ActionTitle,
  918. }).Warnf("Action timed out")
  919. req.mutateLogEntry(func(entry *InternalLogEntry) {
  920. entry.TimedOut = true
  921. entry.Output += "OliveTin::timeout - this action timed out after " + fmt.Sprintf("%v", req.Binding.Action.Timeout) + " seconds. If you need more time for this action, set a longer timeout. See https://docs.olivetin.app/action_customization/timeouts.html for more help."
  922. })
  923. }
  924. req.mutateLogEntry(func(entry *InternalLogEntry) {
  925. entry.DatetimeFinished = time.Now()
  926. })
  927. return true
  928. }
  929. func buildCommand(ctx context.Context, req *ExecutionRequest) *exec.Cmd {
  930. if req.useDirectExec {
  931. return wrapCommandDirect(ctx, req.execArgs)
  932. }
  933. return wrapCommandInShell(ctx, req.finalParsedCommand)
  934. }
  935. func prepareCommand(cmd *exec.Cmd, streamer *OutputStreamer, req *ExecutionRequest) {
  936. cmd.Stdout = streamer
  937. cmd.Stderr = streamer
  938. cmd.Env = buildEnv(req.Arguments)
  939. started := false
  940. req.mutateLogEntry(func(entry *InternalLogEntry) {
  941. if entry.ExecutionStarted {
  942. return
  943. }
  944. entry.ExecutionStarted = true
  945. started = true
  946. })
  947. if started {
  948. notifyListenersStarted(req)
  949. }
  950. }
  951. func stepExecAfter(req *ExecutionRequest) bool {
  952. ctx, cancel := newTimeoutContext(context.Background(), time.Duration(req.Binding.Action.Timeout)*time.Second, req.executor)
  953. defer cancel()
  954. var stdout bytes.Buffer
  955. var stderr bytes.Buffer
  956. cmd, args, err := buildShellAfterCommand(ctx, req, &stdout, &stderr)
  957. if err != nil {
  958. return fail(req, err)
  959. }
  960. if cmd == nil {
  961. return true
  962. }
  963. cmd.Env = buildEnv(args)
  964. runerr := cmd.Start()
  965. ctx.setProcess(cmd.Process)
  966. waiterr := cmd.Wait()
  967. req.mutateLogEntry(func(entry *InternalLogEntry) {
  968. entry.Output += "\n"
  969. entry.Output += "OliveTin::shellAfterCompleted stdout\n"
  970. entry.Output += stdout.String()
  971. entry.Output += "OliveTin::shellAfterCompleted stderr\n"
  972. entry.Output += stderr.String()
  973. entry.Output += "OliveTin::shellAfterCompleted errors and summary\n"
  974. })
  975. appendErrorToStderr(req, runerr)
  976. appendErrorToStderr(req, waiterr)
  977. if ctx.Err() == context.DeadlineExceeded {
  978. req.mutateLogEntry(func(entry *InternalLogEntry) {
  979. entry.Output += "Your shellAfterCompleted command timed out."
  980. })
  981. }
  982. req.mutateLogEntry(func(entry *InternalLogEntry) {
  983. entry.Output += fmt.Sprintf("Your shellAfterCompleted exited with code %v\n", commandExitCode(cmd))
  984. entry.Output += "OliveTin::shellAfterCompleted output complete\n"
  985. })
  986. return true
  987. }
  988. func buildShellAfterCommand(ctx context.Context, req *ExecutionRequest, stdout, stderr *bytes.Buffer) (*exec.Cmd, map[string]string, error) {
  989. if req.Binding.Action.ShellAfterCompleted == "" {
  990. return nil, nil, nil
  991. }
  992. args, err := buildShellAfterArgs(req)
  993. if err != nil {
  994. return nil, nil, err
  995. }
  996. finalParsedCommand, err := tpl.ParseTemplateWithActionContext(req.Binding.Action.ShellAfterCompleted, req.Binding.Entity, args)
  997. if err != nil {
  998. msg := "Could not prepare shellAfterCompleted command: " + err.Error() + "\n"
  999. req.mutateLogEntry(func(entry *InternalLogEntry) {
  1000. entry.Output += msg
  1001. })
  1002. log.Warn(msg)
  1003. return nil, nil, nil
  1004. }
  1005. cmd := wrapCommandInShell(ctx, finalParsedCommand)
  1006. cmd.Stdout = stdout
  1007. cmd.Stderr = stderr
  1008. return cmd, args, nil
  1009. }
  1010. func buildShellAfterArgs(req *ExecutionRequest) (map[string]string, error) {
  1011. args, err := validatedSystemArgs(req)
  1012. if err != nil {
  1013. return nil, err
  1014. }
  1015. args["output"] = req.logEntry.Output
  1016. args["exitCode"] = fmt.Sprintf("%v", req.logEntry.ExitCode)
  1017. return args, nil
  1018. }
  1019. //gocyclo:ignore
  1020. func stepTrigger(req *ExecutionRequest) bool {
  1021. if req.Binding.Action.Triggers == nil {
  1022. return true
  1023. }
  1024. if req.TriggerDepth >= MaxTriggerDepth {
  1025. log.WithFields(log.Fields{
  1026. "actionTitle": req.logEntry.ActionTitle,
  1027. "depth": req.TriggerDepth,
  1028. }).Warnf("Trigger action reached maximum depth of %v. Not triggering further actions.", MaxTriggerDepth)
  1029. req.mutateLogEntry(func(entry *InternalLogEntry) {
  1030. entry.Output += fmt.Sprintf("OliveTin::trigger - this action reached maximum trigger depth of %v. Not triggering further actions.", MaxTriggerDepth)
  1031. })
  1032. return true
  1033. }
  1034. if len(req.Tags) > 0 && req.Tags[0] == "trigger" {
  1035. log.Warnf("Trigger action is triggering another trigger action. This is allowed, but be careful not to create trigger loops.")
  1036. }
  1037. triggerLoop(req)
  1038. return true
  1039. }
  1040. func triggerLoop(req *ExecutionRequest) {
  1041. for _, triggerTitle := range req.Binding.Action.Triggers {
  1042. binding := req.executor.findBindingByActionTitle(triggerTitle, "")
  1043. if binding == nil {
  1044. log.WithFields(log.Fields{
  1045. "triggerTitle": triggerTitle,
  1046. "fromAction": req.logEntry.ActionTitle,
  1047. }).Warnf("Trigger references unknown action title; skipping")
  1048. continue
  1049. }
  1050. trigger := &ExecutionRequest{
  1051. Binding: binding,
  1052. TrackingID: uuid.NewString(),
  1053. Tags: []string{"trigger"},
  1054. AuthenticatedUser: req.AuthenticatedUser,
  1055. Arguments: req.Arguments,
  1056. Cfg: req.Cfg,
  1057. TriggerDepth: req.TriggerDepth + 1,
  1058. Justification: fmt.Sprintf("Triggered by action: %s", req.logEntry.ActionTitle),
  1059. }
  1060. req.executor.ExecRequest(trigger)
  1061. }
  1062. }
  1063. func stepSaveLog(req *ExecutionRequest) bool {
  1064. filename := fmt.Sprintf("%v.%v.%v", req.logEntry.ActionTitle, req.logEntry.DatetimeStarted.Unix(), req.logEntry.ExecutionTrackingID)
  1065. saveLogResults(req, filename)
  1066. saveLogOutput(req, filename)
  1067. return true
  1068. }
  1069. func firstNonEmpty(one, two string) string {
  1070. if one != "" {
  1071. return one
  1072. }
  1073. return two
  1074. }
  1075. func saveLogResults(req *ExecutionRequest, filename string) {
  1076. dir := firstNonEmpty(req.Binding.Action.SaveLogs.ResultsDirectory, req.Cfg.SaveLogs.ResultsDirectory)
  1077. if dir != "" {
  1078. data, err := yaml.Marshal(req.logEntry)
  1079. if err != nil {
  1080. log.Warnf("%v", err)
  1081. }
  1082. filepath := path.Join(dir, filename+".yaml")
  1083. err = os.WriteFile(filepath, data, 0600)
  1084. if err != nil {
  1085. log.Warnf("%v", err)
  1086. }
  1087. }
  1088. }
  1089. func saveLogOutput(req *ExecutionRequest, filename string) {
  1090. dir := firstNonEmpty(req.Binding.Action.SaveLogs.OutputDirectory, req.Cfg.SaveLogs.OutputDirectory)
  1091. if dir != "" {
  1092. data := req.logEntry.Output
  1093. filepath := path.Join(dir, filename+".log")
  1094. err := os.WriteFile(filepath, []byte(data), 0600)
  1095. if err != nil {
  1096. log.Warnf("%v", err)
  1097. }
  1098. }
  1099. }