executor.go 33 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238
  1. package executor
  2. import (
  3. acl "github.com/OliveTin/OliveTin/internal/acl"
  4. "github.com/OliveTin/OliveTin/internal/auth"
  5. authpublic "github.com/OliveTin/OliveTin/internal/auth/authpublic"
  6. config "github.com/OliveTin/OliveTin/internal/config"
  7. "github.com/OliveTin/OliveTin/internal/entities"
  8. "github.com/OliveTin/OliveTin/internal/tpl"
  9. "github.com/google/uuid"
  10. log "github.com/sirupsen/logrus"
  11. "github.com/prometheus/client_golang/prometheus"
  12. "github.com/prometheus/client_golang/prometheus/promauto"
  13. "gopkg.in/yaml.v3"
  14. "bytes"
  15. "context"
  16. "encoding/json"
  17. "fmt"
  18. "io"
  19. "maps"
  20. "os"
  21. "os/exec"
  22. "path"
  23. "strings"
  24. "sync"
  25. "time"
  26. )
  27. const (
  28. DefaultExitCodeNotExecuted = -1337
  29. MaxTriggerDepth = 10
  30. )
  31. var (
  32. metricActionsRequested = promauto.NewCounter(prometheus.CounterOpts{
  33. Name: "olivetin_actions_requested_count",
  34. Help: "The actions requested count",
  35. })
  36. )
  37. type ActionBinding struct {
  38. ID string
  39. Action *config.Action
  40. Entity *entities.Entity
  41. ConfigOrder int
  42. IsOnDashboard bool
  43. }
  44. // Executor represents a helper class for executing commands. It's main method
  45. // is ExecRequest
  46. type Executor struct {
  47. logs map[string]*InternalLogEntry
  48. logsTrackingIdsByDate []string
  49. LogsByBindingId map[string][]*InternalLogEntry
  50. logmutex sync.RWMutex
  51. MapActionBindings map[string]*ActionBinding
  52. MapActionBindingsLock sync.RWMutex
  53. Cfg *config.Config
  54. listeners []listener
  55. chainOfCommand []executorStepFunc
  56. }
  57. // ExecutionRequest is a request to execute an action. It's passed to an
  58. // Executor. They're created from the api.
  59. type ExecutionRequest struct {
  60. Binding *ActionBinding
  61. Arguments map[string]string
  62. TrackingID string
  63. Tags []string
  64. Cfg *config.Config
  65. AuthenticatedUser *authpublic.AuthenticatedUser
  66. TriggerDepth int
  67. logEntry *InternalLogEntry
  68. finalParsedCommand string
  69. execArgs []string
  70. useDirectExec bool
  71. useExecTool bool
  72. execToolName string
  73. execToolConfig []byte
  74. executor *Executor
  75. }
  76. // InternalLogEntry objects are created by an Executor, and represent the final
  77. // state of execution (even if the command is not executed). It's designed to be
  78. // easily serializable.
  79. type InternalLogEntry struct {
  80. Binding *ActionBinding
  81. DatetimeStarted time.Time
  82. DatetimeFinished time.Time
  83. Output string
  84. TimedOut bool
  85. Blocked bool
  86. ExitCode int32
  87. Tags []string
  88. ExecutionStarted bool
  89. ExecutionFinished bool
  90. ExecutionTrackingID string
  91. Process *os.Process
  92. Username string
  93. Index int64
  94. EntityPrefix string
  95. ActionConfigTitle string // This is the title of the action as defined in the config, not the final parsed title.
  96. /*
  97. The following 3 properties are obviously on Action normally, but it's useful
  98. that logs are lightweight (so we don't need to have an action associated to
  99. logs, etc. Therefore, we duplicate those values here.
  100. */
  101. ActionTitle string
  102. ActionIcon string
  103. Attributes map[string]string
  104. }
  105. // .Binding can be nil, so we need to handle that.
  106. func (e *InternalLogEntry) GetBindingId() string {
  107. if e.Binding == nil {
  108. return ""
  109. }
  110. return e.Binding.ID
  111. }
  112. type executorStepFunc func(*ExecutionRequest) bool
  113. // DefaultExecutor returns an Executor, with a sensible "chain of command" for
  114. // executing actions.
  115. func DefaultExecutor(cfg *config.Config) *Executor {
  116. e := Executor{}
  117. e.Cfg = cfg
  118. e.logs = make(map[string]*InternalLogEntry)
  119. e.logsTrackingIdsByDate = make([]string, 0)
  120. e.LogsByBindingId = make(map[string][]*InternalLogEntry)
  121. e.MapActionBindings = make(map[string]*ActionBinding)
  122. e.chainOfCommand = []executorStepFunc{
  123. stepRequestAction,
  124. stepConcurrencyCheck,
  125. stepRateCheck,
  126. stepACLCheck,
  127. stepParseArgs,
  128. stepLogStart,
  129. stepExec,
  130. stepExecAfter,
  131. stepLogFinish,
  132. stepSaveLog,
  133. stepTrigger,
  134. }
  135. return &e
  136. }
  137. type listener interface {
  138. OnExecutionStarted(logEntry *InternalLogEntry)
  139. OnExecutionFinished(logEntry *InternalLogEntry)
  140. OnOutputChunk(o []byte, executionTrackingId string)
  141. OnActionMapRebuilt()
  142. }
  143. func (e *Executor) AddListener(m listener) {
  144. e.listeners = append(e.listeners, m)
  145. }
  146. // getPagingStartIndex calculates the starting index for log pagination.
  147. // Parameters:
  148. //
  149. // startOffset: The offset from the most recent log (0 means start from the most recent)
  150. // totalLogCount: Total number of logs available
  151. // count: Number of logs to retrieve
  152. //
  153. // Returns: The calculated starting index for pagination
  154. func getPagingStartIndex(startOffset int64, totalLogCount int64) int64 {
  155. var startIndex int64
  156. if startOffset <= 0 {
  157. startIndex = totalLogCount
  158. } else {
  159. startIndex = (totalLogCount - startOffset)
  160. if startIndex < 0 {
  161. startIndex = 1
  162. }
  163. }
  164. return startIndex - 1
  165. }
  166. type PagingResult struct {
  167. CountRemaining int64
  168. PageSize int64
  169. TotalCount int64
  170. StartOffset int64
  171. }
  172. func (e *Executor) GetLogTrackingIds(startOffset int64, pageCount int64) ([]*InternalLogEntry, *PagingResult) {
  173. pagingResult := &PagingResult{
  174. CountRemaining: 0,
  175. PageSize: pageCount,
  176. TotalCount: 0,
  177. StartOffset: startOffset,
  178. }
  179. e.logmutex.RLock()
  180. totalLogCount := int64(len(e.logsTrackingIdsByDate))
  181. pagingResult.TotalCount = totalLogCount
  182. startIndex := getPagingStartIndex(startOffset, totalLogCount)
  183. pageCount = min(totalLogCount, pageCount)
  184. endIndex := max(0, (startIndex-pageCount)+1)
  185. log.WithFields(log.Fields{
  186. "startOffset": startOffset,
  187. "pageCount": pageCount,
  188. "total": totalLogCount,
  189. "startIndex": startIndex,
  190. "endIndex": endIndex,
  191. }).Tracef("GetLogTrackingIds")
  192. trackingIds := make([]*InternalLogEntry, 0, pageCount)
  193. if totalLogCount > 0 {
  194. for i := endIndex; i <= startIndex; i++ {
  195. trackingIds = append(trackingIds, e.logs[e.logsTrackingIdsByDate[i]])
  196. }
  197. }
  198. e.logmutex.RUnlock()
  199. pagingResult.CountRemaining = endIndex
  200. return trackingIds, pagingResult
  201. }
  202. func isValidLogEntryForACL(entry *InternalLogEntry) bool {
  203. return entry != nil && entry.Binding != nil && entry.Binding.Action != nil
  204. }
  205. func isLogEntryAllowedByACL(cfg *config.Config, user *authpublic.AuthenticatedUser, entry *InternalLogEntry) bool {
  206. return acl.IsAllowedLogs(cfg, user, entry.Binding.Action)
  207. }
  208. func (e *Executor) filterLogsByACL(cfg *config.Config, user *authpublic.AuthenticatedUser, dateFilter string) []*InternalLogEntry {
  209. e.logmutex.RLock()
  210. defer e.logmutex.RUnlock()
  211. filtered := make([]*InternalLogEntry, 0, len(e.logsTrackingIdsByDate))
  212. filterDate, hasDateFilter := parseDateFilter(dateFilter)
  213. for _, trackingId := range e.logsTrackingIdsByDate {
  214. entry := e.logs[trackingId]
  215. if shouldIncludeLogEntry(cfg, user, entry, filterDate, hasDateFilter) {
  216. filtered = append(filtered, entry)
  217. }
  218. }
  219. return filtered
  220. }
  221. // parseDateFilter parses the date filter string and returns filter information.
  222. func parseDateFilter(dateFilter string) (filterDate time.Time, hasDateFilter bool) {
  223. if dateFilter == "" {
  224. return time.Time{}, false
  225. }
  226. parsedDate, err := time.Parse("2006-01-02", dateFilter)
  227. if err != nil {
  228. log.WithFields(log.Fields{
  229. "dateFilter": dateFilter,
  230. "error": err,
  231. }).Errorf("Failed to parse date filter, expected format YYYY-MM-DD")
  232. return time.Time{}, false
  233. }
  234. return parsedDate, true
  235. }
  236. // shouldIncludeLogEntry determines if a log entry should be included based on ACL and date filter.
  237. func shouldIncludeLogEntry(cfg *config.Config, user *authpublic.AuthenticatedUser, entry *InternalLogEntry, filterDate time.Time, hasDateFilter bool) bool {
  238. if !isValidLogEntryForACL(entry) {
  239. return false
  240. }
  241. if !isLogEntryAllowedByACL(cfg, user, entry) {
  242. return false
  243. }
  244. return matchesDateFilter(entry, filterDate, hasDateFilter)
  245. }
  246. // matchesDateFilter checks if the log entry matches the date filter.
  247. func matchesDateFilter(entry *InternalLogEntry, filterDate time.Time, hasDateFilter bool) bool {
  248. if !hasDateFilter {
  249. return true
  250. }
  251. entryDate := entry.DatetimeStarted.UTC().Truncate(24 * time.Hour)
  252. filterDateUTC := filterDate.UTC().Truncate(24 * time.Hour)
  253. return entryDate.Equal(filterDateUTC)
  254. }
  255. // paginateFilteredLogs applies pagination to a filtered list of logs and returns
  256. // the paginated results along with pagination metadata.
  257. func paginateFilteredLogs(filtered []*InternalLogEntry, startOffset int64, pageCount int64) ([]*InternalLogEntry, *PagingResult) {
  258. total := int64(len(filtered))
  259. paging := &PagingResult{PageSize: pageCount, TotalCount: total, StartOffset: startOffset}
  260. if total == 0 {
  261. paging.CountRemaining = 0
  262. return []*InternalLogEntry{}, paging
  263. }
  264. startIndex := getPagingStartIndex(startOffset, total)
  265. pageCount = min(total, pageCount)
  266. endIndex := max(0, (startIndex-pageCount)+1)
  267. out := make([]*InternalLogEntry, 0, pageCount)
  268. for i := endIndex; i <= startIndex && i < int64(len(filtered)); i++ {
  269. out = append(out, filtered[i])
  270. }
  271. paging.CountRemaining = endIndex
  272. return out, paging
  273. }
  274. // GetLogTrackingIdsACL returns logs filtered by ACL visibility for the user and
  275. // paginated correctly based on the filtered set.
  276. // dateFilter is optional and should be in YYYY-MM-DD format. If empty, no date filtering is applied.
  277. func (e *Executor) GetLogTrackingIdsACL(cfg *config.Config, user *authpublic.AuthenticatedUser, startOffset int64, pageCount int64, dateFilter string) ([]*InternalLogEntry, *PagingResult) {
  278. filtered := e.filterLogsByACL(cfg, user, dateFilter)
  279. return paginateFilteredLogs(filtered, startOffset, pageCount)
  280. }
  281. func (e *Executor) GetLog(trackingID string) (*InternalLogEntry, bool) {
  282. e.logmutex.RLock()
  283. entry, found := e.logs[trackingID]
  284. e.logmutex.RUnlock()
  285. return entry, found
  286. }
  287. func (e *Executor) GetLogsByBindingId(bindingId string) []*InternalLogEntry {
  288. e.logmutex.RLock()
  289. logs, found := e.LogsByBindingId[bindingId]
  290. e.logmutex.RUnlock()
  291. if !found {
  292. return make([]*InternalLogEntry, 0)
  293. }
  294. return logs
  295. }
  296. // shouldCountExecution checks if a log entry should be counted for rate limiting.
  297. func shouldCountExecution(logEntry *InternalLogEntry, windowStart time.Time) bool {
  298. return !logEntry.Blocked && logEntry.DatetimeStarted.After(windowStart)
  299. }
  300. // updateOldestExecution updates the oldest execution time if this entry is older.
  301. func updateOldestExecution(oldestExecutionTime **time.Time, logEntry *InternalLogEntry) {
  302. if *oldestExecutionTime == nil {
  303. *oldestExecutionTime = &logEntry.DatetimeStarted
  304. } else if logEntry.DatetimeStarted.Before(**oldestExecutionTime) {
  305. *oldestExecutionTime = &logEntry.DatetimeStarted
  306. }
  307. }
  308. // findOldestExecutionInWindow finds the oldest execution within the time window and counts executions.
  309. // Returns the count of executions and the oldest execution time, or nil if none found.
  310. func findOldestExecutionInWindow(logs []*InternalLogEntry, windowStart time.Time) (int, *time.Time) {
  311. executions := 0
  312. var oldestExecutionTime *time.Time
  313. for _, logEntry := range logs {
  314. if !shouldCountExecution(logEntry, windowStart) {
  315. continue
  316. }
  317. executions++
  318. updateOldestExecution(&oldestExecutionTime, logEntry)
  319. }
  320. return executions, oldestExecutionTime
  321. }
  322. // calculateExpiryTime calculates when the oldest execution will fall outside the rate limit window.
  323. func calculateExpiryTime(oldestExecutionTime time.Time, duration time.Duration, now time.Time) time.Time {
  324. expiryTime := oldestExecutionTime.Add(duration)
  325. if !expiryTime.After(now) {
  326. return time.Time{}
  327. }
  328. return expiryTime
  329. }
  330. // updateMaxExpiryTime updates maxExpiryTime if expiryTime is later.
  331. func updateMaxExpiryTime(maxExpiryTime *time.Time, expiryTime time.Time) {
  332. if expiryTime.IsZero() {
  333. return
  334. }
  335. if maxExpiryTime.IsZero() || expiryTime.After(*maxExpiryTime) {
  336. *maxExpiryTime = expiryTime
  337. }
  338. }
  339. // calculateExpiryForRate calculates the expiry time for a single rate limit rule.
  340. // Returns the expiry time if the rate limit is exceeded, or zero time if not.
  341. func calculateExpiryForRate(rate config.RateSpec, logs []*InternalLogEntry, now time.Time) time.Time {
  342. duration := parseDuration(rate)
  343. if duration <= 0 {
  344. return time.Time{}
  345. }
  346. windowStart := now.Add(-duration)
  347. executions, oldestExecutionTime := findOldestExecutionInWindow(logs, windowStart)
  348. if executions < rate.Limit || oldestExecutionTime == nil {
  349. return time.Time{}
  350. }
  351. return calculateExpiryTime(*oldestExecutionTime, duration, now)
  352. }
  353. // getLogsForBinding retrieves logs for a binding ID.
  354. func (e *Executor) getLogsForBinding(bindingId string) []*InternalLogEntry {
  355. e.logmutex.RLock()
  356. logs, found := e.LogsByBindingId[bindingId]
  357. e.logmutex.RUnlock()
  358. if !found || len(logs) == 0 {
  359. return nil
  360. }
  361. return logs
  362. }
  363. // calculateMaxExpiryTimeFromRates calculates the maximum expiry time across all rate limit rules.
  364. func calculateMaxExpiryTimeFromRates(rates []config.RateSpec, logs []*InternalLogEntry, now time.Time) time.Time {
  365. var maxExpiryTime time.Time
  366. for _, rate := range rates {
  367. expiryTime := calculateExpiryForRate(rate, logs, now)
  368. updateMaxExpiryTime(&maxExpiryTime, expiryTime)
  369. }
  370. return maxExpiryTime
  371. }
  372. // GetTimeUntilAvailable calculates when an action will be available again based on rate limits.
  373. // Returns the Unix timestamp in seconds when the rate limit expires, or 0 if the action is available now.
  374. func (e *Executor) GetTimeUntilAvailable(binding *ActionBinding) int64 {
  375. if len(binding.Action.MaxRate) == 0 {
  376. return 0
  377. }
  378. logs := e.getLogsForBinding(binding.ID)
  379. if logs == nil {
  380. return 0
  381. }
  382. maxExpiryTime := calculateMaxExpiryTimeFromRates(binding.Action.MaxRate, logs, time.Now())
  383. if maxExpiryTime.IsZero() {
  384. return 0
  385. }
  386. return maxExpiryTime.Unix()
  387. }
  388. func (e *Executor) SetLog(trackingID string, entry *InternalLogEntry) {
  389. e.logmutex.Lock()
  390. entry.Index = int64(len(e.logsTrackingIdsByDate))
  391. e.logs[trackingID] = entry
  392. e.logsTrackingIdsByDate = append(e.logsTrackingIdsByDate, trackingID)
  393. e.logmutex.Unlock()
  394. }
  395. // ExecRequest processes an ExecutionRequest
  396. func (e *Executor) ExecRequest(req *ExecutionRequest) (*sync.WaitGroup, string) {
  397. if req.AuthenticatedUser == nil {
  398. req.AuthenticatedUser = auth.UserGuest(req.Cfg)
  399. }
  400. req.executor = e
  401. req.logEntry = &InternalLogEntry{
  402. Binding: req.Binding,
  403. DatetimeStarted: time.Now(),
  404. ExecutionTrackingID: req.TrackingID,
  405. Output: "",
  406. ExitCode: DefaultExitCodeNotExecuted,
  407. ExecutionStarted: false,
  408. ExecutionFinished: false,
  409. ActionTitle: "notfound",
  410. ActionIcon: "&#x1f4a9;",
  411. Username: req.AuthenticatedUser.Username,
  412. }
  413. _, isDuplicate := e.GetLog(req.TrackingID)
  414. if isDuplicate || req.TrackingID == "" {
  415. req.TrackingID = uuid.NewString()
  416. }
  417. // Update the log entry with the final tracking ID
  418. req.logEntry.ExecutionTrackingID = req.TrackingID
  419. log.Tracef("executor.ExecRequest(): %v", req)
  420. e.SetLog(req.TrackingID, req.logEntry)
  421. wg := new(sync.WaitGroup)
  422. wg.Add(1)
  423. go func() {
  424. e.execChain(req)
  425. defer wg.Done()
  426. }()
  427. return wg, req.TrackingID
  428. }
  429. func (e *Executor) execChain(req *ExecutionRequest) {
  430. for _, step := range e.chainOfCommand {
  431. if !step(req) {
  432. break
  433. }
  434. }
  435. // Ensure DatetimeFinished is set even if execution was blocked early
  436. if req.logEntry.DatetimeFinished.IsZero() {
  437. req.logEntry.DatetimeFinished = time.Now()
  438. }
  439. req.logEntry.ExecutionFinished = true
  440. // This isn't a step, because we want to notify all listeners, irrespective
  441. // of how many steps were actually executed.
  442. notifyListenersFinished(req)
  443. }
  444. func getConcurrentCount(req *ExecutionRequest) int {
  445. concurrentCount := 0
  446. req.executor.logmutex.RLock()
  447. for _, log := range req.executor.GetLogsByBindingId(req.Binding.ID) {
  448. if !log.ExecutionFinished {
  449. concurrentCount += 1
  450. }
  451. }
  452. req.executor.logmutex.RUnlock()
  453. return concurrentCount
  454. }
  455. func stepConcurrencyCheck(req *ExecutionRequest) bool {
  456. concurrentCount := getConcurrentCount(req)
  457. // Note that the current execution is counted int the logs, so when checking we +1
  458. if concurrentCount >= (req.Binding.Action.MaxConcurrent + 1) {
  459. log.WithFields(log.Fields{
  460. "actionTitle": req.logEntry.ActionTitle,
  461. "concurrentCount": concurrentCount,
  462. "maxConcurrent": req.Binding.Action.MaxConcurrent,
  463. }).Warnf("Blocked from executing due to concurrency limit")
  464. req.logEntry.Output = "Blocked from executing due to concurrency limit"
  465. req.logEntry.Blocked = true
  466. return false
  467. }
  468. return true
  469. }
  470. func parseDuration(rate config.RateSpec) time.Duration {
  471. duration, err := time.ParseDuration(rate.Duration)
  472. if err != nil {
  473. log.Warnf("Could not parse duration: %v", rate.Duration)
  474. return -1 * time.Minute
  475. }
  476. return duration
  477. }
  478. //gocyclo:ignore
  479. func getExecutionsCount(rate config.RateSpec, req *ExecutionRequest) int {
  480. executions := -1 // Because we will find ourself when checking execution logs
  481. duration := parseDuration(rate)
  482. then := time.Now().Add(-duration)
  483. for _, logEntry := range req.executor.GetLogsByBindingId(req.Binding.ID) {
  484. // FIXME
  485. /*
  486. if logEntry.EntityPrefix != req.EntityPrefix {
  487. continue
  488. }
  489. */
  490. if logEntry.DatetimeStarted.After(then) && !logEntry.Blocked {
  491. executions += 1
  492. }
  493. }
  494. return executions
  495. }
  496. func stepRateCheck(req *ExecutionRequest) bool {
  497. for _, rate := range req.Binding.Action.MaxRate {
  498. executions := getExecutionsCount(rate, req)
  499. if executions >= rate.Limit {
  500. log.WithFields(log.Fields{
  501. "actionTitle": req.logEntry.ActionTitle,
  502. "executions": executions,
  503. "limit": rate.Limit,
  504. "duration": rate.Duration,
  505. }).Infof("Blocked from executing due to rate limit")
  506. req.logEntry.Output = "Blocked from executing due to rate limit"
  507. req.logEntry.Blocked = true
  508. return false
  509. }
  510. }
  511. return true
  512. }
  513. func stepACLCheck(req *ExecutionRequest) bool {
  514. canExec := acl.IsAllowedExec(req.Cfg, req.AuthenticatedUser, req.Binding.Action)
  515. if !canExec {
  516. req.logEntry.Output = "ACL check failed. Blocked from executing."
  517. req.logEntry.Blocked = true
  518. log.WithFields(log.Fields{
  519. "actionTitle": req.logEntry.ActionTitle,
  520. }).Warnf("ACL check failed. Blocked from executing.")
  521. }
  522. return canExec
  523. }
  524. func stepParseArgs(req *ExecutionRequest) bool {
  525. ensureArgumentMap(req)
  526. injectSystemArgs(req)
  527. if !hasBindingAndAction(req) {
  528. return fail(req, fmt.Errorf("cannot parse arguments: Binding or Action is nil"))
  529. }
  530. mangleInvalidArgumentValues(req)
  531. if hasExec(req) {
  532. return handleExecBranch(req)
  533. }
  534. if hasExecTool(req) {
  535. return handleExecToolBranch(req)
  536. }
  537. return handleShellBranch(req)
  538. }
  539. func handleExecBranch(req *ExecutionRequest) bool {
  540. args, err := parseActionExec(req.Arguments, req.Binding.Action, req.Binding.Entity)
  541. if err != nil {
  542. return fail(req, err)
  543. }
  544. req.useDirectExec = true
  545. req.execArgs = args
  546. return true
  547. }
  548. func handleShellBranch(req *ExecutionRequest) bool {
  549. if err := checkShellArgumentSafety(req.Binding.Action); err != nil {
  550. return fail(req, err)
  551. }
  552. cmd, err := parseActionArguments(req)
  553. if err != nil {
  554. return fail(req, err)
  555. }
  556. req.useDirectExec = false
  557. req.finalParsedCommand = cmd
  558. return true
  559. }
  560. func ensureArgumentMap(req *ExecutionRequest) {
  561. if req.Arguments == nil {
  562. req.Arguments = make(map[string]string)
  563. }
  564. }
  565. func injectSystemArgs(req *ExecutionRequest) {
  566. req.Arguments["ot_executionTrackingId"] = req.TrackingID
  567. req.Arguments["ot_username"] = req.AuthenticatedUser.Username
  568. }
  569. func hasBindingAndAction(req *ExecutionRequest) bool {
  570. return !(req.Binding == nil || req.Binding.Action == nil)
  571. }
  572. func hasExec(req *ExecutionRequest) bool {
  573. return len(req.Binding.Action.Exec) > 0
  574. }
  575. func hasExecTool(req *ExecutionRequest) bool {
  576. return req.Binding.Action.ExecTool != nil
  577. }
  578. func handleExecToolBranch(req *ExecutionRequest) bool {
  579. if err := setupExecToolFields(req); err != nil {
  580. return fail(req, err)
  581. }
  582. return true
  583. }
  584. func setupExecToolFields(req *ExecutionRequest) error {
  585. cfg, err := validatedExecToolConfig(req)
  586. if err != nil {
  587. return err
  588. }
  589. applied, err := tpl.ApplyTemplatesToExecToolConfig(cfg, req.Binding.Entity, req.Arguments)
  590. if err != nil {
  591. return err
  592. }
  593. configJSON, err := json.Marshal(applied)
  594. if err != nil {
  595. return err
  596. }
  597. assignExecToolRequest(req, configJSON)
  598. return nil
  599. }
  600. func validatedExecToolConfig(req *ExecutionRequest) (map[string]any, error) {
  601. if err := validateArguments(req.Arguments, req.Binding.Action); err != nil {
  602. return nil, err
  603. }
  604. cfg := req.Binding.Action.ExecTool.Config
  605. if cfg == nil {
  606. return nil, fmt.Errorf("execTool config is nil")
  607. }
  608. return cfg, nil
  609. }
  610. func assignExecToolRequest(req *ExecutionRequest, configJSON []byte) {
  611. req.useExecTool = true
  612. req.execToolName = req.Binding.Action.ExecTool.Name
  613. req.execToolConfig = configJSON
  614. }
  615. func fail(req *ExecutionRequest, err error) bool {
  616. req.logEntry.Output = err.Error()
  617. log.Warn(err.Error())
  618. return false
  619. }
  620. func stepRequestAction(req *ExecutionRequest) bool {
  621. metricActionsRequested.Inc()
  622. // If there is no binding or action, do not proceed. Leave default
  623. // log entry values (icon/title/id) and stop execution gracefully.
  624. if req.Binding == nil || req.Binding.Action == nil {
  625. log.Warnf("Action request has no binding/action; skipping execution")
  626. return false
  627. }
  628. req.logEntry.Binding = req.Binding
  629. req.logEntry.ActionConfigTitle = req.Binding.Action.Title
  630. req.logEntry.ActionTitle = tpl.ParseTemplateOfActionBeforeExec(req.Binding.Action.Title, req.Binding.Entity)
  631. req.logEntry.ActionIcon = req.Binding.Action.Icon
  632. req.logEntry.Tags = req.Tags
  633. req.executor.logmutex.Lock()
  634. if _, containsKey := req.executor.LogsByBindingId[req.Binding.ID]; !containsKey {
  635. req.executor.LogsByBindingId[req.Binding.ID] = make([]*InternalLogEntry, 0)
  636. }
  637. req.executor.LogsByBindingId[req.Binding.ID] = append(req.executor.LogsByBindingId[req.Binding.ID], req.logEntry)
  638. req.executor.logmutex.Unlock()
  639. log.WithFields(log.Fields{
  640. "actionTitle": req.logEntry.ActionTitle,
  641. "tags": req.Tags,
  642. }).Infof("Action requested")
  643. notifyListenersStarted(req)
  644. return true
  645. }
  646. func stepLogStart(req *ExecutionRequest) bool {
  647. log.WithFields(log.Fields{
  648. "actionTitle": req.logEntry.ActionTitle,
  649. "timeout": req.Binding.Action.Timeout,
  650. }).Infof("Action started")
  651. return true
  652. }
  653. func stepLogFinish(req *ExecutionRequest) bool {
  654. req.logEntry.ExecutionFinished = true
  655. log.WithFields(log.Fields{
  656. "actionTitle": req.logEntry.ActionTitle,
  657. "outputLength": len(req.logEntry.Output),
  658. "timedOut": req.logEntry.TimedOut,
  659. "exit": req.logEntry.ExitCode,
  660. }).Infof("Action finished")
  661. return true
  662. }
  663. func notifyListenersFinished(req *ExecutionRequest) {
  664. for _, listener := range req.executor.listeners {
  665. listener.OnExecutionFinished(req.logEntry)
  666. }
  667. }
  668. func notifyListenersStarted(req *ExecutionRequest) {
  669. for _, listener := range req.executor.listeners {
  670. listener.OnExecutionStarted(req.logEntry)
  671. }
  672. }
  673. func appendErrorToStderr(err error, logEntry *InternalLogEntry) {
  674. if err != nil {
  675. logEntry.Output = err.Error() + "\n\n" + logEntry.Output
  676. }
  677. }
  678. func finishExecutionLog(req *ExecutionRequest, ctx *timeoutContext, streamer *OutputStreamer, runerr, waiterr error, exitCode int32) {
  679. req.logEntry.ExitCode = exitCode
  680. req.logEntry.Output = streamer.String()
  681. appendErrorToStderr(runerr, req.logEntry)
  682. appendErrorToStderr(waiterr, req.logEntry)
  683. applyActionTimeoutToLog(req, ctx)
  684. req.logEntry.DatetimeFinished = time.Now()
  685. }
  686. func applyActionTimeoutToLog(req *ExecutionRequest, ctx *timeoutContext) {
  687. if ctx.Err() != context.DeadlineExceeded {
  688. return
  689. }
  690. log.WithFields(log.Fields{
  691. "actionTitle": req.logEntry.ActionTitle,
  692. }).Warnf("Action timed out")
  693. req.logEntry.TimedOut = true
  694. req.logEntry.Output += "OliveTin::timeout - this action timed out after " + fmt.Sprintf("%v", req.Binding.Action.Timeout) + " seconds. If you need more time for this action, set a longer timeout. See https://docs.olivetin.app/action_customization/timeouts.html for more help."
  695. }
  696. type OutputStreamer struct {
  697. Req *ExecutionRequest
  698. output bytes.Buffer
  699. }
  700. func (ost *OutputStreamer) Write(o []byte) (n int, err error) {
  701. for _, listener := range ost.Req.executor.listeners {
  702. listener.OnOutputChunk(o, ost.Req.TrackingID)
  703. }
  704. return ost.output.Write(o)
  705. }
  706. func (ost *OutputStreamer) String() string {
  707. return ost.output.String()
  708. }
  709. const metadataMaxFirstLineLen = 64 * 1024
  710. type MetadataStreamFilter struct {
  711. w io.Writer
  712. logEntry *InternalLogEntry
  713. buf []byte
  714. done bool
  715. }
  716. func (m *MetadataStreamFilter) Write(p []byte) (n int, err error) {
  717. if m.done {
  718. return m.w.Write(p)
  719. }
  720. m.buf = append(m.buf, p...)
  721. if len(m.buf) > metadataMaxFirstLineLen {
  722. return m.finishMetadataScanAsPlaintext(p)
  723. }
  724. idx := bytes.IndexByte(m.buf, '\n')
  725. if idx < 0 {
  726. return len(p), nil
  727. }
  728. return m.finishMetadataScanAtNewline(idx, p)
  729. }
  730. func (m *MetadataStreamFilter) finishMetadataScanAsPlaintext(p []byte) (n int, err error) {
  731. m.done = true
  732. _, _ = m.w.Write(m.buf)
  733. m.buf = nil
  734. return len(p), nil
  735. }
  736. func (m *MetadataStreamFilter) finishMetadataScanAtNewline(idx int, p []byte) (n int, err error) {
  737. line := m.buf[:idx]
  738. m.buf = m.buf[idx+1:]
  739. m.done = true
  740. m.writeFirstLineAndMaybeMetadata(line)
  741. if len(m.buf) > 0 {
  742. _, _ = m.w.Write(m.buf)
  743. m.buf = nil
  744. }
  745. return len(p), nil
  746. }
  747. func (m *MetadataStreamFilter) writeFirstLineAndMaybeMetadata(line []byte) {
  748. if bytes.HasPrefix(line, []byte("OLIVETIN_METADATA ")) {
  749. m.mergeMetadataLine(line[len("OLIVETIN_METADATA "):])
  750. return
  751. }
  752. _, _ = m.w.Write(line)
  753. _, _ = m.w.Write([]byte{'\n'})
  754. }
  755. func (m *MetadataStreamFilter) mergeMetadataLine(jsonPart []byte) {
  756. attrs, ok := parseMetadataAttrsJSON(jsonPart)
  757. if !ok {
  758. return
  759. }
  760. if m.logEntry.Attributes == nil {
  761. m.logEntry.Attributes = make(map[string]string)
  762. }
  763. maps.Copy(m.logEntry.Attributes, attrs)
  764. }
  765. func parseMetadataAttrsJSON(jsonPart []byte) (map[string]string, bool) {
  766. var attrs map[string]string
  767. if err := json.Unmarshal(jsonPart, &attrs); err != nil {
  768. return nil, false
  769. }
  770. if attrs == nil {
  771. return nil, false
  772. }
  773. return attrs, true
  774. }
  775. func buildEnv(args map[string]string) []string {
  776. ret := append(os.Environ(), "OLIVETIN=1")
  777. for k, v := range args {
  778. varName := fmt.Sprintf("%v", strings.TrimSpace(strings.ToUpper(k)))
  779. // Skip arguments that might not have a name (eg, confirmation), as this causes weird bugs on Windows.
  780. if varName == "" {
  781. continue
  782. }
  783. ret = append(ret, fmt.Sprintf("%v=%v", varName, v))
  784. }
  785. return ret
  786. }
  787. func stepExec(req *ExecutionRequest) bool {
  788. ctx, cancel := newTimeoutContext(context.Background(), time.Duration(req.Binding.Action.Timeout)*time.Second, req.executor, req.logEntry)
  789. defer cancel()
  790. streamer := &OutputStreamer{Req: req}
  791. if req.useExecTool {
  792. return stepExecTool(req, ctx, streamer)
  793. }
  794. cmd := buildCommand(ctx, req)
  795. if cmd == nil {
  796. req.logEntry.Output = "Cannot execute: no command arguments provided"
  797. log.Warn("Cannot execute: no command arguments provided")
  798. return false
  799. }
  800. prepareCommand(cmd, streamer, req)
  801. runerr := cmd.Start()
  802. req.logEntry.Process = cmd.Process
  803. ctx.setProcess(cmd.Process)
  804. waiterr := cmd.Wait()
  805. finishExecutionLog(req, ctx, streamer, runerr, waiterr, int32(cmd.ProcessState.ExitCode()))
  806. return true
  807. }
  808. func stepExecTool(req *ExecutionRequest, ctx *timeoutContext, streamer *OutputStreamer) bool {
  809. toolName := "olivetin-" + req.execToolName
  810. if _, err := exec.LookPath(toolName); err != nil {
  811. req.logEntry.Output = fmt.Sprintf("exec tool %s not found in PATH", toolName)
  812. log.Warnf("exec tool %s not found in PATH", toolName)
  813. return false
  814. }
  815. cmd := wrapCommandExecTool(ctx.Context, req.execToolName)
  816. if cmd == nil {
  817. return false
  818. }
  819. return runExecToolCommand(req, ctx, streamer, cmd)
  820. }
  821. func runExecToolCommand(req *ExecutionRequest, ctx *timeoutContext, streamer *OutputStreamer, cmd *exec.Cmd) bool {
  822. stdinPayload := buildExecToolStdinPayload(req)
  823. filter := &MetadataStreamFilter{w: streamer, logEntry: req.logEntry}
  824. cmd.Stdout = filter
  825. cmd.Stderr = streamer
  826. cmd.Env = buildExecToolEnv(req)
  827. stdinPipe, err := cmd.StdinPipe()
  828. if err != nil {
  829. req.logEntry.Output = "Failed to create stdin pipe: " + err.Error()
  830. return false
  831. }
  832. req.logEntry.ExecutionStarted = true
  833. runerr := cmd.Start()
  834. req.logEntry.Process = cmd.Process
  835. ctx.setProcess(cmd.Process)
  836. _, _ = stdinPipe.Write(stdinPayload)
  837. _ = stdinPipe.Close()
  838. waiterr := cmd.Wait()
  839. finishExecutionLog(req, ctx, streamer, runerr, waiterr, int32(cmd.ProcessState.ExitCode()))
  840. return true
  841. }
  842. func buildExecToolStdinPayload(req *ExecutionRequest) []byte {
  843. var configAny any
  844. _ = json.Unmarshal(req.execToolConfig, &configAny)
  845. payload := map[string]any{
  846. "config": configAny,
  847. "arguments": req.Arguments,
  848. "timeout": req.Binding.Action.Timeout,
  849. "tracking_id": req.TrackingID,
  850. }
  851. data, _ := json.Marshal(payload)
  852. return data
  853. }
  854. func buildExecToolEnv(req *ExecutionRequest) []string {
  855. env := append(os.Environ(), "OLIVETIN=1")
  856. env = append(env, "OLIVETIN_TRACKING_ID="+req.TrackingID)
  857. env = append(env, "OLIVETIN_ACTION_TITLE="+req.logEntry.ActionTitle)
  858. env = append(env, fmt.Sprintf("OLIVETIN_TIMEOUT=%d", req.Binding.Action.Timeout))
  859. for k, v := range req.Arguments {
  860. varName := fmt.Sprintf("%v", strings.TrimSpace(strings.ToUpper(k)))
  861. if varName == "" {
  862. continue
  863. }
  864. env = append(env, fmt.Sprintf("%v=%v", varName, v))
  865. }
  866. return env
  867. }
  868. func buildCommand(ctx context.Context, req *ExecutionRequest) *exec.Cmd {
  869. if req.useDirectExec {
  870. return wrapCommandDirect(ctx, req.execArgs)
  871. }
  872. return wrapCommandInShell(ctx, req.finalParsedCommand)
  873. }
  874. func prepareCommand(cmd *exec.Cmd, streamer *OutputStreamer, req *ExecutionRequest) {
  875. cmd.Stdout = streamer
  876. cmd.Stderr = streamer
  877. cmd.Env = buildEnv(req.Arguments)
  878. req.logEntry.ExecutionStarted = true
  879. }
  880. func stepExecAfter(req *ExecutionRequest) bool {
  881. if req.Binding.Action.ShellAfterCompleted == "" {
  882. return true
  883. }
  884. ctx, cancel := newTimeoutContext(context.Background(), time.Duration(req.Binding.Action.Timeout)*time.Second, req.executor, nil)
  885. defer cancel()
  886. var stdout bytes.Buffer
  887. var stderr bytes.Buffer
  888. args := map[string]string{
  889. "output": req.logEntry.Output,
  890. "exitCode": fmt.Sprintf("%v", req.logEntry.ExitCode),
  891. "ot_executionTrackingId": req.TrackingID,
  892. "ot_username": req.AuthenticatedUser.Username,
  893. }
  894. finalParsedCommand, err := tpl.ParseTemplateWithActionContext(req.Binding.Action.ShellAfterCompleted, req.Binding.Entity, args)
  895. if err != nil {
  896. msg := "Could not prepare shellAfterCompleted command: " + err.Error() + "\n"
  897. req.logEntry.Output += msg
  898. log.Warn(msg)
  899. return true
  900. }
  901. cmd := wrapCommandInShell(ctx, finalParsedCommand)
  902. cmd.Stdout = &stdout
  903. cmd.Stderr = &stderr
  904. cmd.Env = buildEnv(args)
  905. runerr := cmd.Start()
  906. ctx.setProcess(cmd.Process)
  907. waiterr := cmd.Wait()
  908. req.logEntry.Output += "\n"
  909. req.logEntry.Output += "OliveTin::shellAfterCompleted stdout\n"
  910. req.logEntry.Output += stdout.String()
  911. req.logEntry.Output += "OliveTin::shellAfterCompleted stderr\n"
  912. req.logEntry.Output += stderr.String()
  913. req.logEntry.Output += "OliveTin::shellAfterCompleted errors and summary\n"
  914. appendErrorToStderr(runerr, req.logEntry)
  915. appendErrorToStderr(waiterr, req.logEntry)
  916. if ctx.Err() == context.DeadlineExceeded {
  917. req.logEntry.Output += "Your shellAfterCompleted command timed out."
  918. }
  919. req.logEntry.Output += fmt.Sprintf("Your shellAfterCompleted exited with code %v\n", cmd.ProcessState.ExitCode())
  920. req.logEntry.Output += "OliveTin::shellAfterCompleted output complete\n"
  921. return true
  922. }
  923. //gocyclo:ignore
  924. func stepTrigger(req *ExecutionRequest) bool {
  925. if req.Binding.Action.Triggers == nil {
  926. return true
  927. }
  928. if req.TriggerDepth >= MaxTriggerDepth {
  929. log.WithFields(log.Fields{
  930. "actionTitle": req.logEntry.ActionTitle,
  931. "depth": req.TriggerDepth,
  932. }).Warnf("Trigger action reached maximum depth of %v. Not triggering further actions.", MaxTriggerDepth)
  933. req.logEntry.Output += fmt.Sprintf("OliveTin::trigger - this action reached maximum trigger depth of %v. Not triggering further actions.", MaxTriggerDepth)
  934. return true
  935. }
  936. if len(req.Tags) > 0 && req.Tags[0] == "trigger" {
  937. log.Warnf("Trigger action is triggering another trigger action. This is allowed, but be careful not to create trigger loops.")
  938. }
  939. triggerLoop(req)
  940. return true
  941. }
  942. func triggerLoop(req *ExecutionRequest) {
  943. for _, triggerReq := range req.Binding.Action.Triggers {
  944. binding := req.executor.FindBindingByID(triggerReq)
  945. trigger := &ExecutionRequest{
  946. Binding: binding,
  947. TrackingID: uuid.NewString(),
  948. Tags: []string{"trigger"},
  949. AuthenticatedUser: req.AuthenticatedUser,
  950. Arguments: req.Arguments,
  951. Cfg: req.Cfg,
  952. TriggerDepth: req.TriggerDepth + 1,
  953. }
  954. req.executor.ExecRequest(trigger)
  955. }
  956. }
  957. func stepSaveLog(req *ExecutionRequest) bool {
  958. filename := fmt.Sprintf("%v.%v.%v", req.logEntry.ActionTitle, req.logEntry.DatetimeStarted.Unix(), req.logEntry.ExecutionTrackingID)
  959. saveLogResults(req, filename)
  960. saveLogOutput(req, filename)
  961. return true
  962. }
  963. func firstNonEmpty(one, two string) string {
  964. if one != "" {
  965. return one
  966. }
  967. return two
  968. }
  969. func saveLogResults(req *ExecutionRequest, filename string) {
  970. dir := firstNonEmpty(req.Binding.Action.SaveLogs.ResultsDirectory, req.Cfg.SaveLogs.ResultsDirectory)
  971. if dir != "" {
  972. data, err := yaml.Marshal(req.logEntry)
  973. if err != nil {
  974. log.Warnf("%v", err)
  975. }
  976. filepath := path.Join(dir, filename+".yaml")
  977. err = os.WriteFile(filepath, data, 0644)
  978. if err != nil {
  979. log.Warnf("%v", err)
  980. }
  981. }
  982. }
  983. func saveLogOutput(req *ExecutionRequest, filename string) {
  984. dir := firstNonEmpty(req.Binding.Action.SaveLogs.OutputDirectory, req.Cfg.SaveLogs.OutputDirectory)
  985. if dir != "" {
  986. data := req.logEntry.Output
  987. filepath := path.Join(dir, filename+".log")
  988. err := os.WriteFile(filepath, []byte(data), 0644)
  989. if err != nil {
  990. log.Warnf("%v", err)
  991. }
  992. }
  993. }