| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379 |
- package executor
- import (
- acl "github.com/OliveTin/OliveTin/internal/acl"
- "github.com/OliveTin/OliveTin/internal/auth"
- authpublic "github.com/OliveTin/OliveTin/internal/auth/authpublic"
- config "github.com/OliveTin/OliveTin/internal/config"
- "github.com/OliveTin/OliveTin/internal/entities"
- "github.com/OliveTin/OliveTin/internal/logfilter"
- "github.com/OliveTin/OliveTin/internal/tpl"
- "github.com/google/uuid"
- log "github.com/sirupsen/logrus"
- "gopkg.in/yaml.v3"
- "bytes"
- "context"
- "fmt"
- "os"
- "os/exec"
- "path"
- "regexp"
- "strings"
- "sync"
- "time"
- )
- const (
- DefaultExitCodeNotExecuted = -1337
- MaxTriggerDepth = 10
- )
- var validTrackingIDPattern = regexp.MustCompile(`^[a-fA-F0-9\-]+$`)
- func isValidTrackingID(id string) bool {
- const MaxTrackingIDLength = 36
- return id != "" && len(id) <= MaxTrackingIDLength && validTrackingIDPattern.MatchString(id)
- }
- type ActionBinding struct {
- ID string
- Action *config.Action
- Entity *entities.Entity
- ConfigOrder int
- OnDashboards []DashboardNavigationTarget
- }
- // Executor represents a helper class for executing commands. It's main method
- // is ExecRequest
- type Executor struct {
- logs map[string]*InternalLogEntry
- logsTrackingIdsByDate []string
- LogsByBindingId map[string][]*InternalLogEntry
- logmutex sync.RWMutex
- MapActionBindings map[string]*ActionBinding
- MapActionBindingsLock sync.RWMutex
- Cfg *config.Config
- listeners []listener
- listenersMu sync.RWMutex
- chainOfCommand []executorStepFunc
- groupQueue []*queuedExecution
- groupQueueMu sync.Mutex
- }
- // ExecutionRequest is a request to execute an action. It's passed to an
- // Executor. They're created from the api.
- type ExecutionRequest struct {
- Binding *ActionBinding
- Arguments map[string]string
- TrackingID string
- Tags []string
- Cfg *config.Config
- AuthenticatedUser *authpublic.AuthenticatedUser
- TriggerDepth int
- Justification string
- logEntry *InternalLogEntry
- finalParsedCommand string
- execArgs []string
- useDirectExec bool
- executor *Executor
- skipRequestRegistration bool
- }
- func (req *ExecutionRequest) mutateLogEntry(mutator func(*InternalLogEntry)) {
- if req.executor == nil {
- mutator(req.logEntry)
- return
- }
- req.executor.logmutex.Lock()
- defer req.executor.logmutex.Unlock()
- mutator(req.logEntry)
- }
- // LogEntrySnapshot is a copy of selected log entry fields for race-safe reads.
- type LogEntrySnapshot struct {
- Queued bool
- Blocked bool
- ExecutionStarted bool
- ExecutionFinished bool
- ExitCode int32
- Output string
- }
- // SnapshotLog returns a copy of selected log entry fields under read lock.
- func (e *Executor) SnapshotLog(trackingID string) (LogEntrySnapshot, bool) {
- e.logmutex.RLock()
- defer e.logmutex.RUnlock()
- entry, found := e.logs[trackingID]
- if !found {
- return LogEntrySnapshot{}, false
- }
- return LogEntrySnapshot{
- Queued: entry.Queued,
- Blocked: entry.Blocked,
- ExecutionStarted: entry.ExecutionStarted,
- ExecutionFinished: entry.ExecutionFinished,
- ExitCode: entry.ExitCode,
- Output: entry.Output,
- }, true
- }
- // InternalLogEntry objects are created by an Executor, and represent the final
- // state of execution (even if the command is not executed). It's designed to be
- // easily serializable.
- type InternalLogEntry struct {
- Binding *ActionBinding
- DatetimeStarted time.Time
- DatetimeFinished time.Time
- Output string
- TimedOut bool
- Blocked bool
- Queued bool
- QueuedForGroup string
- ExitCode int32
- Tags []string
- ExecutionStarted bool
- ExecutionFinished bool
- ExecutionTrackingID string
- Process *os.Process
- Username string
- Index int64
- EntityPrefix string
- ActionConfigTitle string // This is the title of the action as defined in the config, not the final parsed title.
- /*
- The following 3 properties are obviously on Action normally, but it's useful
- that logs are lightweight (so we don't need to have an action associated to
- logs, etc. Therefore, we duplicate those values here.
- */
- ActionTitle string
- ActionIcon string
- Justification string
- Arguments map[string]string
- }
- // .Binding can be nil, so we need to handle that.
- func (e *InternalLogEntry) GetBindingId() string {
- if e.Binding == nil {
- return ""
- }
- return e.Binding.ID
- }
- type executorStepFunc func(*ExecutionRequest) bool
- // DefaultExecutor returns an Executor, with a sensible "chain of command" for
- // executing actions.
- func DefaultExecutor(cfg *config.Config) *Executor {
- e := Executor{}
- e.Cfg = cfg
- e.logs = make(map[string]*InternalLogEntry)
- e.logsTrackingIdsByDate = make([]string, 0)
- e.LogsByBindingId = make(map[string][]*InternalLogEntry)
- e.MapActionBindings = make(map[string]*ActionBinding)
- e.chainOfCommand = []executorStepFunc{
- stepRequestAction,
- stepConcurrencyCheck,
- stepRateCheck,
- stepACLCheck,
- stepParseArgs,
- stepLogStart,
- stepExec,
- stepExecAfter,
- stepLogFinish,
- stepSaveLog,
- stepTrigger,
- }
- return &e
- }
- type listener interface {
- OnExecutionStarted(logEntry *InternalLogEntry)
- OnExecutionFinished(logEntry *InternalLogEntry)
- OnOutputChunk(o []byte, executionTrackingId string)
- OnActionMapRebuilt()
- }
- func (e *Executor) AddListener(m listener) {
- e.listenersMu.Lock()
- defer e.listenersMu.Unlock()
- e.listeners = append(e.listeners, m)
- }
- func (e *Executor) copyListeners() []listener {
- e.listenersMu.RLock()
- defer e.listenersMu.RUnlock()
- out := make([]listener, len(e.listeners))
- copy(out, e.listeners)
- return out
- }
- // getPagingStartIndex calculates the starting index for log pagination.
- // Parameters:
- //
- // startOffset: The offset from the most recent log (0 means start from the most recent)
- // totalLogCount: Total number of logs available
- // count: Number of logs to retrieve
- //
- // Returns: The calculated starting index for pagination
- func getPagingStartIndex(startOffset int64, totalLogCount int64) int64 {
- var startIndex int64
- if startOffset <= 0 {
- startIndex = totalLogCount
- } else {
- startIndex = (totalLogCount - startOffset)
- if startIndex < 0 {
- startIndex = 1
- }
- }
- return startIndex - 1
- }
- type PagingResult struct {
- CountRemaining int64
- PageSize int64
- TotalCount int64
- StartOffset int64
- }
- func (e *Executor) GetLogTrackingIds(startOffset int64, pageCount int64) ([]*InternalLogEntry, *PagingResult) {
- pagingResult := &PagingResult{
- CountRemaining: 0,
- PageSize: pageCount,
- TotalCount: 0,
- StartOffset: startOffset,
- }
- e.logmutex.RLock()
- totalLogCount := int64(len(e.logsTrackingIdsByDate))
- pagingResult.TotalCount = totalLogCount
- startIndex := getPagingStartIndex(startOffset, totalLogCount)
- pageCount = min(totalLogCount, pageCount)
- endIndex := max(0, (startIndex-pageCount)+1)
- log.WithFields(log.Fields{
- "startOffset": startOffset,
- "pageCount": pageCount,
- "total": totalLogCount,
- "startIndex": startIndex,
- "endIndex": endIndex,
- }).Tracef("GetLogTrackingIds")
- trackingIds := make([]*InternalLogEntry, 0, pageCount)
- if totalLogCount > 0 {
- for i := startIndex; i >= endIndex; i-- {
- trackingIds = append(trackingIds, e.logs[e.logsTrackingIdsByDate[i]])
- }
- }
- e.logmutex.RUnlock()
- pagingResult.CountRemaining = endIndex
- return trackingIds, pagingResult
- }
- func isValidLogEntryForACL(entry *InternalLogEntry) bool {
- return entry != nil && entry.Binding != nil && entry.Binding.Action != nil
- }
- func isLogEntryAllowedByACL(cfg *config.Config, user *authpublic.AuthenticatedUser, entry *InternalLogEntry) bool {
- return acl.IsAllowedLogs(cfg, user, entry.Binding.Action)
- }
- func (e *Executor) filterLogsByACL(cfg *config.Config, user *authpublic.AuthenticatedUser, dateFilter string) []*InternalLogEntry {
- e.logmutex.RLock()
- defer e.logmutex.RUnlock()
- filtered := make([]*InternalLogEntry, 0, len(e.logsTrackingIdsByDate))
- filterDate, hasDateFilter := parseDateFilter(dateFilter)
- for _, trackingId := range e.logsTrackingIdsByDate {
- entry := e.logs[trackingId]
- if shouldIncludeLogEntry(cfg, user, entry, filterDate, hasDateFilter) {
- filtered = append(filtered, entry)
- }
- }
- return filtered
- }
- // parseDateFilter parses the date filter string and returns filter information.
- func parseDateFilter(dateFilter string) (filterDate time.Time, hasDateFilter bool) {
- if dateFilter == "" {
- return time.Time{}, false
- }
- parsedDate, err := time.Parse("2006-01-02", dateFilter)
- if err != nil {
- log.WithFields(log.Fields{
- "dateFilter": dateFilter,
- "error": err,
- }).Errorf("Failed to parse date filter, expected format YYYY-MM-DD")
- return time.Time{}, false
- }
- return parsedDate, true
- }
- // shouldIncludeLogEntry determines if a log entry should be included based on ACL and date filter.
- func shouldIncludeLogEntry(cfg *config.Config, user *authpublic.AuthenticatedUser, entry *InternalLogEntry, filterDate time.Time, hasDateFilter bool) bool {
- if !isValidLogEntryForACL(entry) {
- return false
- }
- if !isLogEntryAllowedByACL(cfg, user, entry) {
- return false
- }
- return matchesDateFilter(entry, filterDate, hasDateFilter)
- }
- // matchesDateFilter checks if the log entry matches the date filter.
- func matchesDateFilter(entry *InternalLogEntry, filterDate time.Time, hasDateFilter bool) bool {
- if !hasDateFilter {
- return true
- }
- entryDate := entry.DatetimeStarted.UTC().Truncate(24 * time.Hour)
- filterDateUTC := filterDate.UTC().Truncate(24 * time.Hour)
- return entryDate.Equal(filterDateUTC)
- }
- // paginateFilteredLogs applies pagination to a filtered list of logs and returns
- // the paginated results along with pagination metadata.
- func paginateFilteredLogs(filtered []*InternalLogEntry, startOffset int64, pageCount int64) ([]*InternalLogEntry, *PagingResult) {
- total := int64(len(filtered))
- paging := &PagingResult{PageSize: pageCount, TotalCount: total, StartOffset: startOffset}
- if total == 0 {
- paging.CountRemaining = 0
- return []*InternalLogEntry{}, paging
- }
- startIndex := getPagingStartIndex(startOffset, total)
- pageCount = min(total, pageCount)
- endIndex := max(0, (startIndex-pageCount)+1)
- out := make([]*InternalLogEntry, 0, pageCount)
- for i := startIndex; i >= endIndex && i < int64(len(filtered)); i-- {
- out = append(out, filtered[i])
- }
- paging.CountRemaining = endIndex
- return out, paging
- }
- // GetLogTrackingIdsACL returns logs filtered by ACL visibility for the user and
- // paginated correctly based on the filtered set.
- // dateFilter is optional and should be in YYYY-MM-DD format. If empty, no date filtering is applied.
- // expressionFilter is an optional filter expression applied after ACL checks.
- func (e *Executor) GetLogTrackingIdsACL(cfg *config.Config, user *authpublic.AuthenticatedUser, startOffset int64, pageCount int64, dateFilter string, expressionFilter string) ([]*InternalLogEntry, *PagingResult, error) {
- filtered := e.filterLogsByACL(cfg, user, dateFilter)
- program, err := logfilter.Compile(expressionFilter)
- if err != nil {
- return nil, nil, err
- }
- filtered, err = applyLogFilter(filtered, program)
- if err != nil {
- return nil, nil, err
- }
- logs, paging := paginateFilteredLogs(filtered, startOffset, pageCount)
- return logs, paging, nil
- }
- func (e *Executor) GetLog(trackingID string) (*InternalLogEntry, bool) {
- e.logmutex.RLock()
- entry, found := e.logs[trackingID]
- e.logmutex.RUnlock()
- return entry, found
- }
- func (e *Executor) GetLogsByBindingId(bindingId string) []*InternalLogEntry {
- e.logmutex.RLock()
- logs, found := e.LogsByBindingId[bindingId]
- e.logmutex.RUnlock()
- if !found {
- return make([]*InternalLogEntry, 0)
- }
- return logs
- }
- // shouldCountExecution checks if a log entry should be counted for rate limiting.
- func shouldCountExecution(logEntry *InternalLogEntry, windowStart time.Time) bool {
- return !logEntry.Blocked && !logEntry.Queued && logEntry.DatetimeStarted.After(windowStart)
- }
- // updateOldestExecution updates the oldest execution time if this entry is older.
- func updateOldestExecution(oldestExecutionTime **time.Time, logEntry *InternalLogEntry) {
- if *oldestExecutionTime == nil {
- *oldestExecutionTime = &logEntry.DatetimeStarted
- } else if logEntry.DatetimeStarted.Before(**oldestExecutionTime) {
- *oldestExecutionTime = &logEntry.DatetimeStarted
- }
- }
- // findOldestExecutionInWindow finds the oldest execution within the time window and counts executions.
- // Returns the count of executions and the oldest execution time, or nil if none found.
- func findOldestExecutionInWindow(logs []*InternalLogEntry, windowStart time.Time) (int, *time.Time) {
- executions := 0
- var oldestExecutionTime *time.Time
- for _, logEntry := range logs {
- if !shouldCountExecution(logEntry, windowStart) {
- continue
- }
- executions++
- updateOldestExecution(&oldestExecutionTime, logEntry)
- }
- return executions, oldestExecutionTime
- }
- // calculateExpiryTime calculates when the oldest execution will fall outside the rate limit window.
- func calculateExpiryTime(oldestExecutionTime time.Time, duration time.Duration, now time.Time) time.Time {
- expiryTime := oldestExecutionTime.Add(duration)
- if !expiryTime.After(now) {
- return time.Time{}
- }
- return expiryTime
- }
- // updateMaxExpiryTime updates maxExpiryTime if expiryTime is later.
- func updateMaxExpiryTime(maxExpiryTime *time.Time, expiryTime time.Time) {
- if expiryTime.IsZero() {
- return
- }
- if maxExpiryTime.IsZero() || expiryTime.After(*maxExpiryTime) {
- *maxExpiryTime = expiryTime
- }
- }
- // calculateExpiryForRate calculates the expiry time for a single rate limit rule.
- // Returns the expiry time if the rate limit is exceeded, or zero time if not.
- func calculateExpiryForRate(rate config.RateSpec, logs []*InternalLogEntry, now time.Time) time.Time {
- duration := parseDuration(rate)
- if duration <= 0 {
- return time.Time{}
- }
- windowStart := now.Add(-duration)
- executions, oldestExecutionTime := findOldestExecutionInWindow(logs, windowStart)
- if executions < rate.Limit || oldestExecutionTime == nil {
- return time.Time{}
- }
- return calculateExpiryTime(*oldestExecutionTime, duration, now)
- }
- // getLogsForBinding retrieves logs for a binding ID.
- func (e *Executor) getLogsForBinding(bindingId string) []*InternalLogEntry {
- e.logmutex.RLock()
- logs, found := e.LogsByBindingId[bindingId]
- e.logmutex.RUnlock()
- if !found || len(logs) == 0 {
- return nil
- }
- return logs
- }
- // calculateMaxExpiryTimeFromRates calculates the maximum expiry time across all rate limit rules.
- func calculateMaxExpiryTimeFromRates(rates []config.RateSpec, logs []*InternalLogEntry, now time.Time) time.Time {
- var maxExpiryTime time.Time
- for _, rate := range rates {
- expiryTime := calculateExpiryForRate(rate, logs, now)
- updateMaxExpiryTime(&maxExpiryTime, expiryTime)
- }
- return maxExpiryTime
- }
- // GetTimeUntilAvailable calculates when an action will be available again based on rate limits.
- // Returns the Unix timestamp in seconds when the rate limit expires, or 0 if the action is available now.
- func (e *Executor) GetTimeUntilAvailable(binding *ActionBinding) int64 {
- if len(binding.Action.MaxRate) == 0 {
- return 0
- }
- logs := e.getLogsForBinding(binding.ID)
- if logs == nil {
- return 0
- }
- maxExpiryTime := calculateMaxExpiryTimeFromRates(binding.Action.MaxRate, logs, time.Now())
- if maxExpiryTime.IsZero() {
- return 0
- }
- return maxExpiryTime.Unix()
- }
- func (e *Executor) SetLog(trackingID string, entry *InternalLogEntry) string {
- e.logmutex.Lock()
- defer e.logmutex.Unlock()
- if _, found := e.logs[trackingID]; found || !isValidTrackingID(trackingID) {
- trackingID = uuid.NewString()
- entry.ExecutionTrackingID = trackingID
- }
- entry.Index = int64(len(e.logsTrackingIdsByDate))
- e.logs[trackingID] = entry
- e.logsTrackingIdsByDate = append(e.logsTrackingIdsByDate, trackingID)
- return trackingID
- }
- // ExecRequest processes an ExecutionRequest
- func (e *Executor) ExecRequest(req *ExecutionRequest) (*sync.WaitGroup, string) {
- e.initializeExecRequest(req)
- log.Tracef("executor.ExecRequest(): trackingID=%s bindingID=%s", req.TrackingID, bindingIDForTrace(req))
- req.TrackingID = e.SetLog(req.TrackingID, req.logEntry)
- wg := new(sync.WaitGroup)
- wg.Add(1)
- go func() {
- queued := e.execChain(req, wg)
- if !queued {
- wg.Done()
- }
- }()
- return wg, req.TrackingID
- }
- func (e *Executor) initializeExecRequest(req *ExecutionRequest) {
- if req.AuthenticatedUser == nil {
- req.AuthenticatedUser = auth.UserGuest(req.Cfg)
- }
- req.executor = e
- req.logEntry = &InternalLogEntry{
- Binding: req.Binding,
- DatetimeStarted: time.Now(),
- ExecutionTrackingID: req.TrackingID,
- Output: "",
- ExitCode: DefaultExitCodeNotExecuted,
- ExecutionStarted: false,
- ExecutionFinished: false,
- ActionTitle: "notfound",
- ActionIcon: "💩",
- Username: req.AuthenticatedUser.Username,
- }
- }
- func bindingIDForTrace(req *ExecutionRequest) string {
- if req.Binding == nil {
- return ""
- }
- return req.Binding.ID
- }
- func (e *Executor) execChain(req *ExecutionRequest, wg *sync.WaitGroup) bool {
- if !req.skipRequestRegistration {
- finished, queued := e.registerOrQueueRequest(req, wg)
- if finished || queued {
- return queued
- }
- }
- e.runExecutionSteps(req)
- e.finishExecChain(req)
- return false
- }
- func (e *Executor) registerOrQueueRequest(req *ExecutionRequest, wg *sync.WaitGroup) (finished bool, queued bool) {
- if !stepRequestAction(req) {
- e.finishExecChain(req)
- return true, false
- }
- if e.finishIfConcurrencyBlocked(req) {
- return true, false
- }
- return e.queueRequestIfGroupLimited(req, wg)
- }
- func (e *Executor) finishIfConcurrencyBlocked(req *ExecutionRequest) bool {
- if actionNeedsGroupLimit(req) {
- return false
- }
- if stepConcurrencyCheck(req) {
- return false
- }
- e.finishExecChain(req)
- return true
- }
- func (e *Executor) queueRequestIfGroupLimited(req *ExecutionRequest, wg *sync.WaitGroup) (finished bool, queued bool) {
- if !actionNeedsGroupLimit(req) || e.groupsHaveCapacityForActive(req) {
- return false, false
- }
- return e.queueRequestAfterACL(req, wg)
- }
- func (e *Executor) queueRequestAfterACL(req *ExecutionRequest, wg *sync.WaitGroup) (finished bool, queued bool) {
- if !stepACLCheck(req) {
- e.finishExecChain(req)
- return true, false
- }
- if e.queueRequest(req, wg) {
- e.finishExecChain(req)
- return true, false
- }
- notifyListenersStarted(req)
- return false, true
- }
- func (e *Executor) runExecutionSteps(req *ExecutionRequest) {
- for _, step := range e.chainOfCommand[1:] {
- if !step(req) {
- break
- }
- }
- }
- func (e *Executor) finishExecChain(req *ExecutionRequest) {
- req.mutateLogEntry(func(entry *InternalLogEntry) {
- if entry.DatetimeFinished.IsZero() {
- entry.DatetimeFinished = time.Now()
- }
- entry.ExecutionFinished = true
- })
- recordExecutionMetrics(req.logEntry)
- notifyListenersFinished(req)
- e.drainGroupQueue()
- }
- func getConcurrentCount(req *ExecutionRequest) int {
- concurrentCount := 0
- req.executor.logmutex.RLock()
- logs := req.executor.LogsByBindingId[req.Binding.ID]
- for _, logEntry := range logs {
- if !logEntry.ExecutionFinished && !logEntry.Queued {
- concurrentCount += 1
- }
- }
- req.executor.logmutex.RUnlock()
- return concurrentCount
- }
- func stepConcurrencyCheck(req *ExecutionRequest) bool {
- if actionNeedsGroupLimit(req) {
- return true
- }
- concurrentCount := getConcurrentCount(req)
- // Note that the current execution is counted int the logs, so when checking we +1
- if concurrentCount >= (req.Binding.Action.MaxConcurrent + 1) {
- log.WithFields(log.Fields{
- "actionTitle": req.logEntry.ActionTitle,
- "concurrentCount": concurrentCount,
- "maxConcurrent": req.Binding.Action.MaxConcurrent,
- }).Warnf("Blocked from executing due to concurrency limit")
- req.mutateLogEntry(func(entry *InternalLogEntry) {
- entry.Output = "Blocked from executing due to concurrency limit"
- entry.Blocked = true
- })
- return false
- }
- return true
- }
- func parseDuration(rate config.RateSpec) time.Duration {
- duration, err := time.ParseDuration(rate.Duration)
- if err != nil {
- log.Warnf("Could not parse duration: %v", rate.Duration)
- return -1 * time.Minute
- }
- return duration
- }
- func entityPrefixForRequest(req *ExecutionRequest) string {
- if req.Binding != nil && req.Binding.Entity != nil {
- return req.Binding.Entity.UniqueKey
- }
- return ""
- }
- func rateExecutionMatchesScope(logEntry *InternalLogEntry, req *ExecutionRequest, entityPrefix string) bool {
- if logEntry.EntityPrefix != entityPrefix {
- return false
- }
- return !logEntry.Queued && logEntry.ExecutionTrackingID != req.TrackingID
- }
- func logEntryStartedInWindow(logEntry *InternalLogEntry, windowStart time.Time) bool {
- return logEntry.DatetimeStarted.After(windowStart) && !logEntry.Blocked
- }
- func rateExecutionCountsForRate(logEntry *InternalLogEntry, req *ExecutionRequest, entityPrefix string, windowStart time.Time) bool {
- return rateExecutionMatchesScope(logEntry, req, entityPrefix) && logEntryStartedInWindow(logEntry, windowStart)
- }
- func countRateExecutions(logs []*InternalLogEntry, req *ExecutionRequest, entityPrefix string, windowStart time.Time) int {
- executions := 0
- for _, logEntry := range logs {
- if rateExecutionCountsForRate(logEntry, req, entityPrefix, windowStart) {
- executions += 1
- }
- }
- return executions
- }
- func getExecutionsCount(rate config.RateSpec, req *ExecutionRequest) int {
- duration := parseDuration(rate)
- then := time.Now().Add(-duration)
- req.executor.logmutex.RLock()
- logs := req.executor.LogsByBindingId[req.Binding.ID]
- executions := countRateExecutions(logs, req, entityPrefixForRequest(req), then)
- req.executor.logmutex.RUnlock()
- return executions
- }
- func stepRateCheck(req *ExecutionRequest) bool {
- for _, rate := range req.Binding.Action.MaxRate {
- executions := getExecutionsCount(rate, req)
- if executions >= rate.Limit {
- log.WithFields(log.Fields{
- "actionTitle": req.logEntry.ActionTitle,
- "executions": executions,
- "limit": rate.Limit,
- "duration": rate.Duration,
- }).Infof("Blocked from executing due to rate limit")
- req.mutateLogEntry(func(entry *InternalLogEntry) {
- entry.Output = "Blocked from executing due to rate limit"
- entry.Blocked = true
- })
- return false
- }
- }
- return true
- }
- func stepACLCheck(req *ExecutionRequest) bool {
- canExec := acl.IsAllowedExec(req.Cfg, req.AuthenticatedUser, req.Binding.Action)
- if !canExec {
- req.mutateLogEntry(func(entry *InternalLogEntry) {
- entry.Output = "ACL check failed. Blocked from executing."
- entry.Blocked = true
- })
- log.WithFields(log.Fields{
- "actionTitle": req.logEntry.ActionTitle,
- }).Warnf("ACL check failed. Blocked from executing.")
- }
- return canExec
- }
- func stepParseArgs(req *ExecutionRequest) bool {
- ensureArgumentMap(req)
- if !hasBindingAndAction(req) {
- return fail(req, fmt.Errorf("cannot parse arguments: Binding or Action is nil"))
- }
- filterToDefinedArgumentsOnly(req)
- if err := injectSystemArgs(req); err != nil {
- return fail(req, err)
- }
- mangleInvalidArgumentValues(req)
- copyStorableArgumentsToLogEntry(req)
- if hasExec(req) {
- return handleExecBranch(req)
- } else {
- return handleShellBranch(req)
- }
- }
- func handleExecBranch(req *ExecutionRequest) bool {
- args, err := parseActionExec(req.Arguments, req.Binding.Action, req.Binding.Entity)
- if err != nil {
- return fail(req, err)
- }
- req.useDirectExec = true
- req.execArgs = args
- return true
- }
- func handleShellBranch(req *ExecutionRequest) bool {
- if hasWebhookTag(req) {
- return fail(req, fmt.Errorf("webhooks cannot use Shell execution; use exec instead. See https://docs.olivetin.app/action_execution/shellvsexec.html"))
- }
- if err := checkShellArgumentSafety(req.Binding.Action); err != nil {
- return fail(req, err)
- }
- cmd, err := parseActionArguments(req)
- if err != nil {
- return fail(req, err)
- }
- req.useDirectExec = false
- req.finalParsedCommand = cmd
- return true
- }
- func ensureArgumentMap(req *ExecutionRequest) {
- if req.Arguments == nil {
- req.Arguments = make(map[string]string)
- }
- }
- func filterToDefinedArgumentsOnly(req *ExecutionRequest) {
- definedNames := make(map[string]struct{})
- for _, arg := range req.Binding.Action.Arguments {
- definedNames[arg.Name] = struct{}{}
- }
- filtered := make(map[string]string)
- for k, v := range req.Arguments {
- if keepArgument(k, definedNames) {
- filtered[k] = v
- }
- }
- req.Arguments = filtered
- }
- func keepArgument(name string, definedNames map[string]struct{}) bool {
- _, ok := definedNames[name]
- return ok
- }
- func hasWebhookTag(req *ExecutionRequest) bool {
- for _, tag := range req.Tags {
- if tag == "webhook" {
- return true
- }
- }
- return false
- }
- var systemArgumentDefinitions = []config.ActionArgument{
- {Name: "ot_executionTrackingId", Type: "ascii_identifier", RejectNull: true},
- {Name: "ot_username", Type: "shell_safe_identifier", RejectNull: true},
- }
- func injectSystemArgs(req *ExecutionRequest) error {
- args, err := validatedSystemArgs(req)
- if err != nil {
- return err
- }
- for name, value := range args {
- req.Arguments[name] = value
- }
- return nil
- }
- func validatedSystemArgs(req *ExecutionRequest) (map[string]string, error) {
- values := map[string]string{
- "ot_executionTrackingId": req.TrackingID,
- "ot_username": req.AuthenticatedUser.Username,
- }
- for i := range systemArgumentDefinitions {
- arg := &systemArgumentDefinitions[i]
- if err := ValidateArgument(arg, values[arg.Name], req.Binding.Action); err != nil {
- return nil, fmt.Errorf("system argument %q failed validation: %w", arg.Name, err)
- }
- }
- return values, nil
- }
- func hasBindingAndAction(req *ExecutionRequest) bool {
- return !(req.Binding == nil || req.Binding.Action == nil)
- }
- func hasExec(req *ExecutionRequest) bool {
- return len(req.Binding.Action.Exec) > 0
- }
- func fail(req *ExecutionRequest, err error) bool {
- req.mutateLogEntry(func(entry *InternalLogEntry) {
- entry.Output = err.Error()
- })
- log.Warn(err.Error())
- return false
- }
- func stepRequestAction(req *ExecutionRequest) bool {
- metricActionsRequested.Inc()
- if !stepRequestActionHasBinding(req) {
- return false
- }
- stepRequestActionPopulateLogEntry(req)
- stepRequestActionRegisterLog(req)
- log.WithFields(log.Fields{
- "actionTitle": req.logEntry.ActionTitle,
- "tags": req.Tags,
- }).Infof("Action requested")
- notifyListenersStarted(req)
- return true
- }
- func stepRequestActionHasBinding(req *ExecutionRequest) bool {
- if req.Binding == nil || req.Binding.Action == nil {
- log.Warnf("Action request has no binding/action; skipping execution")
- return false
- }
- return true
- }
- func stepRequestActionPopulateLogEntry(req *ExecutionRequest) {
- req.mutateLogEntry(func(entry *InternalLogEntry) {
- entry.Binding = req.Binding
- entry.ActionConfigTitle = req.Binding.Action.Title
- entry.ActionTitle = tpl.ParseTemplateOfActionBeforeExec(req.Binding.Action.Title, req.Binding.Entity)
- entry.ActionIcon = tpl.ParseTemplateOfActionBeforeExec(req.Binding.Action.Icon, req.Binding.Entity)
- entry.Tags = req.Tags
- entry.Justification = ResolveJustification(req)
- if req.Binding.Entity != nil {
- entry.EntityPrefix = req.Binding.Entity.UniqueKey
- }
- })
- }
- func stepRequestActionRegisterLog(req *ExecutionRequest) {
- req.executor.logmutex.Lock()
- defer req.executor.logmutex.Unlock()
- if _, containsKey := req.executor.LogsByBindingId[req.Binding.ID]; !containsKey {
- req.executor.LogsByBindingId[req.Binding.ID] = make([]*InternalLogEntry, 0)
- }
- req.executor.LogsByBindingId[req.Binding.ID] = append(req.executor.LogsByBindingId[req.Binding.ID], req.logEntry)
- }
- func stepLogStart(req *ExecutionRequest) bool {
- log.WithFields(log.Fields{
- "actionTitle": req.logEntry.ActionTitle,
- "timeout": req.Binding.Action.Timeout,
- }).Infof("Action started")
- return true
- }
- func stepLogFinish(req *ExecutionRequest) bool {
- req.mutateLogEntry(func(entry *InternalLogEntry) {
- entry.ExecutionFinished = true
- })
- log.WithFields(log.Fields{
- "actionTitle": req.logEntry.ActionTitle,
- "outputLength": len(req.logEntry.Output),
- "timedOut": req.logEntry.TimedOut,
- "exit": req.logEntry.ExitCode,
- }).Infof("Action finished")
- return true
- }
- func notifyListenersFinished(req *ExecutionRequest) {
- for _, listener := range req.executor.copyListeners() {
- listener.OnExecutionFinished(req.logEntry)
- }
- }
- func notifyListenersStarted(req *ExecutionRequest) {
- for _, listener := range req.executor.copyListeners() {
- listener.OnExecutionStarted(req.logEntry)
- }
- }
- func appendErrorToStderr(req *ExecutionRequest, err error) {
- if err == nil {
- return
- }
- req.mutateLogEntry(func(entry *InternalLogEntry) {
- entry.Output = err.Error() + "\n\n" + entry.Output
- })
- }
- type OutputStreamer struct {
- Req *ExecutionRequest
- output bytes.Buffer
- }
- func (ost *OutputStreamer) Write(o []byte) (n int, err error) {
- for _, listener := range ost.Req.executor.copyListeners() {
- listener.OnOutputChunk(o, ost.Req.TrackingID)
- }
- return ost.output.Write(o)
- }
- func (ost *OutputStreamer) String() string {
- return ost.output.String()
- }
- func buildEnv(args map[string]string) []string {
- ret := append(os.Environ(), "OLIVETIN=1")
- for k, v := range args {
- varName := fmt.Sprintf("%v", strings.TrimSpace(strings.ToUpper(k)))
- // Skip arguments that might not have a name (eg, confirmation), as this causes weird bugs on Windows.
- if varName == "" {
- continue
- }
- ret = append(ret, fmt.Sprintf("%v=%v", varName, v))
- }
- return ret
- }
- func commandExitCode(cmd *exec.Cmd) int {
- if cmd == nil || cmd.ProcessState == nil {
- return -1
- }
- return cmd.ProcessState.ExitCode()
- }
- func stepExec(req *ExecutionRequest) bool {
- ctx, cancel := newTimeoutContext(context.Background(), time.Duration(req.Binding.Action.Timeout)*time.Second, req.executor)
- defer cancel()
- streamer := &OutputStreamer{Req: req}
- cmd := buildCommand(ctx, req)
- if cmd == nil {
- req.mutateLogEntry(func(entry *InternalLogEntry) {
- entry.Output = "Cannot execute: no command arguments provided"
- })
- log.Warn("Cannot execute: no command arguments provided")
- return false
- }
- prepareCommand(cmd, streamer, req)
- runerr := cmd.Start()
- req.mutateLogEntry(func(entry *InternalLogEntry) {
- entry.Process = cmd.Process
- })
- ctx.setProcess(cmd.Process)
- waiterr := cmd.Wait()
- req.mutateLogEntry(func(entry *InternalLogEntry) {
- entry.ExitCode = int32(commandExitCode(cmd))
- entry.Output = streamer.String()
- })
- appendErrorToStderr(req, runerr)
- appendErrorToStderr(req, waiterr)
- if ctx.Err() == context.DeadlineExceeded {
- log.WithFields(log.Fields{
- "actionTitle": req.logEntry.ActionTitle,
- }).Warnf("Action timed out")
- req.mutateLogEntry(func(entry *InternalLogEntry) {
- entry.TimedOut = true
- entry.Output += "OliveTin::timeout - this action timed out after " + fmt.Sprintf("%v", req.Binding.Action.Timeout) + " seconds. If you need more time for this action, set a longer timeout. See https://docs.olivetin.app/action_customization/timeouts.html for more help."
- })
- }
- req.mutateLogEntry(func(entry *InternalLogEntry) {
- entry.DatetimeFinished = time.Now()
- })
- return true
- }
- func buildCommand(ctx context.Context, req *ExecutionRequest) *exec.Cmd {
- if req.useDirectExec {
- return wrapCommandDirect(ctx, req.execArgs)
- }
- return wrapCommandInShell(ctx, req.finalParsedCommand)
- }
- func prepareCommand(cmd *exec.Cmd, streamer *OutputStreamer, req *ExecutionRequest) {
- cmd.Stdout = streamer
- cmd.Stderr = streamer
- cmd.Env = buildEnv(req.Arguments)
- started := false
- req.mutateLogEntry(func(entry *InternalLogEntry) {
- if entry.ExecutionStarted {
- return
- }
- entry.ExecutionStarted = true
- started = true
- })
- if started {
- notifyListenersStarted(req)
- }
- }
- func stepExecAfter(req *ExecutionRequest) bool {
- ctx, cancel := newTimeoutContext(context.Background(), time.Duration(req.Binding.Action.Timeout)*time.Second, req.executor)
- defer cancel()
- var stdout bytes.Buffer
- var stderr bytes.Buffer
- cmd, args, err := buildShellAfterCommand(ctx, req, &stdout, &stderr)
- if err != nil {
- return fail(req, err)
- }
- if cmd == nil {
- return true
- }
- cmd.Env = buildEnv(args)
- runerr := cmd.Start()
- ctx.setProcess(cmd.Process)
- waiterr := cmd.Wait()
- req.mutateLogEntry(func(entry *InternalLogEntry) {
- entry.Output += "\n"
- entry.Output += "OliveTin::shellAfterCompleted stdout\n"
- entry.Output += stdout.String()
- entry.Output += "OliveTin::shellAfterCompleted stderr\n"
- entry.Output += stderr.String()
- entry.Output += "OliveTin::shellAfterCompleted errors and summary\n"
- })
- appendErrorToStderr(req, runerr)
- appendErrorToStderr(req, waiterr)
- if ctx.Err() == context.DeadlineExceeded {
- req.mutateLogEntry(func(entry *InternalLogEntry) {
- entry.Output += "Your shellAfterCompleted command timed out."
- })
- }
- req.mutateLogEntry(func(entry *InternalLogEntry) {
- entry.Output += fmt.Sprintf("Your shellAfterCompleted exited with code %v\n", commandExitCode(cmd))
- entry.Output += "OliveTin::shellAfterCompleted output complete\n"
- })
- return true
- }
- func buildShellAfterCommand(ctx context.Context, req *ExecutionRequest, stdout, stderr *bytes.Buffer) (*exec.Cmd, map[string]string, error) {
- if req.Binding.Action.ShellAfterCompleted == "" {
- return nil, nil, nil
- }
- args, err := buildShellAfterArgs(req)
- if err != nil {
- return nil, nil, err
- }
- finalParsedCommand, err := tpl.ParseTemplateWithActionContext(req.Binding.Action.ShellAfterCompleted, req.Binding.Entity, args)
- if err != nil {
- msg := "Could not prepare shellAfterCompleted command: " + err.Error() + "\n"
- req.mutateLogEntry(func(entry *InternalLogEntry) {
- entry.Output += msg
- })
- log.Warn(msg)
- return nil, nil, nil
- }
- cmd := wrapCommandInShell(ctx, finalParsedCommand)
- cmd.Stdout = stdout
- cmd.Stderr = stderr
- return cmd, args, nil
- }
- func buildShellAfterArgs(req *ExecutionRequest) (map[string]string, error) {
- args, err := validatedSystemArgs(req)
- if err != nil {
- return nil, err
- }
- args["output"] = req.logEntry.Output
- args["exitCode"] = fmt.Sprintf("%v", req.logEntry.ExitCode)
- return args, nil
- }
- //gocyclo:ignore
- func stepTrigger(req *ExecutionRequest) bool {
- if req.Binding.Action.Triggers == nil {
- return true
- }
- if req.TriggerDepth >= MaxTriggerDepth {
- log.WithFields(log.Fields{
- "actionTitle": req.logEntry.ActionTitle,
- "depth": req.TriggerDepth,
- }).Warnf("Trigger action reached maximum depth of %v. Not triggering further actions.", MaxTriggerDepth)
- req.mutateLogEntry(func(entry *InternalLogEntry) {
- entry.Output += fmt.Sprintf("OliveTin::trigger - this action reached maximum trigger depth of %v. Not triggering further actions.", MaxTriggerDepth)
- })
- return true
- }
- if len(req.Tags) > 0 && req.Tags[0] == "trigger" {
- log.Warnf("Trigger action is triggering another trigger action. This is allowed, but be careful not to create trigger loops.")
- }
- triggerLoop(req)
- return true
- }
- func triggerLoop(req *ExecutionRequest) {
- for _, triggerTitle := range req.Binding.Action.Triggers {
- binding := req.executor.findBindingByActionTitle(triggerTitle, "")
- if binding == nil {
- log.WithFields(log.Fields{
- "triggerTitle": triggerTitle,
- "fromAction": req.logEntry.ActionTitle,
- }).Warnf("Trigger references unknown action title; skipping")
- continue
- }
- trigger := &ExecutionRequest{
- Binding: binding,
- TrackingID: uuid.NewString(),
- Tags: []string{"trigger"},
- AuthenticatedUser: req.AuthenticatedUser,
- Arguments: req.Arguments,
- Cfg: req.Cfg,
- TriggerDepth: req.TriggerDepth + 1,
- Justification: fmt.Sprintf("Triggered by action: %s", req.logEntry.ActionTitle),
- }
- req.executor.ExecRequest(trigger)
- }
- }
- func stepSaveLog(req *ExecutionRequest) bool {
- filename := fmt.Sprintf("%v.%v.%v", req.logEntry.ActionTitle, req.logEntry.DatetimeStarted.Unix(), req.logEntry.ExecutionTrackingID)
- saveLogResults(req, filename)
- saveLogOutput(req, filename)
- return true
- }
- func firstNonEmpty(one, two string) string {
- if one != "" {
- return one
- }
- return two
- }
- func saveLogResults(req *ExecutionRequest, filename string) {
- dir := firstNonEmpty(req.Binding.Action.SaveLogs.ResultsDirectory, req.Cfg.SaveLogs.ResultsDirectory)
- if dir != "" {
- data, err := yaml.Marshal(req.logEntry)
- if err != nil {
- log.Warnf("%v", err)
- }
- filepath := path.Join(dir, filename+".yaml")
- err = os.WriteFile(filepath, data, 0600)
- if err != nil {
- log.Warnf("%v", err)
- }
- }
- }
- func saveLogOutput(req *ExecutionRequest, filename string) {
- dir := firstNonEmpty(req.Binding.Action.SaveLogs.OutputDirectory, req.Cfg.SaveLogs.OutputDirectory)
- if dir != "" {
- data := req.logEntry.Output
- filepath := path.Join(dir, filename+".log")
- err := os.WriteFile(filepath, []byte(data), 0600)
- if err != nil {
- log.Warnf("%v", err)
- }
- }
- }
|