Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions eventrouter/echo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from http.server import BaseHTTPRequestHandler, HTTPServer

class Handler(BaseHTTPRequestHandler):
def do_POST(self):
length = int(self.headers.get('Content-Length'))
body = self.rfile.read(length)
print("=== RECEIVED POST ===")
print(body.decode('utf-8'))
print("======================")
self.send_response(200)
self.end_headers()

httpd = HTTPServer(("0.0.0.0", 9092), Handler)
print("Listening on :9092")
httpd.serve_forever()
10 changes: 4 additions & 6 deletions eventrouter/internal/router/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,21 +71,19 @@ func (c *pusher) Handle(evt corev1.Event) {
return
}

if len(evt.ManagedFields) == 0 {
evt.ManagedFields = nil
}
out := minifyEvent(evt)

labels := evt.GetLabels()
labels := out.GetLabels()
if labels == nil {
labels = map[string]string{
keyCompositionID: compositionId,
}
} else {
labels[keyCompositionID] = compositionId
}
evt.SetLabels(labels)
out.SetLabels(labels)

c.notifyAll(all, evt)
c.notifyAll(all, out)
}

func (c *pusher) notifyAll(all map[string]v1alpha1.RegistrationSpec, evt corev1.Event) {
Expand Down
49 changes: 49 additions & 0 deletions eventrouter/internal/router/minify.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package router

import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func minifyEvent(e corev1.Event) corev1.Event {
out := corev1.Event{
TypeMeta: metav1.TypeMeta{
Kind: "Event",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: e.ObjectMeta.Name,
Namespace: e.ObjectMeta.Namespace,
Labels: e.ObjectMeta.Labels,
},
Reason: e.Reason,
Message: e.Message,
Type: e.Type,
}

// EventTime può rimanere
out.EventTime = e.EventTime

// Timestamp legacy azzerati
out.FirstTimestamp = metav1.Time{}
out.LastTimestamp = metav1.Time{}

// Source vuoto
out.Source = corev1.EventSource{}

// InvolvedObject leggero
out.InvolvedObject = corev1.ObjectReference{
Kind: e.InvolvedObject.Kind,
Namespace: e.InvolvedObject.Namespace,
Name: e.InvolvedObject.Name,
UID: e.InvolvedObject.UID,
}

// Rimuovi campi rumorosi
out.ReportingController = ""
out.ReportingInstance = ""
out.Action = ""
out.ManagedFields = nil

return out
}
3 changes: 2 additions & 1 deletion eventrouter/manifests/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ spec:
image: kind.local/eventrouter:latest
imagePullPolicy: Never
args:
- --namespace=demo-system
- --insecure=true
- --debug=true
- --v=6
- --v=4
securityContext:
allowPrivilegeEscalation: false
readOnlyRootFilesystem: false
Expand Down
13 changes: 13 additions & 0 deletions eventrouter/scripts/1.kind-up.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#!/bin/bash


kind get kubeconfig >/dev/null 2>&1 || \
cat <<EOF | kind create cluster --config=-
kind: Cluster
apiVersion: kind.x-k8s.io/v1alpha4
nodes:
- role: control-plane
extraPortMappings:
- containerPort: 9091 # porta del server echo nel pod
hostPort: 9091 # porta sul tuo Mac
EOF
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,6 @@ kubectl apply -f manifests/sa.yaml
kubectl apply -f manifests/rbac.yaml
kubectl apply -f manifests/rbac-bind.yaml
kubectl apply -f manifests/deployment.yaml


kubectl apply -f testdata/registration.sample.yaml
25 changes: 25 additions & 0 deletions eventrouter/scripts/4.run-handler.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#!/bin/bash


cat <<EOF > echo.py
from http.server import BaseHTTPRequestHandler, HTTPServer

class Handler(BaseHTTPRequestHandler):
def do_POST(self):
length = int(self.headers.get('Content-Length'))
body = self.rfile.read(length)
print("=== RECEIVED POST ===")
print(body.decode('utf-8'))
print("======================")
self.send_response(200)
self.end_headers()

httpd = HTTPServer(("0.0.0.0", 9092), Handler)
print("Listening on :9092")
httpd.serve_forever()
EOF

python3 echo.py

rm echo.py

3 changes: 0 additions & 3 deletions eventrouter/scripts/kind-up.sh

This file was deleted.

6 changes: 3 additions & 3 deletions eventrouter/testdata/registration.sample.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
apiVersion: eventrouter.krateo.io/v1alpha1
kind: Registration
metadata:
name: httpecho-registration
name: eventrouter-echo
spec:
serviceName: HTTP Echo
endpoint: http://127.0.0.1:9090/handle
serviceName: EventRouter Echo
endpoint: http://127.0.0.1:9092/handle
8 changes: 5 additions & 3 deletions eventsse/internal/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,8 @@ type Options struct {
// Addresses of the etcd servers in the cluster, including port.
// Optional ([]string{"localhost:2379"} by default).
Endpoints []string

ClientTimeout time.Duration
}

// DefaultOptions is an Options object with default values.
Expand All @@ -201,7 +203,7 @@ func NewClient(options Options) (Store, error) {

config := clientv3.Config{
Endpoints: options.Endpoints,
DialTimeout: 2 * time.Second,
DialTimeout: 5 * time.Second,
//DialOptions: []grpc.DialOption{grpc.WithBlock()},
}

Expand All @@ -210,7 +212,7 @@ func NewClient(options Options) (Store, error) {
return result, err
}

ctxWithTimeout, cancel := context.WithTimeout(context.Background(), 3*time.Second)
ctxWithTimeout, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
statusRes, err := cli.Status(ctxWithTimeout, options.Endpoints[0])
if err != nil {
Expand All @@ -220,7 +222,7 @@ func NewClient(options Options) (Store, error) {
}

result.c = cli
result.timeOut = defaultTimeout
result.timeOut = max(options.ClientTimeout, defaultTimeout)

return result, nil
}
16 changes: 10 additions & 6 deletions eventsse/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ func main() {
limit := flag.Int("limit", env.Int("EVENTSSE_GET_LIMIT", defaultLimit),
"limits the number of results to return from 'Get' request")
endpoints := flag.String("etcd-servers", env.String("EVENTSSE_ETCD_SERVERS", "localhost:2379"), "etcd endpoints")
clientTimeout := flag.Duration("etcd-client-timeout",
env.Duration("EVENTSSE_ETCD_CLIENT_TIMEOUT", 300*time.Millisecond), "etcd client timeout")

flag.Usage = func() {
fmt.Fprintln(flag.CommandLine.Output(), "Flags:")
Expand Down Expand Up @@ -68,11 +70,12 @@ func main() {

if log.Debug().Enabled() {
evt := log.Debug().
Str("debug", fmt.Sprintf("%t", *debugOn)).
Str("port", fmt.Sprintf("%d", *port)).
Str("ttl", fmt.Sprintf("%d", *ttlSecs)).
Str("limit", fmt.Sprintf("%d", *limit)).
Str("etcd-endpoints", *endpoints)
Bool("debug", *debugOn).
Int("port", *port).
Int("ttl", *ttlSecs).
Int("limit", *limit).
Str("etcd-endpoints", *endpoints).
Dur("etcd-client-timeout", *clientTimeout)

if *dumpEnv {
evt = evt.Strs("env-vars", os.Environ())
Expand All @@ -82,7 +85,8 @@ func main() {
}

opts := store.Options{
Endpoints: strings.Split(*endpoints, ","),
Endpoints: strings.Split(*endpoints, ","),
ClientTimeout: *clientTimeout,
}
storage, err := store.NewClient(opts)
if err != nil {
Expand Down