فهرست منبع

chore: fix race conditions and little bugs

jamesread 2 هفته پیش
والد
کامیت
7bd6e77ae2

+ 1 - 1
docs/modules/ROOT/pages/args/templates.adoc

@@ -21,7 +21,7 @@ The `Json` template function encodes a value as a JSON string. Pipe a template v
 ----
 actions:
   - title: curl my knx thing
-    shell: curl --json https://knx.example.com/v1/group/global_on/write -d '{{ .Arguments | Json }}'
+    shell: curl --json '{{ .Arguments | Json }}' https://knx.example.com/v1/group/global_on/write
     entity: light
     arguments:
       - name: value

+ 1 - 1
frontend/resources/vue/router.js

@@ -174,7 +174,7 @@ router.beforeEach((to) => {
 
 // Navigation guard for authentication (if needed)
 router.beforeEach((to) => {
-  const isAuthenticated = window.isAuthenticated || true // Default to true for now
+  const isAuthenticated = window.isAuthenticated ?? false
 
   if (to.meta.requiresAuth && !isAuthenticated) {
     return '/login'

+ 0 - 7
frontend/resources/vue/views/LogsListView.vue

@@ -281,13 +281,6 @@ onUnmounted(() => {
     fetchTimer = null
   }
 })
-
-onUnmounted(() => {
-  if (fetchTimer) {
-    clearTimeout(fetchTimer)
-    fetchTimer = null
-  }
-})
 </script>
 
 <style scoped>

+ 10 - 0
frontend/vite.config.js

@@ -12,6 +12,16 @@ export default defineConfig({
     }),
     vue(),
   ],
+  build: {
+    rolldownOptions: {
+      onLog (level, log, defaultHandler) {
+        if (log.code === 'INVALID_ANNOTATION') {
+          return
+        }
+        defaultHandler(level, log)
+      },
+    },
+  },
   server: {
     proxy: {
       '/api': {

+ 1 - 1
integration-tests/lib/elements.js

@@ -101,7 +101,7 @@ export async function waitForExecutionComplete(timeoutMs = DEFAULT_UI_WAIT_MS) {
     try {
       const statusElement = await webdriver.findElement(executionDialogStatusBy)
       const statusText = await statusElement.getText()
-      return !statusText.includes('Still running')
+      return !statusText.includes('Still running') && !statusText.includes('Queued')
     } catch (e) {
       return false
     }

+ 30 - 10
integration-tests/tests/suggestionsBrowserKey/suggestionsBrowserKey.mjs

@@ -13,20 +13,40 @@ import {
   waitForExecutionComplete,
 } from '../../lib/elements.js'
 
+async function clickBackFromLogsPage() {
+  const goBackButtons = await webdriver.findElements(By.css('button[title="Go back"]'))
+  if (goBackButtons.length > 0) {
+    await goBackButtons[0].click()
+    return 'history'
+  }
+
+  const dashboardBackButtons = await webdriver.findElements(By.css('button[title^="Back to "]'))
+  if (dashboardBackButtons.length > 0) {
+    await dashboardBackButtons[0].click()
+    return 'dashboard'
+  }
+
+  throw new Error('No back button found on execution logs page')
+}
+
 async function ensureOnDashboard() {
   let url = await webdriver.getCurrentUrl()
 
   if (url.includes('/logs/')) {
-    const backButton = await webdriver.findElement(By.css('button[title="Go back"]'))
-    await backButton.click()
-    await webdriver.wait(
-      new Condition('wait for argument form after logs back', async () => {
-        const currentUrl = await webdriver.getCurrentUrl()
-        return currentUrl.includes('/argumentForm')
-      }),
-      DEFAULT_UI_WAIT_MS
-    )
-    url = await webdriver.getCurrentUrl()
+    const backType = await clickBackFromLogsPage()
+    if (backType === 'history') {
+      await webdriver.wait(
+        new Condition('wait for argument form after logs back', async () => {
+          const currentUrl = await webdriver.getCurrentUrl()
+          return currentUrl.includes('/argumentForm')
+        }),
+        DEFAULT_UI_WAIT_MS
+      )
+      url = await webdriver.getCurrentUrl()
+    } else {
+      await waitForDashboardLoaded()
+      url = await webdriver.getCurrentUrl()
+    }
   }
 
   if (url.includes('/argumentForm')) {

+ 34 - 7
service/internal/api/api.go

@@ -1023,6 +1023,7 @@ func (api *oliveTinAPI) sendEventStreamHeartbeats(client *streamingClient) {
 	defer close(client.heartbeatDone)
 
 	if !api.sendEventStreamHeartbeat(client) {
+		go api.removeClient(client)
 		return
 	}
 
@@ -1037,6 +1038,7 @@ func (api *oliveTinAPI) runEventStreamHeartbeatLoop(client *streamingClient, tic
 			return
 		}
 		if !api.sendEventStreamHeartbeat(client) {
+			go api.removeClient(client)
 			return
 		}
 	}
@@ -1582,18 +1584,43 @@ func (api *oliveTinAPI) restartActionLogEntry(executionTrackingId string) (*exec
 	return execReqLogEntry, nil
 }
 
+var (
+	executorListenersMu sync.Mutex
+	executorListeners   = map[*executor.Executor]*oliveTinAPI{}
+)
+
+// RegisterExecutorListener registers the API server as an executor listener during startup.
+// Call this before background goroutines that may trigger RebuildActionMap.
+func RegisterExecutorListener(ex *executor.Executor) {
+	ensureExecutorListener(ex)
+}
+
+func ensureExecutorListener(ex *executor.Executor) *oliveTinAPI {
+	executorListenersMu.Lock()
+	defer executorListenersMu.Unlock()
+
+	if server, ok := executorListeners[ex]; ok {
+		return server
+	}
+
+	server := newServer(ex)
+	executorListeners[ex] = server
+	return server
+}
+
 func newServer(ex *executor.Executor) *oliveTinAPI {
-	server := oliveTinAPI{}
-	server.cfg = ex.Cfg
-	server.executor = ex
-	server.streamingClients = make(map[*streamingClient]struct{})
+	server := &oliveTinAPI{
+		cfg:              ex.Cfg,
+		executor:         ex,
+		streamingClients: make(map[*streamingClient]struct{}),
+	}
 
-	ex.AddListener(&server)
-	return &server
+	ex.AddListener(server)
+	return server
 }
 
 func GetNewHandler(ex *executor.Executor) (string, http.Handler) {
-	server := newServer(ex)
+	server := ensureExecutorListener(ex)
 
 	jsonOpt := connectproto.WithJSON(
 		protojson.MarshalOptions{

+ 24 - 6
service/internal/executor/executor.go

@@ -69,7 +69,8 @@ type Executor struct {
 
 	Cfg *config.Config
 
-	listeners []listener
+	listeners   []listener
+	listenersMu sync.RWMutex
 
 	chainOfCommand []executorStepFunc
 
@@ -218,9 +219,19 @@ type listener interface {
 }
 
 func (e *Executor) AddListener(m listener) {
+	e.listenersMu.Lock()
+	defer e.listenersMu.Unlock()
 	e.listeners = append(e.listeners, m)
 }
 
+func (e *Executor) copyListeners() []listener {
+	e.listenersMu.RLock()
+	defer e.listenersMu.RUnlock()
+	out := make([]listener, len(e.listeners))
+	copy(out, e.listeners)
+	return out
+}
+
 // getPagingStartIndex calculates the starting index for log pagination.
 // Parameters:
 //
@@ -1052,13 +1063,13 @@ func stepLogFinish(req *ExecutionRequest) bool {
 }
 
 func notifyListenersFinished(req *ExecutionRequest) {
-	for _, listener := range req.executor.listeners {
+	for _, listener := range req.executor.copyListeners() {
 		listener.OnExecutionFinished(req.logEntry)
 	}
 }
 
 func notifyListenersStarted(req *ExecutionRequest) {
-	for _, listener := range req.executor.listeners {
+	for _, listener := range req.executor.copyListeners() {
 		listener.OnExecutionStarted(req.logEntry)
 	}
 }
@@ -1079,7 +1090,7 @@ type OutputStreamer struct {
 }
 
 func (ost *OutputStreamer) Write(o []byte) (n int, err error) {
-	for _, listener := range ost.Req.executor.listeners {
+	for _, listener := range ost.Req.executor.copyListeners() {
 		listener.OnOutputChunk(o, ost.Req.TrackingID)
 	}
 
@@ -1107,6 +1118,13 @@ func buildEnv(args map[string]string) []string {
 	return ret
 }
 
+func commandExitCode(cmd *exec.Cmd) int {
+	if cmd == nil || cmd.ProcessState == nil {
+		return -1
+	}
+	return cmd.ProcessState.ExitCode()
+}
+
 func stepExec(req *ExecutionRequest) bool {
 	ctx, cancel := newTimeoutContext(context.Background(), time.Duration(req.Binding.Action.Timeout)*time.Second, req.executor)
 	defer cancel()
@@ -1127,7 +1145,7 @@ func stepExec(req *ExecutionRequest) bool {
 	ctx.setProcess(cmd.Process)
 	waiterr := cmd.Wait()
 	req.mutateLogEntry(func(entry *InternalLogEntry) {
-		entry.ExitCode = int32(cmd.ProcessState.ExitCode())
+		entry.ExitCode = int32(commandExitCode(cmd))
 		entry.Output = streamer.String()
 	})
 
@@ -1218,7 +1236,7 @@ func stepExecAfter(req *ExecutionRequest) bool {
 	}
 
 	req.mutateLogEntry(func(entry *InternalLogEntry) {
-		entry.Output += fmt.Sprintf("Your shellAfterCompleted exited with code %v\n", cmd.ProcessState.ExitCode())
+		entry.Output += fmt.Sprintf("Your shellAfterCompleted exited with code %v\n", commandExitCode(cmd))
 		entry.Output += "OliveTin::shellAfterCompleted output complete\n"
 	})
 

+ 1 - 1
service/internal/executor/executor_actions.go

@@ -94,7 +94,7 @@ func (e *Executor) RebuildActionMap() {
 
 	e.MapActionBindingsLock.Unlock()
 
-	for _, l := range e.listeners {
+	for _, l := range e.copyListeners() {
 		l.OnActionMapRebuilt()
 	}
 }

+ 2 - 0
service/main.go

@@ -7,6 +7,7 @@ import (
 
 	log "github.com/sirupsen/logrus"
 
+	"github.com/OliveTin/OliveTin/internal/api"
 	"github.com/OliveTin/OliveTin/internal/auth"
 	"github.com/OliveTin/OliveTin/internal/entities"
 	"github.com/OliveTin/OliveTin/internal/executor"
@@ -264,6 +265,7 @@ func main() {
 	go onfileindir.WatchFilesInDirectory(cfg, executor)
 	go oncalendarfile.Schedule(cfg, executor)
 
+	api.RegisterExecutorListener(executor)
 	entities.AddListener(executor.RebuildActionMap)
 	go entities.SetupEntityFileWatchers(cfg)