|
|
@@ -365,6 +365,10 @@ func (api *oliveTinAPI) GetActionBinding(ctx ctx.Context, req *connect.Request[a
|
|
|
|
|
|
binding := api.executor.FindBindingByID(req.Msg.BindingId)
|
|
|
|
|
|
+ if binding == nil {
|
|
|
+ return nil, connect.NewError(connect.CodeNotFound, fmt.Errorf("action with ID %s not found", req.Msg.BindingId))
|
|
|
+ }
|
|
|
+
|
|
|
return connect.NewResponse(&apiv1.GetActionBindingResponse{
|
|
|
Action: buildAction(binding, &DashboardRenderRequest{
|
|
|
cfg: api.cfg,
|
|
|
@@ -436,9 +440,14 @@ func (api *oliveTinAPI) GetLogs(ctx ctx.Context, req *connect.Request[apiv1.GetL
|
|
|
logEntries, pagingResult := api.executor.GetLogTrackingIds(req.Msg.StartOffset, api.cfg.LogHistoryPageSize)
|
|
|
|
|
|
for _, logEntry := range logEntries {
|
|
|
+ // Skip if binding is nil or action is nil
|
|
|
+ if logEntry.Binding == nil || logEntry.Binding.Action == nil {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
action := logEntry.Binding.Action
|
|
|
|
|
|
- if action == nil || acl.IsAllowedLogs(api.cfg, user, action) {
|
|
|
+ if acl.IsAllowedLogs(api.cfg, user, action) {
|
|
|
pbLogEntry := api.internalLogEntryToPb(logEntry, user)
|
|
|
|
|
|
ret.Logs = append(ret.Logs, pbLogEntry)
|
|
|
@@ -453,6 +462,61 @@ func (api *oliveTinAPI) GetLogs(ctx ctx.Context, req *connect.Request[apiv1.GetL
|
|
|
return connect.NewResponse(ret), nil
|
|
|
}
|
|
|
|
|
|
+func (api *oliveTinAPI) GetActionLogs(ctx ctx.Context, req *connect.Request[apiv1.GetActionLogsRequest]) (*connect.Response[apiv1.GetActionLogsResponse], error) {
|
|
|
+ user := acl.UserFromContext(ctx, req, api.cfg)
|
|
|
+
|
|
|
+ if err := api.checkDashboardAccess(user); err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+
|
|
|
+ ret := &apiv1.GetActionLogsResponse{}
|
|
|
+
|
|
|
+ logs := api.executor.GetLogsByActionId(req.Msg.ActionId)
|
|
|
+
|
|
|
+ // Apply ACL filtering
|
|
|
+ filteredLogs := make([]*executor.InternalLogEntry, 0)
|
|
|
+ for _, logEntry := range logs {
|
|
|
+ // Skip if binding is nil or action is nil
|
|
|
+ if logEntry.Binding == nil || logEntry.Binding.Action == nil {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
+ action := logEntry.Binding.Action
|
|
|
+ if acl.IsAllowedLogs(api.cfg, user, action) {
|
|
|
+ filteredLogs = append(filteredLogs, logEntry)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Pagination
|
|
|
+ totalCount := int64(len(filteredLogs))
|
|
|
+ pageSize := api.cfg.LogHistoryPageSize
|
|
|
+ startOffset := req.Msg.StartOffset
|
|
|
+
|
|
|
+ startIdx := startOffset
|
|
|
+ endIdx := startOffset + pageSize
|
|
|
+ if endIdx > totalCount {
|
|
|
+ endIdx = totalCount
|
|
|
+ }
|
|
|
+
|
|
|
+ logEntries := filteredLogs[startIdx:endIdx]
|
|
|
+ countRemaining := totalCount - endIdx
|
|
|
+ if countRemaining < 0 {
|
|
|
+ countRemaining = 0
|
|
|
+ }
|
|
|
+
|
|
|
+ for _, logEntry := range logEntries {
|
|
|
+ pbLogEntry := api.internalLogEntryToPb(logEntry, user)
|
|
|
+ ret.Logs = append(ret.Logs, pbLogEntry)
|
|
|
+ }
|
|
|
+
|
|
|
+ ret.CountRemaining = countRemaining
|
|
|
+ ret.PageSize = pageSize
|
|
|
+ ret.TotalCount = totalCount
|
|
|
+ ret.StartOffset = startOffset
|
|
|
+
|
|
|
+ return connect.NewResponse(ret), nil
|
|
|
+}
|
|
|
+
|
|
|
/*
|
|
|
This function is ONLY a helper for the UI - the arguments are validated properly
|
|
|
on the StartAction -> Executor chain. This is here basically to provide helpful
|
|
|
@@ -579,6 +643,9 @@ func (api *oliveTinAPI) EventStream(ctx ctx.Context, req *connect.Request[apiv1.
|
|
|
log.Debugf("Sending event to client: %v", msg)
|
|
|
if err := srv.Send(msg); err != nil {
|
|
|
log.Errorf("Error sending event to client: %v", err)
|
|
|
+ // Remove disconnected client from the list
|
|
|
+ api.removeClient(client)
|
|
|
+ break
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -587,7 +654,21 @@ func (api *oliveTinAPI) EventStream(ctx ctx.Context, req *connect.Request[apiv1.
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
+func (api *oliveTinAPI) removeClient(clientToRemove *connectedClients) {
|
|
|
+ api.connectedClients = func() []*connectedClients {
|
|
|
+ var filtered []*connectedClients
|
|
|
+ for _, client := range api.connectedClients {
|
|
|
+ if client != clientToRemove {
|
|
|
+ filtered = append(filtered, client)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return filtered
|
|
|
+ }()
|
|
|
+}
|
|
|
+
|
|
|
func (api *oliveTinAPI) OnActionMapRebuilt() {
|
|
|
+ toRemove := []*connectedClients{}
|
|
|
+
|
|
|
for _, client := range api.connectedClients {
|
|
|
select {
|
|
|
case client.channel <- &apiv1.EventStreamResponse{
|
|
|
@@ -596,12 +677,19 @@ func (api *oliveTinAPI) OnActionMapRebuilt() {
|
|
|
},
|
|
|
}:
|
|
|
default:
|
|
|
- log.Warnf("EventStream: client channel is full, dropping message")
|
|
|
+ log.Warnf("EventStream: client channel is full, removing client")
|
|
|
+ toRemove = append(toRemove, client)
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ for _, client := range toRemove {
|
|
|
+ api.removeClient(client)
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
func (api *oliveTinAPI) OnExecutionStarted(ex *executor.InternalLogEntry) {
|
|
|
+ toRemove := []*connectedClients{}
|
|
|
+
|
|
|
for _, client := range api.connectedClients {
|
|
|
select {
|
|
|
case client.channel <- &apiv1.EventStreamResponse{
|
|
|
@@ -612,12 +700,19 @@ func (api *oliveTinAPI) OnExecutionStarted(ex *executor.InternalLogEntry) {
|
|
|
},
|
|
|
}:
|
|
|
default:
|
|
|
- log.Warnf("EventStream: client channel is full, dropping message")
|
|
|
+ log.Warnf("EventStream: client channel is full, removing client")
|
|
|
+ toRemove = append(toRemove, client)
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ for _, client := range toRemove {
|
|
|
+ api.removeClient(client)
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
func (api *oliveTinAPI) OnExecutionFinished(ex *executor.InternalLogEntry) {
|
|
|
+ toRemove := []*connectedClients{}
|
|
|
+
|
|
|
for _, client := range api.connectedClients {
|
|
|
select {
|
|
|
case client.channel <- &apiv1.EventStreamResponse{
|
|
|
@@ -628,9 +723,14 @@ func (api *oliveTinAPI) OnExecutionFinished(ex *executor.InternalLogEntry) {
|
|
|
},
|
|
|
}:
|
|
|
default:
|
|
|
- log.Warnf("EventStream: client channel is full, dropping message")
|
|
|
+ log.Warnf("EventStream: client channel is full, removing client")
|
|
|
+ toRemove = append(toRemove, client)
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ for _, client := range toRemove {
|
|
|
+ api.removeClient(client)
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
func (api *oliveTinAPI) GetDiagnostics(ctx ctx.Context, req *connect.Request[apiv1.GetDiagnosticsRequest]) (*connect.Response[apiv1.GetDiagnosticsResponse], error) {
|
|
|
@@ -734,6 +834,8 @@ func buildAdditionalLinks(links []*config.NavigationLink) []*apiv1.AdditionalLin
|
|
|
}
|
|
|
|
|
|
func (api *oliveTinAPI) OnOutputChunk(content []byte, executionTrackingId string) {
|
|
|
+ toRemove := []*connectedClients{}
|
|
|
+
|
|
|
for _, client := range api.connectedClients {
|
|
|
select {
|
|
|
case client.channel <- &apiv1.EventStreamResponse{
|
|
|
@@ -745,9 +847,14 @@ func (api *oliveTinAPI) OnOutputChunk(content []byte, executionTrackingId string
|
|
|
},
|
|
|
}:
|
|
|
default:
|
|
|
- log.Warnf("EventStream: client channel is full, dropping message")
|
|
|
+ log.Warnf("EventStream: client channel is full, removing client")
|
|
|
+ toRemove = append(toRemove, client)
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ for _, client := range toRemove {
|
|
|
+ api.removeClient(client)
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
func (api *oliveTinAPI) GetEntities(ctx ctx.Context, req *connect.Request[apiv1.GetEntitiesRequest]) (*connect.Response[apiv1.GetEntitiesResponse], error) {
|