| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238 |
- package executor
- import (
- acl "github.com/OliveTin/OliveTin/internal/acl"
- "github.com/OliveTin/OliveTin/internal/auth"
- authpublic "github.com/OliveTin/OliveTin/internal/auth/authpublic"
- config "github.com/OliveTin/OliveTin/internal/config"
- "github.com/OliveTin/OliveTin/internal/entities"
- "github.com/OliveTin/OliveTin/internal/tpl"
- "github.com/google/uuid"
- log "github.com/sirupsen/logrus"
- "github.com/prometheus/client_golang/prometheus"
- "github.com/prometheus/client_golang/prometheus/promauto"
- "gopkg.in/yaml.v3"
- "bytes"
- "context"
- "encoding/json"
- "fmt"
- "io"
- "maps"
- "os"
- "os/exec"
- "path"
- "strings"
- "sync"
- "time"
- )
- const (
- DefaultExitCodeNotExecuted = -1337
- MaxTriggerDepth = 10
- )
- var (
- metricActionsRequested = promauto.NewCounter(prometheus.CounterOpts{
- Name: "olivetin_actions_requested_count",
- Help: "The actions requested count",
- })
- )
- type ActionBinding struct {
- ID string
- Action *config.Action
- Entity *entities.Entity
- ConfigOrder int
- IsOnDashboard bool
- }
- // Executor represents a helper class for executing commands. It's main method
- // is ExecRequest
- type Executor struct {
- logs map[string]*InternalLogEntry
- logsTrackingIdsByDate []string
- LogsByBindingId map[string][]*InternalLogEntry
- logmutex sync.RWMutex
- MapActionBindings map[string]*ActionBinding
- MapActionBindingsLock sync.RWMutex
- Cfg *config.Config
- listeners []listener
- chainOfCommand []executorStepFunc
- }
- // ExecutionRequest is a request to execute an action. It's passed to an
- // Executor. They're created from the api.
- type ExecutionRequest struct {
- Binding *ActionBinding
- Arguments map[string]string
- TrackingID string
- Tags []string
- Cfg *config.Config
- AuthenticatedUser *authpublic.AuthenticatedUser
- TriggerDepth int
- logEntry *InternalLogEntry
- finalParsedCommand string
- execArgs []string
- useDirectExec bool
- useExecTool bool
- execToolName string
- execToolConfig []byte
- executor *Executor
- }
- // InternalLogEntry objects are created by an Executor, and represent the final
- // state of execution (even if the command is not executed). It's designed to be
- // easily serializable.
- type InternalLogEntry struct {
- Binding *ActionBinding
- DatetimeStarted time.Time
- DatetimeFinished time.Time
- Output string
- TimedOut bool
- Blocked bool
- ExitCode int32
- Tags []string
- ExecutionStarted bool
- ExecutionFinished bool
- ExecutionTrackingID string
- Process *os.Process
- Username string
- Index int64
- EntityPrefix string
- ActionConfigTitle string // This is the title of the action as defined in the config, not the final parsed title.
- /*
- The following 3 properties are obviously on Action normally, but it's useful
- that logs are lightweight (so we don't need to have an action associated to
- logs, etc. Therefore, we duplicate those values here.
- */
- ActionTitle string
- ActionIcon string
- Attributes map[string]string
- }
- // .Binding can be nil, so we need to handle that.
- func (e *InternalLogEntry) GetBindingId() string {
- if e.Binding == nil {
- return ""
- }
- return e.Binding.ID
- }
- type executorStepFunc func(*ExecutionRequest) bool
- // DefaultExecutor returns an Executor, with a sensible "chain of command" for
- // executing actions.
- func DefaultExecutor(cfg *config.Config) *Executor {
- e := Executor{}
- e.Cfg = cfg
- e.logs = make(map[string]*InternalLogEntry)
- e.logsTrackingIdsByDate = make([]string, 0)
- e.LogsByBindingId = make(map[string][]*InternalLogEntry)
- e.MapActionBindings = make(map[string]*ActionBinding)
- e.chainOfCommand = []executorStepFunc{
- stepRequestAction,
- stepConcurrencyCheck,
- stepRateCheck,
- stepACLCheck,
- stepParseArgs,
- stepLogStart,
- stepExec,
- stepExecAfter,
- stepLogFinish,
- stepSaveLog,
- stepTrigger,
- }
- return &e
- }
- type listener interface {
- OnExecutionStarted(logEntry *InternalLogEntry)
- OnExecutionFinished(logEntry *InternalLogEntry)
- OnOutputChunk(o []byte, executionTrackingId string)
- OnActionMapRebuilt()
- }
- func (e *Executor) AddListener(m listener) {
- e.listeners = append(e.listeners, m)
- }
- // getPagingStartIndex calculates the starting index for log pagination.
- // Parameters:
- //
- // startOffset: The offset from the most recent log (0 means start from the most recent)
- // totalLogCount: Total number of logs available
- // count: Number of logs to retrieve
- //
- // Returns: The calculated starting index for pagination
- func getPagingStartIndex(startOffset int64, totalLogCount int64) int64 {
- var startIndex int64
- if startOffset <= 0 {
- startIndex = totalLogCount
- } else {
- startIndex = (totalLogCount - startOffset)
- if startIndex < 0 {
- startIndex = 1
- }
- }
- return startIndex - 1
- }
- type PagingResult struct {
- CountRemaining int64
- PageSize int64
- TotalCount int64
- StartOffset int64
- }
- func (e *Executor) GetLogTrackingIds(startOffset int64, pageCount int64) ([]*InternalLogEntry, *PagingResult) {
- pagingResult := &PagingResult{
- CountRemaining: 0,
- PageSize: pageCount,
- TotalCount: 0,
- StartOffset: startOffset,
- }
- e.logmutex.RLock()
- totalLogCount := int64(len(e.logsTrackingIdsByDate))
- pagingResult.TotalCount = totalLogCount
- startIndex := getPagingStartIndex(startOffset, totalLogCount)
- pageCount = min(totalLogCount, pageCount)
- endIndex := max(0, (startIndex-pageCount)+1)
- log.WithFields(log.Fields{
- "startOffset": startOffset,
- "pageCount": pageCount,
- "total": totalLogCount,
- "startIndex": startIndex,
- "endIndex": endIndex,
- }).Tracef("GetLogTrackingIds")
- trackingIds := make([]*InternalLogEntry, 0, pageCount)
- if totalLogCount > 0 {
- for i := endIndex; i <= startIndex; i++ {
- trackingIds = append(trackingIds, e.logs[e.logsTrackingIdsByDate[i]])
- }
- }
- e.logmutex.RUnlock()
- pagingResult.CountRemaining = endIndex
- return trackingIds, pagingResult
- }
- func isValidLogEntryForACL(entry *InternalLogEntry) bool {
- return entry != nil && entry.Binding != nil && entry.Binding.Action != nil
- }
- func isLogEntryAllowedByACL(cfg *config.Config, user *authpublic.AuthenticatedUser, entry *InternalLogEntry) bool {
- return acl.IsAllowedLogs(cfg, user, entry.Binding.Action)
- }
- func (e *Executor) filterLogsByACL(cfg *config.Config, user *authpublic.AuthenticatedUser, dateFilter string) []*InternalLogEntry {
- e.logmutex.RLock()
- defer e.logmutex.RUnlock()
- filtered := make([]*InternalLogEntry, 0, len(e.logsTrackingIdsByDate))
- filterDate, hasDateFilter := parseDateFilter(dateFilter)
- for _, trackingId := range e.logsTrackingIdsByDate {
- entry := e.logs[trackingId]
- if shouldIncludeLogEntry(cfg, user, entry, filterDate, hasDateFilter) {
- filtered = append(filtered, entry)
- }
- }
- return filtered
- }
- // parseDateFilter parses the date filter string and returns filter information.
- func parseDateFilter(dateFilter string) (filterDate time.Time, hasDateFilter bool) {
- if dateFilter == "" {
- return time.Time{}, false
- }
- parsedDate, err := time.Parse("2006-01-02", dateFilter)
- if err != nil {
- log.WithFields(log.Fields{
- "dateFilter": dateFilter,
- "error": err,
- }).Errorf("Failed to parse date filter, expected format YYYY-MM-DD")
- return time.Time{}, false
- }
- return parsedDate, true
- }
- // shouldIncludeLogEntry determines if a log entry should be included based on ACL and date filter.
- func shouldIncludeLogEntry(cfg *config.Config, user *authpublic.AuthenticatedUser, entry *InternalLogEntry, filterDate time.Time, hasDateFilter bool) bool {
- if !isValidLogEntryForACL(entry) {
- return false
- }
- if !isLogEntryAllowedByACL(cfg, user, entry) {
- return false
- }
- return matchesDateFilter(entry, filterDate, hasDateFilter)
- }
- // matchesDateFilter checks if the log entry matches the date filter.
- func matchesDateFilter(entry *InternalLogEntry, filterDate time.Time, hasDateFilter bool) bool {
- if !hasDateFilter {
- return true
- }
- entryDate := entry.DatetimeStarted.UTC().Truncate(24 * time.Hour)
- filterDateUTC := filterDate.UTC().Truncate(24 * time.Hour)
- return entryDate.Equal(filterDateUTC)
- }
- // paginateFilteredLogs applies pagination to a filtered list of logs and returns
- // the paginated results along with pagination metadata.
- func paginateFilteredLogs(filtered []*InternalLogEntry, startOffset int64, pageCount int64) ([]*InternalLogEntry, *PagingResult) {
- total := int64(len(filtered))
- paging := &PagingResult{PageSize: pageCount, TotalCount: total, StartOffset: startOffset}
- if total == 0 {
- paging.CountRemaining = 0
- return []*InternalLogEntry{}, paging
- }
- startIndex := getPagingStartIndex(startOffset, total)
- pageCount = min(total, pageCount)
- endIndex := max(0, (startIndex-pageCount)+1)
- out := make([]*InternalLogEntry, 0, pageCount)
- for i := endIndex; i <= startIndex && i < int64(len(filtered)); i++ {
- out = append(out, filtered[i])
- }
- paging.CountRemaining = endIndex
- return out, paging
- }
- // GetLogTrackingIdsACL returns logs filtered by ACL visibility for the user and
- // paginated correctly based on the filtered set.
- // dateFilter is optional and should be in YYYY-MM-DD format. If empty, no date filtering is applied.
- func (e *Executor) GetLogTrackingIdsACL(cfg *config.Config, user *authpublic.AuthenticatedUser, startOffset int64, pageCount int64, dateFilter string) ([]*InternalLogEntry, *PagingResult) {
- filtered := e.filterLogsByACL(cfg, user, dateFilter)
- return paginateFilteredLogs(filtered, startOffset, pageCount)
- }
- func (e *Executor) GetLog(trackingID string) (*InternalLogEntry, bool) {
- e.logmutex.RLock()
- entry, found := e.logs[trackingID]
- e.logmutex.RUnlock()
- return entry, found
- }
- func (e *Executor) GetLogsByBindingId(bindingId string) []*InternalLogEntry {
- e.logmutex.RLock()
- logs, found := e.LogsByBindingId[bindingId]
- e.logmutex.RUnlock()
- if !found {
- return make([]*InternalLogEntry, 0)
- }
- return logs
- }
- // shouldCountExecution checks if a log entry should be counted for rate limiting.
- func shouldCountExecution(logEntry *InternalLogEntry, windowStart time.Time) bool {
- return !logEntry.Blocked && logEntry.DatetimeStarted.After(windowStart)
- }
- // updateOldestExecution updates the oldest execution time if this entry is older.
- func updateOldestExecution(oldestExecutionTime **time.Time, logEntry *InternalLogEntry) {
- if *oldestExecutionTime == nil {
- *oldestExecutionTime = &logEntry.DatetimeStarted
- } else if logEntry.DatetimeStarted.Before(**oldestExecutionTime) {
- *oldestExecutionTime = &logEntry.DatetimeStarted
- }
- }
- // findOldestExecutionInWindow finds the oldest execution within the time window and counts executions.
- // Returns the count of executions and the oldest execution time, or nil if none found.
- func findOldestExecutionInWindow(logs []*InternalLogEntry, windowStart time.Time) (int, *time.Time) {
- executions := 0
- var oldestExecutionTime *time.Time
- for _, logEntry := range logs {
- if !shouldCountExecution(logEntry, windowStart) {
- continue
- }
- executions++
- updateOldestExecution(&oldestExecutionTime, logEntry)
- }
- return executions, oldestExecutionTime
- }
- // calculateExpiryTime calculates when the oldest execution will fall outside the rate limit window.
- func calculateExpiryTime(oldestExecutionTime time.Time, duration time.Duration, now time.Time) time.Time {
- expiryTime := oldestExecutionTime.Add(duration)
- if !expiryTime.After(now) {
- return time.Time{}
- }
- return expiryTime
- }
- // updateMaxExpiryTime updates maxExpiryTime if expiryTime is later.
- func updateMaxExpiryTime(maxExpiryTime *time.Time, expiryTime time.Time) {
- if expiryTime.IsZero() {
- return
- }
- if maxExpiryTime.IsZero() || expiryTime.After(*maxExpiryTime) {
- *maxExpiryTime = expiryTime
- }
- }
- // calculateExpiryForRate calculates the expiry time for a single rate limit rule.
- // Returns the expiry time if the rate limit is exceeded, or zero time if not.
- func calculateExpiryForRate(rate config.RateSpec, logs []*InternalLogEntry, now time.Time) time.Time {
- duration := parseDuration(rate)
- if duration <= 0 {
- return time.Time{}
- }
- windowStart := now.Add(-duration)
- executions, oldestExecutionTime := findOldestExecutionInWindow(logs, windowStart)
- if executions < rate.Limit || oldestExecutionTime == nil {
- return time.Time{}
- }
- return calculateExpiryTime(*oldestExecutionTime, duration, now)
- }
- // getLogsForBinding retrieves logs for a binding ID.
- func (e *Executor) getLogsForBinding(bindingId string) []*InternalLogEntry {
- e.logmutex.RLock()
- logs, found := e.LogsByBindingId[bindingId]
- e.logmutex.RUnlock()
- if !found || len(logs) == 0 {
- return nil
- }
- return logs
- }
- // calculateMaxExpiryTimeFromRates calculates the maximum expiry time across all rate limit rules.
- func calculateMaxExpiryTimeFromRates(rates []config.RateSpec, logs []*InternalLogEntry, now time.Time) time.Time {
- var maxExpiryTime time.Time
- for _, rate := range rates {
- expiryTime := calculateExpiryForRate(rate, logs, now)
- updateMaxExpiryTime(&maxExpiryTime, expiryTime)
- }
- return maxExpiryTime
- }
- // GetTimeUntilAvailable calculates when an action will be available again based on rate limits.
- // Returns the Unix timestamp in seconds when the rate limit expires, or 0 if the action is available now.
- func (e *Executor) GetTimeUntilAvailable(binding *ActionBinding) int64 {
- if len(binding.Action.MaxRate) == 0 {
- return 0
- }
- logs := e.getLogsForBinding(binding.ID)
- if logs == nil {
- return 0
- }
- maxExpiryTime := calculateMaxExpiryTimeFromRates(binding.Action.MaxRate, logs, time.Now())
- if maxExpiryTime.IsZero() {
- return 0
- }
- return maxExpiryTime.Unix()
- }
- func (e *Executor) SetLog(trackingID string, entry *InternalLogEntry) {
- e.logmutex.Lock()
- entry.Index = int64(len(e.logsTrackingIdsByDate))
- e.logs[trackingID] = entry
- e.logsTrackingIdsByDate = append(e.logsTrackingIdsByDate, trackingID)
- e.logmutex.Unlock()
- }
- // ExecRequest processes an ExecutionRequest
- func (e *Executor) ExecRequest(req *ExecutionRequest) (*sync.WaitGroup, string) {
- if req.AuthenticatedUser == nil {
- req.AuthenticatedUser = auth.UserGuest(req.Cfg)
- }
- req.executor = e
- req.logEntry = &InternalLogEntry{
- Binding: req.Binding,
- DatetimeStarted: time.Now(),
- ExecutionTrackingID: req.TrackingID,
- Output: "",
- ExitCode: DefaultExitCodeNotExecuted,
- ExecutionStarted: false,
- ExecutionFinished: false,
- ActionTitle: "notfound",
- ActionIcon: "💩",
- Username: req.AuthenticatedUser.Username,
- }
- _, isDuplicate := e.GetLog(req.TrackingID)
- if isDuplicate || req.TrackingID == "" {
- req.TrackingID = uuid.NewString()
- }
- // Update the log entry with the final tracking ID
- req.logEntry.ExecutionTrackingID = req.TrackingID
- log.Tracef("executor.ExecRequest(): %v", req)
- e.SetLog(req.TrackingID, req.logEntry)
- wg := new(sync.WaitGroup)
- wg.Add(1)
- go func() {
- e.execChain(req)
- defer wg.Done()
- }()
- return wg, req.TrackingID
- }
- func (e *Executor) execChain(req *ExecutionRequest) {
- for _, step := range e.chainOfCommand {
- if !step(req) {
- break
- }
- }
- // Ensure DatetimeFinished is set even if execution was blocked early
- if req.logEntry.DatetimeFinished.IsZero() {
- req.logEntry.DatetimeFinished = time.Now()
- }
- req.logEntry.ExecutionFinished = true
- // This isn't a step, because we want to notify all listeners, irrespective
- // of how many steps were actually executed.
- notifyListenersFinished(req)
- }
- func getConcurrentCount(req *ExecutionRequest) int {
- concurrentCount := 0
- req.executor.logmutex.RLock()
- for _, log := range req.executor.GetLogsByBindingId(req.Binding.ID) {
- if !log.ExecutionFinished {
- concurrentCount += 1
- }
- }
- req.executor.logmutex.RUnlock()
- return concurrentCount
- }
- func stepConcurrencyCheck(req *ExecutionRequest) bool {
- concurrentCount := getConcurrentCount(req)
- // Note that the current execution is counted int the logs, so when checking we +1
- if concurrentCount >= (req.Binding.Action.MaxConcurrent + 1) {
- log.WithFields(log.Fields{
- "actionTitle": req.logEntry.ActionTitle,
- "concurrentCount": concurrentCount,
- "maxConcurrent": req.Binding.Action.MaxConcurrent,
- }).Warnf("Blocked from executing due to concurrency limit")
- req.logEntry.Output = "Blocked from executing due to concurrency limit"
- req.logEntry.Blocked = true
- return false
- }
- return true
- }
- func parseDuration(rate config.RateSpec) time.Duration {
- duration, err := time.ParseDuration(rate.Duration)
- if err != nil {
- log.Warnf("Could not parse duration: %v", rate.Duration)
- return -1 * time.Minute
- }
- return duration
- }
- //gocyclo:ignore
- func getExecutionsCount(rate config.RateSpec, req *ExecutionRequest) int {
- executions := -1 // Because we will find ourself when checking execution logs
- duration := parseDuration(rate)
- then := time.Now().Add(-duration)
- for _, logEntry := range req.executor.GetLogsByBindingId(req.Binding.ID) {
- // FIXME
- /*
- if logEntry.EntityPrefix != req.EntityPrefix {
- continue
- }
- */
- if logEntry.DatetimeStarted.After(then) && !logEntry.Blocked {
- executions += 1
- }
- }
- return executions
- }
- func stepRateCheck(req *ExecutionRequest) bool {
- for _, rate := range req.Binding.Action.MaxRate {
- executions := getExecutionsCount(rate, req)
- if executions >= rate.Limit {
- log.WithFields(log.Fields{
- "actionTitle": req.logEntry.ActionTitle,
- "executions": executions,
- "limit": rate.Limit,
- "duration": rate.Duration,
- }).Infof("Blocked from executing due to rate limit")
- req.logEntry.Output = "Blocked from executing due to rate limit"
- req.logEntry.Blocked = true
- return false
- }
- }
- return true
- }
- func stepACLCheck(req *ExecutionRequest) bool {
- canExec := acl.IsAllowedExec(req.Cfg, req.AuthenticatedUser, req.Binding.Action)
- if !canExec {
- req.logEntry.Output = "ACL check failed. Blocked from executing."
- req.logEntry.Blocked = true
- log.WithFields(log.Fields{
- "actionTitle": req.logEntry.ActionTitle,
- }).Warnf("ACL check failed. Blocked from executing.")
- }
- return canExec
- }
- func stepParseArgs(req *ExecutionRequest) bool {
- ensureArgumentMap(req)
- injectSystemArgs(req)
- if !hasBindingAndAction(req) {
- return fail(req, fmt.Errorf("cannot parse arguments: Binding or Action is nil"))
- }
- mangleInvalidArgumentValues(req)
- if hasExec(req) {
- return handleExecBranch(req)
- }
- if hasExecTool(req) {
- return handleExecToolBranch(req)
- }
- return handleShellBranch(req)
- }
- func handleExecBranch(req *ExecutionRequest) bool {
- args, err := parseActionExec(req.Arguments, req.Binding.Action, req.Binding.Entity)
- if err != nil {
- return fail(req, err)
- }
- req.useDirectExec = true
- req.execArgs = args
- return true
- }
- func handleShellBranch(req *ExecutionRequest) bool {
- if err := checkShellArgumentSafety(req.Binding.Action); err != nil {
- return fail(req, err)
- }
- cmd, err := parseActionArguments(req)
- if err != nil {
- return fail(req, err)
- }
- req.useDirectExec = false
- req.finalParsedCommand = cmd
- return true
- }
- func ensureArgumentMap(req *ExecutionRequest) {
- if req.Arguments == nil {
- req.Arguments = make(map[string]string)
- }
- }
- func injectSystemArgs(req *ExecutionRequest) {
- req.Arguments["ot_executionTrackingId"] = req.TrackingID
- req.Arguments["ot_username"] = req.AuthenticatedUser.Username
- }
- func hasBindingAndAction(req *ExecutionRequest) bool {
- return !(req.Binding == nil || req.Binding.Action == nil)
- }
- func hasExec(req *ExecutionRequest) bool {
- return len(req.Binding.Action.Exec) > 0
- }
- func hasExecTool(req *ExecutionRequest) bool {
- return req.Binding.Action.ExecTool != nil
- }
- func handleExecToolBranch(req *ExecutionRequest) bool {
- if err := setupExecToolFields(req); err != nil {
- return fail(req, err)
- }
- return true
- }
- func setupExecToolFields(req *ExecutionRequest) error {
- cfg, err := validatedExecToolConfig(req)
- if err != nil {
- return err
- }
- applied, err := tpl.ApplyTemplatesToExecToolConfig(cfg, req.Binding.Entity, req.Arguments)
- if err != nil {
- return err
- }
- configJSON, err := json.Marshal(applied)
- if err != nil {
- return err
- }
- assignExecToolRequest(req, configJSON)
- return nil
- }
- func validatedExecToolConfig(req *ExecutionRequest) (map[string]any, error) {
- if err := validateArguments(req.Arguments, req.Binding.Action); err != nil {
- return nil, err
- }
- cfg := req.Binding.Action.ExecTool.Config
- if cfg == nil {
- return nil, fmt.Errorf("execTool config is nil")
- }
- return cfg, nil
- }
- func assignExecToolRequest(req *ExecutionRequest, configJSON []byte) {
- req.useExecTool = true
- req.execToolName = req.Binding.Action.ExecTool.Name
- req.execToolConfig = configJSON
- }
- func fail(req *ExecutionRequest, err error) bool {
- req.logEntry.Output = err.Error()
- log.Warn(err.Error())
- return false
- }
- func stepRequestAction(req *ExecutionRequest) bool {
- metricActionsRequested.Inc()
- // If there is no binding or action, do not proceed. Leave default
- // log entry values (icon/title/id) and stop execution gracefully.
- if req.Binding == nil || req.Binding.Action == nil {
- log.Warnf("Action request has no binding/action; skipping execution")
- return false
- }
- req.logEntry.Binding = req.Binding
- req.logEntry.ActionConfigTitle = req.Binding.Action.Title
- req.logEntry.ActionTitle = tpl.ParseTemplateOfActionBeforeExec(req.Binding.Action.Title, req.Binding.Entity)
- req.logEntry.ActionIcon = req.Binding.Action.Icon
- req.logEntry.Tags = req.Tags
- req.executor.logmutex.Lock()
- if _, containsKey := req.executor.LogsByBindingId[req.Binding.ID]; !containsKey {
- req.executor.LogsByBindingId[req.Binding.ID] = make([]*InternalLogEntry, 0)
- }
- req.executor.LogsByBindingId[req.Binding.ID] = append(req.executor.LogsByBindingId[req.Binding.ID], req.logEntry)
- req.executor.logmutex.Unlock()
- log.WithFields(log.Fields{
- "actionTitle": req.logEntry.ActionTitle,
- "tags": req.Tags,
- }).Infof("Action requested")
- notifyListenersStarted(req)
- return true
- }
- func stepLogStart(req *ExecutionRequest) bool {
- log.WithFields(log.Fields{
- "actionTitle": req.logEntry.ActionTitle,
- "timeout": req.Binding.Action.Timeout,
- }).Infof("Action started")
- return true
- }
- func stepLogFinish(req *ExecutionRequest) bool {
- req.logEntry.ExecutionFinished = true
- log.WithFields(log.Fields{
- "actionTitle": req.logEntry.ActionTitle,
- "outputLength": len(req.logEntry.Output),
- "timedOut": req.logEntry.TimedOut,
- "exit": req.logEntry.ExitCode,
- }).Infof("Action finished")
- return true
- }
- func notifyListenersFinished(req *ExecutionRequest) {
- for _, listener := range req.executor.listeners {
- listener.OnExecutionFinished(req.logEntry)
- }
- }
- func notifyListenersStarted(req *ExecutionRequest) {
- for _, listener := range req.executor.listeners {
- listener.OnExecutionStarted(req.logEntry)
- }
- }
- func appendErrorToStderr(err error, logEntry *InternalLogEntry) {
- if err != nil {
- logEntry.Output = err.Error() + "\n\n" + logEntry.Output
- }
- }
- func finishExecutionLog(req *ExecutionRequest, ctx *timeoutContext, streamer *OutputStreamer, runerr, waiterr error, exitCode int32) {
- req.logEntry.ExitCode = exitCode
- req.logEntry.Output = streamer.String()
- appendErrorToStderr(runerr, req.logEntry)
- appendErrorToStderr(waiterr, req.logEntry)
- applyActionTimeoutToLog(req, ctx)
- req.logEntry.DatetimeFinished = time.Now()
- }
- func applyActionTimeoutToLog(req *ExecutionRequest, ctx *timeoutContext) {
- if ctx.Err() != context.DeadlineExceeded {
- return
- }
- log.WithFields(log.Fields{
- "actionTitle": req.logEntry.ActionTitle,
- }).Warnf("Action timed out")
- req.logEntry.TimedOut = true
- req.logEntry.Output += "OliveTin::timeout - this action timed out after " + fmt.Sprintf("%v", req.Binding.Action.Timeout) + " seconds. If you need more time for this action, set a longer timeout. See https://docs.olivetin.app/action_customization/timeouts.html for more help."
- }
- type OutputStreamer struct {
- Req *ExecutionRequest
- output bytes.Buffer
- }
- func (ost *OutputStreamer) Write(o []byte) (n int, err error) {
- for _, listener := range ost.Req.executor.listeners {
- listener.OnOutputChunk(o, ost.Req.TrackingID)
- }
- return ost.output.Write(o)
- }
- func (ost *OutputStreamer) String() string {
- return ost.output.String()
- }
- const metadataMaxFirstLineLen = 64 * 1024
- type MetadataStreamFilter struct {
- w io.Writer
- logEntry *InternalLogEntry
- buf []byte
- done bool
- }
- func (m *MetadataStreamFilter) Write(p []byte) (n int, err error) {
- if m.done {
- return m.w.Write(p)
- }
- m.buf = append(m.buf, p...)
- if len(m.buf) > metadataMaxFirstLineLen {
- return m.finishMetadataScanAsPlaintext(p)
- }
- idx := bytes.IndexByte(m.buf, '\n')
- if idx < 0 {
- return len(p), nil
- }
- return m.finishMetadataScanAtNewline(idx, p)
- }
- func (m *MetadataStreamFilter) finishMetadataScanAsPlaintext(p []byte) (n int, err error) {
- m.done = true
- _, _ = m.w.Write(m.buf)
- m.buf = nil
- return len(p), nil
- }
- func (m *MetadataStreamFilter) finishMetadataScanAtNewline(idx int, p []byte) (n int, err error) {
- line := m.buf[:idx]
- m.buf = m.buf[idx+1:]
- m.done = true
- m.writeFirstLineAndMaybeMetadata(line)
- if len(m.buf) > 0 {
- _, _ = m.w.Write(m.buf)
- m.buf = nil
- }
- return len(p), nil
- }
- func (m *MetadataStreamFilter) writeFirstLineAndMaybeMetadata(line []byte) {
- if bytes.HasPrefix(line, []byte("OLIVETIN_METADATA ")) {
- m.mergeMetadataLine(line[len("OLIVETIN_METADATA "):])
- return
- }
- _, _ = m.w.Write(line)
- _, _ = m.w.Write([]byte{'\n'})
- }
- func (m *MetadataStreamFilter) mergeMetadataLine(jsonPart []byte) {
- attrs, ok := parseMetadataAttrsJSON(jsonPart)
- if !ok {
- return
- }
- if m.logEntry.Attributes == nil {
- m.logEntry.Attributes = make(map[string]string)
- }
- maps.Copy(m.logEntry.Attributes, attrs)
- }
- func parseMetadataAttrsJSON(jsonPart []byte) (map[string]string, bool) {
- var attrs map[string]string
- if err := json.Unmarshal(jsonPart, &attrs); err != nil {
- return nil, false
- }
- if attrs == nil {
- return nil, false
- }
- return attrs, true
- }
- func buildEnv(args map[string]string) []string {
- ret := append(os.Environ(), "OLIVETIN=1")
- for k, v := range args {
- varName := fmt.Sprintf("%v", strings.TrimSpace(strings.ToUpper(k)))
- // Skip arguments that might not have a name (eg, confirmation), as this causes weird bugs on Windows.
- if varName == "" {
- continue
- }
- ret = append(ret, fmt.Sprintf("%v=%v", varName, v))
- }
- return ret
- }
- func stepExec(req *ExecutionRequest) bool {
- ctx, cancel := newTimeoutContext(context.Background(), time.Duration(req.Binding.Action.Timeout)*time.Second, req.executor, req.logEntry)
- defer cancel()
- streamer := &OutputStreamer{Req: req}
- if req.useExecTool {
- return stepExecTool(req, ctx, streamer)
- }
- cmd := buildCommand(ctx, req)
- if cmd == nil {
- req.logEntry.Output = "Cannot execute: no command arguments provided"
- log.Warn("Cannot execute: no command arguments provided")
- return false
- }
- prepareCommand(cmd, streamer, req)
- runerr := cmd.Start()
- req.logEntry.Process = cmd.Process
- ctx.setProcess(cmd.Process)
- waiterr := cmd.Wait()
- finishExecutionLog(req, ctx, streamer, runerr, waiterr, int32(cmd.ProcessState.ExitCode()))
- return true
- }
- func stepExecTool(req *ExecutionRequest, ctx *timeoutContext, streamer *OutputStreamer) bool {
- toolName := "olivetin-" + req.execToolName
- if _, err := exec.LookPath(toolName); err != nil {
- req.logEntry.Output = fmt.Sprintf("exec tool %s not found in PATH", toolName)
- log.Warnf("exec tool %s not found in PATH", toolName)
- return false
- }
- cmd := wrapCommandExecTool(ctx.Context, req.execToolName)
- if cmd == nil {
- return false
- }
- return runExecToolCommand(req, ctx, streamer, cmd)
- }
- func runExecToolCommand(req *ExecutionRequest, ctx *timeoutContext, streamer *OutputStreamer, cmd *exec.Cmd) bool {
- stdinPayload := buildExecToolStdinPayload(req)
- filter := &MetadataStreamFilter{w: streamer, logEntry: req.logEntry}
- cmd.Stdout = filter
- cmd.Stderr = streamer
- cmd.Env = buildExecToolEnv(req)
- stdinPipe, err := cmd.StdinPipe()
- if err != nil {
- req.logEntry.Output = "Failed to create stdin pipe: " + err.Error()
- return false
- }
- req.logEntry.ExecutionStarted = true
- runerr := cmd.Start()
- req.logEntry.Process = cmd.Process
- ctx.setProcess(cmd.Process)
- _, _ = stdinPipe.Write(stdinPayload)
- _ = stdinPipe.Close()
- waiterr := cmd.Wait()
- finishExecutionLog(req, ctx, streamer, runerr, waiterr, int32(cmd.ProcessState.ExitCode()))
- return true
- }
- func buildExecToolStdinPayload(req *ExecutionRequest) []byte {
- var configAny any
- _ = json.Unmarshal(req.execToolConfig, &configAny)
- payload := map[string]any{
- "config": configAny,
- "arguments": req.Arguments,
- "timeout": req.Binding.Action.Timeout,
- "tracking_id": req.TrackingID,
- }
- data, _ := json.Marshal(payload)
- return data
- }
- func buildExecToolEnv(req *ExecutionRequest) []string {
- env := append(os.Environ(), "OLIVETIN=1")
- env = append(env, "OLIVETIN_TRACKING_ID="+req.TrackingID)
- env = append(env, "OLIVETIN_ACTION_TITLE="+req.logEntry.ActionTitle)
- env = append(env, fmt.Sprintf("OLIVETIN_TIMEOUT=%d", req.Binding.Action.Timeout))
- for k, v := range req.Arguments {
- varName := fmt.Sprintf("%v", strings.TrimSpace(strings.ToUpper(k)))
- if varName == "" {
- continue
- }
- env = append(env, fmt.Sprintf("%v=%v", varName, v))
- }
- return env
- }
- func buildCommand(ctx context.Context, req *ExecutionRequest) *exec.Cmd {
- if req.useDirectExec {
- return wrapCommandDirect(ctx, req.execArgs)
- }
- return wrapCommandInShell(ctx, req.finalParsedCommand)
- }
- func prepareCommand(cmd *exec.Cmd, streamer *OutputStreamer, req *ExecutionRequest) {
- cmd.Stdout = streamer
- cmd.Stderr = streamer
- cmd.Env = buildEnv(req.Arguments)
- req.logEntry.ExecutionStarted = true
- }
- func stepExecAfter(req *ExecutionRequest) bool {
- if req.Binding.Action.ShellAfterCompleted == "" {
- return true
- }
- ctx, cancel := newTimeoutContext(context.Background(), time.Duration(req.Binding.Action.Timeout)*time.Second, req.executor, nil)
- defer cancel()
- var stdout bytes.Buffer
- var stderr bytes.Buffer
- args := map[string]string{
- "output": req.logEntry.Output,
- "exitCode": fmt.Sprintf("%v", req.logEntry.ExitCode),
- "ot_executionTrackingId": req.TrackingID,
- "ot_username": req.AuthenticatedUser.Username,
- }
- finalParsedCommand, err := tpl.ParseTemplateWithActionContext(req.Binding.Action.ShellAfterCompleted, req.Binding.Entity, args)
- if err != nil {
- msg := "Could not prepare shellAfterCompleted command: " + err.Error() + "\n"
- req.logEntry.Output += msg
- log.Warn(msg)
- return true
- }
- cmd := wrapCommandInShell(ctx, finalParsedCommand)
- cmd.Stdout = &stdout
- cmd.Stderr = &stderr
- cmd.Env = buildEnv(args)
- runerr := cmd.Start()
- ctx.setProcess(cmd.Process)
- waiterr := cmd.Wait()
- req.logEntry.Output += "\n"
- req.logEntry.Output += "OliveTin::shellAfterCompleted stdout\n"
- req.logEntry.Output += stdout.String()
- req.logEntry.Output += "OliveTin::shellAfterCompleted stderr\n"
- req.logEntry.Output += stderr.String()
- req.logEntry.Output += "OliveTin::shellAfterCompleted errors and summary\n"
- appendErrorToStderr(runerr, req.logEntry)
- appendErrorToStderr(waiterr, req.logEntry)
- if ctx.Err() == context.DeadlineExceeded {
- req.logEntry.Output += "Your shellAfterCompleted command timed out."
- }
- req.logEntry.Output += fmt.Sprintf("Your shellAfterCompleted exited with code %v\n", cmd.ProcessState.ExitCode())
- req.logEntry.Output += "OliveTin::shellAfterCompleted output complete\n"
- return true
- }
- //gocyclo:ignore
- func stepTrigger(req *ExecutionRequest) bool {
- if req.Binding.Action.Triggers == nil {
- return true
- }
- if req.TriggerDepth >= MaxTriggerDepth {
- log.WithFields(log.Fields{
- "actionTitle": req.logEntry.ActionTitle,
- "depth": req.TriggerDepth,
- }).Warnf("Trigger action reached maximum depth of %v. Not triggering further actions.", MaxTriggerDepth)
- req.logEntry.Output += fmt.Sprintf("OliveTin::trigger - this action reached maximum trigger depth of %v. Not triggering further actions.", MaxTriggerDepth)
- return true
- }
- if len(req.Tags) > 0 && req.Tags[0] == "trigger" {
- log.Warnf("Trigger action is triggering another trigger action. This is allowed, but be careful not to create trigger loops.")
- }
- triggerLoop(req)
- return true
- }
- func triggerLoop(req *ExecutionRequest) {
- for _, triggerReq := range req.Binding.Action.Triggers {
- binding := req.executor.FindBindingByID(triggerReq)
- trigger := &ExecutionRequest{
- Binding: binding,
- TrackingID: uuid.NewString(),
- Tags: []string{"trigger"},
- AuthenticatedUser: req.AuthenticatedUser,
- Arguments: req.Arguments,
- Cfg: req.Cfg,
- TriggerDepth: req.TriggerDepth + 1,
- }
- req.executor.ExecRequest(trigger)
- }
- }
- func stepSaveLog(req *ExecutionRequest) bool {
- filename := fmt.Sprintf("%v.%v.%v", req.logEntry.ActionTitle, req.logEntry.DatetimeStarted.Unix(), req.logEntry.ExecutionTrackingID)
- saveLogResults(req, filename)
- saveLogOutput(req, filename)
- return true
- }
- func firstNonEmpty(one, two string) string {
- if one != "" {
- return one
- }
- return two
- }
- func saveLogResults(req *ExecutionRequest, filename string) {
- dir := firstNonEmpty(req.Binding.Action.SaveLogs.ResultsDirectory, req.Cfg.SaveLogs.ResultsDirectory)
- if dir != "" {
- data, err := yaml.Marshal(req.logEntry)
- if err != nil {
- log.Warnf("%v", err)
- }
- filepath := path.Join(dir, filename+".yaml")
- err = os.WriteFile(filepath, data, 0644)
- if err != nil {
- log.Warnf("%v", err)
- }
- }
- }
- func saveLogOutput(req *ExecutionRequest, filename string) {
- dir := firstNonEmpty(req.Binding.Action.SaveLogs.OutputDirectory, req.Cfg.SaveLogs.OutputDirectory)
- if dir != "" {
- data := req.logEntry.Output
- filepath := path.Join(dir, filename+".log")
- err := os.WriteFile(filepath, []byte(data), 0644)
- if err != nil {
- log.Warnf("%v", err)
- }
- }
- }
|