From 707d375198392b4ab659dc12bbb9d313c987d493 Mon Sep 17 00:00:00 2001 From: Luca Sepe Date: Thu, 20 Nov 2025 11:34:33 +0100 Subject: [PATCH] feat: minify event data and expose etcd client timeout --- eventrouter/echo.py | 15 ++++++ eventrouter/internal/router/handler.go | 10 ++-- eventrouter/internal/router/minify.go | 49 +++++++++++++++++++ eventrouter/manifests/deployment.yaml | 3 +- eventrouter/scripts/1.kind-up.sh | 13 +++++ eventrouter/scripts/{build.sh => 2.build.sh} | 0 .../scripts/{install.sh => 3.install.sh} | 3 ++ eventrouter/scripts/4.run-handler.sh | 25 ++++++++++ eventrouter/scripts/kind-up.sh | 3 -- eventrouter/testdata/registration.sample.yaml | 6 +-- eventsse/internal/store/store.go | 8 +-- eventsse/main.go | 16 +++--- 12 files changed, 129 insertions(+), 22 deletions(-) create mode 100644 eventrouter/echo.py create mode 100644 eventrouter/internal/router/minify.go create mode 100755 eventrouter/scripts/1.kind-up.sh rename eventrouter/scripts/{build.sh => 2.build.sh} (100%) rename eventrouter/scripts/{install.sh => 3.install.sh} (85%) create mode 100755 eventrouter/scripts/4.run-handler.sh delete mode 100755 eventrouter/scripts/kind-up.sh diff --git a/eventrouter/echo.py b/eventrouter/echo.py new file mode 100644 index 0000000..f8ff63a --- /dev/null +++ b/eventrouter/echo.py @@ -0,0 +1,15 @@ +from http.server import BaseHTTPRequestHandler, HTTPServer + +class Handler(BaseHTTPRequestHandler): + def do_POST(self): + length = int(self.headers.get('Content-Length')) + body = self.rfile.read(length) + print("=== RECEIVED POST ===") + print(body.decode('utf-8')) + print("======================") + self.send_response(200) + self.end_headers() + +httpd = HTTPServer(("0.0.0.0", 9092), Handler) +print("Listening on :9092") +httpd.serve_forever() diff --git a/eventrouter/internal/router/handler.go b/eventrouter/internal/router/handler.go index 73e2216..3d2ba75 100644 --- a/eventrouter/internal/router/handler.go +++ b/eventrouter/internal/router/handler.go @@ -71,11 +71,9 @@ func (c *pusher) Handle(evt corev1.Event) { return } - if len(evt.ManagedFields) == 0 { - evt.ManagedFields = nil - } + out := minifyEvent(evt) - labels := evt.GetLabels() + labels := out.GetLabels() if labels == nil { labels = map[string]string{ keyCompositionID: compositionId, @@ -83,9 +81,9 @@ func (c *pusher) Handle(evt corev1.Event) { } else { labels[keyCompositionID] = compositionId } - evt.SetLabels(labels) + out.SetLabels(labels) - c.notifyAll(all, evt) + c.notifyAll(all, out) } func (c *pusher) notifyAll(all map[string]v1alpha1.RegistrationSpec, evt corev1.Event) { diff --git a/eventrouter/internal/router/minify.go b/eventrouter/internal/router/minify.go new file mode 100644 index 0000000..2e9e4e7 --- /dev/null +++ b/eventrouter/internal/router/minify.go @@ -0,0 +1,49 @@ +package router + +import ( + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func minifyEvent(e corev1.Event) corev1.Event { + out := corev1.Event{ + TypeMeta: metav1.TypeMeta{ + Kind: "Event", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: e.ObjectMeta.Name, + Namespace: e.ObjectMeta.Namespace, + Labels: e.ObjectMeta.Labels, + }, + Reason: e.Reason, + Message: e.Message, + Type: e.Type, + } + + // EventTime può rimanere + out.EventTime = e.EventTime + + // Timestamp legacy azzerati + out.FirstTimestamp = metav1.Time{} + out.LastTimestamp = metav1.Time{} + + // Source vuoto + out.Source = corev1.EventSource{} + + // InvolvedObject leggero + out.InvolvedObject = corev1.ObjectReference{ + Kind: e.InvolvedObject.Kind, + Namespace: e.InvolvedObject.Namespace, + Name: e.InvolvedObject.Name, + UID: e.InvolvedObject.UID, + } + + // Rimuovi campi rumorosi + out.ReportingController = "" + out.ReportingInstance = "" + out.Action = "" + out.ManagedFields = nil + + return out +} diff --git a/eventrouter/manifests/deployment.yaml b/eventrouter/manifests/deployment.yaml index 8bcd9d2..982d8b2 100644 --- a/eventrouter/manifests/deployment.yaml +++ b/eventrouter/manifests/deployment.yaml @@ -23,9 +23,10 @@ spec: image: kind.local/eventrouter:latest imagePullPolicy: Never args: + - --namespace=demo-system - --insecure=true - --debug=true - - --v=6 + - --v=4 securityContext: allowPrivilegeEscalation: false readOnlyRootFilesystem: false diff --git a/eventrouter/scripts/1.kind-up.sh b/eventrouter/scripts/1.kind-up.sh new file mode 100755 index 0000000..b6c4849 --- /dev/null +++ b/eventrouter/scripts/1.kind-up.sh @@ -0,0 +1,13 @@ +#!/bin/bash + + +kind get kubeconfig >/dev/null 2>&1 || \ + cat < echo.py +from http.server import BaseHTTPRequestHandler, HTTPServer + +class Handler(BaseHTTPRequestHandler): + def do_POST(self): + length = int(self.headers.get('Content-Length')) + body = self.rfile.read(length) + print("=== RECEIVED POST ===") + print(body.decode('utf-8')) + print("======================") + self.send_response(200) + self.end_headers() + +httpd = HTTPServer(("0.0.0.0", 9092), Handler) +print("Listening on :9092") +httpd.serve_forever() +EOF + +python3 echo.py + +rm echo.py + diff --git a/eventrouter/scripts/kind-up.sh b/eventrouter/scripts/kind-up.sh deleted file mode 100755 index 4b35aa0..0000000 --- a/eventrouter/scripts/kind-up.sh +++ /dev/null @@ -1,3 +0,0 @@ -#!/bin/bash - -kind get kubeconfig >/dev/null 2>&1 || kind create cluster diff --git a/eventrouter/testdata/registration.sample.yaml b/eventrouter/testdata/registration.sample.yaml index f29e748..ba38d1a 100644 --- a/eventrouter/testdata/registration.sample.yaml +++ b/eventrouter/testdata/registration.sample.yaml @@ -1,7 +1,7 @@ apiVersion: eventrouter.krateo.io/v1alpha1 kind: Registration metadata: - name: httpecho-registration + name: eventrouter-echo spec: - serviceName: HTTP Echo - endpoint: http://127.0.0.1:9090/handle \ No newline at end of file + serviceName: EventRouter Echo + endpoint: http://127.0.0.1:9092/handle \ No newline at end of file diff --git a/eventsse/internal/store/store.go b/eventsse/internal/store/store.go index 5a85110..5d14924 100644 --- a/eventsse/internal/store/store.go +++ b/eventsse/internal/store/store.go @@ -180,6 +180,8 @@ type Options struct { // Addresses of the etcd servers in the cluster, including port. // Optional ([]string{"localhost:2379"} by default). Endpoints []string + + ClientTimeout time.Duration } // DefaultOptions is an Options object with default values. @@ -201,7 +203,7 @@ func NewClient(options Options) (Store, error) { config := clientv3.Config{ Endpoints: options.Endpoints, - DialTimeout: 2 * time.Second, + DialTimeout: 5 * time.Second, //DialOptions: []grpc.DialOption{grpc.WithBlock()}, } @@ -210,7 +212,7 @@ func NewClient(options Options) (Store, error) { return result, err } - ctxWithTimeout, cancel := context.WithTimeout(context.Background(), 3*time.Second) + ctxWithTimeout, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() statusRes, err := cli.Status(ctxWithTimeout, options.Endpoints[0]) if err != nil { @@ -220,7 +222,7 @@ func NewClient(options Options) (Store, error) { } result.c = cli - result.timeOut = defaultTimeout + result.timeOut = max(options.ClientTimeout, defaultTimeout) return result, nil } diff --git a/eventsse/main.go b/eventsse/main.go index 6a4f66d..be96710 100644 --- a/eventsse/main.go +++ b/eventsse/main.go @@ -40,6 +40,8 @@ func main() { limit := flag.Int("limit", env.Int("EVENTSSE_GET_LIMIT", defaultLimit), "limits the number of results to return from 'Get' request") endpoints := flag.String("etcd-servers", env.String("EVENTSSE_ETCD_SERVERS", "localhost:2379"), "etcd endpoints") + clientTimeout := flag.Duration("etcd-client-timeout", + env.Duration("EVENTSSE_ETCD_CLIENT_TIMEOUT", 300*time.Millisecond), "etcd client timeout") flag.Usage = func() { fmt.Fprintln(flag.CommandLine.Output(), "Flags:") @@ -68,11 +70,12 @@ func main() { if log.Debug().Enabled() { evt := log.Debug(). - Str("debug", fmt.Sprintf("%t", *debugOn)). - Str("port", fmt.Sprintf("%d", *port)). - Str("ttl", fmt.Sprintf("%d", *ttlSecs)). - Str("limit", fmt.Sprintf("%d", *limit)). - Str("etcd-endpoints", *endpoints) + Bool("debug", *debugOn). + Int("port", *port). + Int("ttl", *ttlSecs). + Int("limit", *limit). + Str("etcd-endpoints", *endpoints). + Dur("etcd-client-timeout", *clientTimeout) if *dumpEnv { evt = evt.Strs("env-vars", os.Environ()) @@ -82,7 +85,8 @@ func main() { } opts := store.Options{ - Endpoints: strings.Split(*endpoints, ","), + Endpoints: strings.Split(*endpoints, ","), + ClientTimeout: *clientTimeout, } storage, err := store.NewClient(opts) if err != nil {