-
Notifications
You must be signed in to change notification settings - Fork 54
[kernel-1116] browser events add s2 storage support #235
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
archandatta
wants to merge
71
commits into
main
Choose a base branch
from
archand/kernel-1116/browser-events/s2
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
71 commits
Select commit
Hold shift + click to select a range
ab738a1
feat: add CDP pipeline, cdpmonitor, and wire into API service
archandatta a7b1714
refactor: rename Pipeline to CaptureSession and delete pipeline.go
archandatta 04b440d
review: fix request context leak in StartCapture, add missing categor…
archandatta 86ef8f5
review: clean up syntax
archandatta 4de35ba
review: move CategoryFor to cdpmonitor package
archandatta 4ebdcf5
review: internalize ring buffer and file writer in CaptureSession con…
archandatta 33efe9a
review: write logs under /var/log/kernel and ensure dir exists
archandatta 6c6f3c9
review: add capture config to /events/start and OpenAPI spec
archandatta ade2f4b
review: fix lifecycle context, stop-before-reset ordering, seq reset,…
archandatta 3f9ada7
fix: oapi version
archandatta ace55a5
fix: Shutdown cancels context outside lock, racing with StartCapture
archandatta be34cc5
review: validate DetailLevel with generated Valid
archandatta 8294a54
fix: reset ring buffer on session restart to unstrand existing readers
archandatta 12261d5
chore: remove dead categoryFor function
archandatta 4ff30ce
review: guard zero-capacity ring buffer and fix reader reset after bu…
archandatta 1c8ae0a
review: use t.TempDir in test helper, map-based ValidCategory, avoid …
archandatta 67eefa7
review: add captureConfigFrom and StartCapture/StopCapture handler tests
archandatta c6f509f
feat: refactor events API to resource-style capture sessions
archandatta 34aef8e
review: update file writer to be internal to the package
archandatta edf7cab
review: tighten to `Write(filename string, data []byte) error`
archandatta 57871d5
review: update panic -> error
archandatta 2358564
review: update oapi and remove detail level
archandatta 37a155e
review: remove url
archandatta 8f24624
chore: restore server/api on branch
archandatta 147ca41
review: harden captureConfigFromOAPI and clarify stop comment
archandatta 95e7b6e
review: unexport ringBuffer and drop AllCategories wrapper
archandatta 3b8c0a2
review: replace event producers with cdp monitor in stop description
archandatta 97b8374
remove test line
archandatta ddbaabc
review: update uuid to cuid2
archandatta 0210a6e
feat: add cdpmonitor foundation — types, util, computed state machine…
archandatta 42999af
self review
archandatta b151535
review: cursor feedback
archandatta 869e368
[kernel-1116] CDP monitor core (#214)
archandatta 150a306
review: update types and sensitive interaction data
archandatta b77e7e2
feat: add two-layer CDP decode, protocol-faithful types, then monitor…
archandatta f6258b6
review: clean up monitor health and types
archandatta 915009c
review: add chromium version
archandatta 52c6f87
review: remove dead code
archandatta 277084e
fix injection script
archandatta 635449d
review: remove sensitive data from inject
archandatta d4585b5
review
archandatta ca628ef
review: sensitive data audit interaction.js
archandatta bf8026a
review: reconnect failure leaks goroutines and deadlocks Stop
archandatta f3c1441
review: add ctx to monitor and update comment
archandatta b536299
review: lift lifeMu to dispatch level to make ctx handling explicit
archandatta e977007
review: add readme for cdp monitor
archandatta e8eb642
feat: capturesession: add Active(), publishLocked helper, session_end…
archandatta 37b79ee
feat: add PublishEvent and StreamEvents handlers
archandatta ff15e66
review: address feedback
archandatta 6bf578e
review: update comment
archandatta c266e78
review: add events lifecycle test
archandatta 3f95cff
review: add documentation
archandatta b61d0f1
chore: rename event type constants to drop Type prefix
archandatta 121e5ae
feat: add S2 durable event storage library
archandatta c4a9ae1
feat: wire S2 storage writer into api service
archandatta 98b54de
review: rename storage fields and clean up writer tests
archandatta 609bbb1
chore: update headless configs
archandatta 9d1885b
fix: bound ack goroutine lifetimes and close the wg.Add/Wait race in …
archandatta a78dc6b
fix: drain SessionEnded to storage before evicting s2 producer
archandatta d86369f
fix: run shutdown phases in parallel and bound storageWriter.Close wi…
archandatta 36cc25b
fix: suppress spurious storage_error on shutdown and surface ring ove…
archandatta 9927d41
fix: remove streams:list probe and drop ctx from NewS2Storage; degrad…
archandatta 0766996
fix: make s2 Append synchronous — wait for ack before returning
archandatta 93cd63f
review: add missing test coverage for overflow, slow-append shutdown,…
archandatta 2e4c65c
review: use EventsDropped for ring overflow events and add batcher tu…
archandatta 503f4bd
review: document nil storageWriter contract and session cleanup ordering
archandatta f9f31b3
review: remove unnecessary sessionID local var in StopCaptureSession
archandatta 6871eac
fix: block EventsStorageError in PublishEvent reserved type check
archandatta 2e638ae
fix: remove overflow PublishUnfiltered to eliminate cpu-spinning feed…
archandatta a54a688
fix: remove unused storageWriter field and parameter from ApiService
archandatta 8b34d4f
fix: stop capture session before cancelling writer so SessionEnded re…
archandatta File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
2 changes: 1 addition & 1 deletion
2
images/chromium-headful/supervisor/services/kernel-images-api.conf
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
2 changes: 1 addition & 1 deletion
2
images/chromium-headless/image/supervisor/services/kernel-images-api.conf
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -10,7 +10,6 @@ import ( | |
| "sync" | ||
| "time" | ||
|
|
||
| "github.com/kernel/kernel-images/server/lib/capturesession" | ||
| "github.com/kernel/kernel-images/server/lib/cdpmonitor" | ||
| "github.com/kernel/kernel-images/server/lib/devtoolsproxy" | ||
| "github.com/kernel/kernel-images/server/lib/events" | ||
|
|
@@ -82,8 +81,7 @@ type ApiService struct { | |
| xvfbResizeMu sync.Mutex | ||
|
|
||
| // CDP event pipeline and cdpMonitor. | ||
| eventStream *events.EventStream | ||
| captureSession *capturesession.CaptureSession | ||
| captureSession *events.CaptureSession | ||
| cdpMonitor cdpMonitorController | ||
| monitorMu sync.Mutex | ||
| lifecycleCtx context.Context | ||
|
|
@@ -92,14 +90,14 @@ type ApiService struct { | |
|
|
||
| var _ oapi.StrictServerInterface = (*ApiService)(nil) | ||
|
|
||
| // New constructs an ApiService. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. remove |
||
| func New( | ||
| recordManager recorder.RecordManager, | ||
| factory recorder.FFmpegRecorderFactory, | ||
| upstreamMgr *devtoolsproxy.UpstreamManager, | ||
| stz scaletozero.Controller, | ||
| nekoAuthClient *nekoclient.AuthClient, | ||
| captureSession *capturesession.CaptureSession, | ||
| eventStream *events.EventStream, | ||
| captureSession *events.CaptureSession, | ||
| displayNum int, | ||
| ) (*ApiService, error) { | ||
| switch { | ||
|
|
@@ -113,24 +111,21 @@ func New( | |
| return nil, fmt.Errorf("nekoAuthClient cannot be nil") | ||
| case captureSession == nil: | ||
| return nil, fmt.Errorf("captureSession cannot be nil") | ||
| case eventStream == nil: | ||
| return nil, fmt.Errorf("eventStream cannot be nil") | ||
| } | ||
|
|
||
| mon := cdpmonitor.New(upstreamMgr, captureSession.Publish, displayNum, slog.Default()) | ||
| mon := cdpmonitor.New(upstreamMgr, func(ev events.Event) { captureSession.Publish(ev) }, displayNum, slog.Default()) | ||
| ctx, cancel := context.WithCancel(context.Background()) | ||
|
|
||
| return &ApiService{ | ||
| recordManager: recordManager, | ||
| factory: factory, | ||
| recordManager: recordManager, | ||
| factory: factory, | ||
| defaultRecorderID: "default", | ||
| watches: make(map[string]*fsWatch), | ||
| procs: make(map[string]*processHandle), | ||
| upstreamMgr: upstreamMgr, | ||
| stz: stz, | ||
| nekoAuthClient: nekoAuthClient, | ||
| policy: &policy.Policy{}, | ||
| eventStream: eventStream, | ||
| captureSession: captureSession, | ||
| cdpMonitor: mon, | ||
| lifecycleCtx: ctx, | ||
|
|
@@ -358,6 +353,7 @@ func (s *ApiService) Shutdown(ctx context.Context) error { | |
| s.lifecycleCancel() | ||
| s.cdpMonitor.Stop() | ||
| s.captureSession.Stop() | ||
| _ = s.captureSession.Close() | ||
| s.monitorMu.Unlock() | ||
| return s.recordManager.StopAll(ctx) | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -8,7 +8,6 @@ import ( | |
| "github.com/nrednav/cuid2" | ||
| oapi "github.com/kernel/kernel-images/server/lib/oapi" | ||
|
|
||
| "github.com/kernel/kernel-images/server/lib/capturesession" | ||
| "github.com/kernel/kernel-images/server/lib/events" | ||
| "github.com/kernel/kernel-images/server/lib/logger" | ||
| ) | ||
|
|
@@ -88,6 +87,9 @@ func (s *ApiService) StopCaptureSession(_ context.Context, _ oapi.StopCaptureSes | |
| // tear down asynchronously, leaving IsRunning briefly true. | ||
| resp := s.buildSessionResponse() | ||
| resp.Status = oapi.CaptureSessionStatusStopped | ||
| // Session cleanup (Remove on the S2 producer) happens automatically in | ||
| // EventsStorageWriter.Run when it processes the SessionEnded event, ensuring | ||
| // all pending writes are flushed before the producer is torn down. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this comment feels out of place. remove or move closer to the things it's talking about? |
||
| s.captureSession.Stop() | ||
|
|
||
| return oapi.StopCaptureSession200JSONResponse(resp), nil | ||
|
|
@@ -120,26 +122,26 @@ func (s *ApiService) buildSessionResponse() oapi.CaptureSession { | |
| } | ||
|
|
||
| // captureConfigFrom converts the optional StartCaptureSessionRequest body | ||
| // into a capturesession.CaptureConfig. | ||
| func captureConfigFrom(body *oapi.StartCaptureSessionRequest) (capturesession.CaptureConfig, error) { | ||
| // into an events.CaptureConfig. | ||
| func captureConfigFrom(body *oapi.StartCaptureSessionRequest) (events.CaptureConfig, error) { | ||
| if body == nil { | ||
| return capturesession.CaptureConfig{}, nil | ||
| return events.CaptureConfig{}, nil | ||
| } | ||
| return captureConfigFromOAPI(body.Config) | ||
| } | ||
|
|
||
| // captureConfigFromOAPI converts an oapi.CaptureConfig to capturesession.CaptureConfig. | ||
| func captureConfigFromOAPI(cfg *oapi.CaptureConfig) (capturesession.CaptureConfig, error) { | ||
| // captureConfigFromOAPI converts an oapi.CaptureConfig to events.CaptureConfig. | ||
| func captureConfigFromOAPI(cfg *oapi.CaptureConfig) (events.CaptureConfig, error) { | ||
| if cfg == nil || cfg.Categories == nil { | ||
| return capturesession.CaptureConfig{}, nil | ||
| return events.CaptureConfig{}, nil | ||
| } | ||
| out := capturesession.CaptureConfig{ | ||
| out := events.CaptureConfig{ | ||
| Categories: make([]events.EventCategory, 0, len(*cfg.Categories)), | ||
| } | ||
| for _, c := range *cfg.Categories { | ||
| cat := events.EventCategory(c) | ||
| if !events.ValidCategory(cat) { | ||
| return capturesession.CaptureConfig{}, fmt.Errorf("unknown category: %q", c) | ||
| return events.CaptureConfig{}, fmt.Errorf("unknown category: %q", c) | ||
| } | ||
| out.Categories = append(out.Categories, cat) | ||
| } | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should there also be an s2 stream config?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
stream names are derived dynamically from the CaptureSessionID at runtime, so one S2 stream is created per capture session within the configured basin
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this will make it hard for, e.g. the API to know what stream to read when it wants to stream telemetry for a browser. It also is at odds with "one global event stream" that the cdp stuff publishes into. I would make this a constant that is injected on startup