|
|
@@ -60,6 +60,20 @@ type streamingClient struct {
|
|
|
AuthenticatedUser *authpublic.AuthenticatedUser
|
|
|
}
|
|
|
|
|
|
+// trySendEventToClient sends msg to the client's channel. Returns false if the channel is full (client should be removed).
|
|
|
+func (api *oliveTinAPI) trySendEventToClient(client *streamingClient, msg *apiv1.EventStreamResponse) bool {
|
|
|
+ if client == nil || msg == nil {
|
|
|
+ return false
|
|
|
+ }
|
|
|
+ select {
|
|
|
+ case client.channel <- msg:
|
|
|
+ return true
|
|
|
+ default:
|
|
|
+ log.Warnf("EventStream: client channel is full, removing client")
|
|
|
+ return false
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
func (api *oliveTinAPI) KillAction(ctx ctx.Context, req *connect.Request[apiv1.KillActionRequest]) (*connect.Response[apiv1.KillActionResponse], error) {
|
|
|
ret := &apiv1.KillActionResponse{
|
|
|
ExecutionTrackingId: req.Msg.ExecutionTrackingId,
|
|
|
@@ -589,9 +603,20 @@ func isValidLogEntry(e *executor.InternalLogEntry) bool {
|
|
|
|
|
|
// isLogEntryAllowed checks if a log entry is allowed to be viewed by the user.
|
|
|
func (api *oliveTinAPI) isLogEntryAllowed(e *executor.InternalLogEntry, user *authpublic.AuthenticatedUser) bool {
|
|
|
+ if user == nil || !isValidLogEntry(e) {
|
|
|
+ return false
|
|
|
+ }
|
|
|
return acl.IsAllowedLogs(api.cfg, user, e.Binding.Action)
|
|
|
}
|
|
|
|
|
|
+// mayViewExecutionEvent returns whether the user is allowed to receive this execution event (for EventStream ACL).
|
|
|
+func (api *oliveTinAPI) mayViewExecutionEvent(entry *executor.InternalLogEntry, user *authpublic.AuthenticatedUser) bool {
|
|
|
+ if user == nil {
|
|
|
+ return false
|
|
|
+ }
|
|
|
+ return isValidLogEntry(entry) && api.isLogEntryAllowed(entry, user)
|
|
|
+}
|
|
|
+
|
|
|
// buildEmptyPageResponse creates a response for an empty page.
|
|
|
func buildEmptyPageResponse(page pageInfo) *apiv1.GetActionLogsResponse {
|
|
|
return &apiv1.GetActionLogsResponse{
|
|
|
@@ -886,6 +911,9 @@ func (api *oliveTinAPI) EventStream(ctx ctx.Context, req *connect.Request[apiv1.
|
|
|
}
|
|
|
|
|
|
func (api *oliveTinAPI) removeClient(clientToRemove *streamingClient) {
|
|
|
+ if clientToRemove == nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
api.streamingClientsMutex.Lock()
|
|
|
delete(api.streamingClients, clientToRemove)
|
|
|
api.streamingClientsMutex.Unlock()
|
|
|
@@ -915,50 +943,62 @@ func (api *oliveTinAPI) OnActionMapRebuilt() {
|
|
|
|
|
|
func (api *oliveTinAPI) OnExecutionStarted(ex *executor.InternalLogEntry) {
|
|
|
toRemove := []*streamingClient{}
|
|
|
-
|
|
|
for _, client := range api.copyOfStreamingClients() {
|
|
|
- select {
|
|
|
- case client.channel <- &apiv1.EventStreamResponse{
|
|
|
- Event: &apiv1.EventStreamResponse_ExecutionStarted{
|
|
|
- ExecutionStarted: &apiv1.EventExecutionStarted{
|
|
|
- LogEntry: api.internalLogEntryToPb(ex, client.AuthenticatedUser),
|
|
|
- },
|
|
|
- },
|
|
|
- }:
|
|
|
- default:
|
|
|
- log.Warnf("EventStream: client channel is full, removing client")
|
|
|
- toRemove = append(toRemove, client)
|
|
|
- }
|
|
|
+ api.maybeSendExecutionStarted(client, ex, &toRemove)
|
|
|
}
|
|
|
-
|
|
|
for _, client := range toRemove {
|
|
|
api.removeClient(client)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+func (api *oliveTinAPI) maybeSendExecutionStarted(client *streamingClient, ex *executor.InternalLogEntry, toRemove *[]*streamingClient) {
|
|
|
+ if client == nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ if !api.mayViewExecutionEvent(ex, client.AuthenticatedUser) {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ msg := &apiv1.EventStreamResponse{
|
|
|
+ Event: &apiv1.EventStreamResponse_ExecutionStarted{
|
|
|
+ ExecutionStarted: &apiv1.EventExecutionStarted{
|
|
|
+ LogEntry: api.internalLogEntryToPb(ex, client.AuthenticatedUser),
|
|
|
+ },
|
|
|
+ },
|
|
|
+ }
|
|
|
+ if !api.trySendEventToClient(client, msg) {
|
|
|
+ *toRemove = append(*toRemove, client)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
func (api *oliveTinAPI) OnExecutionFinished(ile *executor.InternalLogEntry) {
|
|
|
toRemove := []*streamingClient{}
|
|
|
-
|
|
|
for _, client := range api.copyOfStreamingClients() {
|
|
|
- select {
|
|
|
- case client.channel <- &apiv1.EventStreamResponse{
|
|
|
- Event: &apiv1.EventStreamResponse_ExecutionFinished{
|
|
|
- ExecutionFinished: &apiv1.EventExecutionFinished{
|
|
|
- LogEntry: api.internalLogEntryToPb(ile, client.AuthenticatedUser),
|
|
|
- },
|
|
|
- },
|
|
|
- }:
|
|
|
- default:
|
|
|
- log.Warnf("EventStream: client channel is full, removing client")
|
|
|
- toRemove = append(toRemove, client)
|
|
|
- }
|
|
|
+ api.maybeSendExecutionFinished(client, ile, &toRemove)
|
|
|
}
|
|
|
-
|
|
|
for _, client := range toRemove {
|
|
|
api.removeClient(client)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+func (api *oliveTinAPI) maybeSendExecutionFinished(client *streamingClient, ile *executor.InternalLogEntry, toRemove *[]*streamingClient) {
|
|
|
+ if client == nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ if !api.mayViewExecutionEvent(ile, client.AuthenticatedUser) {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ msg := &apiv1.EventStreamResponse{
|
|
|
+ Event: &apiv1.EventStreamResponse_ExecutionFinished{
|
|
|
+ ExecutionFinished: &apiv1.EventExecutionFinished{
|
|
|
+ LogEntry: api.internalLogEntryToPb(ile, client.AuthenticatedUser),
|
|
|
+ },
|
|
|
+ },
|
|
|
+ }
|
|
|
+ if !api.trySendEventToClient(client, msg) {
|
|
|
+ *toRemove = append(*toRemove, client)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
func (api *oliveTinAPI) GetDiagnostics(ctx ctx.Context, req *connect.Request[apiv1.GetDiagnosticsRequest]) (*connect.Response[apiv1.GetDiagnosticsResponse], error) {
|
|
|
user := auth.UserFromApiCall(ctx, req, api.cfg)
|
|
|
if err := api.checkDashboardAccess(user); err != nil {
|
|
|
@@ -1128,29 +1168,47 @@ func buildAdditionalLinks(links []*config.NavigationLink) []*apiv1.AdditionalLin
|
|
|
}
|
|
|
|
|
|
func (api *oliveTinAPI) OnOutputChunk(content []byte, executionTrackingId string) {
|
|
|
+ entry := api.getValidLogEntryForStreaming(executionTrackingId)
|
|
|
+ if entry == nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ msg := &apiv1.EventStreamResponse{
|
|
|
+ Event: &apiv1.EventStreamResponse_OutputChunk{
|
|
|
+ OutputChunk: &apiv1.EventOutputChunk{
|
|
|
+ Output: string(content),
|
|
|
+ ExecutionTrackingId: executionTrackingId,
|
|
|
+ },
|
|
|
+ },
|
|
|
+ }
|
|
|
toRemove := []*streamingClient{}
|
|
|
-
|
|
|
for _, client := range api.copyOfStreamingClients() {
|
|
|
- select {
|
|
|
- case client.channel <- &apiv1.EventStreamResponse{
|
|
|
- Event: &apiv1.EventStreamResponse_OutputChunk{
|
|
|
- OutputChunk: &apiv1.EventOutputChunk{
|
|
|
- Output: string(content),
|
|
|
- ExecutionTrackingId: executionTrackingId,
|
|
|
- },
|
|
|
- },
|
|
|
- }:
|
|
|
- default:
|
|
|
- log.Warnf("EventStream: client channel is full, removing client")
|
|
|
- toRemove = append(toRemove, client)
|
|
|
- }
|
|
|
+ api.maybeSendOutputChunk(client, entry, msg, &toRemove)
|
|
|
}
|
|
|
-
|
|
|
for _, client := range toRemove {
|
|
|
api.removeClient(client)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+func (api *oliveTinAPI) getValidLogEntryForStreaming(executionTrackingId string) *executor.InternalLogEntry {
|
|
|
+ entry, ok := api.executor.GetLog(executionTrackingId)
|
|
|
+ if !ok || !isValidLogEntry(entry) {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ return entry
|
|
|
+}
|
|
|
+
|
|
|
+func (api *oliveTinAPI) maybeSendOutputChunk(client *streamingClient, entry *executor.InternalLogEntry, msg *apiv1.EventStreamResponse, toRemove *[]*streamingClient) {
|
|
|
+ if client == nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ if !api.mayViewExecutionEvent(entry, client.AuthenticatedUser) {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ if !api.trySendEventToClient(client, msg) {
|
|
|
+ *toRemove = append(*toRemove, client)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
func (api *oliveTinAPI) GetEntities(ctx ctx.Context, req *connect.Request[apiv1.GetEntitiesRequest]) (*connect.Response[apiv1.GetEntitiesResponse], error) {
|
|
|
user := auth.UserFromApiCall(ctx, req, api.cfg)
|
|
|
|