|
|
@@ -58,18 +58,43 @@ func (api *oliveTinAPI) copyOfStreamingClients() []*streamingClient {
|
|
|
type streamingClient struct {
|
|
|
channel chan *apiv1.EventStreamResponse
|
|
|
AuthenticatedUser *authpublic.AuthenticatedUser
|
|
|
+ heartbeatStopOnce sync.Once
|
|
|
+ heartbeatStop chan struct{}
|
|
|
+ heartbeatDone chan struct{}
|
|
|
}
|
|
|
|
|
|
-// trySendEventToClient sends msg to the client's channel. Returns false if the channel is full (client should be removed).
|
|
|
+func (c *streamingClient) stopHeartbeat() {
|
|
|
+ if c.heartbeatStop == nil || c.heartbeatDone == nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ c.heartbeatStopOnce.Do(func() {
|
|
|
+ close(c.heartbeatStop)
|
|
|
+ })
|
|
|
+ <-c.heartbeatDone
|
|
|
+}
|
|
|
+
|
|
|
+// trySendEventToClient sends msg to the client's channel. Returns false if the channel is full or closed.
|
|
|
func (api *oliveTinAPI) trySendEventToClient(client *streamingClient, msg *apiv1.EventStreamResponse) bool {
|
|
|
if client == nil || msg == nil {
|
|
|
return false
|
|
|
}
|
|
|
+ sent := sendToStreamingClientChannel(client.channel, msg)
|
|
|
+ if !sent {
|
|
|
+ log.Warnf("EventStream: client channel is full or closed, removing client")
|
|
|
+ }
|
|
|
+ return sent
|
|
|
+}
|
|
|
+
|
|
|
+func sendToStreamingClientChannel(ch chan *apiv1.EventStreamResponse, msg *apiv1.EventStreamResponse) (sent bool) {
|
|
|
+ defer func() {
|
|
|
+ if recover() != nil {
|
|
|
+ sent = false
|
|
|
+ }
|
|
|
+ }()
|
|
|
select {
|
|
|
- case client.channel <- msg:
|
|
|
+ case ch <- msg:
|
|
|
return true
|
|
|
default:
|
|
|
- log.Warnf("EventStream: client channel is full, removing client")
|
|
|
return false
|
|
|
}
|
|
|
}
|
|
|
@@ -933,6 +958,8 @@ func (api *oliveTinAPI) EventStream(ctx ctx.Context, req *connect.Request[apiv1.
|
|
|
client := &streamingClient{
|
|
|
channel: make(chan *apiv1.EventStreamResponse, 10), // Buffered channel to hold Events
|
|
|
AuthenticatedUser: user,
|
|
|
+ heartbeatStop: make(chan struct{}),
|
|
|
+ heartbeatDone: make(chan struct{}),
|
|
|
}
|
|
|
|
|
|
log.WithFields(log.Fields{
|
|
|
@@ -943,9 +970,7 @@ func (api *oliveTinAPI) EventStream(ctx ctx.Context, req *connect.Request[apiv1.
|
|
|
api.streamingClients[client] = struct{}{}
|
|
|
api.streamingClientsMutex.Unlock()
|
|
|
|
|
|
- heartbeatDone := make(chan struct{})
|
|
|
- defer close(heartbeatDone)
|
|
|
- go api.sendEventStreamHeartbeats(heartbeatDone, client)
|
|
|
+ go api.sendEventStreamHeartbeats(client)
|
|
|
|
|
|
// loop over client channel and send events to connectedClient
|
|
|
for msg := range client.channel {
|
|
|
@@ -963,12 +988,21 @@ func (api *oliveTinAPI) EventStream(ctx ctx.Context, req *connect.Request[apiv1.
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-func (api *oliveTinAPI) sendEventStreamHeartbeats(done <-chan struct{}, client *streamingClient) {
|
|
|
+func (api *oliveTinAPI) sendEventStreamHeartbeats(client *streamingClient) {
|
|
|
+ defer close(client.heartbeatDone)
|
|
|
+
|
|
|
+ if !api.sendEventStreamHeartbeat(client) {
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
ticker := time.NewTicker(10 * time.Second)
|
|
|
defer ticker.Stop()
|
|
|
+ api.runEventStreamHeartbeatLoop(client, ticker)
|
|
|
+}
|
|
|
|
|
|
+func (api *oliveTinAPI) runEventStreamHeartbeatLoop(client *streamingClient, ticker *time.Ticker) {
|
|
|
for {
|
|
|
- if api.waitEventStreamHeartbeatOrDone(done, ticker) {
|
|
|
+ if api.waitEventStreamHeartbeatOrDone(client.heartbeatStop, ticker) {
|
|
|
return
|
|
|
}
|
|
|
if !api.sendEventStreamHeartbeat(client) {
|
|
|
@@ -1000,8 +1034,13 @@ func (api *oliveTinAPI) removeClient(clientToRemove *streamingClient) {
|
|
|
return
|
|
|
}
|
|
|
api.streamingClientsMutex.Lock()
|
|
|
+ if _, exists := api.streamingClients[clientToRemove]; !exists {
|
|
|
+ api.streamingClientsMutex.Unlock()
|
|
|
+ return
|
|
|
+ }
|
|
|
delete(api.streamingClients, clientToRemove)
|
|
|
api.streamingClientsMutex.Unlock()
|
|
|
+ clientToRemove.stopHeartbeat()
|
|
|
close(clientToRemove.channel)
|
|
|
}
|
|
|
|
|
|
@@ -1009,14 +1048,12 @@ func (api *oliveTinAPI) OnActionMapRebuilt() {
|
|
|
toRemove := []*streamingClient{}
|
|
|
|
|
|
for _, client := range api.copyOfStreamingClients() {
|
|
|
- select {
|
|
|
- case client.channel <- &apiv1.EventStreamResponse{
|
|
|
+ msg := &apiv1.EventStreamResponse{
|
|
|
Event: &apiv1.EventStreamResponse_ConfigChanged{
|
|
|
ConfigChanged: &apiv1.EventConfigChanged{},
|
|
|
},
|
|
|
- }:
|
|
|
- default:
|
|
|
- log.Warnf("EventStream: client channel is full, removing client")
|
|
|
+ }
|
|
|
+ if !api.trySendEventToClient(client, msg) {
|
|
|
toRemove = append(toRemove, client)
|
|
|
}
|
|
|
}
|