executor.go 30 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138
  1. package executor
  2. import (
  3. acl "github.com/OliveTin/OliveTin/internal/acl"
  4. "github.com/OliveTin/OliveTin/internal/auth"
  5. authpublic "github.com/OliveTin/OliveTin/internal/auth/authpublic"
  6. config "github.com/OliveTin/OliveTin/internal/config"
  7. "github.com/OliveTin/OliveTin/internal/entities"
  8. "github.com/OliveTin/OliveTin/internal/fileupload"
  9. "github.com/OliveTin/OliveTin/internal/tpl"
  10. "github.com/google/uuid"
  11. log "github.com/sirupsen/logrus"
  12. "github.com/prometheus/client_golang/prometheus"
  13. "github.com/prometheus/client_golang/prometheus/promauto"
  14. "gopkg.in/yaml.v3"
  15. "bytes"
  16. "context"
  17. "fmt"
  18. "os"
  19. "os/exec"
  20. "path"
  21. "regexp"
  22. "strings"
  23. "sync"
  24. "time"
  25. )
  26. const (
  27. DefaultExitCodeNotExecuted = -1337
  28. MaxTriggerDepth = 10
  29. )
  30. var validTrackingIDPattern = regexp.MustCompile(`^[a-fA-F0-9\-]+$`)
  31. func isValidTrackingID(id string) bool {
  32. const MaxTrackingIDLength = 36
  33. return id != "" && len(id) <= MaxTrackingIDLength && validTrackingIDPattern.MatchString(id)
  34. }
  35. var (
  36. metricActionsRequested = promauto.NewCounter(prometheus.CounterOpts{
  37. Name: "olivetin_actions_requested_count",
  38. Help: "The actions requested count",
  39. })
  40. )
  41. type ActionBinding struct {
  42. ID string
  43. Action *config.Action
  44. Entity *entities.Entity
  45. ConfigOrder int
  46. IsOnDashboard bool
  47. }
  48. // Executor represents a helper class for executing commands. It's main method
  49. // is ExecRequest
  50. type Executor struct {
  51. logs map[string]*InternalLogEntry
  52. logsTrackingIdsByDate []string
  53. LogsByBindingId map[string][]*InternalLogEntry
  54. logmutex sync.RWMutex
  55. MapActionBindings map[string]*ActionBinding
  56. MapActionBindingsLock sync.RWMutex
  57. Cfg *config.Config
  58. UploadRegistry *fileupload.Registry
  59. listeners []listener
  60. chainOfCommand []executorStepFunc
  61. }
  62. // ExecutionRequest is a request to execute an action. It's passed to an
  63. // Executor. They're created from the api.
  64. type ExecutionRequest struct {
  65. Binding *ActionBinding
  66. Arguments map[string]string
  67. TrackingID string
  68. Tags []string
  69. Cfg *config.Config
  70. AuthenticatedUser *authpublic.AuthenticatedUser
  71. TriggerDepth int
  72. FileArgData map[string]*tpl.FileUpload
  73. UploadTempPaths []string
  74. logEntry *InternalLogEntry
  75. finalParsedCommand string
  76. execArgs []string
  77. useDirectExec bool
  78. executor *Executor
  79. }
  80. // InternalLogEntry objects are created by an Executor, and represent the final
  81. // state of execution (even if the command is not executed). It's designed to be
  82. // easily serializable.
  83. type InternalLogEntry struct {
  84. Binding *ActionBinding
  85. DatetimeStarted time.Time
  86. DatetimeFinished time.Time
  87. Output string
  88. TimedOut bool
  89. Blocked bool
  90. ExitCode int32
  91. Tags []string
  92. ExecutionStarted bool
  93. ExecutionFinished bool
  94. ExecutionTrackingID string
  95. Process *os.Process
  96. Username string
  97. Index int64
  98. EntityPrefix string
  99. ActionConfigTitle string // This is the title of the action as defined in the config, not the final parsed title.
  100. /*
  101. The following 3 properties are obviously on Action normally, but it's useful
  102. that logs are lightweight (so we don't need to have an action associated to
  103. logs, etc. Therefore, we duplicate those values here.
  104. */
  105. ActionTitle string
  106. ActionIcon string
  107. }
  108. // .Binding can be nil, so we need to handle that.
  109. func (e *InternalLogEntry) GetBindingId() string {
  110. if e.Binding == nil {
  111. return ""
  112. }
  113. return e.Binding.ID
  114. }
  115. type executorStepFunc func(*ExecutionRequest) bool
  116. // DefaultExecutor returns an Executor, with a sensible "chain of command" for
  117. // executing actions.
  118. func DefaultExecutor(cfg *config.Config) *Executor {
  119. e := Executor{}
  120. e.Cfg = cfg
  121. e.logs = make(map[string]*InternalLogEntry)
  122. e.logsTrackingIdsByDate = make([]string, 0)
  123. e.LogsByBindingId = make(map[string][]*InternalLogEntry)
  124. e.MapActionBindings = make(map[string]*ActionBinding)
  125. e.chainOfCommand = []executorStepFunc{
  126. stepRequestAction,
  127. stepConcurrencyCheck,
  128. stepRateCheck,
  129. stepACLCheck,
  130. stepParseArgs,
  131. stepLogStart,
  132. stepExec,
  133. stepExecAfter,
  134. stepLogFinish,
  135. stepSaveLog,
  136. stepTrigger,
  137. stepCleanupUploadTemps,
  138. }
  139. reg, err := fileupload.NewRegistry(cfg)
  140. if err != nil {
  141. log.WithError(err).Warn("File uploads are disabled (could not initialize staging directory)")
  142. } else {
  143. e.UploadRegistry = reg
  144. reg.StartPeriodicPrune()
  145. }
  146. return &e
  147. }
  148. type listener interface {
  149. OnExecutionStarted(logEntry *InternalLogEntry)
  150. OnExecutionFinished(logEntry *InternalLogEntry)
  151. OnOutputChunk(o []byte, executionTrackingId string)
  152. OnActionMapRebuilt()
  153. }
  154. func (e *Executor) AddListener(m listener) {
  155. e.listeners = append(e.listeners, m)
  156. }
  157. // getPagingStartIndex calculates the starting index for log pagination.
  158. // Parameters:
  159. //
  160. // startOffset: The offset from the most recent log (0 means start from the most recent)
  161. // totalLogCount: Total number of logs available
  162. // count: Number of logs to retrieve
  163. //
  164. // Returns: The calculated starting index for pagination
  165. func getPagingStartIndex(startOffset int64, totalLogCount int64) int64 {
  166. var startIndex int64
  167. if startOffset <= 0 {
  168. startIndex = totalLogCount
  169. } else {
  170. startIndex = (totalLogCount - startOffset)
  171. if startIndex < 0 {
  172. startIndex = 1
  173. }
  174. }
  175. return startIndex - 1
  176. }
  177. type PagingResult struct {
  178. CountRemaining int64
  179. PageSize int64
  180. TotalCount int64
  181. StartOffset int64
  182. }
  183. func (e *Executor) GetLogTrackingIds(startOffset int64, pageCount int64) ([]*InternalLogEntry, *PagingResult) {
  184. pagingResult := &PagingResult{
  185. CountRemaining: 0,
  186. PageSize: pageCount,
  187. TotalCount: 0,
  188. StartOffset: startOffset,
  189. }
  190. e.logmutex.RLock()
  191. totalLogCount := int64(len(e.logsTrackingIdsByDate))
  192. pagingResult.TotalCount = totalLogCount
  193. startIndex := getPagingStartIndex(startOffset, totalLogCount)
  194. pageCount = min(totalLogCount, pageCount)
  195. endIndex := max(0, (startIndex-pageCount)+1)
  196. log.WithFields(log.Fields{
  197. "startOffset": startOffset,
  198. "pageCount": pageCount,
  199. "total": totalLogCount,
  200. "startIndex": startIndex,
  201. "endIndex": endIndex,
  202. }).Tracef("GetLogTrackingIds")
  203. trackingIds := make([]*InternalLogEntry, 0, pageCount)
  204. if totalLogCount > 0 {
  205. for i := endIndex; i <= startIndex; i++ {
  206. trackingIds = append(trackingIds, e.logs[e.logsTrackingIdsByDate[i]])
  207. }
  208. }
  209. e.logmutex.RUnlock()
  210. pagingResult.CountRemaining = endIndex
  211. return trackingIds, pagingResult
  212. }
  213. func isValidLogEntryForACL(entry *InternalLogEntry) bool {
  214. return entry != nil && entry.Binding != nil && entry.Binding.Action != nil
  215. }
  216. func isLogEntryAllowedByACL(cfg *config.Config, user *authpublic.AuthenticatedUser, entry *InternalLogEntry) bool {
  217. return acl.IsAllowedLogs(cfg, user, entry.Binding.Action)
  218. }
  219. func (e *Executor) filterLogsByACL(cfg *config.Config, user *authpublic.AuthenticatedUser, dateFilter string) []*InternalLogEntry {
  220. e.logmutex.RLock()
  221. defer e.logmutex.RUnlock()
  222. filtered := make([]*InternalLogEntry, 0, len(e.logsTrackingIdsByDate))
  223. filterDate, hasDateFilter := parseDateFilter(dateFilter)
  224. for _, trackingId := range e.logsTrackingIdsByDate {
  225. entry := e.logs[trackingId]
  226. if shouldIncludeLogEntry(cfg, user, entry, filterDate, hasDateFilter) {
  227. filtered = append(filtered, entry)
  228. }
  229. }
  230. return filtered
  231. }
  232. // parseDateFilter parses the date filter string and returns filter information.
  233. func parseDateFilter(dateFilter string) (filterDate time.Time, hasDateFilter bool) {
  234. if dateFilter == "" {
  235. return time.Time{}, false
  236. }
  237. parsedDate, err := time.Parse("2006-01-02", dateFilter)
  238. if err != nil {
  239. log.WithFields(log.Fields{
  240. "dateFilter": dateFilter,
  241. "error": err,
  242. }).Errorf("Failed to parse date filter, expected format YYYY-MM-DD")
  243. return time.Time{}, false
  244. }
  245. return parsedDate, true
  246. }
  247. // shouldIncludeLogEntry determines if a log entry should be included based on ACL and date filter.
  248. func shouldIncludeLogEntry(cfg *config.Config, user *authpublic.AuthenticatedUser, entry *InternalLogEntry, filterDate time.Time, hasDateFilter bool) bool {
  249. if !isValidLogEntryForACL(entry) {
  250. return false
  251. }
  252. if !isLogEntryAllowedByACL(cfg, user, entry) {
  253. return false
  254. }
  255. return matchesDateFilter(entry, filterDate, hasDateFilter)
  256. }
  257. // matchesDateFilter checks if the log entry matches the date filter.
  258. func matchesDateFilter(entry *InternalLogEntry, filterDate time.Time, hasDateFilter bool) bool {
  259. if !hasDateFilter {
  260. return true
  261. }
  262. entryDate := entry.DatetimeStarted.UTC().Truncate(24 * time.Hour)
  263. filterDateUTC := filterDate.UTC().Truncate(24 * time.Hour)
  264. return entryDate.Equal(filterDateUTC)
  265. }
  266. // paginateFilteredLogs applies pagination to a filtered list of logs and returns
  267. // the paginated results along with pagination metadata.
  268. func paginateFilteredLogs(filtered []*InternalLogEntry, startOffset int64, pageCount int64) ([]*InternalLogEntry, *PagingResult) {
  269. total := int64(len(filtered))
  270. paging := &PagingResult{PageSize: pageCount, TotalCount: total, StartOffset: startOffset}
  271. if total == 0 {
  272. paging.CountRemaining = 0
  273. return []*InternalLogEntry{}, paging
  274. }
  275. startIndex := getPagingStartIndex(startOffset, total)
  276. pageCount = min(total, pageCount)
  277. endIndex := max(0, (startIndex-pageCount)+1)
  278. out := make([]*InternalLogEntry, 0, pageCount)
  279. for i := endIndex; i <= startIndex && i < int64(len(filtered)); i++ {
  280. out = append(out, filtered[i])
  281. }
  282. paging.CountRemaining = endIndex
  283. return out, paging
  284. }
  285. // GetLogTrackingIdsACL returns logs filtered by ACL visibility for the user and
  286. // paginated correctly based on the filtered set.
  287. // dateFilter is optional and should be in YYYY-MM-DD format. If empty, no date filtering is applied.
  288. func (e *Executor) GetLogTrackingIdsACL(cfg *config.Config, user *authpublic.AuthenticatedUser, startOffset int64, pageCount int64, dateFilter string) ([]*InternalLogEntry, *PagingResult) {
  289. filtered := e.filterLogsByACL(cfg, user, dateFilter)
  290. return paginateFilteredLogs(filtered, startOffset, pageCount)
  291. }
  292. func (e *Executor) GetLog(trackingID string) (*InternalLogEntry, bool) {
  293. e.logmutex.RLock()
  294. entry, found := e.logs[trackingID]
  295. e.logmutex.RUnlock()
  296. return entry, found
  297. }
  298. func (e *Executor) GetLogsByBindingId(bindingId string) []*InternalLogEntry {
  299. e.logmutex.RLock()
  300. logs, found := e.LogsByBindingId[bindingId]
  301. e.logmutex.RUnlock()
  302. if !found {
  303. return make([]*InternalLogEntry, 0)
  304. }
  305. return logs
  306. }
  307. // shouldCountExecution checks if a log entry should be counted for rate limiting.
  308. func shouldCountExecution(logEntry *InternalLogEntry, windowStart time.Time) bool {
  309. return !logEntry.Blocked && logEntry.DatetimeStarted.After(windowStart)
  310. }
  311. // updateOldestExecution updates the oldest execution time if this entry is older.
  312. func updateOldestExecution(oldestExecutionTime **time.Time, logEntry *InternalLogEntry) {
  313. if *oldestExecutionTime == nil {
  314. *oldestExecutionTime = &logEntry.DatetimeStarted
  315. } else if logEntry.DatetimeStarted.Before(**oldestExecutionTime) {
  316. *oldestExecutionTime = &logEntry.DatetimeStarted
  317. }
  318. }
  319. // findOldestExecutionInWindow finds the oldest execution within the time window and counts executions.
  320. // Returns the count of executions and the oldest execution time, or nil if none found.
  321. func findOldestExecutionInWindow(logs []*InternalLogEntry, windowStart time.Time) (int, *time.Time) {
  322. executions := 0
  323. var oldestExecutionTime *time.Time
  324. for _, logEntry := range logs {
  325. if !shouldCountExecution(logEntry, windowStart) {
  326. continue
  327. }
  328. executions++
  329. updateOldestExecution(&oldestExecutionTime, logEntry)
  330. }
  331. return executions, oldestExecutionTime
  332. }
  333. // calculateExpiryTime calculates when the oldest execution will fall outside the rate limit window.
  334. func calculateExpiryTime(oldestExecutionTime time.Time, duration time.Duration, now time.Time) time.Time {
  335. expiryTime := oldestExecutionTime.Add(duration)
  336. if !expiryTime.After(now) {
  337. return time.Time{}
  338. }
  339. return expiryTime
  340. }
  341. // updateMaxExpiryTime updates maxExpiryTime if expiryTime is later.
  342. func updateMaxExpiryTime(maxExpiryTime *time.Time, expiryTime time.Time) {
  343. if expiryTime.IsZero() {
  344. return
  345. }
  346. if maxExpiryTime.IsZero() || expiryTime.After(*maxExpiryTime) {
  347. *maxExpiryTime = expiryTime
  348. }
  349. }
  350. // calculateExpiryForRate calculates the expiry time for a single rate limit rule.
  351. // Returns the expiry time if the rate limit is exceeded, or zero time if not.
  352. func calculateExpiryForRate(rate config.RateSpec, logs []*InternalLogEntry, now time.Time) time.Time {
  353. duration := parseDuration(rate)
  354. if duration <= 0 {
  355. return time.Time{}
  356. }
  357. windowStart := now.Add(-duration)
  358. executions, oldestExecutionTime := findOldestExecutionInWindow(logs, windowStart)
  359. if executions < rate.Limit || oldestExecutionTime == nil {
  360. return time.Time{}
  361. }
  362. return calculateExpiryTime(*oldestExecutionTime, duration, now)
  363. }
  364. // getLogsForBinding retrieves logs for a binding ID.
  365. func (e *Executor) getLogsForBinding(bindingId string) []*InternalLogEntry {
  366. e.logmutex.RLock()
  367. logs, found := e.LogsByBindingId[bindingId]
  368. e.logmutex.RUnlock()
  369. if !found || len(logs) == 0 {
  370. return nil
  371. }
  372. return logs
  373. }
  374. // calculateMaxExpiryTimeFromRates calculates the maximum expiry time across all rate limit rules.
  375. func calculateMaxExpiryTimeFromRates(rates []config.RateSpec, logs []*InternalLogEntry, now time.Time) time.Time {
  376. var maxExpiryTime time.Time
  377. for _, rate := range rates {
  378. expiryTime := calculateExpiryForRate(rate, logs, now)
  379. updateMaxExpiryTime(&maxExpiryTime, expiryTime)
  380. }
  381. return maxExpiryTime
  382. }
  383. // GetTimeUntilAvailable calculates when an action will be available again based on rate limits.
  384. // Returns the Unix timestamp in seconds when the rate limit expires, or 0 if the action is available now.
  385. func (e *Executor) GetTimeUntilAvailable(binding *ActionBinding) int64 {
  386. if len(binding.Action.MaxRate) == 0 {
  387. return 0
  388. }
  389. logs := e.getLogsForBinding(binding.ID)
  390. if logs == nil {
  391. return 0
  392. }
  393. maxExpiryTime := calculateMaxExpiryTimeFromRates(binding.Action.MaxRate, logs, time.Now())
  394. if maxExpiryTime.IsZero() {
  395. return 0
  396. }
  397. return maxExpiryTime.Unix()
  398. }
  399. func (e *Executor) SetLog(trackingID string, entry *InternalLogEntry) {
  400. e.logmutex.Lock()
  401. entry.Index = int64(len(e.logsTrackingIdsByDate))
  402. e.logs[trackingID] = entry
  403. e.logsTrackingIdsByDate = append(e.logsTrackingIdsByDate, trackingID)
  404. e.logmutex.Unlock()
  405. }
  406. // ExecRequest processes an ExecutionRequest
  407. func (e *Executor) ExecRequest(req *ExecutionRequest) (*sync.WaitGroup, string) {
  408. if req.AuthenticatedUser == nil {
  409. req.AuthenticatedUser = auth.UserGuest(req.Cfg)
  410. }
  411. req.executor = e
  412. req.logEntry = &InternalLogEntry{
  413. Binding: req.Binding,
  414. DatetimeStarted: time.Now(),
  415. ExecutionTrackingID: req.TrackingID,
  416. Output: "",
  417. ExitCode: DefaultExitCodeNotExecuted,
  418. ExecutionStarted: false,
  419. ExecutionFinished: false,
  420. ActionTitle: "notfound",
  421. ActionIcon: "&#x1f4a9;",
  422. Username: req.AuthenticatedUser.Username,
  423. }
  424. _, isDuplicate := e.GetLog(req.TrackingID)
  425. if isDuplicate || !isValidTrackingID(req.TrackingID) {
  426. req.TrackingID = uuid.NewString()
  427. }
  428. // Update the log entry with the final tracking ID
  429. req.logEntry.ExecutionTrackingID = req.TrackingID
  430. log.Tracef("executor.ExecRequest(): %v", req)
  431. e.SetLog(req.TrackingID, req.logEntry)
  432. wg := new(sync.WaitGroup)
  433. wg.Add(1)
  434. go func() {
  435. e.execChain(req)
  436. defer wg.Done()
  437. }()
  438. return wg, req.TrackingID
  439. }
  440. func (e *Executor) execChain(req *ExecutionRequest) {
  441. for _, step := range e.chainOfCommand {
  442. if !step(req) {
  443. break
  444. }
  445. }
  446. // Ensure DatetimeFinished is set even if execution was blocked early
  447. if req.logEntry.DatetimeFinished.IsZero() {
  448. req.logEntry.DatetimeFinished = time.Now()
  449. }
  450. req.logEntry.ExecutionFinished = true
  451. // This isn't a step, because we want to notify all listeners, irrespective
  452. // of how many steps were actually executed.
  453. notifyListenersFinished(req)
  454. }
  455. func getConcurrentCount(req *ExecutionRequest) int {
  456. concurrentCount := 0
  457. req.executor.logmutex.RLock()
  458. for _, log := range req.executor.GetLogsByBindingId(req.Binding.ID) {
  459. if !log.ExecutionFinished {
  460. concurrentCount += 1
  461. }
  462. }
  463. req.executor.logmutex.RUnlock()
  464. return concurrentCount
  465. }
  466. func stepConcurrencyCheck(req *ExecutionRequest) bool {
  467. concurrentCount := getConcurrentCount(req)
  468. // Note that the current execution is counted int the logs, so when checking we +1
  469. if concurrentCount >= (req.Binding.Action.MaxConcurrent + 1) {
  470. log.WithFields(log.Fields{
  471. "actionTitle": req.logEntry.ActionTitle,
  472. "concurrentCount": concurrentCount,
  473. "maxConcurrent": req.Binding.Action.MaxConcurrent,
  474. }).Warnf("Blocked from executing due to concurrency limit")
  475. req.logEntry.Output = "Blocked from executing due to concurrency limit"
  476. req.logEntry.Blocked = true
  477. return false
  478. }
  479. return true
  480. }
  481. func parseDuration(rate config.RateSpec) time.Duration {
  482. duration, err := time.ParseDuration(rate.Duration)
  483. if err != nil {
  484. log.Warnf("Could not parse duration: %v", rate.Duration)
  485. return -1 * time.Minute
  486. }
  487. return duration
  488. }
  489. //gocyclo:ignore
  490. func getExecutionsCount(rate config.RateSpec, req *ExecutionRequest) int {
  491. executions := -1 // Because we will find ourself when checking execution logs
  492. duration := parseDuration(rate)
  493. then := time.Now().Add(-duration)
  494. currentEntityPrefix := ""
  495. if req.Binding != nil && req.Binding.Entity != nil {
  496. currentEntityPrefix = req.Binding.Entity.UniqueKey
  497. }
  498. for _, logEntry := range req.executor.GetLogsByBindingId(req.Binding.ID) {
  499. if logEntry.EntityPrefix != currentEntityPrefix {
  500. continue
  501. }
  502. if logEntry.DatetimeStarted.After(then) && !logEntry.Blocked {
  503. executions += 1
  504. }
  505. }
  506. return executions
  507. }
  508. func stepRateCheck(req *ExecutionRequest) bool {
  509. for _, rate := range req.Binding.Action.MaxRate {
  510. executions := getExecutionsCount(rate, req)
  511. if executions >= rate.Limit {
  512. log.WithFields(log.Fields{
  513. "actionTitle": req.logEntry.ActionTitle,
  514. "executions": executions,
  515. "limit": rate.Limit,
  516. "duration": rate.Duration,
  517. }).Infof("Blocked from executing due to rate limit")
  518. req.logEntry.Output = "Blocked from executing due to rate limit"
  519. req.logEntry.Blocked = true
  520. return false
  521. }
  522. }
  523. return true
  524. }
  525. func stepACLCheck(req *ExecutionRequest) bool {
  526. canExec := acl.IsAllowedExec(req.Cfg, req.AuthenticatedUser, req.Binding.Action)
  527. if !canExec {
  528. req.logEntry.Output = "ACL check failed. Blocked from executing."
  529. req.logEntry.Blocked = true
  530. log.WithFields(log.Fields{
  531. "actionTitle": req.logEntry.ActionTitle,
  532. }).Warnf("ACL check failed. Blocked from executing.")
  533. }
  534. return canExec
  535. }
  536. func stepParseArgs(req *ExecutionRequest) bool {
  537. ensureArgumentMap(req)
  538. injectSystemArgs(req)
  539. if !hasBindingAndAction(req) {
  540. return fail(req, fmt.Errorf("cannot parse arguments: Binding or Action is nil"))
  541. }
  542. filterToDefinedArgumentsOnly(req)
  543. mangleInvalidArgumentValues(req)
  544. if hasExec(req) {
  545. return handleExecBranch(req)
  546. } else {
  547. return handleShellBranch(req)
  548. }
  549. }
  550. func handleExecBranch(req *ExecutionRequest) bool {
  551. args, err := parseActionExec(req)
  552. if err != nil {
  553. return fail(req, err)
  554. }
  555. req.useDirectExec = true
  556. req.execArgs = args
  557. return true
  558. }
  559. func handleShellBranch(req *ExecutionRequest) bool {
  560. if hasWebhookTag(req) {
  561. return fail(req, fmt.Errorf("webhooks cannot use Shell execution; use exec instead. See https://docs.olivetin.app/action_execution/shellvsexec.html"))
  562. }
  563. if err := checkShellArgumentSafety(req.Binding.Action); err != nil {
  564. return fail(req, err)
  565. }
  566. cmd, err := parseActionArguments(req)
  567. if err != nil {
  568. return fail(req, err)
  569. }
  570. req.useDirectExec = false
  571. req.finalParsedCommand = cmd
  572. return true
  573. }
  574. func ensureArgumentMap(req *ExecutionRequest) {
  575. if req.Arguments == nil {
  576. req.Arguments = make(map[string]string)
  577. }
  578. }
  579. func filterToDefinedArgumentsOnly(req *ExecutionRequest) {
  580. definedNames := make(map[string]struct{})
  581. for _, arg := range req.Binding.Action.Arguments {
  582. definedNames[arg.Name] = struct{}{}
  583. }
  584. filtered := make(map[string]string)
  585. for k, v := range req.Arguments {
  586. if keepArgument(k, definedNames) {
  587. filtered[k] = v
  588. }
  589. }
  590. req.Arguments = filtered
  591. }
  592. func keepArgument(name string, definedNames map[string]struct{}) bool {
  593. _, ok := definedNames[name]
  594. return ok || strings.HasPrefix(name, "ot_")
  595. }
  596. func hasWebhookTag(req *ExecutionRequest) bool {
  597. for _, tag := range req.Tags {
  598. if tag == "webhook" {
  599. return true
  600. }
  601. }
  602. return false
  603. }
  604. func injectSystemArgs(req *ExecutionRequest) {
  605. req.Arguments["ot_executionTrackingId"] = req.TrackingID
  606. req.Arguments["ot_username"] = req.AuthenticatedUser.Username
  607. }
  608. func hasBindingAndAction(req *ExecutionRequest) bool {
  609. if req == nil {
  610. return false
  611. }
  612. return !(req.Binding == nil || req.Binding.Action == nil)
  613. }
  614. func hasExec(req *ExecutionRequest) bool {
  615. return len(req.Binding.Action.Exec) > 0
  616. }
  617. func fail(req *ExecutionRequest, err error) bool {
  618. req.logEntry.Output = err.Error()
  619. log.Warn(err.Error())
  620. removeUploadTempFiles(req)
  621. return false
  622. }
  623. func stepCleanupUploadTemps(req *ExecutionRequest) bool {
  624. removeUploadTempFiles(req)
  625. return true
  626. }
  627. func removeUploadTempFiles(req *ExecutionRequest) {
  628. reg := registryForUploadCleanup(req)
  629. if reg == nil {
  630. return
  631. }
  632. for _, p := range req.UploadTempPaths {
  633. reg.DeleteTempFile(p)
  634. }
  635. req.UploadTempPaths = nil
  636. }
  637. func registryForUploadCleanup(req *ExecutionRequest) *fileupload.Registry {
  638. if req == nil || req.executor == nil || req.executor.UploadRegistry == nil {
  639. return nil
  640. }
  641. return req.executor.UploadRegistry
  642. }
  643. func stepRequestAction(req *ExecutionRequest) bool {
  644. metricActionsRequested.Inc()
  645. if !stepRequestActionHasBinding(req) {
  646. return false
  647. }
  648. stepRequestActionPopulateLogEntry(req)
  649. stepRequestActionRegisterLog(req)
  650. log.WithFields(log.Fields{
  651. "actionTitle": req.logEntry.ActionTitle,
  652. "tags": req.Tags,
  653. }).Infof("Action requested")
  654. notifyListenersStarted(req)
  655. return true
  656. }
  657. func stepRequestActionHasBinding(req *ExecutionRequest) bool {
  658. if req.Binding == nil || req.Binding.Action == nil {
  659. log.Warnf("Action request has no binding/action; skipping execution")
  660. return false
  661. }
  662. return true
  663. }
  664. func stepRequestActionPopulateLogEntry(req *ExecutionRequest) {
  665. req.logEntry.Binding = req.Binding
  666. req.logEntry.ActionConfigTitle = req.Binding.Action.Title
  667. req.logEntry.ActionTitle = tpl.ParseTemplateOfActionBeforeExec(req.Binding.Action.Title, req.Binding.Entity)
  668. req.logEntry.ActionIcon = req.Binding.Action.Icon
  669. req.logEntry.Tags = req.Tags
  670. if req.Binding.Entity != nil {
  671. req.logEntry.EntityPrefix = req.Binding.Entity.UniqueKey
  672. }
  673. }
  674. func stepRequestActionRegisterLog(req *ExecutionRequest) {
  675. req.executor.logmutex.Lock()
  676. defer req.executor.logmutex.Unlock()
  677. if _, containsKey := req.executor.LogsByBindingId[req.Binding.ID]; !containsKey {
  678. req.executor.LogsByBindingId[req.Binding.ID] = make([]*InternalLogEntry, 0)
  679. }
  680. req.executor.LogsByBindingId[req.Binding.ID] = append(req.executor.LogsByBindingId[req.Binding.ID], req.logEntry)
  681. }
  682. func stepLogStart(req *ExecutionRequest) bool {
  683. log.WithFields(log.Fields{
  684. "actionTitle": req.logEntry.ActionTitle,
  685. "timeout": req.Binding.Action.Timeout,
  686. }).Infof("Action started")
  687. return true
  688. }
  689. func stepLogFinish(req *ExecutionRequest) bool {
  690. req.logEntry.ExecutionFinished = true
  691. log.WithFields(log.Fields{
  692. "actionTitle": req.logEntry.ActionTitle,
  693. "outputLength": len(req.logEntry.Output),
  694. "timedOut": req.logEntry.TimedOut,
  695. "exit": req.logEntry.ExitCode,
  696. }).Infof("Action finished")
  697. return true
  698. }
  699. func notifyListenersFinished(req *ExecutionRequest) {
  700. for _, listener := range req.executor.listeners {
  701. listener.OnExecutionFinished(req.logEntry)
  702. }
  703. }
  704. func notifyListenersStarted(req *ExecutionRequest) {
  705. for _, listener := range req.executor.listeners {
  706. listener.OnExecutionStarted(req.logEntry)
  707. }
  708. }
  709. func appendErrorToStderr(err error, logEntry *InternalLogEntry) {
  710. if err != nil {
  711. logEntry.Output = err.Error() + "\n\n" + logEntry.Output
  712. }
  713. }
  714. type OutputStreamer struct {
  715. Req *ExecutionRequest
  716. output bytes.Buffer
  717. }
  718. func (ost *OutputStreamer) Write(o []byte) (n int, err error) {
  719. for _, listener := range ost.Req.executor.listeners {
  720. listener.OnOutputChunk(o, ost.Req.TrackingID)
  721. }
  722. return ost.output.Write(o)
  723. }
  724. func (ost *OutputStreamer) String() string {
  725. return ost.output.String()
  726. }
  727. func buildEnv(args map[string]string) []string {
  728. ret := append(os.Environ(), "OLIVETIN=1")
  729. for k, v := range args {
  730. varName := fmt.Sprintf("%v", strings.TrimSpace(strings.ToUpper(k)))
  731. // Skip arguments that might not have a name (eg, confirmation), as this causes weird bugs on Windows.
  732. if varName == "" {
  733. continue
  734. }
  735. ret = append(ret, fmt.Sprintf("%v=%v", varName, v))
  736. }
  737. return ret
  738. }
  739. func stepExec(req *ExecutionRequest) bool {
  740. ctx, cancel := newTimeoutContext(context.Background(), time.Duration(req.Binding.Action.Timeout)*time.Second, req.executor)
  741. defer cancel()
  742. streamer := &OutputStreamer{Req: req}
  743. cmd := buildCommand(ctx, req)
  744. if cmd == nil {
  745. req.logEntry.Output = "Cannot execute: no command arguments provided"
  746. log.Warn("Cannot execute: no command arguments provided")
  747. return false
  748. }
  749. prepareCommand(cmd, streamer, req)
  750. runerr := cmd.Start()
  751. req.logEntry.Process = cmd.Process
  752. ctx.setProcess(cmd.Process)
  753. waiterr := cmd.Wait()
  754. req.logEntry.ExitCode = int32(cmd.ProcessState.ExitCode())
  755. req.logEntry.Output = streamer.String()
  756. appendErrorToStderr(runerr, req.logEntry)
  757. appendErrorToStderr(waiterr, req.logEntry)
  758. if ctx.Err() == context.DeadlineExceeded {
  759. log.WithFields(log.Fields{
  760. "actionTitle": req.logEntry.ActionTitle,
  761. }).Warnf("Action timed out")
  762. req.logEntry.TimedOut = true
  763. req.logEntry.Output += "OliveTin::timeout - this action timed out after " + fmt.Sprintf("%v", req.Binding.Action.Timeout) + " seconds. If you need more time for this action, set a longer timeout. See https://docs.olivetin.app/action_customization/timeouts.html for more help."
  764. }
  765. req.logEntry.DatetimeFinished = time.Now()
  766. return true
  767. }
  768. func buildCommand(ctx context.Context, req *ExecutionRequest) *exec.Cmd {
  769. if req.useDirectExec {
  770. return wrapCommandDirect(ctx, req.execArgs)
  771. }
  772. return wrapCommandInShell(ctx, req.finalParsedCommand)
  773. }
  774. func prepareCommand(cmd *exec.Cmd, streamer *OutputStreamer, req *ExecutionRequest) {
  775. cmd.Stdout = streamer
  776. cmd.Stderr = streamer
  777. cmd.Env = buildEnv(req.Arguments)
  778. req.logEntry.ExecutionStarted = true
  779. }
  780. func stepExecAfter(req *ExecutionRequest) bool {
  781. if req.Binding.Action.ShellAfterCompleted == "" {
  782. return true
  783. }
  784. ctx, cancel := newTimeoutContext(context.Background(), time.Duration(req.Binding.Action.Timeout)*time.Second, req.executor)
  785. defer cancel()
  786. var stdout bytes.Buffer
  787. var stderr bytes.Buffer
  788. merged := buildTemplateArgumentMap(req)
  789. merged["output"] = req.logEntry.Output
  790. merged["exitCode"] = fmt.Sprintf("%v", req.logEntry.ExitCode)
  791. merged["ot_executionTrackingId"] = req.TrackingID
  792. merged["ot_username"] = req.AuthenticatedUser.Username
  793. finalParsedCommand, err := tpl.ParseTemplateWithActionContext(req.Binding.Action.ShellAfterCompleted, req.Binding.Entity, merged)
  794. if err != nil {
  795. msg := "Could not prepare shellAfterCompleted command: " + err.Error() + "\n"
  796. req.logEntry.Output += msg
  797. log.Warn(msg)
  798. return true
  799. }
  800. cmd := wrapCommandInShell(ctx, finalParsedCommand)
  801. cmd.Stdout = &stdout
  802. cmd.Stderr = &stderr
  803. cmd.Env = buildEnv(req.Arguments)
  804. runerr := cmd.Start()
  805. ctx.setProcess(cmd.Process)
  806. waiterr := cmd.Wait()
  807. req.logEntry.Output += "\n"
  808. req.logEntry.Output += "OliveTin::shellAfterCompleted stdout\n"
  809. req.logEntry.Output += stdout.String()
  810. req.logEntry.Output += "OliveTin::shellAfterCompleted stderr\n"
  811. req.logEntry.Output += stderr.String()
  812. req.logEntry.Output += "OliveTin::shellAfterCompleted errors and summary\n"
  813. appendErrorToStderr(runerr, req.logEntry)
  814. appendErrorToStderr(waiterr, req.logEntry)
  815. if ctx.Err() == context.DeadlineExceeded {
  816. req.logEntry.Output += "Your shellAfterCompleted command timed out."
  817. }
  818. req.logEntry.Output += fmt.Sprintf("Your shellAfterCompleted exited with code %v\n", cmd.ProcessState.ExitCode())
  819. req.logEntry.Output += "OliveTin::shellAfterCompleted output complete\n"
  820. return true
  821. }
  822. //gocyclo:ignore
  823. func stepTrigger(req *ExecutionRequest) bool {
  824. if req.Binding.Action.Triggers == nil {
  825. return true
  826. }
  827. if req.TriggerDepth >= MaxTriggerDepth {
  828. log.WithFields(log.Fields{
  829. "actionTitle": req.logEntry.ActionTitle,
  830. "depth": req.TriggerDepth,
  831. }).Warnf("Trigger action reached maximum depth of %v. Not triggering further actions.", MaxTriggerDepth)
  832. req.logEntry.Output += fmt.Sprintf("OliveTin::trigger - this action reached maximum trigger depth of %v. Not triggering further actions.", MaxTriggerDepth)
  833. return true
  834. }
  835. if len(req.Tags) > 0 && req.Tags[0] == "trigger" {
  836. log.Warnf("Trigger action is triggering another trigger action. This is allowed, but be careful not to create trigger loops.")
  837. }
  838. triggerLoop(req)
  839. return true
  840. }
  841. func triggerLoop(req *ExecutionRequest) {
  842. for _, triggerTitle := range req.Binding.Action.Triggers {
  843. binding := req.executor.findBindingByActionTitle(triggerTitle, "")
  844. if binding == nil {
  845. log.WithFields(log.Fields{
  846. "triggerTitle": triggerTitle,
  847. "fromAction": req.logEntry.ActionTitle,
  848. }).Warnf("Trigger references unknown action title; skipping")
  849. continue
  850. }
  851. trigger := &ExecutionRequest{
  852. Binding: binding,
  853. TrackingID: uuid.NewString(),
  854. Tags: []string{"trigger"},
  855. AuthenticatedUser: req.AuthenticatedUser,
  856. Arguments: triggerArgumentsWithoutUploads(req),
  857. Cfg: req.Cfg,
  858. TriggerDepth: req.TriggerDepth + 1,
  859. }
  860. req.executor.ExecRequest(trigger)
  861. }
  862. }
  863. func stepSaveLog(req *ExecutionRequest) bool {
  864. filename := fmt.Sprintf("%v.%v.%v", req.logEntry.ActionTitle, req.logEntry.DatetimeStarted.Unix(), req.logEntry.ExecutionTrackingID)
  865. saveLogResults(req, filename)
  866. saveLogOutput(req, filename)
  867. return true
  868. }
  869. func firstNonEmpty(one, two string) string {
  870. if one != "" {
  871. return one
  872. }
  873. return two
  874. }
  875. func saveLogResults(req *ExecutionRequest, filename string) {
  876. dir := firstNonEmpty(req.Binding.Action.SaveLogs.ResultsDirectory, req.Cfg.SaveLogs.ResultsDirectory)
  877. if dir != "" {
  878. data, err := yaml.Marshal(req.logEntry)
  879. if err != nil {
  880. log.Warnf("%v", err)
  881. }
  882. filepath := path.Join(dir, filename+".yaml")
  883. err = os.WriteFile(filepath, data, 0600)
  884. if err != nil {
  885. log.Warnf("%v", err)
  886. }
  887. }
  888. }
  889. func saveLogOutput(req *ExecutionRequest, filename string) {
  890. dir := firstNonEmpty(req.Binding.Action.SaveLogs.OutputDirectory, req.Cfg.SaveLogs.OutputDirectory)
  891. if dir != "" {
  892. data := req.logEntry.Output
  893. filepath := path.Join(dir, filename+".log")
  894. err := os.WriteFile(filepath, []byte(data), 0600)
  895. if err != nil {
  896. log.Warnf("%v", err)
  897. }
  898. }
  899. }