|
|
@@ -14,6 +14,7 @@ import (
|
|
|
|
|
|
"fmt"
|
|
|
"net/http"
|
|
|
+ "sync"
|
|
|
|
|
|
acl "github.com/OliveTin/OliveTin/internal/acl"
|
|
|
auth "github.com/OliveTin/OliveTin/internal/auth"
|
|
|
@@ -28,10 +29,26 @@ type oliveTinAPI struct {
|
|
|
executor *executor.Executor
|
|
|
cfg *config.Config
|
|
|
|
|
|
- connectedClients []*connectedClients
|
|
|
+ // streamingClients is a set of currently connected clients.
|
|
|
+ // The empty struct value models set semantics (keys only) and keeps add/remove O(1).
|
|
|
+ // We use a map for efficient membership and deletion; ordering is not required.
|
|
|
+ streamingClients map[*streamingClient]struct{}
|
|
|
+ streamingClientsMutex sync.RWMutex
|
|
|
}
|
|
|
|
|
|
-type connectedClients struct {
|
|
|
+// This is used to avoid race conditions when iterating over the connectedClients map.
|
|
|
+// and holds the lock for as minimal time as possible to avoid blocking the API for too long.
|
|
|
+func (api *oliveTinAPI) copyOfStreamingClients() []*streamingClient {
|
|
|
+ api.streamingClientsMutex.RLock()
|
|
|
+ defer api.streamingClientsMutex.RUnlock()
|
|
|
+ clients := make([]*streamingClient, 0, len(api.streamingClients))
|
|
|
+ for client := range api.streamingClients {
|
|
|
+ clients = append(clients, client)
|
|
|
+ }
|
|
|
+ return clients
|
|
|
+}
|
|
|
+
|
|
|
+type streamingClient struct {
|
|
|
channel chan *apiv1.EventStreamResponse
|
|
|
AuthenticatedUser *acl.AuthenticatedUser
|
|
|
}
|
|
|
@@ -365,6 +382,10 @@ func (api *oliveTinAPI) GetActionBinding(ctx ctx.Context, req *connect.Request[a
|
|
|
|
|
|
binding := api.executor.FindBindingByID(req.Msg.BindingId)
|
|
|
|
|
|
+ if binding == nil {
|
|
|
+ return nil, connect.NewError(connect.CodeNotFound, fmt.Errorf("action with ID %s not found", req.Msg.BindingId))
|
|
|
+ }
|
|
|
+
|
|
|
return connect.NewResponse(&apiv1.GetActionBindingResponse{
|
|
|
Action: buildAction(binding, &DashboardRenderRequest{
|
|
|
cfg: api.cfg,
|
|
|
@@ -436,9 +457,14 @@ func (api *oliveTinAPI) GetLogs(ctx ctx.Context, req *connect.Request[apiv1.GetL
|
|
|
logEntries, pagingResult := api.executor.GetLogTrackingIds(req.Msg.StartOffset, api.cfg.LogHistoryPageSize)
|
|
|
|
|
|
for _, logEntry := range logEntries {
|
|
|
+ // Skip if binding is nil or action is nil
|
|
|
+ if logEntry.Binding == nil || logEntry.Binding.Action == nil {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
action := logEntry.Binding.Action
|
|
|
|
|
|
- if action == nil || acl.IsAllowedLogs(api.cfg, user, action) {
|
|
|
+ if acl.IsAllowedLogs(api.cfg, user, action) {
|
|
|
pbLogEntry := api.internalLogEntryToPb(logEntry, user)
|
|
|
|
|
|
ret.Logs = append(ret.Logs, pbLogEntry)
|
|
|
@@ -453,6 +479,75 @@ func (api *oliveTinAPI) GetLogs(ctx ctx.Context, req *connect.Request[apiv1.GetL
|
|
|
return connect.NewResponse(ret), nil
|
|
|
}
|
|
|
|
|
|
+func (api *oliveTinAPI) GetActionLogs(ctx ctx.Context, req *connect.Request[apiv1.GetActionLogsRequest]) (*connect.Response[apiv1.GetActionLogsResponse], error) {
|
|
|
+ user := acl.UserFromContext(ctx, req, api.cfg)
|
|
|
+
|
|
|
+ if err := api.checkDashboardAccess(user); err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+
|
|
|
+ ret := &apiv1.GetActionLogsResponse{}
|
|
|
+
|
|
|
+ logs := api.executor.GetLogsByActionId(req.Msg.ActionId)
|
|
|
+
|
|
|
+ // Apply ACL filtering
|
|
|
+ filteredLogs := make([]*executor.InternalLogEntry, 0)
|
|
|
+ for _, logEntry := range logs {
|
|
|
+ // Skip if binding is nil or action is nil
|
|
|
+ if logEntry.Binding == nil || logEntry.Binding.Action == nil {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
+ action := logEntry.Binding.Action
|
|
|
+ if acl.IsAllowedLogs(api.cfg, user, action) {
|
|
|
+ filteredLogs = append(filteredLogs, logEntry)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Pagination
|
|
|
+ totalCount := int64(len(filteredLogs))
|
|
|
+ pageSize := api.cfg.LogHistoryPageSize
|
|
|
+ startOffset := req.Msg.StartOffset
|
|
|
+
|
|
|
+ // Validate and clamp offset to prevent out-of-bounds access
|
|
|
+ if startOffset < 0 {
|
|
|
+ startOffset = 0
|
|
|
+ }
|
|
|
+
|
|
|
+ // If offset is beyond available data, return empty result with correct metadata
|
|
|
+ if startOffset >= totalCount {
|
|
|
+ ret.CountRemaining = 0
|
|
|
+ ret.PageSize = pageSize
|
|
|
+ ret.TotalCount = totalCount
|
|
|
+ ret.StartOffset = startOffset
|
|
|
+ return connect.NewResponse(ret), nil
|
|
|
+ }
|
|
|
+
|
|
|
+ startIdx := startOffset
|
|
|
+ endIdx := startOffset + pageSize
|
|
|
+ if endIdx > totalCount {
|
|
|
+ endIdx = totalCount
|
|
|
+ }
|
|
|
+
|
|
|
+ logEntries := filteredLogs[startIdx:endIdx]
|
|
|
+ countRemaining := totalCount - endIdx
|
|
|
+ if countRemaining < 0 {
|
|
|
+ countRemaining = 0
|
|
|
+ }
|
|
|
+
|
|
|
+ for _, logEntry := range logEntries {
|
|
|
+ pbLogEntry := api.internalLogEntryToPb(logEntry, user)
|
|
|
+ ret.Logs = append(ret.Logs, pbLogEntry)
|
|
|
+ }
|
|
|
+
|
|
|
+ ret.CountRemaining = countRemaining
|
|
|
+ ret.PageSize = pageSize
|
|
|
+ ret.TotalCount = totalCount
|
|
|
+ ret.StartOffset = startOffset
|
|
|
+
|
|
|
+ return connect.NewResponse(ret), nil
|
|
|
+}
|
|
|
+
|
|
|
/*
|
|
|
This function is ONLY a helper for the UI - the arguments are validated properly
|
|
|
on the StartAction -> Executor chain. This is here basically to provide helpful
|
|
|
@@ -565,20 +660,25 @@ func (api *oliveTinAPI) EventStream(ctx ctx.Context, req *connect.Request[apiv1.
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
- client := &connectedClients{
|
|
|
+ client := &streamingClient{
|
|
|
channel: make(chan *apiv1.EventStreamResponse, 10), // Buffered channel to hold Events
|
|
|
AuthenticatedUser: user,
|
|
|
}
|
|
|
|
|
|
log.Infof("EventStream: client connected: %v", client.AuthenticatedUser.Username)
|
|
|
|
|
|
- api.connectedClients = append(api.connectedClients, client)
|
|
|
+ api.streamingClientsMutex.Lock()
|
|
|
+ api.streamingClients[client] = struct{}{}
|
|
|
+ api.streamingClientsMutex.Unlock()
|
|
|
|
|
|
// loop over client channel and send events to connectedClient
|
|
|
for msg := range client.channel {
|
|
|
log.Debugf("Sending event to client: %v", msg)
|
|
|
if err := srv.Send(msg); err != nil {
|
|
|
log.Errorf("Error sending event to client: %v", err)
|
|
|
+ // Remove disconnected client from the list
|
|
|
+ api.removeClient(client)
|
|
|
+ break
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -587,8 +687,16 @@ func (api *oliveTinAPI) EventStream(ctx ctx.Context, req *connect.Request[apiv1.
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
+func (api *oliveTinAPI) removeClient(clientToRemove *streamingClient) {
|
|
|
+ api.streamingClientsMutex.Lock()
|
|
|
+ delete(api.streamingClients, clientToRemove)
|
|
|
+ api.streamingClientsMutex.Unlock()
|
|
|
+}
|
|
|
+
|
|
|
func (api *oliveTinAPI) OnActionMapRebuilt() {
|
|
|
- for _, client := range api.connectedClients {
|
|
|
+ toRemove := []*streamingClient{}
|
|
|
+
|
|
|
+ for _, client := range api.copyOfStreamingClients() {
|
|
|
select {
|
|
|
case client.channel <- &apiv1.EventStreamResponse{
|
|
|
Event: &apiv1.EventStreamResponse_ConfigChanged{
|
|
|
@@ -596,13 +704,20 @@ func (api *oliveTinAPI) OnActionMapRebuilt() {
|
|
|
},
|
|
|
}:
|
|
|
default:
|
|
|
- log.Warnf("EventStream: client channel is full, dropping message")
|
|
|
+ log.Warnf("EventStream: client channel is full, removing client")
|
|
|
+ toRemove = append(toRemove, client)
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ for _, client := range toRemove {
|
|
|
+ api.removeClient(client)
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
func (api *oliveTinAPI) OnExecutionStarted(ex *executor.InternalLogEntry) {
|
|
|
- for _, client := range api.connectedClients {
|
|
|
+ toRemove := []*streamingClient{}
|
|
|
+
|
|
|
+ for _, client := range api.copyOfStreamingClients() {
|
|
|
select {
|
|
|
case client.channel <- &apiv1.EventStreamResponse{
|
|
|
Event: &apiv1.EventStreamResponse_ExecutionStarted{
|
|
|
@@ -612,13 +727,20 @@ func (api *oliveTinAPI) OnExecutionStarted(ex *executor.InternalLogEntry) {
|
|
|
},
|
|
|
}:
|
|
|
default:
|
|
|
- log.Warnf("EventStream: client channel is full, dropping message")
|
|
|
+ log.Warnf("EventStream: client channel is full, removing client")
|
|
|
+ toRemove = append(toRemove, client)
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ for _, client := range toRemove {
|
|
|
+ api.removeClient(client)
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
func (api *oliveTinAPI) OnExecutionFinished(ex *executor.InternalLogEntry) {
|
|
|
- for _, client := range api.connectedClients {
|
|
|
+ toRemove := []*streamingClient{}
|
|
|
+
|
|
|
+ for _, client := range api.copyOfStreamingClients() {
|
|
|
select {
|
|
|
case client.channel <- &apiv1.EventStreamResponse{
|
|
|
Event: &apiv1.EventStreamResponse_ExecutionFinished{
|
|
|
@@ -628,9 +750,14 @@ func (api *oliveTinAPI) OnExecutionFinished(ex *executor.InternalLogEntry) {
|
|
|
},
|
|
|
}:
|
|
|
default:
|
|
|
- log.Warnf("EventStream: client channel is full, dropping message")
|
|
|
+ log.Warnf("EventStream: client channel is full, removing client")
|
|
|
+ toRemove = append(toRemove, client)
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ for _, client := range toRemove {
|
|
|
+ api.removeClient(client)
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
func (api *oliveTinAPI) GetDiagnostics(ctx ctx.Context, req *connect.Request[apiv1.GetDiagnosticsRequest]) (*connect.Response[apiv1.GetDiagnosticsResponse], error) {
|
|
|
@@ -734,7 +861,9 @@ func buildAdditionalLinks(links []*config.NavigationLink) []*apiv1.AdditionalLin
|
|
|
}
|
|
|
|
|
|
func (api *oliveTinAPI) OnOutputChunk(content []byte, executionTrackingId string) {
|
|
|
- for _, client := range api.connectedClients {
|
|
|
+ toRemove := []*streamingClient{}
|
|
|
+
|
|
|
+ for _, client := range api.copyOfStreamingClients() {
|
|
|
select {
|
|
|
case client.channel <- &apiv1.EventStreamResponse{
|
|
|
Event: &apiv1.EventStreamResponse_OutputChunk{
|
|
|
@@ -745,9 +874,14 @@ func (api *oliveTinAPI) OnOutputChunk(content []byte, executionTrackingId string
|
|
|
},
|
|
|
}:
|
|
|
default:
|
|
|
- log.Warnf("EventStream: client channel is full, dropping message")
|
|
|
+ log.Warnf("EventStream: client channel is full, removing client")
|
|
|
+ toRemove = append(toRemove, client)
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ for _, client := range toRemove {
|
|
|
+ api.removeClient(client)
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
func (api *oliveTinAPI) GetEntities(ctx ctx.Context, req *connect.Request[apiv1.GetEntitiesRequest]) (*connect.Response[apiv1.GetEntitiesResponse], error) {
|
|
|
@@ -864,6 +998,7 @@ func newServer(ex *executor.Executor) *oliveTinAPI {
|
|
|
server := oliveTinAPI{}
|
|
|
server.cfg = ex.Cfg
|
|
|
server.executor = ex
|
|
|
+ server.streamingClients = make(map[*streamingClient]struct{})
|
|
|
|
|
|
ex.AddListener(&server)
|
|
|
return &server
|