diff --git a/eventsse/internal/handlers/pub/pub.go b/eventsse/internal/handlers/pub/pub.go index a5b06e9..b2500a5 100644 --- a/eventsse/internal/handlers/pub/pub.go +++ b/eventsse/internal/handlers/pub/pub.go @@ -6,6 +6,7 @@ import ( "fmt" "net/http" "os" + "time" "github.com/krateoplatformops/eventsse/internal/labels" "github.com/krateoplatformops/eventsse/internal/store" @@ -14,82 +15,102 @@ import ( corev1 "k8s.io/api/core/v1" ) +const ( + defaultHeartbeatInterval = 25 * time.Second + defaultEventQueueSize = 256 +) + func SSE(cli clientv3.Watcher) http.Handler { - return &handler{ - cli: cli, - } + return &handler{cli: cli} } -var _ http.Handler = (*handler)(nil) - type handler struct { cli clientv3.Watcher } -// @title EventSSE API -// @version 1.0 -// @description This the Krateo EventSSE server. -// @BasePath / - -// Health godoc -// @Summary SSE Endpoint -// @Description Get available events notifications -// @ID notifications -// @Produce json -// @Success 200 {array} types.Event -// @Router /pub [get] -func (r *handler) ServeHTTP(wri http.ResponseWriter, req *http.Request) { +func (r *handler) ServeHTTP(w http.ResponseWriter, req *http.Request) { ctx, cancel := context.WithCancel(req.Context()) defer cancel() - // === CORS Preflight === + // CORS if req.Method == http.MethodOptions { - r.setCORSHeaders(wri) - wri.WriteHeader(http.StatusNoContent) + r.setCORSHeaders(w) + w.WriteHeader(http.StatusNoContent) return } + r.setCORSHeaders(w) - r.setCORSHeaders(wri) - - // === SSE Headers === - wri.Header().Set("Content-Type", "text/event-stream") - wri.Header().Set("Cache-Control", "no-cache") - wri.Header().Set("Connection", "keep-alive") - wri.Header().Set("X-Accel-Buffering", "no") + // SSE headers + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + w.Header().Set("X-Accel-Buffering", "no") log := zerolog.New(os.Stdout).With(). Str("service", "eventsse"). Timestamp(). Logger() - f, ok := wri.(http.Flusher) + flusher, ok := w.(http.Flusher) if !ok { log.Error().Msg("http.ResponseWriter does not implement http.Flusher") - http.Error(wri, "Streaming not supported", http.StatusInternalServerError) + http.Error(w, "Streaming not supported", http.StatusInternalServerError) + return + } + + eventCh := make(chan string, defaultEventQueueSize) + defer close(eventCh) + + go func() { + for evt := range eventCh { + _, err := w.Write([]byte(evt)) + if err != nil { + log.Info().Msg("Client disconnected (write error)") + return + } + flusher.Flush() + } + }() + + // Evento iniziale + initial := fmt.Sprintf("event: connection-established\nid: 88888888\ndata: %s\n\n", `{"info": "Ready to watch events"}`) + select { + case eventCh <- initial: + case <-ctx.Done(): return } - fmt.Fprintln(wri, "event: connection-established") - fmt.Fprintln(wri, "id: 88888888") - fmt.Fprintf(wri, "data: %s\n\n", `{"info": "Ready to watch events"}`) - f.Flush() + heartbeat := time.NewTicker(defaultHeartbeatInterval) + defer heartbeat.Stop() watchChan := r.cli.Watch(ctx, store.RootKey, clientv3.WithPrefix()) + for { select { case <-ctx.Done(): log.Info().Msg("SSE client disconnected") return + case <-heartbeat.C: + select { + case eventCh <- ": ping\n\n": + default: + log.Debug().Msg("Heartbeat skipped, client lento") + } + case watchResp, ok := <-watchChan: if !ok { log.Warn().Msg("Etcd watch channel closed") return } + if err := watchResp.Err(); err != nil { + log.Error().Err(err).Msg("Error from ETCD watch") + continue + } for _, ev := range watchResp.Events { - key := string(ev.Kv.Key) val := ev.Kv.Value + key := string(ev.Kv.Key) if len(val) == 0 { continue } @@ -101,48 +122,39 @@ func (r *handler) ServeHTTP(wri http.ResponseWriter, req *http.Request) { } cid := labels.CompositionID(&obj) - belongsToComposition := len(cid) > 0 - eventName := "krateo" - if len(cid) > 0 { + if cid != "" { eventName = cid } - zle := log.Debug(). + log.Debug(). Str("id", key). + Str("event", eventName). Str("reason", obj.Reason). Str("message", obj.Message). Str("involvedObject.Name", obj.InvolvedObject.Name). - Str("involvedObject.Namespace", obj.InvolvedObject.Namespace) - - if belongsToComposition { - zle.Str("event", cid) - } else { - zle.Str("event", "krateo") - } - zle.Msg("Sending SSE") - zle = nil + Str("involvedObject.Namespace", obj.InvolvedObject.Namespace). + Msg("Queueing SSE event") - fmt.Fprintf(wri, "event: %s\n", eventName) - fmt.Fprintf(wri, "id: %s\n", key) - fmt.Fprintf(wri, "data: %s\n\n", string(val)) - f.Flush() + payload := fmt.Sprintf("event: %s\nid: %s\ndata: %s\n\n", eventName, key, string(val)) - log.Debug(). - Str("event", eventName). - Str("key", key). - Msg("SSE sent") + // invio non bloccante + select { + case eventCh <- payload: + default: + log.Warn().Str("event", eventName).Str("key", key).Msg("Dropping SSE event, client troppo lento") + } } } } } -// setCORSHeaders aggiunge header CORS generali func (r *handler) setCORSHeaders(w http.ResponseWriter) { w.Header().Set("Access-Control-Allow-Origin", "*") - w.Header().Set("Access-Control-Allow-Methods", "GET, POST, OPTIONS") - w.Header().Set("Access-Control-Allow-Headers", "Accept, Authorization, Content-Type, X-Auth-Code, X-Krateo-TraceId") - w.Header().Set("Access-Control-Expose-Headers", "Link,Authorization,Content-Type") - w.Header().Set("Access-Control-Allow-Headers", "Authorization,Content-Type") + w.Header().Set("Access-Control-Allow-Methods", "GET, OPTIONS") + w.Header().Set("Access-Control-Allow-Headers", + "Accept, Authorization, Content-Type, X-Auth-Code, X-Krateo-TraceId") + w.Header().Set("Access-Control-Expose-Headers", + "Link, Authorization, Content-Type") w.Header().Set("Access-Control-Allow-Credentials", "true") } diff --git a/eventsse/main.go b/eventsse/main.go index bf0f597..6a4f66d 100644 --- a/eventsse/main.go +++ b/eventsse/main.go @@ -28,9 +28,8 @@ import ( ) const ( - serviceName = "eventsse" - defaultLimit = 100 - fifoMultiplier = 10 + serviceName = "eventsse" + defaultLimit = 100 ) func main() {