Просмотр исходного кода

fix: Port the calender schedulding fix from 2k to 3k (#830)

jamesread 5 месяцев назад
Родитель
Сommit
56786491e8
4 измененных файлов с 108 добавлено и 4 удалено
  1. 23 0
      .pre-commit-config.yaml
  2. 6 0
      Makefile
  3. 1 1
      frontend/js/websocket.js
  4. 78 3
      service/internal/oncalendarfile/calendar.go

+ 23 - 0
.pre-commit-config.yaml

@@ -17,3 +17,26 @@ repos:
       - id: conventional-pre-commit
         stages: [commit-msg]
         args: [] # optional: list of Conventional Commits types to allow e.g. [feat, fix, ci, chore, test]
+
+  - repo: local
+    hooks:
+      - id: service-codestyle
+        name: service-codestyle
+        entry: make service-codestyle
+        language: system
+        pass_filenames: false
+        always_run: true
+
+      - id: frontend-codestyle
+        name: frontend-codestyle
+        entry: make frontend-codestyle
+        language: system
+        pass_filenames: false
+        always_run: true
+
+      - id: it
+        name: it
+        entry: make service-codestyle frontend-codestyle
+        language: system
+        pass_filenames: false
+        always_run: true

+ 6 - 0
Makefile

@@ -11,6 +11,12 @@ service-prep:
 service-unittests:
 	$(MAKE) -wC service unittests
 
+service-codestyle:
+	$(MAKE) -wC service codestyle
+
+frontend-codestyle:
+	$(MAKE) -wC frontend codestyle
+
 it:
 	$(MAKE) -wC integration-tests
 

+ 1 - 1
frontend/js/websocket.js

@@ -76,4 +76,4 @@ function onExecutionChanged (evt) {
     // Clear rate limit if not set
     rateLimits[logEntry.bindingId] = 0
   }
-}
+}

+ 78 - 3
service/internal/oncalendarfile/calendar.go

@@ -3,6 +3,7 @@ package oncalendarfile
 import (
 	"context"
 	"os"
+	"sync"
 	"time"
 
 	"github.com/OliveTin/OliveTin/internal/auth"
@@ -13,11 +14,27 @@ import (
 	"gopkg.in/yaml.v3"
 )
 
+type timerEntry struct {
+	timer  *time.Timer
+	cancel context.CancelFunc
+}
+
+type existingTimers struct {
+	timers map[time.Time]timerEntry
+}
+
+var (
+	scheduleMap      = make(map[string]existingTimers)
+	scheduleMapMutex sync.RWMutex
+)
+
 func Schedule(cfg *config.Config, ex *executor.Executor) {
 	for _, action := range cfg.Actions {
+		captured := action
+
 		if action.ExecOnCalendarFile != "" {
 			x := func(filename string) {
-				parseCalendarFile(action, cfg, ex, filename)
+				parseCalendarFile(captured, cfg, ex, filename)
 			}
 
 			go filehelper.WatchFileWrite(action.ExecOnCalendarFile, x)
@@ -27,7 +44,30 @@ func Schedule(cfg *config.Config, ex *executor.Executor) {
 	}
 }
 
+func clearExistingTimers(action *config.Action) {
+	scheduleMapMutex.Lock()
+	defer scheduleMapMutex.Unlock()
+
+	if _, exists := scheduleMap[action.ID]; exists {
+		for instant, entry := range scheduleMap[action.ID].timers {
+			log.WithFields(log.Fields{
+				"instant":     instant,
+				"actionTitle": action.Title,
+			}).Infof("Clearing existing scheduled action from calendar")
+
+			entry.cancel()
+			entry.timer.Stop()
+		}
+	}
+
+	scheduleMap[action.ID] = existingTimers{
+		timers: make(map[time.Time]timerEntry),
+	}
+}
+
 func parseCalendarFile(action *config.Action, cfg *config.Config, ex *executor.Executor, filename string) {
+	clearExistingTimers(action)
+
 	filehelper.Touch(action.ExecOnCalendarFile, "calendar file")
 
 	log.WithFields(log.Fields{
@@ -61,12 +101,41 @@ func scheduleCalendarActions(entries []string, action *config.Action, cfg *confi
 			continue
 		}
 
-		until, _ := time.Parse(time.RFC3339, instant)
+		until, err := time.Parse(time.RFC3339, instant)
+
+		if err != nil {
+			log.WithFields(log.Fields{
+				"instant":     instant,
+				"actionTitle": action.Title,
+			}).Warnf("Invalid calendar entry, skipping: %v", err)
+			continue
+		}
 
 		go sleepUntil(ctx, until, action, cfg, ex)
 	}
 }
 
+func registerTimer(action *config.Action, instant time.Time, timer *time.Timer, cancel context.CancelFunc) {
+	scheduleMapMutex.Lock()
+	defer scheduleMapMutex.Unlock()
+
+	if _, exists := scheduleMap[action.ID]; !exists {
+		scheduleMap[action.ID] = existingTimers{
+			timers: make(map[time.Time]timerEntry),
+		}
+	}
+	scheduleMap[action.ID].timers[instant] = timerEntry{
+		timer:  timer,
+		cancel: cancel,
+	}
+}
+
+func unregisterTimer(action *config.Action, instant time.Time) {
+	scheduleMapMutex.Lock()
+	delete(scheduleMap[action.ID].timers, instant)
+	scheduleMapMutex.Unlock()
+}
+
 func sleepUntil(ctx context.Context, instant time.Time, action *config.Action, cfg *config.Config, ex *executor.Executor) {
 	if time.Now().After(instant) {
 		log.WithFields(log.Fields{
@@ -82,15 +151,21 @@ func sleepUntil(ctx context.Context, instant time.Time, action *config.Action, c
 		"actionTitle": action.Title,
 	}).Infof("Scheduling action on calendar")
 
+	childCtx, cancel := context.WithCancel(ctx)
 	timer := time.NewTimer(time.Until(instant))
 
+	registerTimer(action, instant, timer, cancel)
+
 	defer timer.Stop()
+	defer cancel()
 
 	select {
 	case <-timer.C:
+		unregisterTimer(action, instant)
 		exec(instant, action, cfg, ex)
 		return
-	case <-ctx.Done():
+	case <-childCtx.Done():
+		unregisterTimer(action, instant)
 		log.Infof("Cancelled scheduled action")
 		return
 	}