| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205 |
- package executor
- import (
- "os"
- "path/filepath"
- "sort"
- "strings"
- log "github.com/sirupsen/logrus"
- "gopkg.in/yaml.v3"
- )
- // LoadLogsFromDisk loads persisted logs from YAML files on disk and restores them to the executor.
- // This should be called during startup if saveLogs is configured.
- func (e *Executor) LoadLogsFromDisk() {
- resultsDir := e.Cfg.SaveLogs.ResultsDirectory
- if resultsDir == "" {
- return
- }
- entries, skippedCount := e.readLogDirectory(resultsDir)
- if entries == nil {
- return
- }
- loadedLogs, skippedCount := e.parseLogFiles(resultsDir, entries, skippedCount)
- sort.Slice(loadedLogs, func(i, j int) bool {
- return loadedLogs[i].DatetimeStarted.Before(loadedLogs[j].DatetimeStarted)
- })
- skippedCount = e.restoreLogsToExecutor(loadedLogs, skippedCount)
- log.WithFields(log.Fields{
- "loaded": len(loadedLogs),
- "skipped": skippedCount,
- }).Info("Finished loading persisted logs from disk")
- }
- func (e *Executor) readLogDirectory(resultsDir string) ([]os.DirEntry, int) {
- if _, err := os.Stat(resultsDir); os.IsNotExist(err) {
- log.WithFields(log.Fields{
- "directory": resultsDir,
- }).Debug("Logs directory does not exist, skipping log loading")
- return nil, 0
- }
- log.WithFields(log.Fields{
- "directory": resultsDir,
- }).Info("Loading persisted logs from disk")
- entries, err := os.ReadDir(resultsDir)
- if err != nil {
- log.WithFields(log.Fields{
- "directory": resultsDir,
- "error": err,
- }).Warnf("Failed to read logs directory")
- return nil, 0
- }
- return entries, 0
- }
- func (e *Executor) parseLogFiles(resultsDir string, entries []os.DirEntry, skippedCount int) ([]*InternalLogEntry, int) {
- loadedLogs := make([]*InternalLogEntry, 0)
- for _, entry := range entries {
- if !e.shouldProcessLogEntry(entry) {
- continue
- }
- logEntry, newSkippedCount := e.processLogFileEntry(resultsDir, entry.Name())
- skippedCount += newSkippedCount
- if logEntry != nil {
- loadedLogs = append(loadedLogs, logEntry)
- }
- }
- return loadedLogs, skippedCount
- }
- func (e *Executor) shouldProcessLogEntry(entry os.DirEntry) bool {
- return !entry.IsDir() && strings.HasSuffix(entry.Name(), ".yaml")
- }
- func (e *Executor) processLogFileEntry(resultsDir, filename string) (*InternalLogEntry, int) {
- logEntry, ok := e.loadLogFileFromPath(resultsDir, filename)
- if !ok {
- return nil, 1
- }
- if logEntry.ExecutionTrackingID == "" {
- log.WithFields(log.Fields{
- "file": filepath.Join(resultsDir, filename),
- }).Warnf("Log file missing execution tracking ID, skipping")
- return nil, 1
- }
- e.restoreBindingForLogEntry(logEntry, filepath.Join(resultsDir, filename))
- return logEntry, 0
- }
- func (e *Executor) loadLogFileFromPath(resultsDir, filename string) (*InternalLogEntry, bool) {
- filepath := filepath.Join(resultsDir, filename)
- data, err := os.ReadFile(filepath)
- if err != nil {
- log.WithFields(log.Fields{
- "file": filepath,
- "error": err,
- }).Warnf("Failed to read log file")
- return nil, false
- }
- var logEntry InternalLogEntry
- if err := yaml.Unmarshal(data, &logEntry); err != nil {
- log.WithFields(log.Fields{
- "file": filepath,
- "error": err,
- }).Warnf("Failed to unmarshal log file")
- return nil, false
- }
- return &logEntry, true
- }
- // Skipped when the entry already has a valid binding or has no ActionConfigTitle (e.g. action/entity removed from config).
- func (e *Executor) restoreBindingForLogEntry(logEntry *InternalLogEntry, filepath string) {
- if e.hasValidBinding(logEntry) || logEntry.ActionConfigTitle == "" {
- return
- }
- binding := e.findBindingByActionTitle(logEntry.ActionConfigTitle, logEntry.EntityPrefix)
- if binding != nil {
- logEntry.Binding = binding
- return
- }
- e.logBindingNotFound(logEntry, filepath)
- logEntry.Binding = nil
- }
- func (e *Executor) hasValidBinding(logEntry *InternalLogEntry) bool {
- return logEntry.Binding != nil && logEntry.Binding.Action != nil
- }
- func (e *Executor) logBindingNotFound(logEntry *InternalLogEntry, filepath string) {
- log.WithFields(log.Fields{
- "file": filepath,
- "actionTitle": logEntry.ActionConfigTitle,
- "entityPrefix": logEntry.EntityPrefix,
- "trackingId": logEntry.ExecutionTrackingID,
- }).Debug("Could not find binding for log entry, loading without binding")
- }
- func (e *Executor) restoreLogsToExecutor(loadedLogs []*InternalLogEntry, skippedCount int) int {
- e.logmutex.Lock()
- defer e.logmutex.Unlock()
- for _, logEntry := range loadedLogs {
- if _, exists := e.logs[logEntry.ExecutionTrackingID]; exists {
- log.WithFields(log.Fields{
- "trackingId": logEntry.ExecutionTrackingID,
- }).Debug("Log entry already exists, skipping")
- skippedCount++
- continue
- }
- logEntry.Index = int64(len(e.logsTrackingIdsByDate))
- e.logs[logEntry.ExecutionTrackingID] = logEntry
- e.logsTrackingIdsByDate = append(e.logsTrackingIdsByDate, logEntry.ExecutionTrackingID)
- if logEntry.Binding != nil {
- e.addLogToBindingMap(logEntry)
- }
- }
- return skippedCount
- }
- func (e *Executor) addLogToBindingMap(logEntry *InternalLogEntry) {
- if _, containsKey := e.LogsByBindingId[logEntry.Binding.ID]; !containsKey {
- e.LogsByBindingId[logEntry.Binding.ID] = make([]*InternalLogEntry, 0)
- }
- e.LogsByBindingId[logEntry.Binding.ID] = append(e.LogsByBindingId[logEntry.Binding.ID], logEntry)
- }
- func (e *Executor) findBindingByActionTitle(actionConfigTitle string, entityPrefix string) *ActionBinding {
- e.MapActionBindingsLock.RLock()
- defer e.MapActionBindingsLock.RUnlock()
- for _, binding := range e.MapActionBindings {
- if binding.Action.Title == actionConfigTitle && e.matchesEntityPrefix(binding, entityPrefix) {
- return binding
- }
- }
- return nil
- }
- func (e *Executor) matchesEntityPrefix(binding *ActionBinding, entityPrefix string) bool {
- if entityPrefix == "" {
- return binding.Entity == nil
- }
- return binding.Entity != nil && binding.Entity.UniqueKey == entityPrefix
- }
|