executor.go 36 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384
  1. package executor
  2. import (
  3. acl "github.com/OliveTin/OliveTin/internal/acl"
  4. "github.com/OliveTin/OliveTin/internal/auth"
  5. authpublic "github.com/OliveTin/OliveTin/internal/auth/authpublic"
  6. config "github.com/OliveTin/OliveTin/internal/config"
  7. "github.com/OliveTin/OliveTin/internal/entities"
  8. "github.com/OliveTin/OliveTin/internal/logfilter"
  9. "github.com/OliveTin/OliveTin/internal/tpl"
  10. "github.com/google/uuid"
  11. log "github.com/sirupsen/logrus"
  12. "github.com/prometheus/client_golang/prometheus"
  13. "github.com/prometheus/client_golang/prometheus/promauto"
  14. "gopkg.in/yaml.v3"
  15. "bytes"
  16. "context"
  17. "fmt"
  18. "os"
  19. "os/exec"
  20. "path"
  21. "regexp"
  22. "strings"
  23. "sync"
  24. "time"
  25. )
  26. const (
  27. DefaultExitCodeNotExecuted = -1337
  28. MaxTriggerDepth = 10
  29. )
  30. var validTrackingIDPattern = regexp.MustCompile(`^[a-fA-F0-9\-]+$`)
  31. func isValidTrackingID(id string) bool {
  32. const MaxTrackingIDLength = 36
  33. return id != "" && len(id) <= MaxTrackingIDLength && validTrackingIDPattern.MatchString(id)
  34. }
  35. var (
  36. metricActionsRequested = promauto.NewCounter(prometheus.CounterOpts{
  37. Name: "olivetin_actions_requested_count",
  38. Help: "The actions requested count",
  39. })
  40. )
  41. type ActionBinding struct {
  42. ID string
  43. Action *config.Action
  44. Entity *entities.Entity
  45. ConfigOrder int
  46. OnDashboards []DashboardNavigationTarget
  47. }
  48. // Executor represents a helper class for executing commands. It's main method
  49. // is ExecRequest
  50. type Executor struct {
  51. logs map[string]*InternalLogEntry
  52. logsTrackingIdsByDate []string
  53. LogsByBindingId map[string][]*InternalLogEntry
  54. logmutex sync.RWMutex
  55. MapActionBindings map[string]*ActionBinding
  56. MapActionBindingsLock sync.RWMutex
  57. Cfg *config.Config
  58. listeners []listener
  59. listenersMu sync.RWMutex
  60. chainOfCommand []executorStepFunc
  61. groupQueue []*queuedExecution
  62. groupQueueMu sync.Mutex
  63. }
  64. // ExecutionRequest is a request to execute an action. It's passed to an
  65. // Executor. They're created from the api.
  66. type ExecutionRequest struct {
  67. Binding *ActionBinding
  68. Arguments map[string]string
  69. TrackingID string
  70. Tags []string
  71. Cfg *config.Config
  72. AuthenticatedUser *authpublic.AuthenticatedUser
  73. TriggerDepth int
  74. Justification string
  75. logEntry *InternalLogEntry
  76. finalParsedCommand string
  77. execArgs []string
  78. useDirectExec bool
  79. executor *Executor
  80. skipRequestRegistration bool
  81. }
  82. func (req *ExecutionRequest) mutateLogEntry(mutator func(*InternalLogEntry)) {
  83. if req.executor == nil {
  84. mutator(req.logEntry)
  85. return
  86. }
  87. req.executor.logmutex.Lock()
  88. defer req.executor.logmutex.Unlock()
  89. mutator(req.logEntry)
  90. }
  91. // LogEntrySnapshot is a copy of selected log entry fields for race-safe reads.
  92. type LogEntrySnapshot struct {
  93. Queued bool
  94. Blocked bool
  95. ExecutionStarted bool
  96. ExecutionFinished bool
  97. ExitCode int32
  98. Output string
  99. }
  100. // SnapshotLog returns a copy of selected log entry fields under read lock.
  101. func (e *Executor) SnapshotLog(trackingID string) (LogEntrySnapshot, bool) {
  102. e.logmutex.RLock()
  103. defer e.logmutex.RUnlock()
  104. entry, found := e.logs[trackingID]
  105. if !found {
  106. return LogEntrySnapshot{}, false
  107. }
  108. return LogEntrySnapshot{
  109. Queued: entry.Queued,
  110. Blocked: entry.Blocked,
  111. ExecutionStarted: entry.ExecutionStarted,
  112. ExecutionFinished: entry.ExecutionFinished,
  113. ExitCode: entry.ExitCode,
  114. Output: entry.Output,
  115. }, true
  116. }
  117. // InternalLogEntry objects are created by an Executor, and represent the final
  118. // state of execution (even if the command is not executed). It's designed to be
  119. // easily serializable.
  120. type InternalLogEntry struct {
  121. Binding *ActionBinding
  122. DatetimeStarted time.Time
  123. DatetimeFinished time.Time
  124. Output string
  125. TimedOut bool
  126. Blocked bool
  127. Queued bool
  128. QueuedForGroup string
  129. ExitCode int32
  130. Tags []string
  131. ExecutionStarted bool
  132. ExecutionFinished bool
  133. ExecutionTrackingID string
  134. Process *os.Process
  135. Username string
  136. Index int64
  137. EntityPrefix string
  138. ActionConfigTitle string // This is the title of the action as defined in the config, not the final parsed title.
  139. /*
  140. The following 3 properties are obviously on Action normally, but it's useful
  141. that logs are lightweight (so we don't need to have an action associated to
  142. logs, etc. Therefore, we duplicate those values here.
  143. */
  144. ActionTitle string
  145. ActionIcon string
  146. Justification string
  147. }
  148. // .Binding can be nil, so we need to handle that.
  149. func (e *InternalLogEntry) GetBindingId() string {
  150. if e.Binding == nil {
  151. return ""
  152. }
  153. return e.Binding.ID
  154. }
  155. type executorStepFunc func(*ExecutionRequest) bool
  156. // DefaultExecutor returns an Executor, with a sensible "chain of command" for
  157. // executing actions.
  158. func DefaultExecutor(cfg *config.Config) *Executor {
  159. e := Executor{}
  160. e.Cfg = cfg
  161. e.logs = make(map[string]*InternalLogEntry)
  162. e.logsTrackingIdsByDate = make([]string, 0)
  163. e.LogsByBindingId = make(map[string][]*InternalLogEntry)
  164. e.MapActionBindings = make(map[string]*ActionBinding)
  165. e.chainOfCommand = []executorStepFunc{
  166. stepRequestAction,
  167. stepConcurrencyCheck,
  168. stepRateCheck,
  169. stepACLCheck,
  170. stepParseArgs,
  171. stepLogStart,
  172. stepExec,
  173. stepExecAfter,
  174. stepLogFinish,
  175. stepSaveLog,
  176. stepTrigger,
  177. }
  178. return &e
  179. }
  180. type listener interface {
  181. OnExecutionStarted(logEntry *InternalLogEntry)
  182. OnExecutionFinished(logEntry *InternalLogEntry)
  183. OnOutputChunk(o []byte, executionTrackingId string)
  184. OnActionMapRebuilt()
  185. }
  186. func (e *Executor) AddListener(m listener) {
  187. e.listenersMu.Lock()
  188. defer e.listenersMu.Unlock()
  189. e.listeners = append(e.listeners, m)
  190. }
  191. func (e *Executor) copyListeners() []listener {
  192. e.listenersMu.RLock()
  193. defer e.listenersMu.RUnlock()
  194. out := make([]listener, len(e.listeners))
  195. copy(out, e.listeners)
  196. return out
  197. }
  198. // getPagingStartIndex calculates the starting index for log pagination.
  199. // Parameters:
  200. //
  201. // startOffset: The offset from the most recent log (0 means start from the most recent)
  202. // totalLogCount: Total number of logs available
  203. // count: Number of logs to retrieve
  204. //
  205. // Returns: The calculated starting index for pagination
  206. func getPagingStartIndex(startOffset int64, totalLogCount int64) int64 {
  207. var startIndex int64
  208. if startOffset <= 0 {
  209. startIndex = totalLogCount
  210. } else {
  211. startIndex = (totalLogCount - startOffset)
  212. if startIndex < 0 {
  213. startIndex = 1
  214. }
  215. }
  216. return startIndex - 1
  217. }
  218. type PagingResult struct {
  219. CountRemaining int64
  220. PageSize int64
  221. TotalCount int64
  222. StartOffset int64
  223. }
  224. func (e *Executor) GetLogTrackingIds(startOffset int64, pageCount int64) ([]*InternalLogEntry, *PagingResult) {
  225. pagingResult := &PagingResult{
  226. CountRemaining: 0,
  227. PageSize: pageCount,
  228. TotalCount: 0,
  229. StartOffset: startOffset,
  230. }
  231. e.logmutex.RLock()
  232. totalLogCount := int64(len(e.logsTrackingIdsByDate))
  233. pagingResult.TotalCount = totalLogCount
  234. startIndex := getPagingStartIndex(startOffset, totalLogCount)
  235. pageCount = min(totalLogCount, pageCount)
  236. endIndex := max(0, (startIndex-pageCount)+1)
  237. log.WithFields(log.Fields{
  238. "startOffset": startOffset,
  239. "pageCount": pageCount,
  240. "total": totalLogCount,
  241. "startIndex": startIndex,
  242. "endIndex": endIndex,
  243. }).Tracef("GetLogTrackingIds")
  244. trackingIds := make([]*InternalLogEntry, 0, pageCount)
  245. if totalLogCount > 0 {
  246. for i := startIndex; i >= endIndex; i-- {
  247. trackingIds = append(trackingIds, e.logs[e.logsTrackingIdsByDate[i]])
  248. }
  249. }
  250. e.logmutex.RUnlock()
  251. pagingResult.CountRemaining = endIndex
  252. return trackingIds, pagingResult
  253. }
  254. func isValidLogEntryForACL(entry *InternalLogEntry) bool {
  255. return entry != nil && entry.Binding != nil && entry.Binding.Action != nil
  256. }
  257. func isLogEntryAllowedByACL(cfg *config.Config, user *authpublic.AuthenticatedUser, entry *InternalLogEntry) bool {
  258. return acl.IsAllowedLogs(cfg, user, entry.Binding.Action)
  259. }
  260. func (e *Executor) filterLogsByACL(cfg *config.Config, user *authpublic.AuthenticatedUser, dateFilter string) []*InternalLogEntry {
  261. e.logmutex.RLock()
  262. defer e.logmutex.RUnlock()
  263. filtered := make([]*InternalLogEntry, 0, len(e.logsTrackingIdsByDate))
  264. filterDate, hasDateFilter := parseDateFilter(dateFilter)
  265. for _, trackingId := range e.logsTrackingIdsByDate {
  266. entry := e.logs[trackingId]
  267. if shouldIncludeLogEntry(cfg, user, entry, filterDate, hasDateFilter) {
  268. filtered = append(filtered, entry)
  269. }
  270. }
  271. return filtered
  272. }
  273. // parseDateFilter parses the date filter string and returns filter information.
  274. func parseDateFilter(dateFilter string) (filterDate time.Time, hasDateFilter bool) {
  275. if dateFilter == "" {
  276. return time.Time{}, false
  277. }
  278. parsedDate, err := time.Parse("2006-01-02", dateFilter)
  279. if err != nil {
  280. log.WithFields(log.Fields{
  281. "dateFilter": dateFilter,
  282. "error": err,
  283. }).Errorf("Failed to parse date filter, expected format YYYY-MM-DD")
  284. return time.Time{}, false
  285. }
  286. return parsedDate, true
  287. }
  288. // shouldIncludeLogEntry determines if a log entry should be included based on ACL and date filter.
  289. func shouldIncludeLogEntry(cfg *config.Config, user *authpublic.AuthenticatedUser, entry *InternalLogEntry, filterDate time.Time, hasDateFilter bool) bool {
  290. if !isValidLogEntryForACL(entry) {
  291. return false
  292. }
  293. if !isLogEntryAllowedByACL(cfg, user, entry) {
  294. return false
  295. }
  296. return matchesDateFilter(entry, filterDate, hasDateFilter)
  297. }
  298. // matchesDateFilter checks if the log entry matches the date filter.
  299. func matchesDateFilter(entry *InternalLogEntry, filterDate time.Time, hasDateFilter bool) bool {
  300. if !hasDateFilter {
  301. return true
  302. }
  303. entryDate := entry.DatetimeStarted.UTC().Truncate(24 * time.Hour)
  304. filterDateUTC := filterDate.UTC().Truncate(24 * time.Hour)
  305. return entryDate.Equal(filterDateUTC)
  306. }
  307. // paginateFilteredLogs applies pagination to a filtered list of logs and returns
  308. // the paginated results along with pagination metadata.
  309. func paginateFilteredLogs(filtered []*InternalLogEntry, startOffset int64, pageCount int64) ([]*InternalLogEntry, *PagingResult) {
  310. total := int64(len(filtered))
  311. paging := &PagingResult{PageSize: pageCount, TotalCount: total, StartOffset: startOffset}
  312. if total == 0 {
  313. paging.CountRemaining = 0
  314. return []*InternalLogEntry{}, paging
  315. }
  316. startIndex := getPagingStartIndex(startOffset, total)
  317. pageCount = min(total, pageCount)
  318. endIndex := max(0, (startIndex-pageCount)+1)
  319. out := make([]*InternalLogEntry, 0, pageCount)
  320. for i := startIndex; i >= endIndex && i < int64(len(filtered)); i-- {
  321. out = append(out, filtered[i])
  322. }
  323. paging.CountRemaining = endIndex
  324. return out, paging
  325. }
  326. // GetLogTrackingIdsACL returns logs filtered by ACL visibility for the user and
  327. // paginated correctly based on the filtered set.
  328. // dateFilter is optional and should be in YYYY-MM-DD format. If empty, no date filtering is applied.
  329. // expressionFilter is an optional filter expression applied after ACL checks.
  330. func (e *Executor) GetLogTrackingIdsACL(cfg *config.Config, user *authpublic.AuthenticatedUser, startOffset int64, pageCount int64, dateFilter string, expressionFilter string) ([]*InternalLogEntry, *PagingResult, error) {
  331. filtered := e.filterLogsByACL(cfg, user, dateFilter)
  332. program, err := logfilter.Compile(expressionFilter)
  333. if err != nil {
  334. return nil, nil, err
  335. }
  336. filtered, err = applyLogFilter(filtered, program)
  337. if err != nil {
  338. return nil, nil, err
  339. }
  340. logs, paging := paginateFilteredLogs(filtered, startOffset, pageCount)
  341. return logs, paging, nil
  342. }
  343. func (e *Executor) GetLog(trackingID string) (*InternalLogEntry, bool) {
  344. e.logmutex.RLock()
  345. entry, found := e.logs[trackingID]
  346. e.logmutex.RUnlock()
  347. return entry, found
  348. }
  349. func (e *Executor) GetLogsByBindingId(bindingId string) []*InternalLogEntry {
  350. e.logmutex.RLock()
  351. logs, found := e.LogsByBindingId[bindingId]
  352. e.logmutex.RUnlock()
  353. if !found {
  354. return make([]*InternalLogEntry, 0)
  355. }
  356. return logs
  357. }
  358. // shouldCountExecution checks if a log entry should be counted for rate limiting.
  359. func shouldCountExecution(logEntry *InternalLogEntry, windowStart time.Time) bool {
  360. return !logEntry.Blocked && !logEntry.Queued && logEntry.DatetimeStarted.After(windowStart)
  361. }
  362. // updateOldestExecution updates the oldest execution time if this entry is older.
  363. func updateOldestExecution(oldestExecutionTime **time.Time, logEntry *InternalLogEntry) {
  364. if *oldestExecutionTime == nil {
  365. *oldestExecutionTime = &logEntry.DatetimeStarted
  366. } else if logEntry.DatetimeStarted.Before(**oldestExecutionTime) {
  367. *oldestExecutionTime = &logEntry.DatetimeStarted
  368. }
  369. }
  370. // findOldestExecutionInWindow finds the oldest execution within the time window and counts executions.
  371. // Returns the count of executions and the oldest execution time, or nil if none found.
  372. func findOldestExecutionInWindow(logs []*InternalLogEntry, windowStart time.Time) (int, *time.Time) {
  373. executions := 0
  374. var oldestExecutionTime *time.Time
  375. for _, logEntry := range logs {
  376. if !shouldCountExecution(logEntry, windowStart) {
  377. continue
  378. }
  379. executions++
  380. updateOldestExecution(&oldestExecutionTime, logEntry)
  381. }
  382. return executions, oldestExecutionTime
  383. }
  384. // calculateExpiryTime calculates when the oldest execution will fall outside the rate limit window.
  385. func calculateExpiryTime(oldestExecutionTime time.Time, duration time.Duration, now time.Time) time.Time {
  386. expiryTime := oldestExecutionTime.Add(duration)
  387. if !expiryTime.After(now) {
  388. return time.Time{}
  389. }
  390. return expiryTime
  391. }
  392. // updateMaxExpiryTime updates maxExpiryTime if expiryTime is later.
  393. func updateMaxExpiryTime(maxExpiryTime *time.Time, expiryTime time.Time) {
  394. if expiryTime.IsZero() {
  395. return
  396. }
  397. if maxExpiryTime.IsZero() || expiryTime.After(*maxExpiryTime) {
  398. *maxExpiryTime = expiryTime
  399. }
  400. }
  401. // calculateExpiryForRate calculates the expiry time for a single rate limit rule.
  402. // Returns the expiry time if the rate limit is exceeded, or zero time if not.
  403. func calculateExpiryForRate(rate config.RateSpec, logs []*InternalLogEntry, now time.Time) time.Time {
  404. duration := parseDuration(rate)
  405. if duration <= 0 {
  406. return time.Time{}
  407. }
  408. windowStart := now.Add(-duration)
  409. executions, oldestExecutionTime := findOldestExecutionInWindow(logs, windowStart)
  410. if executions < rate.Limit || oldestExecutionTime == nil {
  411. return time.Time{}
  412. }
  413. return calculateExpiryTime(*oldestExecutionTime, duration, now)
  414. }
  415. // getLogsForBinding retrieves logs for a binding ID.
  416. func (e *Executor) getLogsForBinding(bindingId string) []*InternalLogEntry {
  417. e.logmutex.RLock()
  418. logs, found := e.LogsByBindingId[bindingId]
  419. e.logmutex.RUnlock()
  420. if !found || len(logs) == 0 {
  421. return nil
  422. }
  423. return logs
  424. }
  425. // calculateMaxExpiryTimeFromRates calculates the maximum expiry time across all rate limit rules.
  426. func calculateMaxExpiryTimeFromRates(rates []config.RateSpec, logs []*InternalLogEntry, now time.Time) time.Time {
  427. var maxExpiryTime time.Time
  428. for _, rate := range rates {
  429. expiryTime := calculateExpiryForRate(rate, logs, now)
  430. updateMaxExpiryTime(&maxExpiryTime, expiryTime)
  431. }
  432. return maxExpiryTime
  433. }
  434. // GetTimeUntilAvailable calculates when an action will be available again based on rate limits.
  435. // Returns the Unix timestamp in seconds when the rate limit expires, or 0 if the action is available now.
  436. func (e *Executor) GetTimeUntilAvailable(binding *ActionBinding) int64 {
  437. if len(binding.Action.MaxRate) == 0 {
  438. return 0
  439. }
  440. logs := e.getLogsForBinding(binding.ID)
  441. if logs == nil {
  442. return 0
  443. }
  444. maxExpiryTime := calculateMaxExpiryTimeFromRates(binding.Action.MaxRate, logs, time.Now())
  445. if maxExpiryTime.IsZero() {
  446. return 0
  447. }
  448. return maxExpiryTime.Unix()
  449. }
  450. func (e *Executor) SetLog(trackingID string, entry *InternalLogEntry) string {
  451. e.logmutex.Lock()
  452. defer e.logmutex.Unlock()
  453. if _, found := e.logs[trackingID]; found || !isValidTrackingID(trackingID) {
  454. trackingID = uuid.NewString()
  455. entry.ExecutionTrackingID = trackingID
  456. }
  457. entry.Index = int64(len(e.logsTrackingIdsByDate))
  458. e.logs[trackingID] = entry
  459. e.logsTrackingIdsByDate = append(e.logsTrackingIdsByDate, trackingID)
  460. return trackingID
  461. }
  462. // ExecRequest processes an ExecutionRequest
  463. func (e *Executor) ExecRequest(req *ExecutionRequest) (*sync.WaitGroup, string) {
  464. e.initializeExecRequest(req)
  465. log.Tracef("executor.ExecRequest(): trackingID=%s bindingID=%s", req.TrackingID, bindingIDForTrace(req))
  466. req.TrackingID = e.SetLog(req.TrackingID, req.logEntry)
  467. wg := new(sync.WaitGroup)
  468. wg.Add(1)
  469. go func() {
  470. queued := e.execChain(req, wg)
  471. if !queued {
  472. wg.Done()
  473. }
  474. }()
  475. return wg, req.TrackingID
  476. }
  477. func (e *Executor) initializeExecRequest(req *ExecutionRequest) {
  478. if req.AuthenticatedUser == nil {
  479. req.AuthenticatedUser = auth.UserGuest(req.Cfg)
  480. }
  481. req.executor = e
  482. req.logEntry = &InternalLogEntry{
  483. Binding: req.Binding,
  484. DatetimeStarted: time.Now(),
  485. ExecutionTrackingID: req.TrackingID,
  486. Output: "",
  487. ExitCode: DefaultExitCodeNotExecuted,
  488. ExecutionStarted: false,
  489. ExecutionFinished: false,
  490. ActionTitle: "notfound",
  491. ActionIcon: "&#x1f4a9;",
  492. Username: req.AuthenticatedUser.Username,
  493. }
  494. }
  495. func bindingIDForTrace(req *ExecutionRequest) string {
  496. if req.Binding == nil {
  497. return ""
  498. }
  499. return req.Binding.ID
  500. }
  501. func (e *Executor) execChain(req *ExecutionRequest, wg *sync.WaitGroup) bool {
  502. if !req.skipRequestRegistration {
  503. finished, queued := e.registerOrQueueRequest(req, wg)
  504. if finished || queued {
  505. return queued
  506. }
  507. }
  508. e.runExecutionSteps(req)
  509. e.finishExecChain(req)
  510. return false
  511. }
  512. func (e *Executor) registerOrQueueRequest(req *ExecutionRequest, wg *sync.WaitGroup) (finished bool, queued bool) {
  513. if !stepRequestAction(req) {
  514. e.finishExecChain(req)
  515. return true, false
  516. }
  517. if e.finishIfConcurrencyBlocked(req) {
  518. return true, false
  519. }
  520. return e.queueRequestIfGroupLimited(req, wg)
  521. }
  522. func (e *Executor) finishIfConcurrencyBlocked(req *ExecutionRequest) bool {
  523. if actionNeedsGroupLimit(req) {
  524. return false
  525. }
  526. if stepConcurrencyCheck(req) {
  527. return false
  528. }
  529. e.finishExecChain(req)
  530. return true
  531. }
  532. func (e *Executor) queueRequestIfGroupLimited(req *ExecutionRequest, wg *sync.WaitGroup) (finished bool, queued bool) {
  533. if !actionNeedsGroupLimit(req) || e.groupsHaveCapacityForActive(req) {
  534. return false, false
  535. }
  536. return e.queueRequestAfterACL(req, wg)
  537. }
  538. func (e *Executor) queueRequestAfterACL(req *ExecutionRequest, wg *sync.WaitGroup) (finished bool, queued bool) {
  539. if !stepACLCheck(req) {
  540. e.finishExecChain(req)
  541. return true, false
  542. }
  543. if e.queueRequest(req, wg) {
  544. e.finishExecChain(req)
  545. return true, false
  546. }
  547. notifyListenersStarted(req)
  548. return false, true
  549. }
  550. func (e *Executor) runExecutionSteps(req *ExecutionRequest) {
  551. for _, step := range e.chainOfCommand[1:] {
  552. if !step(req) {
  553. break
  554. }
  555. }
  556. }
  557. func (e *Executor) finishExecChain(req *ExecutionRequest) {
  558. req.mutateLogEntry(func(entry *InternalLogEntry) {
  559. if entry.DatetimeFinished.IsZero() {
  560. entry.DatetimeFinished = time.Now()
  561. }
  562. entry.ExecutionFinished = true
  563. })
  564. notifyListenersFinished(req)
  565. e.drainGroupQueue()
  566. }
  567. func getConcurrentCount(req *ExecutionRequest) int {
  568. concurrentCount := 0
  569. req.executor.logmutex.RLock()
  570. logs := req.executor.LogsByBindingId[req.Binding.ID]
  571. for _, logEntry := range logs {
  572. if !logEntry.ExecutionFinished && !logEntry.Queued {
  573. concurrentCount += 1
  574. }
  575. }
  576. req.executor.logmutex.RUnlock()
  577. return concurrentCount
  578. }
  579. func stepConcurrencyCheck(req *ExecutionRequest) bool {
  580. if actionNeedsGroupLimit(req) {
  581. return true
  582. }
  583. concurrentCount := getConcurrentCount(req)
  584. // Note that the current execution is counted int the logs, so when checking we +1
  585. if concurrentCount >= (req.Binding.Action.MaxConcurrent + 1) {
  586. log.WithFields(log.Fields{
  587. "actionTitle": req.logEntry.ActionTitle,
  588. "concurrentCount": concurrentCount,
  589. "maxConcurrent": req.Binding.Action.MaxConcurrent,
  590. }).Warnf("Blocked from executing due to concurrency limit")
  591. req.mutateLogEntry(func(entry *InternalLogEntry) {
  592. entry.Output = "Blocked from executing due to concurrency limit"
  593. entry.Blocked = true
  594. })
  595. return false
  596. }
  597. return true
  598. }
  599. func parseDuration(rate config.RateSpec) time.Duration {
  600. duration, err := time.ParseDuration(rate.Duration)
  601. if err != nil {
  602. log.Warnf("Could not parse duration: %v", rate.Duration)
  603. return -1 * time.Minute
  604. }
  605. return duration
  606. }
  607. func entityPrefixForRequest(req *ExecutionRequest) string {
  608. if req.Binding != nil && req.Binding.Entity != nil {
  609. return req.Binding.Entity.UniqueKey
  610. }
  611. return ""
  612. }
  613. func rateExecutionMatchesScope(logEntry *InternalLogEntry, req *ExecutionRequest, entityPrefix string) bool {
  614. if logEntry.EntityPrefix != entityPrefix {
  615. return false
  616. }
  617. return !logEntry.Queued && logEntry.ExecutionTrackingID != req.TrackingID
  618. }
  619. func logEntryStartedInWindow(logEntry *InternalLogEntry, windowStart time.Time) bool {
  620. return logEntry.DatetimeStarted.After(windowStart) && !logEntry.Blocked
  621. }
  622. func rateExecutionCountsForRate(logEntry *InternalLogEntry, req *ExecutionRequest, entityPrefix string, windowStart time.Time) bool {
  623. return rateExecutionMatchesScope(logEntry, req, entityPrefix) && logEntryStartedInWindow(logEntry, windowStart)
  624. }
  625. func countRateExecutions(logs []*InternalLogEntry, req *ExecutionRequest, entityPrefix string, windowStart time.Time) int {
  626. executions := 0
  627. for _, logEntry := range logs {
  628. if rateExecutionCountsForRate(logEntry, req, entityPrefix, windowStart) {
  629. executions += 1
  630. }
  631. }
  632. return executions
  633. }
  634. func getExecutionsCount(rate config.RateSpec, req *ExecutionRequest) int {
  635. duration := parseDuration(rate)
  636. then := time.Now().Add(-duration)
  637. req.executor.logmutex.RLock()
  638. logs := req.executor.LogsByBindingId[req.Binding.ID]
  639. executions := countRateExecutions(logs, req, entityPrefixForRequest(req), then)
  640. req.executor.logmutex.RUnlock()
  641. return executions
  642. }
  643. func stepRateCheck(req *ExecutionRequest) bool {
  644. for _, rate := range req.Binding.Action.MaxRate {
  645. executions := getExecutionsCount(rate, req)
  646. if executions >= rate.Limit {
  647. log.WithFields(log.Fields{
  648. "actionTitle": req.logEntry.ActionTitle,
  649. "executions": executions,
  650. "limit": rate.Limit,
  651. "duration": rate.Duration,
  652. }).Infof("Blocked from executing due to rate limit")
  653. req.mutateLogEntry(func(entry *InternalLogEntry) {
  654. entry.Output = "Blocked from executing due to rate limit"
  655. entry.Blocked = true
  656. })
  657. return false
  658. }
  659. }
  660. return true
  661. }
  662. func stepACLCheck(req *ExecutionRequest) bool {
  663. canExec := acl.IsAllowedExec(req.Cfg, req.AuthenticatedUser, req.Binding.Action)
  664. if !canExec {
  665. req.mutateLogEntry(func(entry *InternalLogEntry) {
  666. entry.Output = "ACL check failed. Blocked from executing."
  667. entry.Blocked = true
  668. })
  669. log.WithFields(log.Fields{
  670. "actionTitle": req.logEntry.ActionTitle,
  671. }).Warnf("ACL check failed. Blocked from executing.")
  672. }
  673. return canExec
  674. }
  675. func stepParseArgs(req *ExecutionRequest) bool {
  676. ensureArgumentMap(req)
  677. if !hasBindingAndAction(req) {
  678. return fail(req, fmt.Errorf("cannot parse arguments: Binding or Action is nil"))
  679. }
  680. filterToDefinedArgumentsOnly(req)
  681. if err := injectSystemArgs(req); err != nil {
  682. return fail(req, err)
  683. }
  684. mangleInvalidArgumentValues(req)
  685. if hasExec(req) {
  686. return handleExecBranch(req)
  687. } else {
  688. return handleShellBranch(req)
  689. }
  690. }
  691. func handleExecBranch(req *ExecutionRequest) bool {
  692. args, err := parseActionExec(req.Arguments, req.Binding.Action, req.Binding.Entity)
  693. if err != nil {
  694. return fail(req, err)
  695. }
  696. req.useDirectExec = true
  697. req.execArgs = args
  698. return true
  699. }
  700. func handleShellBranch(req *ExecutionRequest) bool {
  701. if hasWebhookTag(req) {
  702. return fail(req, fmt.Errorf("webhooks cannot use Shell execution; use exec instead. See https://docs.olivetin.app/action_execution/shellvsexec.html"))
  703. }
  704. if err := checkShellArgumentSafety(req.Binding.Action); err != nil {
  705. return fail(req, err)
  706. }
  707. cmd, err := parseActionArguments(req)
  708. if err != nil {
  709. return fail(req, err)
  710. }
  711. req.useDirectExec = false
  712. req.finalParsedCommand = cmd
  713. return true
  714. }
  715. func ensureArgumentMap(req *ExecutionRequest) {
  716. if req.Arguments == nil {
  717. req.Arguments = make(map[string]string)
  718. }
  719. }
  720. func filterToDefinedArgumentsOnly(req *ExecutionRequest) {
  721. definedNames := make(map[string]struct{})
  722. for _, arg := range req.Binding.Action.Arguments {
  723. definedNames[arg.Name] = struct{}{}
  724. }
  725. filtered := make(map[string]string)
  726. for k, v := range req.Arguments {
  727. if keepArgument(k, definedNames) {
  728. filtered[k] = v
  729. }
  730. }
  731. req.Arguments = filtered
  732. }
  733. func keepArgument(name string, definedNames map[string]struct{}) bool {
  734. _, ok := definedNames[name]
  735. return ok
  736. }
  737. func hasWebhookTag(req *ExecutionRequest) bool {
  738. for _, tag := range req.Tags {
  739. if tag == "webhook" {
  740. return true
  741. }
  742. }
  743. return false
  744. }
  745. var systemArgumentDefinitions = []config.ActionArgument{
  746. {Name: "ot_executionTrackingId", Type: "ascii_identifier", RejectNull: true},
  747. {Name: "ot_username", Type: "shell_safe_identifier", RejectNull: true},
  748. }
  749. func injectSystemArgs(req *ExecutionRequest) error {
  750. args, err := validatedSystemArgs(req)
  751. if err != nil {
  752. return err
  753. }
  754. for name, value := range args {
  755. req.Arguments[name] = value
  756. }
  757. return nil
  758. }
  759. func validatedSystemArgs(req *ExecutionRequest) (map[string]string, error) {
  760. values := map[string]string{
  761. "ot_executionTrackingId": req.TrackingID,
  762. "ot_username": req.AuthenticatedUser.Username,
  763. }
  764. for i := range systemArgumentDefinitions {
  765. arg := &systemArgumentDefinitions[i]
  766. if err := ValidateArgument(arg, values[arg.Name], req.Binding.Action); err != nil {
  767. return nil, fmt.Errorf("system argument %q failed validation: %w", arg.Name, err)
  768. }
  769. }
  770. return values, nil
  771. }
  772. func hasBindingAndAction(req *ExecutionRequest) bool {
  773. return !(req.Binding == nil || req.Binding.Action == nil)
  774. }
  775. func hasExec(req *ExecutionRequest) bool {
  776. return len(req.Binding.Action.Exec) > 0
  777. }
  778. func fail(req *ExecutionRequest, err error) bool {
  779. req.mutateLogEntry(func(entry *InternalLogEntry) {
  780. entry.Output = err.Error()
  781. })
  782. log.Warn(err.Error())
  783. return false
  784. }
  785. func stepRequestAction(req *ExecutionRequest) bool {
  786. metricActionsRequested.Inc()
  787. if !stepRequestActionHasBinding(req) {
  788. return false
  789. }
  790. stepRequestActionPopulateLogEntry(req)
  791. stepRequestActionRegisterLog(req)
  792. log.WithFields(log.Fields{
  793. "actionTitle": req.logEntry.ActionTitle,
  794. "tags": req.Tags,
  795. }).Infof("Action requested")
  796. notifyListenersStarted(req)
  797. return true
  798. }
  799. func stepRequestActionHasBinding(req *ExecutionRequest) bool {
  800. if req.Binding == nil || req.Binding.Action == nil {
  801. log.Warnf("Action request has no binding/action; skipping execution")
  802. return false
  803. }
  804. return true
  805. }
  806. func stepRequestActionPopulateLogEntry(req *ExecutionRequest) {
  807. req.mutateLogEntry(func(entry *InternalLogEntry) {
  808. entry.Binding = req.Binding
  809. entry.ActionConfigTitle = req.Binding.Action.Title
  810. entry.ActionTitle = tpl.ParseTemplateOfActionBeforeExec(req.Binding.Action.Title, req.Binding.Entity)
  811. entry.ActionIcon = req.Binding.Action.Icon
  812. entry.Tags = req.Tags
  813. entry.Justification = ResolveJustification(req)
  814. if req.Binding.Entity != nil {
  815. entry.EntityPrefix = req.Binding.Entity.UniqueKey
  816. }
  817. })
  818. }
  819. func stepRequestActionRegisterLog(req *ExecutionRequest) {
  820. req.executor.logmutex.Lock()
  821. defer req.executor.logmutex.Unlock()
  822. if _, containsKey := req.executor.LogsByBindingId[req.Binding.ID]; !containsKey {
  823. req.executor.LogsByBindingId[req.Binding.ID] = make([]*InternalLogEntry, 0)
  824. }
  825. req.executor.LogsByBindingId[req.Binding.ID] = append(req.executor.LogsByBindingId[req.Binding.ID], req.logEntry)
  826. }
  827. func stepLogStart(req *ExecutionRequest) bool {
  828. log.WithFields(log.Fields{
  829. "actionTitle": req.logEntry.ActionTitle,
  830. "timeout": req.Binding.Action.Timeout,
  831. }).Infof("Action started")
  832. return true
  833. }
  834. func stepLogFinish(req *ExecutionRequest) bool {
  835. req.mutateLogEntry(func(entry *InternalLogEntry) {
  836. entry.ExecutionFinished = true
  837. })
  838. log.WithFields(log.Fields{
  839. "actionTitle": req.logEntry.ActionTitle,
  840. "outputLength": len(req.logEntry.Output),
  841. "timedOut": req.logEntry.TimedOut,
  842. "exit": req.logEntry.ExitCode,
  843. }).Infof("Action finished")
  844. return true
  845. }
  846. func notifyListenersFinished(req *ExecutionRequest) {
  847. for _, listener := range req.executor.copyListeners() {
  848. listener.OnExecutionFinished(req.logEntry)
  849. }
  850. }
  851. func notifyListenersStarted(req *ExecutionRequest) {
  852. for _, listener := range req.executor.copyListeners() {
  853. listener.OnExecutionStarted(req.logEntry)
  854. }
  855. }
  856. func appendErrorToStderr(req *ExecutionRequest, err error) {
  857. if err == nil {
  858. return
  859. }
  860. req.mutateLogEntry(func(entry *InternalLogEntry) {
  861. entry.Output = err.Error() + "\n\n" + entry.Output
  862. })
  863. }
  864. type OutputStreamer struct {
  865. Req *ExecutionRequest
  866. output bytes.Buffer
  867. }
  868. func (ost *OutputStreamer) Write(o []byte) (n int, err error) {
  869. for _, listener := range ost.Req.executor.copyListeners() {
  870. listener.OnOutputChunk(o, ost.Req.TrackingID)
  871. }
  872. return ost.output.Write(o)
  873. }
  874. func (ost *OutputStreamer) String() string {
  875. return ost.output.String()
  876. }
  877. func buildEnv(args map[string]string) []string {
  878. ret := append(os.Environ(), "OLIVETIN=1")
  879. for k, v := range args {
  880. varName := fmt.Sprintf("%v", strings.TrimSpace(strings.ToUpper(k)))
  881. // Skip arguments that might not have a name (eg, confirmation), as this causes weird bugs on Windows.
  882. if varName == "" {
  883. continue
  884. }
  885. ret = append(ret, fmt.Sprintf("%v=%v", varName, v))
  886. }
  887. return ret
  888. }
  889. func commandExitCode(cmd *exec.Cmd) int {
  890. if cmd == nil || cmd.ProcessState == nil {
  891. return -1
  892. }
  893. return cmd.ProcessState.ExitCode()
  894. }
  895. func stepExec(req *ExecutionRequest) bool {
  896. ctx, cancel := newTimeoutContext(context.Background(), time.Duration(req.Binding.Action.Timeout)*time.Second, req.executor)
  897. defer cancel()
  898. streamer := &OutputStreamer{Req: req}
  899. cmd := buildCommand(ctx, req)
  900. if cmd == nil {
  901. req.mutateLogEntry(func(entry *InternalLogEntry) {
  902. entry.Output = "Cannot execute: no command arguments provided"
  903. })
  904. log.Warn("Cannot execute: no command arguments provided")
  905. return false
  906. }
  907. prepareCommand(cmd, streamer, req)
  908. runerr := cmd.Start()
  909. req.mutateLogEntry(func(entry *InternalLogEntry) {
  910. entry.Process = cmd.Process
  911. })
  912. ctx.setProcess(cmd.Process)
  913. waiterr := cmd.Wait()
  914. req.mutateLogEntry(func(entry *InternalLogEntry) {
  915. entry.ExitCode = int32(commandExitCode(cmd))
  916. entry.Output = streamer.String()
  917. })
  918. appendErrorToStderr(req, runerr)
  919. appendErrorToStderr(req, waiterr)
  920. if ctx.Err() == context.DeadlineExceeded {
  921. log.WithFields(log.Fields{
  922. "actionTitle": req.logEntry.ActionTitle,
  923. }).Warnf("Action timed out")
  924. req.mutateLogEntry(func(entry *InternalLogEntry) {
  925. entry.TimedOut = true
  926. entry.Output += "OliveTin::timeout - this action timed out after " + fmt.Sprintf("%v", req.Binding.Action.Timeout) + " seconds. If you need more time for this action, set a longer timeout. See https://docs.olivetin.app/action_customization/timeouts.html for more help."
  927. })
  928. }
  929. req.mutateLogEntry(func(entry *InternalLogEntry) {
  930. entry.DatetimeFinished = time.Now()
  931. })
  932. return true
  933. }
  934. func buildCommand(ctx context.Context, req *ExecutionRequest) *exec.Cmd {
  935. if req.useDirectExec {
  936. return wrapCommandDirect(ctx, req.execArgs)
  937. }
  938. return wrapCommandInShell(ctx, req.finalParsedCommand)
  939. }
  940. func prepareCommand(cmd *exec.Cmd, streamer *OutputStreamer, req *ExecutionRequest) {
  941. cmd.Stdout = streamer
  942. cmd.Stderr = streamer
  943. cmd.Env = buildEnv(req.Arguments)
  944. started := false
  945. req.mutateLogEntry(func(entry *InternalLogEntry) {
  946. if entry.ExecutionStarted {
  947. return
  948. }
  949. entry.ExecutionStarted = true
  950. started = true
  951. })
  952. if started {
  953. notifyListenersStarted(req)
  954. }
  955. }
  956. func stepExecAfter(req *ExecutionRequest) bool {
  957. ctx, cancel := newTimeoutContext(context.Background(), time.Duration(req.Binding.Action.Timeout)*time.Second, req.executor)
  958. defer cancel()
  959. var stdout bytes.Buffer
  960. var stderr bytes.Buffer
  961. cmd, args, err := buildShellAfterCommand(ctx, req, &stdout, &stderr)
  962. if err != nil {
  963. return fail(req, err)
  964. }
  965. if cmd == nil {
  966. return true
  967. }
  968. cmd.Env = buildEnv(args)
  969. runerr := cmd.Start()
  970. ctx.setProcess(cmd.Process)
  971. waiterr := cmd.Wait()
  972. req.mutateLogEntry(func(entry *InternalLogEntry) {
  973. entry.Output += "\n"
  974. entry.Output += "OliveTin::shellAfterCompleted stdout\n"
  975. entry.Output += stdout.String()
  976. entry.Output += "OliveTin::shellAfterCompleted stderr\n"
  977. entry.Output += stderr.String()
  978. entry.Output += "OliveTin::shellAfterCompleted errors and summary\n"
  979. })
  980. appendErrorToStderr(req, runerr)
  981. appendErrorToStderr(req, waiterr)
  982. if ctx.Err() == context.DeadlineExceeded {
  983. req.mutateLogEntry(func(entry *InternalLogEntry) {
  984. entry.Output += "Your shellAfterCompleted command timed out."
  985. })
  986. }
  987. req.mutateLogEntry(func(entry *InternalLogEntry) {
  988. entry.Output += fmt.Sprintf("Your shellAfterCompleted exited with code %v\n", commandExitCode(cmd))
  989. entry.Output += "OliveTin::shellAfterCompleted output complete\n"
  990. })
  991. return true
  992. }
  993. func buildShellAfterCommand(ctx context.Context, req *ExecutionRequest, stdout, stderr *bytes.Buffer) (*exec.Cmd, map[string]string, error) {
  994. if req.Binding.Action.ShellAfterCompleted == "" {
  995. return nil, nil, nil
  996. }
  997. args, err := buildShellAfterArgs(req)
  998. if err != nil {
  999. return nil, nil, err
  1000. }
  1001. finalParsedCommand, err := tpl.ParseTemplateWithActionContext(req.Binding.Action.ShellAfterCompleted, req.Binding.Entity, args)
  1002. if err != nil {
  1003. msg := "Could not prepare shellAfterCompleted command: " + err.Error() + "\n"
  1004. req.mutateLogEntry(func(entry *InternalLogEntry) {
  1005. entry.Output += msg
  1006. })
  1007. log.Warn(msg)
  1008. return nil, nil, nil
  1009. }
  1010. cmd := wrapCommandInShell(ctx, finalParsedCommand)
  1011. cmd.Stdout = stdout
  1012. cmd.Stderr = stderr
  1013. return cmd, args, nil
  1014. }
  1015. func buildShellAfterArgs(req *ExecutionRequest) (map[string]string, error) {
  1016. args, err := validatedSystemArgs(req)
  1017. if err != nil {
  1018. return nil, err
  1019. }
  1020. args["output"] = req.logEntry.Output
  1021. args["exitCode"] = fmt.Sprintf("%v", req.logEntry.ExitCode)
  1022. return args, nil
  1023. }
  1024. //gocyclo:ignore
  1025. func stepTrigger(req *ExecutionRequest) bool {
  1026. if req.Binding.Action.Triggers == nil {
  1027. return true
  1028. }
  1029. if req.TriggerDepth >= MaxTriggerDepth {
  1030. log.WithFields(log.Fields{
  1031. "actionTitle": req.logEntry.ActionTitle,
  1032. "depth": req.TriggerDepth,
  1033. }).Warnf("Trigger action reached maximum depth of %v. Not triggering further actions.", MaxTriggerDepth)
  1034. req.mutateLogEntry(func(entry *InternalLogEntry) {
  1035. entry.Output += fmt.Sprintf("OliveTin::trigger - this action reached maximum trigger depth of %v. Not triggering further actions.", MaxTriggerDepth)
  1036. })
  1037. return true
  1038. }
  1039. if len(req.Tags) > 0 && req.Tags[0] == "trigger" {
  1040. log.Warnf("Trigger action is triggering another trigger action. This is allowed, but be careful not to create trigger loops.")
  1041. }
  1042. triggerLoop(req)
  1043. return true
  1044. }
  1045. func triggerLoop(req *ExecutionRequest) {
  1046. for _, triggerTitle := range req.Binding.Action.Triggers {
  1047. binding := req.executor.findBindingByActionTitle(triggerTitle, "")
  1048. if binding == nil {
  1049. log.WithFields(log.Fields{
  1050. "triggerTitle": triggerTitle,
  1051. "fromAction": req.logEntry.ActionTitle,
  1052. }).Warnf("Trigger references unknown action title; skipping")
  1053. continue
  1054. }
  1055. trigger := &ExecutionRequest{
  1056. Binding: binding,
  1057. TrackingID: uuid.NewString(),
  1058. Tags: []string{"trigger"},
  1059. AuthenticatedUser: req.AuthenticatedUser,
  1060. Arguments: req.Arguments,
  1061. Cfg: req.Cfg,
  1062. TriggerDepth: req.TriggerDepth + 1,
  1063. Justification: fmt.Sprintf("Triggered by action: %s", req.logEntry.ActionTitle),
  1064. }
  1065. req.executor.ExecRequest(trigger)
  1066. }
  1067. }
  1068. func stepSaveLog(req *ExecutionRequest) bool {
  1069. filename := fmt.Sprintf("%v.%v.%v", req.logEntry.ActionTitle, req.logEntry.DatetimeStarted.Unix(), req.logEntry.ExecutionTrackingID)
  1070. saveLogResults(req, filename)
  1071. saveLogOutput(req, filename)
  1072. return true
  1073. }
  1074. func firstNonEmpty(one, two string) string {
  1075. if one != "" {
  1076. return one
  1077. }
  1078. return two
  1079. }
  1080. func saveLogResults(req *ExecutionRequest, filename string) {
  1081. dir := firstNonEmpty(req.Binding.Action.SaveLogs.ResultsDirectory, req.Cfg.SaveLogs.ResultsDirectory)
  1082. if dir != "" {
  1083. data, err := yaml.Marshal(req.logEntry)
  1084. if err != nil {
  1085. log.Warnf("%v", err)
  1086. }
  1087. filepath := path.Join(dir, filename+".yaml")
  1088. err = os.WriteFile(filepath, data, 0600)
  1089. if err != nil {
  1090. log.Warnf("%v", err)
  1091. }
  1092. }
  1093. }
  1094. func saveLogOutput(req *ExecutionRequest, filename string) {
  1095. dir := firstNonEmpty(req.Binding.Action.SaveLogs.OutputDirectory, req.Cfg.SaveLogs.OutputDirectory)
  1096. if dir != "" {
  1097. data := req.logEntry.Output
  1098. filepath := path.Join(dir, filename+".log")
  1099. err := os.WriteFile(filepath, []byte(data), 0600)
  1100. if err != nil {
  1101. log.Warnf("%v", err)
  1102. }
  1103. }
  1104. }