Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 73 additions & 61 deletions eventsse/internal/handlers/pub/pub.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"net/http"
"os"
"time"

"github.com/krateoplatformops/eventsse/internal/labels"
"github.com/krateoplatformops/eventsse/internal/store"
Expand All @@ -14,82 +15,102 @@ import (
corev1 "k8s.io/api/core/v1"
)

const (
defaultHeartbeatInterval = 25 * time.Second
defaultEventQueueSize = 256
)

func SSE(cli clientv3.Watcher) http.Handler {
return &handler{
cli: cli,
}
return &handler{cli: cli}
}

var _ http.Handler = (*handler)(nil)

type handler struct {
cli clientv3.Watcher
}

// @title EventSSE API
// @version 1.0
// @description This the Krateo EventSSE server.
// @BasePath /

// Health godoc
// @Summary SSE Endpoint
// @Description Get available events notifications
// @ID notifications
// @Produce json
// @Success 200 {array} types.Event
// @Router /pub [get]
func (r *handler) ServeHTTP(wri http.ResponseWriter, req *http.Request) {
func (r *handler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()

// === CORS Preflight ===
// CORS
if req.Method == http.MethodOptions {
r.setCORSHeaders(wri)
wri.WriteHeader(http.StatusNoContent)
r.setCORSHeaders(w)
w.WriteHeader(http.StatusNoContent)
return
}
r.setCORSHeaders(w)

r.setCORSHeaders(wri)

// === SSE Headers ===
wri.Header().Set("Content-Type", "text/event-stream")
wri.Header().Set("Cache-Control", "no-cache")
wri.Header().Set("Connection", "keep-alive")
wri.Header().Set("X-Accel-Buffering", "no")
// SSE headers
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("X-Accel-Buffering", "no")

log := zerolog.New(os.Stdout).With().
Str("service", "eventsse").
Timestamp().
Logger()

f, ok := wri.(http.Flusher)
flusher, ok := w.(http.Flusher)
if !ok {
log.Error().Msg("http.ResponseWriter does not implement http.Flusher")
http.Error(wri, "Streaming not supported", http.StatusInternalServerError)
http.Error(w, "Streaming not supported", http.StatusInternalServerError)
return
}

eventCh := make(chan string, defaultEventQueueSize)
defer close(eventCh)

go func() {
for evt := range eventCh {
_, err := w.Write([]byte(evt))
if err != nil {
log.Info().Msg("Client disconnected (write error)")
return
}
flusher.Flush()
}
}()

// Evento iniziale
initial := fmt.Sprintf("event: connection-established\nid: 88888888\ndata: %s\n\n", `{"info": "Ready to watch events"}`)
select {
case eventCh <- initial:
case <-ctx.Done():
return
}

fmt.Fprintln(wri, "event: connection-established")
fmt.Fprintln(wri, "id: 88888888")
fmt.Fprintf(wri, "data: %s\n\n", `{"info": "Ready to watch events"}`)
f.Flush()
heartbeat := time.NewTicker(defaultHeartbeatInterval)
defer heartbeat.Stop()

watchChan := r.cli.Watch(ctx, store.RootKey, clientv3.WithPrefix())

for {
select {
case <-ctx.Done():
log.Info().Msg("SSE client disconnected")
return

case <-heartbeat.C:
select {
case eventCh <- ": ping\n\n":
default:
log.Debug().Msg("Heartbeat skipped, client lento")
}

case watchResp, ok := <-watchChan:
if !ok {
log.Warn().Msg("Etcd watch channel closed")
return
}
if err := watchResp.Err(); err != nil {
log.Error().Err(err).Msg("Error from ETCD watch")
continue
}

for _, ev := range watchResp.Events {
key := string(ev.Kv.Key)
val := ev.Kv.Value
key := string(ev.Kv.Key)
if len(val) == 0 {
continue
}
Expand All @@ -101,48 +122,39 @@ func (r *handler) ServeHTTP(wri http.ResponseWriter, req *http.Request) {
}

cid := labels.CompositionID(&obj)
belongsToComposition := len(cid) > 0

eventName := "krateo"
if len(cid) > 0 {
if cid != "" {
eventName = cid
}

zle := log.Debug().
log.Debug().
Str("id", key).
Str("event", eventName).
Str("reason", obj.Reason).
Str("message", obj.Message).
Str("involvedObject.Name", obj.InvolvedObject.Name).
Str("involvedObject.Namespace", obj.InvolvedObject.Namespace)

if belongsToComposition {
zle.Str("event", cid)
} else {
zle.Str("event", "krateo")
}
zle.Msg("Sending SSE")
zle = nil
Str("involvedObject.Namespace", obj.InvolvedObject.Namespace).
Msg("Queueing SSE event")

fmt.Fprintf(wri, "event: %s\n", eventName)
fmt.Fprintf(wri, "id: %s\n", key)
fmt.Fprintf(wri, "data: %s\n\n", string(val))
f.Flush()
payload := fmt.Sprintf("event: %s\nid: %s\ndata: %s\n\n", eventName, key, string(val))

log.Debug().
Str("event", eventName).
Str("key", key).
Msg("SSE sent")
// invio non bloccante
select {
case eventCh <- payload:
default:
log.Warn().Str("event", eventName).Str("key", key).Msg("Dropping SSE event, client troppo lento")
}
}
}
}
}

// setCORSHeaders aggiunge header CORS generali
func (r *handler) setCORSHeaders(w http.ResponseWriter) {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
w.Header().Set("Access-Control-Allow-Headers", "Accept, Authorization, Content-Type, X-Auth-Code, X-Krateo-TraceId")
w.Header().Set("Access-Control-Expose-Headers", "Link,Authorization,Content-Type")
w.Header().Set("Access-Control-Allow-Headers", "Authorization,Content-Type")
w.Header().Set("Access-Control-Allow-Methods", "GET, OPTIONS")
w.Header().Set("Access-Control-Allow-Headers",
"Accept, Authorization, Content-Type, X-Auth-Code, X-Krateo-TraceId")
w.Header().Set("Access-Control-Expose-Headers",
"Link, Authorization, Content-Type")
w.Header().Set("Access-Control-Allow-Credentials", "true")
}
5 changes: 2 additions & 3 deletions eventsse/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@ import (
)

const (
serviceName = "eventsse"
defaultLimit = 100
fifoMultiplier = 10
serviceName = "eventsse"
defaultLimit = 100
)

func main() {
Expand Down