loadlogs.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. package executor
  2. import (
  3. "os"
  4. "path/filepath"
  5. "sort"
  6. "strings"
  7. log "github.com/sirupsen/logrus"
  8. "gopkg.in/yaml.v3"
  9. )
  10. // LoadLogsFromDisk loads persisted logs from YAML files on disk and restores them to the executor.
  11. // This should be called during startup if saveLogs is configured.
  12. func (e *Executor) LoadLogsFromDisk() {
  13. resultsDir := e.Cfg.SaveLogs.ResultsDirectory
  14. if resultsDir == "" {
  15. return
  16. }
  17. entries, skippedCount := e.readLogDirectory(resultsDir)
  18. if entries == nil {
  19. return
  20. }
  21. loadedLogs, skippedCount := e.parseLogFiles(resultsDir, entries, skippedCount)
  22. sort.Slice(loadedLogs, func(i, j int) bool {
  23. return loadedLogs[i].DatetimeStarted.Before(loadedLogs[j].DatetimeStarted)
  24. })
  25. skippedCount = e.restoreLogsToExecutor(loadedLogs, skippedCount)
  26. log.WithFields(log.Fields{
  27. "loaded": len(loadedLogs),
  28. "skipped": skippedCount,
  29. }).Info("Finished loading persisted logs from disk")
  30. }
  31. func (e *Executor) readLogDirectory(resultsDir string) ([]os.DirEntry, int) {
  32. if _, err := os.Stat(resultsDir); os.IsNotExist(err) {
  33. log.WithFields(log.Fields{
  34. "directory": resultsDir,
  35. }).Debug("Logs directory does not exist, skipping log loading")
  36. return nil, 0
  37. }
  38. log.WithFields(log.Fields{
  39. "directory": resultsDir,
  40. }).Info("Loading persisted logs from disk")
  41. entries, err := os.ReadDir(resultsDir)
  42. if err != nil {
  43. log.WithFields(log.Fields{
  44. "directory": resultsDir,
  45. "error": err,
  46. }).Warnf("Failed to read logs directory")
  47. return nil, 0
  48. }
  49. return entries, 0
  50. }
  51. func (e *Executor) parseLogFiles(resultsDir string, entries []os.DirEntry, skippedCount int) ([]*InternalLogEntry, int) {
  52. loadedLogs := make([]*InternalLogEntry, 0)
  53. for _, entry := range entries {
  54. if !e.shouldProcessLogEntry(entry) {
  55. continue
  56. }
  57. logEntry, newSkippedCount := e.processLogFileEntry(resultsDir, entry.Name())
  58. skippedCount += newSkippedCount
  59. if logEntry != nil {
  60. loadedLogs = append(loadedLogs, logEntry)
  61. }
  62. }
  63. return loadedLogs, skippedCount
  64. }
  65. func (e *Executor) shouldProcessLogEntry(entry os.DirEntry) bool {
  66. return !entry.IsDir() && strings.HasSuffix(entry.Name(), ".yaml")
  67. }
  68. func (e *Executor) processLogFileEntry(resultsDir, filename string) (*InternalLogEntry, int) {
  69. logEntry, ok := e.loadLogFileFromPath(resultsDir, filename)
  70. if !ok {
  71. return nil, 1
  72. }
  73. if logEntry.ExecutionTrackingID == "" {
  74. log.WithFields(log.Fields{
  75. "file": filepath.Join(resultsDir, filename),
  76. }).Warnf("Log file missing execution tracking ID, skipping")
  77. return nil, 1
  78. }
  79. e.restoreBindingForLogEntry(logEntry, filepath.Join(resultsDir, filename))
  80. return logEntry, 0
  81. }
  82. func (e *Executor) loadLogFileFromPath(resultsDir, filename string) (*InternalLogEntry, bool) {
  83. filepath := filepath.Join(resultsDir, filename)
  84. data, err := os.ReadFile(filepath)
  85. if err != nil {
  86. log.WithFields(log.Fields{
  87. "file": filepath,
  88. "error": err,
  89. }).Warnf("Failed to read log file")
  90. return nil, false
  91. }
  92. var logEntry InternalLogEntry
  93. if err := yaml.Unmarshal(data, &logEntry); err != nil {
  94. log.WithFields(log.Fields{
  95. "file": filepath,
  96. "error": err,
  97. }).Warnf("Failed to unmarshal log file")
  98. return nil, false
  99. }
  100. return &logEntry, true
  101. }
  102. // Skipped when the entry already has a valid binding or has no ActionConfigTitle (e.g. action/entity removed from config).
  103. func (e *Executor) restoreBindingForLogEntry(logEntry *InternalLogEntry, filepath string) {
  104. if e.hasValidBinding(logEntry) || logEntry.ActionConfigTitle == "" {
  105. return
  106. }
  107. binding := e.findBindingByActionTitle(logEntry.ActionConfigTitle, logEntry.EntityPrefix)
  108. if binding != nil {
  109. logEntry.Binding = binding
  110. return
  111. }
  112. e.logBindingNotFound(logEntry, filepath)
  113. logEntry.Binding = nil
  114. }
  115. func (e *Executor) hasValidBinding(logEntry *InternalLogEntry) bool {
  116. return logEntry.Binding != nil && logEntry.Binding.Action != nil
  117. }
  118. func (e *Executor) logBindingNotFound(logEntry *InternalLogEntry, filepath string) {
  119. log.WithFields(log.Fields{
  120. "file": filepath,
  121. "actionTitle": logEntry.ActionConfigTitle,
  122. "entityPrefix": logEntry.EntityPrefix,
  123. "trackingId": logEntry.ExecutionTrackingID,
  124. }).Debug("Could not find binding for log entry, loading without binding")
  125. }
  126. func (e *Executor) restoreLogsToExecutor(loadedLogs []*InternalLogEntry, skippedCount int) int {
  127. e.logmutex.Lock()
  128. defer e.logmutex.Unlock()
  129. for _, logEntry := range loadedLogs {
  130. if _, exists := e.logs[logEntry.ExecutionTrackingID]; exists {
  131. log.WithFields(log.Fields{
  132. "trackingId": logEntry.ExecutionTrackingID,
  133. }).Debug("Log entry already exists, skipping")
  134. skippedCount++
  135. continue
  136. }
  137. logEntry.Index = int64(len(e.logsTrackingIdsByDate))
  138. e.logs[logEntry.ExecutionTrackingID] = logEntry
  139. e.logsTrackingIdsByDate = append(e.logsTrackingIdsByDate, logEntry.ExecutionTrackingID)
  140. if logEntry.Binding != nil {
  141. e.addLogToBindingMap(logEntry)
  142. }
  143. }
  144. return skippedCount
  145. }
  146. func (e *Executor) addLogToBindingMap(logEntry *InternalLogEntry) {
  147. if _, containsKey := e.LogsByBindingId[logEntry.Binding.ID]; !containsKey {
  148. e.LogsByBindingId[logEntry.Binding.ID] = make([]*InternalLogEntry, 0)
  149. }
  150. e.LogsByBindingId[logEntry.Binding.ID] = append(e.LogsByBindingId[logEntry.Binding.ID], logEntry)
  151. }
  152. func (e *Executor) findBindingByActionTitle(actionConfigTitle string, entityPrefix string) *ActionBinding {
  153. e.MapActionBindingsLock.RLock()
  154. defer e.MapActionBindingsLock.RUnlock()
  155. for _, binding := range e.MapActionBindings {
  156. if binding.Action.Title == actionConfigTitle && e.matchesEntityPrefix(binding, entityPrefix) {
  157. return binding
  158. }
  159. }
  160. return nil
  161. }
  162. func (e *Executor) matchesEntityPrefix(binding *ActionBinding, entityPrefix string) bool {
  163. if entityPrefix == "" {
  164. return binding.Entity == nil
  165. }
  166. return binding.Entity != nil && binding.Entity.UniqueKey == entityPrefix
  167. }