From 21ba579c06520ccd4ab5a673812b65af52f78bd9 Mon Sep 17 00:00:00 2001 From: Francesco Renzi Date: Wed, 8 Apr 2026 08:53:41 -0700 Subject: [PATCH 01/12] Add WS bridge over DAP TCP server --- src/Runner.Worker/Dap/DapDebugger.cs | 40 +- src/Runner.Worker/Dap/WebSocketDapBridge.cs | 812 ++++++++++++++++++++ src/Test/L0/Worker/DapDebuggerL0.cs | 143 +++- src/Test/L0/Worker/WebSocketDapBridgeL0.cs | 237 ++++++ 4 files changed, 1228 insertions(+), 4 deletions(-) create mode 100644 src/Runner.Worker/Dap/WebSocketDapBridge.cs create mode 100644 src/Test/L0/Worker/WebSocketDapBridgeL0.cs diff --git a/src/Runner.Worker/Dap/DapDebugger.cs b/src/Runner.Worker/Dap/DapDebugger.cs index 99b61e1b1a2..a7afa598814 100644 --- a/src/Runner.Worker/Dap/DapDebugger.cs +++ b/src/Runner.Worker/Dap/DapDebugger.cs @@ -66,6 +66,7 @@ public sealed class DapDebugger : RunnerService, IDapDebugger // Dev Tunnel relay host for remote debugging private TunnelRelayTunnelHost _tunnelRelayHost; + private WebSocketDapBridge _webSocketBridge; // Cancellation source for the connection loop, cancelled in StopAsync // so AcceptTcpClientAsync unblocks cleanly without relying on listener disposal. @@ -74,6 +75,10 @@ public sealed class DapDebugger : RunnerService, IDapDebugger // When true, skip tunnel relay startup (unit tests only) internal bool SkipTunnelRelay { get; set; } + // When true, skip the public websocket bridge and expose the raw DAP + // listener directly on the configured tunnel port (unit tests only). + internal bool SkipWebSocketBridge { get; set; } + // Synchronization for step execution private TaskCompletionSource _commandTcs; private readonly object _stateLock = new object(); @@ -108,6 +113,7 @@ public sealed class DapDebugger : RunnerService, IDapDebugger _state == DapSessionState.Running; internal DapSessionState State => _state; + internal int InternalDapPort => (_listener?.LocalEndpoint as IPEndPoint)?.Port ?? 0; public override void Initialize(IHostContext hostContext) { @@ -133,9 +139,22 @@ public async Task StartAsync(IExecutionContext jobContext) _jobContext = jobContext; _readyTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - _listener = new TcpListener(IPAddress.Loopback, debuggerConfig.Tunnel.Port); + var dapPort = SkipWebSocketBridge ? debuggerConfig.Tunnel.Port : 0; + _listener = new TcpListener(IPAddress.Loopback, dapPort); _listener.Start(); - Trace.Info($"DAP debugger listening on {_listener.LocalEndpoint}"); + if (SkipWebSocketBridge) + { + Trace.Info($"DAP debugger listening on {_listener.LocalEndpoint}"); + } + else + { + Trace.Info($"Internal DAP debugger listening on {_listener.LocalEndpoint}"); + _webSocketBridge = new WebSocketDapBridge( + HostContext.GetTrace("DapWebSocketBridge"), + debuggerConfig.Tunnel.Port, + InternalDapPort); + _webSocketBridge.Start(); + } // Start Dev Tunnel relay so remote clients reach the local DAP port. // The relay is torn down explicitly in StopAsync (after the DAP session @@ -274,6 +293,22 @@ public async Task StopAsync() _tunnelRelayHost = null; } + if (_webSocketBridge != null) + { + Trace.Info("Stopping WebSocket DAP bridge"); + var disposeTask = _webSocketBridge.DisposeAsync().AsTask(); + if (await Task.WhenAny(disposeTask, Task.Delay(5_000)) != disposeTask) + { + Trace.Warning("WebSocket DAP bridge dispose timed out after 5s"); + } + else + { + Trace.Info("WebSocket DAP bridge stopped"); + } + + _webSocketBridge = null; + } + CleanupConnection(); // Cancel the connection loop first so AcceptTcpClientAsync unblocks @@ -315,6 +350,7 @@ public async Task StopAsync() _connectionLoopTask = null; _loopCts?.Dispose(); _loopCts = null; + _webSocketBridge = null; } public async Task OnStepStartingAsync(IStep step) diff --git a/src/Runner.Worker/Dap/WebSocketDapBridge.cs b/src/Runner.Worker/Dap/WebSocketDapBridge.cs new file mode 100644 index 00000000000..2596c184028 --- /dev/null +++ b/src/Runner.Worker/Dap/WebSocketDapBridge.cs @@ -0,0 +1,812 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Net; +using System.Net.Sockets; +using System.Net.WebSockets; +using System.Security.Cryptography; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using GitHub.Runner.Common; + +namespace GitHub.Runner.Worker.Dap +{ + internal sealed class WebSocketDapBridge : IAsyncDisposable + { + internal enum IncomingStreamPrefixKind + { + Unknown, + HttpWebSocketUpgrade, + PreUpgradedWebSocket, + WebSocketReservedBits, + Http2Preface, + TlsClientHello, + } + + private const int _bufferSize = 32 * 1024; + private const int _maxHeaderLineLength = 8 * 1024; + private const int _defaultMaxInboundMessageSize = 10 * 1024 * 1024; // 10 MB + private static readonly TimeSpan _keepAliveInterval = TimeSpan.FromSeconds(30); + private static readonly TimeSpan _closeTimeout = TimeSpan.FromSeconds(5); + private const string _webSocketAcceptMagic = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; + + private readonly Tracing _trace; + private readonly int _listenPort; + private readonly int _targetPort; + + private TcpListener _listener; + private CancellationTokenSource _loopCts; + private Task _acceptLoopTask; + + // Overridable for unit tests to avoid allocating 10 MB payloads. + internal int MaxInboundMessageSize { get; set; } = _defaultMaxInboundMessageSize; + + public WebSocketDapBridge(Tracing trace, int listenPort, int targetPort) + { + _trace = trace ?? throw new ArgumentNullException(nameof(trace)); + _listenPort = listenPort; + _targetPort = targetPort; + } + + public void Start() + { + if (_listener != null) + { + throw new InvalidOperationException("WebSocket DAP bridge already started."); + } + + _listener = new TcpListener(IPAddress.Loopback, _listenPort); + _listener.Start(); + _loopCts = new CancellationTokenSource(); + _acceptLoopTask = AcceptLoopAsync(_loopCts.Token); + + _trace.Info($"WebSocket DAP bridge listening on {_listener.LocalEndpoint} -> 127.0.0.1:{_targetPort}"); + } + + public async ValueTask DisposeAsync() + { + try + { + _loopCts?.Cancel(); + } + catch + { + // best effort during shutdown + } + + try + { + _listener?.Stop(); + } + catch + { + // best effort during shutdown + } + + if (_acceptLoopTask != null) + { + try + { + await _acceptLoopTask; + } + catch (OperationCanceledException) + { + // expected on shutdown + } + } + + _loopCts?.Dispose(); + _loopCts = null; + _listener = null; + _acceptLoopTask = null; + } + + private async Task AcceptLoopAsync(CancellationToken cancellationToken) + { + while (!cancellationToken.IsCancellationRequested) + { + TcpClient client = null; + try + { + client = await _listener.AcceptTcpClientAsync(cancellationToken); + client.NoDelay = true; + await HandleClientAsync(client, cancellationToken); + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + break; + } + catch (ObjectDisposedException) when (cancellationToken.IsCancellationRequested) + { + break; + } + catch (Exception ex) + { + client?.Dispose(); + _trace.Warning($"WebSocket DAP bridge connection error ({ex.GetType().Name})"); + _trace.Error(ex); + } + } + + _trace.Info("WebSocket DAP bridge accept loop ended"); + } + + private async Task HandleClientAsync(TcpClient incomingClient, CancellationToken cancellationToken) + { + using (incomingClient) + using (var incomingStream = incomingClient.GetStream()) + { + _trace.Info($"WebSocket DAP bridge accepted client {incomingClient.Client.RemoteEndPoint}"); + + var webSocket = await AcceptWebSocketAsync(incomingStream, cancellationToken); + if (webSocket == null) + { + return; + } + + using (webSocket) + using (var dapClient = new TcpClient()) + { + dapClient.NoDelay = true; + await dapClient.ConnectAsync(IPAddress.Loopback, _targetPort, cancellationToken); + + using (var dapStream = dapClient.GetStream()) + using (var sessionCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken)) + { + var proxyToken = sessionCts.Token; + var wsToTcpTask = PumpWebSocketToTcpAsync(webSocket, dapStream, proxyToken); + var tcpToWsTask = PumpTcpToWebSocketAsync(dapStream, webSocket, proxyToken); + + var completedTask = await Task.WhenAny(wsToTcpTask, tcpToWsTask); + sessionCts.Cancel(); + + try + { + await completedTask; + } + catch (OperationCanceledException) when (proxyToken.IsCancellationRequested) + { + // expected during shutdown + } + + try + { + await Task.WhenAll(wsToTcpTask, tcpToWsTask); + } + catch (OperationCanceledException) when (proxyToken.IsCancellationRequested) + { + // expected during shutdown + } + catch (IOException) + { + // peer disconnected while unwinding + } + catch (WebSocketException) + { + // peer disconnected while unwinding + } + } + + await CloseWebSocketAsync(webSocket); + } + } + } + + private async Task AcceptWebSocketAsync(NetworkStream stream, CancellationToken cancellationToken) + { + var initialBytes = await ReadInitialBytesAsync(stream, cancellationToken); + if (initialBytes == null || initialBytes.Length == 0) + { + return null; + } + + var prefixKind = ClassifyIncomingStreamPrefix(initialBytes); + if (prefixKind == IncomingStreamPrefixKind.PreUpgradedWebSocket) + { + _trace.Info($"Treating incoming tunnel stream as an already-upgraded websocket connection ({DescribeInitialBytes(initialBytes)})"); + return WebSocket.CreateFromStream( + new ReplayableStream(stream, initialBytes), + isServer: true, + subProtocol: null, + keepAliveInterval: _keepAliveInterval); + } + + if (prefixKind != IncomingStreamPrefixKind.HttpWebSocketUpgrade) + { + _trace.Warning($"Unsupported debugger tunnel stream prefix ({prefixKind}): {DescribeInitialBytes(initialBytes)}"); + return null; + } + + var handshakeStream = new ReplayableStream(stream, initialBytes); + var requestLine = await ReadLineAsync(handshakeStream, cancellationToken); + if (string.IsNullOrEmpty(requestLine)) + { + return null; + } + + var headers = new Dictionary(StringComparer.OrdinalIgnoreCase); + while (true) + { + var line = await ReadLineAsync(handshakeStream, cancellationToken); + if (line == null) + { + return null; + } + + if (line.Length == 0) + { + break; + } + + var separatorIndex = line.IndexOf(':'); + if (separatorIndex <= 0) + { + await WriteHttpErrorAsync(stream, HttpStatusCode.BadRequest, "Invalid HTTP header.", cancellationToken); + return null; + } + + var headerName = line.Substring(0, separatorIndex).Trim(); + var headerValue = line.Substring(separatorIndex + 1).Trim(); + + if (headers.TryGetValue(headerName, out var existingValue)) + { + headers[headerName] = $"{existingValue}, {headerValue}"; + } + else + { + headers[headerName] = headerValue; + } + } + + if (!IsValidWebSocketRequest(requestLine, headers)) + { + _trace.Info($"Rejected non-websocket request: {requestLine}"); + await WriteHttpErrorAsync(stream, HttpStatusCode.BadRequest, "Expected a websocket upgrade request.", cancellationToken); + return null; + } + + var webSocketKey = headers["Sec-WebSocket-Key"]; + var acceptValue = ComputeAcceptValue(webSocketKey); + var responseBytes = Encoding.ASCII.GetBytes( + "HTTP/1.1 101 Switching Protocols\r\n" + + "Connection: Upgrade\r\n" + + "Upgrade: websocket\r\n" + + $"Sec-WebSocket-Accept: {acceptValue}\r\n" + + "\r\n"); + + await handshakeStream.WriteAsync(responseBytes, 0, responseBytes.Length, cancellationToken); + await handshakeStream.FlushAsync(cancellationToken); + + _trace.Info("WebSocket DAP bridge completed websocket handshake"); + return WebSocket.CreateFromStream(handshakeStream, isServer: true, subProtocol: null, keepAliveInterval: _keepAliveInterval); + } + + private async Task PumpWebSocketToTcpAsync(WebSocket source, NetworkStream destination, CancellationToken cancellationToken) + { + var buffer = new byte[_bufferSize]; + + while (!cancellationToken.IsCancellationRequested) + { + using (var messageStream = new MemoryStream()) + { + WebSocketReceiveResult result; + do + { + result = await source.ReceiveAsync(new ArraySegment(buffer), cancellationToken); + if (result.MessageType == WebSocketMessageType.Close) + { + return; + } + + if (result.MessageType != WebSocketMessageType.Binary && + result.MessageType != WebSocketMessageType.Text) + { + break; + } + + if (result.Count > 0) + { + if (messageStream.Length + result.Count > MaxInboundMessageSize) + { + _trace.Warning($"WebSocket message exceeds maximum allowed size of {MaxInboundMessageSize} bytes, closing connection"); + await source.CloseAsync( + WebSocketCloseStatus.MessageTooBig, + $"Message exceeds {MaxInboundMessageSize} byte limit", + CancellationToken.None); + return; + } + + messageStream.Write(buffer, 0, result.Count); + } + } + while (!result.EndOfMessage); + + if (result.MessageType != WebSocketMessageType.Binary && + result.MessageType != WebSocketMessageType.Text) + { + continue; + } + + var messageBytes = messageStream.ToArray(); + if (messageBytes.Length == 0) + { + continue; + } + + var contentLengthHeader = Encoding.ASCII.GetBytes($"Content-Length: {messageBytes.Length}\r\n\r\n"); + await destination.WriteAsync(contentLengthHeader, 0, contentLengthHeader.Length, cancellationToken); + await destination.WriteAsync(messageBytes, 0, messageBytes.Length, cancellationToken); + await destination.FlushAsync(cancellationToken); + } + } + } + + private static async Task PumpTcpToWebSocketAsync(NetworkStream source, WebSocket destination, CancellationToken cancellationToken) + { + var readBuffer = new byte[_bufferSize]; + var dapBuffer = new List(); + + while (!cancellationToken.IsCancellationRequested) + { + var bytesRead = await source.ReadAsync(readBuffer, 0, readBuffer.Length, cancellationToken); + if (bytesRead == 0) + { + break; + } + + for (int i = 0; i < bytesRead; i++) + { + dapBuffer.Add(readBuffer[i]); + } + + while (TryParseDapMessage(dapBuffer, out var messageBody)) + { + await destination.SendAsync( + new ArraySegment(messageBody), + WebSocketMessageType.Text, + endOfMessage: true, + cancellationToken); + } + } + } + + private static bool TryParseDapMessage(List buffer, out byte[] messageBody) + { + messageBody = null; + + var headerEndMarker = new byte[] { (byte)'\r', (byte)'\n', (byte)'\r', (byte)'\n' }; + var headerEndIndex = FindSequence(buffer, headerEndMarker); + if (headerEndIndex == -1) + { + return false; + } + + var headerBytes = buffer.GetRange(0, headerEndIndex).ToArray(); + var headerText = Encoding.ASCII.GetString(headerBytes); + + var contentLength = -1; + foreach (var line in headerText.Split(new[] { "\r\n" }, StringSplitOptions.RemoveEmptyEntries)) + { + if (line.StartsWith("Content-Length:", StringComparison.OrdinalIgnoreCase)) + { + var valueStart = line.IndexOf(':') + 1; + if (int.TryParse(line.Substring(valueStart).Trim(), out var parsedLength)) + { + contentLength = parsedLength; + break; + } + } + } + + if (contentLength < 0) + { + buffer.RemoveRange(0, headerEndIndex + 4); + return false; + } + + var messageStart = headerEndIndex + 4; + var messageEnd = messageStart + contentLength; + + if (buffer.Count < messageEnd) + { + return false; + } + + messageBody = buffer.GetRange(messageStart, contentLength).ToArray(); + buffer.RemoveRange(0, messageEnd); + return true; + } + + private static int FindSequence(List buffer, byte[] sequence) + { + if (buffer.Count < sequence.Length) + { + return -1; + } + + for (int i = 0; i <= buffer.Count - sequence.Length; i++) + { + var match = true; + for (int j = 0; j < sequence.Length; j++) + { + if (buffer[i + j] != sequence[j]) + { + match = false; + break; + } + } + + if (match) + { + return i; + } + } + + return -1; + } + + private static bool IsValidWebSocketRequest(string requestLine, IDictionary headers) + { + if (string.IsNullOrWhiteSpace(requestLine)) + { + return false; + } + + var requestLineParts = requestLine.Split(' '); + if (requestLineParts.Length < 3 || !string.Equals(requestLineParts[0], "GET", StringComparison.OrdinalIgnoreCase)) + { + return false; + } + + return HeaderContainsToken(headers, "Connection", "Upgrade") && + HeaderContainsToken(headers, "Upgrade", "websocket") && + headers.ContainsKey("Sec-WebSocket-Key"); + } + + private static bool HeaderContainsToken(IDictionary headers, string headerName, string expectedToken) + { + if (!headers.TryGetValue(headerName, out var headerValue) || string.IsNullOrWhiteSpace(headerValue)) + { + return false; + } + + return headerValue + .Split(',') + .Select(token => token.Trim()) + .Any(token => string.Equals(token, expectedToken, StringComparison.OrdinalIgnoreCase)); + } + + private static string ComputeAcceptValue(string webSocketKey) + { + using (var sha1 = SHA1.Create()) + { + var inputBytes = Encoding.ASCII.GetBytes($"{webSocketKey}{_webSocketAcceptMagic}"); + var hashBytes = sha1.ComputeHash(inputBytes); + return Convert.ToBase64String(hashBytes); + } + } + + private static async Task ReadLineAsync(Stream stream, CancellationToken cancellationToken) + { + var lineBuilder = new StringBuilder(); + var buffer = new byte[1]; + var previousWasCarriageReturn = false; + + while (true) + { + var bytesRead = await stream.ReadAsync(buffer, 0, 1, cancellationToken); + if (bytesRead == 0) + { + return lineBuilder.Length > 0 ? lineBuilder.ToString() : null; + } + + var currentChar = (char)buffer[0]; + if (currentChar == '\n' && previousWasCarriageReturn) + { + if (lineBuilder.Length > 0 && lineBuilder[lineBuilder.Length - 1] == '\r') + { + lineBuilder.Length--; + } + + return lineBuilder.ToString(); + } + + previousWasCarriageReturn = currentChar == '\r'; + lineBuilder.Append(currentChar); + + if (lineBuilder.Length > _maxHeaderLineLength) + { + throw new InvalidDataException($"HTTP header line exceeds maximum length of {_maxHeaderLineLength}"); + } + } + } + + private static async Task ReadInitialBytesAsync(NetworkStream stream, CancellationToken cancellationToken) + { + var buffer = new byte[4]; + var totalRead = 0; + + while (totalRead < buffer.Length) + { + var bytesRead = await stream.ReadAsync(buffer, totalRead, buffer.Length - totalRead, cancellationToken); + if (bytesRead == 0) + { + break; + } + + totalRead += bytesRead; + } + + if (totalRead == 0) + { + return Array.Empty(); + } + + if (totalRead == buffer.Length) + { + return buffer; + } + + var initialBytes = new byte[totalRead]; + Array.Copy(buffer, initialBytes, totalRead); + return initialBytes; + } + + internal static IncomingStreamPrefixKind ClassifyIncomingStreamPrefix(byte[] initialBytes) + { + if (LooksLikeHttpUpgrade(initialBytes)) + { + return IncomingStreamPrefixKind.HttpWebSocketUpgrade; + } + + if (LooksLikeHttp2Preface(initialBytes)) + { + return IncomingStreamPrefixKind.Http2Preface; + } + + if (LooksLikeTlsClientHello(initialBytes)) + { + return IncomingStreamPrefixKind.TlsClientHello; + } + + if (LooksLikeWebSocketFramePrefix(initialBytes, requireReservedBitsClear: false)) + { + return HasReservedBitsSet(initialBytes[0]) + ? IncomingStreamPrefixKind.WebSocketReservedBits + : IncomingStreamPrefixKind.PreUpgradedWebSocket; + } + + return IncomingStreamPrefixKind.Unknown; + } + + internal static string DescribeInitialBytes(byte[] initialBytes) + { + if (initialBytes == null || initialBytes.Length == 0) + { + return "no bytes read"; + } + + var hex = BitConverter.ToString(initialBytes); + var ascii = new string(initialBytes.Select(value => value >= 32 && value <= 126 ? (char)value : '.').ToArray()); + return $"hex={hex}, ascii=\"{ascii}\""; + } + + private static bool LooksLikeHttpUpgrade(byte[] initialBytes) + { + if (initialBytes == null || initialBytes.Length < 4) + { + return false; + } + + return initialBytes[0] == (byte)'G' && + initialBytes[1] == (byte)'E' && + initialBytes[2] == (byte)'T' && + initialBytes[3] == (byte)' '; + } + + private static bool LooksLikeHttp2Preface(byte[] initialBytes) + { + if (initialBytes == null || initialBytes.Length < 4) + { + return false; + } + + return initialBytes[0] == (byte)'P' && + initialBytes[1] == (byte)'R' && + initialBytes[2] == (byte)'I' && + initialBytes[3] == (byte)' '; + } + + private static bool LooksLikeTlsClientHello(byte[] initialBytes) + { + if (initialBytes == null || initialBytes.Length < 3) + { + return false; + } + + return initialBytes[0] == 0x16 && + initialBytes[1] == 0x03 && + initialBytes[2] >= 0x00 && + initialBytes[2] <= 0x04; + } + + private static bool LooksLikeWebSocketFramePrefix(byte[] initialBytes, bool requireReservedBitsClear) + { + if (initialBytes == null || initialBytes.Length < 2) + { + return false; + } + + var firstByte = initialBytes[0]; + var secondByte = initialBytes[1]; + var opcode = firstByte & 0x0F; + var isMasked = (secondByte & 0x80) != 0; + + if (!isMasked || !IsSupportedWebSocketOpcode(opcode)) + { + return false; + } + + return !requireReservedBitsClear || !HasReservedBitsSet(firstByte); + } + + private static bool HasReservedBitsSet(byte firstByte) + { + return (firstByte & 0x70) != 0; + } + + private static bool IsSupportedWebSocketOpcode(int opcode) + { + switch (opcode) + { + case 0x0: + case 0x1: + case 0x2: + case 0x8: + case 0x9: + case 0xA: + return true; + default: + return false; + } + } + + private static async Task WriteHttpErrorAsync( + NetworkStream stream, + HttpStatusCode statusCode, + string message, + CancellationToken cancellationToken) + { + var bodyBytes = Encoding.UTF8.GetBytes(message); + var responseBytes = Encoding.ASCII.GetBytes( + $"HTTP/1.1 {(int)statusCode} {statusCode}\r\n" + + "Connection: close\r\n" + + "Content-Type: text/plain; charset=utf-8\r\n" + + $"Content-Length: {bodyBytes.Length}\r\n" + + "Sec-WebSocket-Version: 13\r\n" + + "\r\n"); + + await stream.WriteAsync(responseBytes, 0, responseBytes.Length, cancellationToken); + await stream.WriteAsync(bodyBytes, 0, bodyBytes.Length, cancellationToken); + await stream.FlushAsync(cancellationToken); + } + + private static async Task CloseWebSocketAsync(WebSocket webSocket) + { + if (webSocket == null) + { + return; + } + + if (webSocket.State != WebSocketState.Open && + webSocket.State != WebSocketState.CloseReceived) + { + return; + } + + try + { + using var cts = new CancellationTokenSource(_closeTimeout); + await webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, cts.Token); + } + catch (OperationCanceledException) + { + // Graceful close timed out, abort the connection. + webSocket.Abort(); + } + catch (WebSocketException) + { + // Peer already disconnected. + } + } + + private sealed class ReplayableStream : Stream + { + private readonly Stream _innerStream; + private readonly byte[] _prefixBytes; + private int _prefixOffset; + + public ReplayableStream(Stream innerStream, byte[] prefixBytes) + { + _innerStream = innerStream ?? throw new ArgumentNullException(nameof(innerStream)); + _prefixBytes = prefixBytes ?? Array.Empty(); + } + + public override bool CanRead => _innerStream.CanRead; + public override bool CanSeek => false; + public override bool CanWrite => _innerStream.CanWrite; + public override long Length => throw new NotSupportedException(); + + public override long Position + { + get => throw new NotSupportedException(); + set => throw new NotSupportedException(); + } + + public override void Flush() => _innerStream.Flush(); + + public override Task FlushAsync(CancellationToken cancellationToken) => _innerStream.FlushAsync(cancellationToken); + + public override int Read(byte[] buffer, int offset, int count) + { + if (TryReadPrefix(buffer, offset, count, out var bytesRead)) + { + return bytesRead; + } + + return _innerStream.Read(buffer, offset, count); + } + + public override async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + if (TryReadPrefix(buffer, offset, count, out var bytesRead)) + { + return bytesRead; + } + + return await _innerStream.ReadAsync(buffer, offset, count, cancellationToken); + } + + public override async ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default) + { + if (_prefixOffset < _prefixBytes.Length) + { + var bytesToCopy = Math.Min(buffer.Length, _prefixBytes.Length - _prefixOffset); + new ReadOnlySpan(_prefixBytes, _prefixOffset, bytesToCopy).CopyTo(buffer.Span); + _prefixOffset += bytesToCopy; + return bytesToCopy; + } + + return await _innerStream.ReadAsync(buffer, cancellationToken); + } + + public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException(); + + public override void SetLength(long value) => throw new NotSupportedException(); + + public override void Write(byte[] buffer, int offset, int count) => _innerStream.Write(buffer, offset, count); + + public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) => + _innerStream.WriteAsync(buffer, offset, count, cancellationToken); + + public override ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) => + _innerStream.WriteAsync(buffer, cancellationToken); + + private bool TryReadPrefix(byte[] buffer, int offset, int count, out int bytesRead) + { + if (_prefixOffset >= _prefixBytes.Length) + { + bytesRead = 0; + return false; + } + + bytesRead = Math.Min(count, _prefixBytes.Length - _prefixOffset); + Array.Copy(_prefixBytes, _prefixOffset, buffer, offset, bytesRead); + _prefixOffset += bytesRead; + return true; + } + } + } +} diff --git a/src/Test/L0/Worker/DapDebuggerL0.cs b/src/Test/L0/Worker/DapDebuggerL0.cs index 3b7487dd934..6f4bdadd47e 100644 --- a/src/Test/L0/Worker/DapDebuggerL0.cs +++ b/src/Test/L0/Worker/DapDebuggerL0.cs @@ -1,7 +1,8 @@ -using System; +using System; using System.IO; using System.Net; using System.Net.Sockets; +using System.Net.WebSockets; using System.Runtime.CompilerServices; using System.Text; using System.Threading; @@ -20,12 +21,13 @@ public sealed class DapDebuggerL0 private const string TunnelConnectTimeoutVariable = "ACTIONS_RUNNER_DAP_TUNNEL_CONNECT_TIMEOUT_SECONDS"; private DapDebugger _debugger; - private TestHostContext CreateTestContext([CallerMemberName] string testName = "") + private TestHostContext CreateTestContext(bool enableWebSocketBridge = false, [CallerMemberName] string testName = "") { var hc = new TestHostContext(this, testName); _debugger = new DapDebugger(); _debugger.Initialize(hc); _debugger.SkipTunnelRelay = true; + _debugger.SkipWebSocketBridge = !enableWebSocketBridge; return hc; } @@ -71,6 +73,13 @@ private static async Task ConnectClientAsync(int port) return client; } + private static async Task ConnectWebSocketClientAsync(int port) + { + var client = new ClientWebSocket(); + await client.ConnectAsync(new Uri($"ws://127.0.0.1:{port}/"), CancellationToken.None); + return client; + } + private static async Task SendRequestAsync(NetworkStream stream, Request request) { var json = JsonConvert.SerializeObject(request); @@ -83,6 +92,14 @@ private static async Task SendRequestAsync(NetworkStream stream, Request request await stream.FlushAsync(); } + private static async Task SendRequestAsync(WebSocket client, Request request) + { + var json = JsonConvert.SerializeObject(request); + var body = Encoding.UTF8.GetBytes(json); + + await client.SendAsync(new ArraySegment(body), WebSocketMessageType.Text, endOfMessage: true, CancellationToken.None); + } + /// /// Reads a single DAP-framed message from a stream with a timeout. /// Parses the Content-Length header, reads exactly that many bytes, @@ -141,6 +158,52 @@ private static async Task ReadDapMessageAsync(NetworkStream stream, Time return Encoding.UTF8.GetString(body); } + private static async Task ReadWebSocketDataUntilAsync(WebSocket client, TimeSpan timeout, params string[] expectedFragments) + { + using var cts = new CancellationTokenSource(timeout); + var buffer = new byte[4096]; + var allMessages = new StringBuilder(); + + while (true) + { + using var messageStream = new MemoryStream(); + WebSocketReceiveResult result; + do + { + result = await client.ReceiveAsync(new ArraySegment(buffer), cts.Token); + if (result.MessageType == WebSocketMessageType.Close) + { + throw new EndOfStreamException("WebSocket closed before expected DAP messages were received."); + } + + if (result.Count > 0) + { + messageStream.Write(buffer, 0, result.Count); + } + } + while (!result.EndOfMessage); + + var messageText = Encoding.UTF8.GetString(messageStream.ToArray()); + allMessages.Append(messageText); + + var text = allMessages.ToString(); + var containsAllFragments = true; + foreach (var fragment in expectedFragments) + { + if (!text.Contains(fragment, StringComparison.Ordinal)) + { + containsAllFragments = false; + break; + } + } + + if (containsAllFragments) + { + return text; + } + } + } + private static Mock CreateJobContextWithTunnel(CancellationToken cancellationToken, ushort port, string jobName = null) { var tunnel = new GitHub.DistributedTask.Pipelines.DebuggerTunnelInfo @@ -208,6 +271,82 @@ public async Task StartAsyncUsesPortFromTunnelConfig() } } + [Fact] + [Trait("Level", "L0")] + [Trait("Category", "Worker")] + public async Task StartAsyncWithWebSocketBridgeAcceptsInitializeOverWebSocket() + { + using (CreateTestContext(enableWebSocketBridge: true)) + { + var port = GetFreePort(); + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var jobContext = CreateJobContextWithTunnel(cts.Token, port); + await _debugger.StartAsync(jobContext.Object); + + Assert.NotEqual(0, _debugger.InternalDapPort); + Assert.NotEqual(port, _debugger.InternalDapPort); + + using var client = await ConnectWebSocketClientAsync(port); + await SendRequestAsync(client, new Request + { + Seq = 1, + Type = "request", + Command = "initialize" + }); + + var response = await ReadWebSocketDataUntilAsync( + client, + TimeSpan.FromSeconds(5), + "\"type\":\"response\"", + "\"command\":\"initialize\"", + "\"event\":\"initialized\""); + + Assert.Contains("\"success\":true", response); + await _debugger.StopAsync(); + } + } + + [Fact] + [Trait("Level", "L0")] + [Trait("Category", "Worker")] + public async Task StartAsyncWithWebSocketBridgeAcceptsPreUpgradedWebSocketStream() + { + using (CreateTestContext(enableWebSocketBridge: true)) + { + var port = GetFreePort(); + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var jobContext = CreateJobContextWithTunnel(cts.Token, port); + await _debugger.StartAsync(jobContext.Object); + + Assert.NotEqual(0, _debugger.InternalDapPort); + Assert.NotEqual(port, _debugger.InternalDapPort); + + using var tcpClient = await ConnectClientAsync(port); + using var webSocket = WebSocket.CreateFromStream( + tcpClient.GetStream(), + isServer: false, + subProtocol: null, + keepAliveInterval: TimeSpan.FromSeconds(30)); + + await SendRequestAsync(webSocket, new Request + { + Seq = 1, + Type = "request", + Command = "initialize" + }); + + var response = await ReadWebSocketDataUntilAsync( + webSocket, + TimeSpan.FromSeconds(5), + "\"type\":\"response\"", + "\"command\":\"initialize\"", + "\"event\":\"initialized\""); + + Assert.Contains("\"success\":true", response); + await _debugger.StopAsync(); + } + } + [Fact] [Trait("Level", "L0")] [Trait("Category", "Worker")] diff --git a/src/Test/L0/Worker/WebSocketDapBridgeL0.cs b/src/Test/L0/Worker/WebSocketDapBridgeL0.cs new file mode 100644 index 00000000000..f57ebcbf46f --- /dev/null +++ b/src/Test/L0/Worker/WebSocketDapBridgeL0.cs @@ -0,0 +1,237 @@ +using System; +using System.IO; +using System.Net; +using System.Net.Sockets; +using System.Net.WebSockets; +using System.Runtime.CompilerServices; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using GitHub.Runner.Common; +using GitHub.Runner.Worker.Dap; +using Xunit; + +namespace GitHub.Runner.Common.Tests.Worker +{ + public sealed class WebSocketDapBridgeL0 + { + private TestHostContext CreateTestContext([CallerMemberName] string testName = "") + { + return new TestHostContext(this, testName); + } + + private static ushort GetFreePort() + { + using var listener = new TcpListener(IPAddress.Loopback, 0); + listener.Start(); + return (ushort)((IPEndPoint)listener.LocalEndpoint).Port; + } + + private static async Task ReadWebSocketMessageAsync(ClientWebSocket client, TimeSpan timeout) + { + using var cts = new CancellationTokenSource(timeout); + using var buffer = new MemoryStream(); + var receiveBuffer = new byte[1024]; + + while (true) + { + var result = await client.ReceiveAsync(new ArraySegment(receiveBuffer), cts.Token); + if (result.MessageType == WebSocketMessageType.Close) + { + throw new EndOfStreamException("WebSocket closed unexpectedly."); + } + + if (result.Count > 0) + { + buffer.Write(receiveBuffer, 0, result.Count); + } + + if (result.EndOfMessage) + { + return buffer.ToArray(); + } + } + } + + [Fact] + [Trait("Level", "L0")] + [Trait("Category", "Worker")] + public async Task BridgeForwardsWebSocketFramesToTcpAndBack() + { + using var hc = CreateTestContext(); + using var targetListener = new TcpListener(IPAddress.Loopback, 0); + targetListener.Start(); + + var targetPort = ((IPEndPoint)targetListener.LocalEndpoint).Port; + var bridgePort = GetFreePort(); + + await using var bridge = new WebSocketDapBridge(hc.GetTrace("DapWebSocketBridge"), bridgePort, targetPort); + bridge.Start(); + + var echoTask = Task.Run(async () => + { + using var targetClient = await targetListener.AcceptTcpClientAsync(); + using var stream = targetClient.GetStream(); + + var headerBuilder = new StringBuilder(); + var buffer = new byte[1]; + var contentLength = -1; + + while (true) + { + var bytesRead = await stream.ReadAsync(buffer, 0, 1); + if (bytesRead == 0) break; + + headerBuilder.Append((char)buffer[0]); + var headers = headerBuilder.ToString(); + if (headers.EndsWith("\r\n\r\n", StringComparison.Ordinal)) + { + foreach (var line in headers.Split(new[] { "\r\n" }, StringSplitOptions.RemoveEmptyEntries)) + { + if (line.StartsWith("Content-Length: ", StringComparison.OrdinalIgnoreCase)) + { + contentLength = int.Parse(line.Substring("Content-Length: ".Length).Trim()); + } + } + break; + } + } + + var body = new byte[contentLength]; + var totalRead = 0; + while (totalRead < contentLength) + { + var bytesRead = await stream.ReadAsync(body, totalRead, contentLength - totalRead); + if (bytesRead == 0) break; + totalRead += bytesRead; + } + + var header = $"Content-Length: {body.Length}\r\n\r\n"; + var headerBytes = Encoding.ASCII.GetBytes(header); + await stream.WriteAsync(headerBytes, 0, headerBytes.Length); + await stream.WriteAsync(body, 0, body.Length); + await stream.FlushAsync(); + }); + + using var client = new ClientWebSocket(); + await client.ConnectAsync(new Uri($"ws://127.0.0.1:{bridgePort}/"), CancellationToken.None); + + var dapMessage = "{\"type\":\"request\",\"seq\":1,\"command\":\"initialize\"}"; + var payload = Encoding.UTF8.GetBytes(dapMessage); + await client.SendAsync(new ArraySegment(payload), WebSocketMessageType.Text, endOfMessage: true, CancellationToken.None); + + var echoed = await ReadWebSocketMessageAsync(client, TimeSpan.FromSeconds(5)); + Assert.Equal(payload, echoed); + + await echoTask; + } + + [Fact] + [Trait("Level", "L0")] + [Trait("Category", "Worker")] + public async Task BridgeRejectsNonWebSocketRequests() + { + using var hc = CreateTestContext(); + var bridgePort = GetFreePort(); + + await using var bridge = new WebSocketDapBridge(hc.GetTrace("DapWebSocketBridge"), bridgePort, GetFreePort()); + bridge.Start(); + + using var client = new TcpClient(); + await client.ConnectAsync(IPAddress.Loopback, bridgePort); + using var stream = client.GetStream(); + + var request = Encoding.ASCII.GetBytes( + "GET / HTTP/1.1\r\n" + + "Host: localhost\r\n" + + "\r\n"); + await stream.WriteAsync(request, 0, request.Length); + await stream.FlushAsync(); + + var responseBuffer = new byte[1024]; + var bytesRead = await stream.ReadAsync(responseBuffer, 0, responseBuffer.Length); + var response = Encoding.ASCII.GetString(responseBuffer, 0, bytesRead); + + Assert.Contains("400 BadRequest", response); + Assert.Contains("Expected a websocket upgrade request.", response); + } + + [Theory] + [Trait("Level", "L0")] + [Trait("Category", "Worker")] + [InlineData(new byte[] { (byte)'G', (byte)'E', (byte)'T', (byte)' ' }, 1)] + [InlineData(new byte[] { 0x81, 0x85, 0x00, 0x00 }, 2)] + [InlineData(new byte[] { 0xC1, 0x85, 0x00, 0x00 }, 3)] + [InlineData(new byte[] { (byte)'P', (byte)'R', (byte)'I', (byte)' ' }, 4)] + [InlineData(new byte[] { 0x16, 0x03, 0x03, 0x01 }, 5)] + [InlineData(new byte[] { (byte)'B', (byte)'A', (byte)'D', (byte)'!' }, 0)] + public void ClassifyIncomingStreamPrefixDetectsExpectedProtocols(byte[] initialBytes, int expectedKind) + { + var actualKind = WebSocketDapBridge.ClassifyIncomingStreamPrefix(initialBytes); + Assert.Equal((WebSocketDapBridge.IncomingStreamPrefixKind)expectedKind, actualKind); + } + + [Fact] + [Trait("Level", "L0")] + [Trait("Category", "Worker")] + public async Task BridgeRejectsOversizedWebSocketMessage() + { + using var hc = CreateTestContext(); + using var targetListener = new TcpListener(IPAddress.Loopback, 0); + targetListener.Start(); + + var targetPort = ((IPEndPoint)targetListener.LocalEndpoint).Port; + var bridgePort = GetFreePort(); + + await using var bridge = new WebSocketDapBridge(hc.GetTrace("DapWebSocketBridge"), bridgePort, targetPort); + bridge.MaxInboundMessageSize = 64; // artificially small limit for testing + bridge.Start(); + + using var client = new ClientWebSocket(); + await client.ConnectAsync(new Uri($"ws://127.0.0.1:{bridgePort}/"), CancellationToken.None); + + // Send a message that exceeds the 64-byte limit + var oversizedPayload = new byte[128]; + Array.Fill(oversizedPayload, (byte)'X'); + await client.SendAsync( + new ArraySegment(oversizedPayload), + WebSocketMessageType.Text, + endOfMessage: true, + CancellationToken.None); + + // The bridge should close the connection with MessageTooBig + var receiveBuffer = new byte[256]; + var result = await client.ReceiveAsync( + new ArraySegment(receiveBuffer), + new CancellationTokenSource(TimeSpan.FromSeconds(5)).Token); + + Assert.Equal(WebSocketMessageType.Close, result.MessageType); + Assert.Equal(WebSocketCloseStatus.MessageTooBig, client.CloseStatus); + } + + [Fact] + [Trait("Level", "L0")] + [Trait("Category", "Worker")] + public async Task BridgeDisposeCompletesWhenPeerDoesNotCloseGracefully() + { + using var hc = CreateTestContext(); + using var targetListener = new TcpListener(IPAddress.Loopback, 0); + targetListener.Start(); + + var targetPort = ((IPEndPoint)targetListener.LocalEndpoint).Port; + var bridgePort = GetFreePort(); + + var bridge = new WebSocketDapBridge(hc.GetTrace("DapWebSocketBridge"), bridgePort, targetPort); + bridge.Start(); + + // Connect a raw TCP client but never perform WebSocket close handshake + using var rawClient = new TcpClient(); + await rawClient.ConnectAsync(IPAddress.Loopback, bridgePort); + + // Dispose should complete within a bounded time, not hang + var disposeTask = bridge.DisposeAsync().AsTask(); + var completed = await Task.WhenAny(disposeTask, Task.Delay(TimeSpan.FromSeconds(15))); + Assert.True(completed == disposeTask, "Bridge dispose should complete within the timeout, not hang on a non-cooperative peer"); + } + } +} From ac658858544a12880cd6246dc15f0d91f60e3f5f Mon Sep 17 00:00:00 2001 From: Francesco Renzi Date: Wed, 8 Apr 2026 09:41:31 -0700 Subject: [PATCH 02/12] fix windows --- src/Test/L0/Worker/WebSocketDapBridgeL0.cs | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/Test/L0/Worker/WebSocketDapBridgeL0.cs b/src/Test/L0/Worker/WebSocketDapBridgeL0.cs index f57ebcbf46f..0e49c186512 100644 --- a/src/Test/L0/Worker/WebSocketDapBridgeL0.cs +++ b/src/Test/L0/Worker/WebSocketDapBridgeL0.cs @@ -148,9 +148,17 @@ public async Task BridgeRejectsNonWebSocketRequests() await stream.WriteAsync(request, 0, request.Length); await stream.FlushAsync(); + // Read until the server closes the connection (Connection: close). + // A single ReadAsync may return a partial response on some platforms. + using var ms = new MemoryStream(); var responseBuffer = new byte[1024]; - var bytesRead = await stream.ReadAsync(responseBuffer, 0, responseBuffer.Length); - var response = Encoding.ASCII.GetString(responseBuffer, 0, bytesRead); + int bytesRead; + while ((bytesRead = await stream.ReadAsync(responseBuffer, 0, responseBuffer.Length)) > 0) + { + ms.Write(responseBuffer, 0, bytesRead); + } + + var response = Encoding.ASCII.GetString(ms.ToArray()); Assert.Contains("400 BadRequest", response); Assert.Contains("Expected a websocket upgrade request.", response); From b5690868109b0d850224d93c9d469c509052d833 Mon Sep 17 00:00:00 2001 From: Francesco Renzi Date: Thu, 9 Apr 2026 17:58:58 +0100 Subject: [PATCH 03/12] Update src/Runner.Worker/Dap/WebSocketDapBridge.cs Co-authored-by: Tingluo Huang --- src/Runner.Worker/Dap/WebSocketDapBridge.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Runner.Worker/Dap/WebSocketDapBridge.cs b/src/Runner.Worker/Dap/WebSocketDapBridge.cs index 2596c184028..1cd5d1dd27b 100644 --- a/src/Runner.Worker/Dap/WebSocketDapBridge.cs +++ b/src/Runner.Worker/Dap/WebSocketDapBridge.cs @@ -125,7 +125,7 @@ private async Task AcceptLoopAsync(CancellationToken cancellationToken) catch (Exception ex) { client?.Dispose(); - _trace.Warning($"WebSocket DAP bridge connection error ({ex.GetType().Name})"); + _trace.Error($"WebSocket DAP bridge connection error"); _trace.Error(ex); } } From cfbedd5d95d9e2b6d9d2dcbd2f6d3fdd867ea74c Mon Sep 17 00:00:00 2001 From: Francesco Renzi Date: Thu, 9 Apr 2026 10:02:47 -0700 Subject: [PATCH 04/12] feedback --- src/Runner.Worker/Dap/WebSocketDapBridge.cs | 56 +++++++++++++-------- src/Test/L0/Worker/WebSocketDapBridgeL0.cs | 3 +- 2 files changed, 36 insertions(+), 23 deletions(-) diff --git a/src/Runner.Worker/Dap/WebSocketDapBridge.cs b/src/Runner.Worker/Dap/WebSocketDapBridge.cs index 1cd5d1dd27b..cdea9efa14e 100644 --- a/src/Runner.Worker/Dap/WebSocketDapBridge.cs +++ b/src/Runner.Worker/Dap/WebSocketDapBridge.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Collections.Generic; using System.IO; using System.Linq; @@ -30,6 +30,7 @@ internal enum IncomingStreamPrefixKind private const int _defaultMaxInboundMessageSize = 10 * 1024 * 1024; // 10 MB private static readonly TimeSpan _keepAliveInterval = TimeSpan.FromSeconds(30); private static readonly TimeSpan _closeTimeout = TimeSpan.FromSeconds(5); + private static readonly TimeSpan _handshakeTimeout = TimeSpan.FromSeconds(10); private const string _webSocketAcceptMagic = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; private readonly Tracing _trace; @@ -67,22 +68,15 @@ public void Start() public async ValueTask DisposeAsync() { - try - { - _loopCts?.Cancel(); - } - catch - { - // best effort during shutdown - } + _loopCts?.Cancel(); try { _listener?.Stop(); } - catch + catch (Exception ex) { - // best effort during shutdown + _trace.Warning($"Error stopping listener during shutdown ({ex.GetType().Name})"); } if (_acceptLoopTask != null) @@ -118,16 +112,16 @@ private async Task AcceptLoopAsync(CancellationToken cancellationToken) { break; } - catch (ObjectDisposedException) when (cancellationToken.IsCancellationRequested) - { - break; - } catch (Exception ex) { client?.Dispose(); _trace.Error($"WebSocket DAP bridge connection error"); _trace.Error(ex); } + finally + { + client?.Dispose(); + } } _trace.Info("WebSocket DAP bridge accept loop ended"); @@ -135,12 +129,24 @@ private async Task AcceptLoopAsync(CancellationToken cancellationToken) private async Task HandleClientAsync(TcpClient incomingClient, CancellationToken cancellationToken) { - using (incomingClient) using (var incomingStream = incomingClient.GetStream()) { _trace.Info($"WebSocket DAP bridge accepted client {incomingClient.Client.RemoteEndPoint}"); - var webSocket = await AcceptWebSocketAsync(incomingStream, cancellationToken); + WebSocket webSocket; + using (var handshakeCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken)) + { + handshakeCts.CancelAfter(_handshakeTimeout); + try + { + webSocket = await AcceptWebSocketAsync(incomingStream, handshakeCts.Token); + } + catch (OperationCanceledException) when (!cancellationToken.IsCancellationRequested) + { + _trace.Warning("WebSocket handshake timed out"); + return; + } + } if (webSocket == null) { return; @@ -262,11 +268,20 @@ private async Task AcceptWebSocketAsync(NetworkStream stream, Cancell if (!IsValidWebSocketRequest(requestLine, headers)) { - _trace.Info($"Rejected non-websocket request: {requestLine}"); + var method = requestLine.Split(' ')[0]; + _trace.Info($"Rejected non-websocket request (method={method})"); await WriteHttpErrorAsync(stream, HttpStatusCode.BadRequest, "Expected a websocket upgrade request.", cancellationToken); return null; } + if (!headers.TryGetValue("Sec-WebSocket-Version", out var webSocketVersion) || + !string.Equals(webSocketVersion.Trim(), "13", StringComparison.Ordinal)) + { + _trace.Warning("Rejected WebSocket request with unsupported version"); + await WriteHttpErrorAsync(stream, (HttpStatusCode)426, "Unsupported WebSocket version. Expected: 13.", cancellationToken); + return null; + } + var webSocketKey = headers["Sec-WebSocket-Key"]; var acceptValue = ComputeAcceptValue(webSocketKey); var responseBytes = Encoding.ASCII.GetBytes( @@ -356,10 +371,7 @@ private static async Task PumpTcpToWebSocketAsync(NetworkStream source, WebSocke break; } - for (int i = 0; i < bytesRead; i++) - { - dapBuffer.Add(readBuffer[i]); - } + dapBuffer.AddRange(new ArraySegment(readBuffer, 0, bytesRead)); while (TryParseDapMessage(dapBuffer, out var messageBody)) { diff --git a/src/Test/L0/Worker/WebSocketDapBridgeL0.cs b/src/Test/L0/Worker/WebSocketDapBridgeL0.cs index 0e49c186512..dc4950f06f3 100644 --- a/src/Test/L0/Worker/WebSocketDapBridgeL0.cs +++ b/src/Test/L0/Worker/WebSocketDapBridgeL0.cs @@ -209,9 +209,10 @@ await client.SendAsync( // The bridge should close the connection with MessageTooBig var receiveBuffer = new byte[256]; + using var receiveCts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); var result = await client.ReceiveAsync( new ArraySegment(receiveBuffer), - new CancellationTokenSource(TimeSpan.FromSeconds(5)).Token); + receiveCts.Token); Assert.Equal(WebSocketMessageType.Close, result.MessageType); Assert.Equal(WebSocketCloseStatus.MessageTooBig, client.CloseStatus); From 7783c982546b18723636272cad0bf0e9ff04c490 Mon Sep 17 00:00:00 2001 From: Francesco Renzi Date: Fri, 10 Apr 2026 01:57:28 -0700 Subject: [PATCH 05/12] feedback --- src/Runner.Worker/Dap/DapDebugger.cs | 13 +++--- src/Runner.Worker/Dap/WebSocketDapBridge.cs | 48 +++++++++++---------- src/Test/L0/Worker/WebSocketDapBridgeL0.cs | 47 ++++++++++++++++---- 3 files changed, 70 insertions(+), 38 deletions(-) diff --git a/src/Runner.Worker/Dap/DapDebugger.cs b/src/Runner.Worker/Dap/DapDebugger.cs index a7afa598814..69e850a02bd 100644 --- a/src/Runner.Worker/Dap/DapDebugger.cs +++ b/src/Runner.Worker/Dap/DapDebugger.cs @@ -149,10 +149,9 @@ public async Task StartAsync(IExecutionContext jobContext) else { Trace.Info($"Internal DAP debugger listening on {_listener.LocalEndpoint}"); - _webSocketBridge = new WebSocketDapBridge( - HostContext.GetTrace("DapWebSocketBridge"), - debuggerConfig.Tunnel.Port, - InternalDapPort); + _webSocketBridge = new WebSocketDapBridge(); + _webSocketBridge.Initialize(HostContext); + _webSocketBridge.Configure(debuggerConfig.Tunnel.Port, InternalDapPort); _webSocketBridge.Start(); } @@ -296,10 +295,10 @@ public async Task StopAsync() if (_webSocketBridge != null) { Trace.Info("Stopping WebSocket DAP bridge"); - var disposeTask = _webSocketBridge.DisposeAsync().AsTask(); - if (await Task.WhenAny(disposeTask, Task.Delay(5_000)) != disposeTask) + var shutdownTask = _webSocketBridge.ShutdownAsync(); + if (await Task.WhenAny(shutdownTask, Task.Delay(5_000)) != shutdownTask) { - Trace.Warning("WebSocket DAP bridge dispose timed out after 5s"); + Trace.Warning("WebSocket DAP bridge shutdown timed out after 5s"); } else { diff --git a/src/Runner.Worker/Dap/WebSocketDapBridge.cs b/src/Runner.Worker/Dap/WebSocketDapBridge.cs index cdea9efa14e..d5dfb3679a7 100644 --- a/src/Runner.Worker/Dap/WebSocketDapBridge.cs +++ b/src/Runner.Worker/Dap/WebSocketDapBridge.cs @@ -13,7 +13,7 @@ namespace GitHub.Runner.Worker.Dap { - internal sealed class WebSocketDapBridge : IAsyncDisposable + internal sealed class WebSocketDapBridge : RunnerService { internal enum IncomingStreamPrefixKind { @@ -33,26 +33,30 @@ internal enum IncomingStreamPrefixKind private static readonly TimeSpan _handshakeTimeout = TimeSpan.FromSeconds(10); private const string _webSocketAcceptMagic = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; - private readonly Tracing _trace; - private readonly int _listenPort; - private readonly int _targetPort; + private int _listenPort; + private int _targetPort; + private bool _configured; private TcpListener _listener; private CancellationTokenSource _loopCts; private Task _acceptLoopTask; - // Overridable for unit tests to avoid allocating 10 MB payloads. - internal int MaxInboundMessageSize { get; set; } = _defaultMaxInboundMessageSize; + public int MaxInboundMessageSize { get; set; } = _defaultMaxInboundMessageSize; - public WebSocketDapBridge(Tracing trace, int listenPort, int targetPort) + public void Configure(int listenPort, int targetPort) { - _trace = trace ?? throw new ArgumentNullException(nameof(trace)); _listenPort = listenPort; _targetPort = targetPort; + _configured = true; } public void Start() { + if (!_configured) + { + throw new InvalidOperationException("Configure must be called before Start."); + } + if (_listener != null) { throw new InvalidOperationException("WebSocket DAP bridge already started."); @@ -63,10 +67,10 @@ public void Start() _loopCts = new CancellationTokenSource(); _acceptLoopTask = AcceptLoopAsync(_loopCts.Token); - _trace.Info($"WebSocket DAP bridge listening on {_listener.LocalEndpoint} -> 127.0.0.1:{_targetPort}"); + Trace.Info($"WebSocket DAP bridge listening on {_listener.LocalEndpoint} -> 127.0.0.1:{_targetPort}"); } - public async ValueTask DisposeAsync() + public async Task ShutdownAsync() { _loopCts?.Cancel(); @@ -76,7 +80,7 @@ public async ValueTask DisposeAsync() } catch (Exception ex) { - _trace.Warning($"Error stopping listener during shutdown ({ex.GetType().Name})"); + Trace.Warning($"Error stopping listener during shutdown ({ex.GetType().Name})"); } if (_acceptLoopTask != null) @@ -115,8 +119,8 @@ private async Task AcceptLoopAsync(CancellationToken cancellationToken) catch (Exception ex) { client?.Dispose(); - _trace.Error($"WebSocket DAP bridge connection error"); - _trace.Error(ex); + Trace.Error($"WebSocket DAP bridge connection error"); + Trace.Error(ex); } finally { @@ -124,14 +128,14 @@ private async Task AcceptLoopAsync(CancellationToken cancellationToken) } } - _trace.Info("WebSocket DAP bridge accept loop ended"); + Trace.Info("WebSocket DAP bridge accept loop ended"); } private async Task HandleClientAsync(TcpClient incomingClient, CancellationToken cancellationToken) { using (var incomingStream = incomingClient.GetStream()) { - _trace.Info($"WebSocket DAP bridge accepted client {incomingClient.Client.RemoteEndPoint}"); + Trace.Info($"WebSocket DAP bridge accepted client {incomingClient.Client.RemoteEndPoint}"); WebSocket webSocket; using (var handshakeCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken)) @@ -143,7 +147,7 @@ private async Task HandleClientAsync(TcpClient incomingClient, CancellationToken } catch (OperationCanceledException) when (!cancellationToken.IsCancellationRequested) { - _trace.Warning("WebSocket handshake timed out"); + Trace.Warning("WebSocket handshake timed out"); return; } } @@ -211,7 +215,7 @@ private async Task AcceptWebSocketAsync(NetworkStream stream, Cancell var prefixKind = ClassifyIncomingStreamPrefix(initialBytes); if (prefixKind == IncomingStreamPrefixKind.PreUpgradedWebSocket) { - _trace.Info($"Treating incoming tunnel stream as an already-upgraded websocket connection ({DescribeInitialBytes(initialBytes)})"); + Trace.Info($"Treating incoming tunnel stream as an already-upgraded websocket connection ({DescribeInitialBytes(initialBytes)})"); return WebSocket.CreateFromStream( new ReplayableStream(stream, initialBytes), isServer: true, @@ -221,7 +225,7 @@ private async Task AcceptWebSocketAsync(NetworkStream stream, Cancell if (prefixKind != IncomingStreamPrefixKind.HttpWebSocketUpgrade) { - _trace.Warning($"Unsupported debugger tunnel stream prefix ({prefixKind}): {DescribeInitialBytes(initialBytes)}"); + Trace.Warning($"Unsupported debugger tunnel stream prefix ({prefixKind}): {DescribeInitialBytes(initialBytes)}"); return null; } @@ -269,7 +273,7 @@ private async Task AcceptWebSocketAsync(NetworkStream stream, Cancell if (!IsValidWebSocketRequest(requestLine, headers)) { var method = requestLine.Split(' ')[0]; - _trace.Info($"Rejected non-websocket request (method={method})"); + Trace.Info($"Rejected non-websocket request (method={method})"); await WriteHttpErrorAsync(stream, HttpStatusCode.BadRequest, "Expected a websocket upgrade request.", cancellationToken); return null; } @@ -277,7 +281,7 @@ private async Task AcceptWebSocketAsync(NetworkStream stream, Cancell if (!headers.TryGetValue("Sec-WebSocket-Version", out var webSocketVersion) || !string.Equals(webSocketVersion.Trim(), "13", StringComparison.Ordinal)) { - _trace.Warning("Rejected WebSocket request with unsupported version"); + Trace.Warning("Rejected WebSocket request with unsupported version"); await WriteHttpErrorAsync(stream, (HttpStatusCode)426, "Unsupported WebSocket version. Expected: 13.", cancellationToken); return null; } @@ -294,7 +298,7 @@ private async Task AcceptWebSocketAsync(NetworkStream stream, Cancell await handshakeStream.WriteAsync(responseBytes, 0, responseBytes.Length, cancellationToken); await handshakeStream.FlushAsync(cancellationToken); - _trace.Info("WebSocket DAP bridge completed websocket handshake"); + Trace.Info("WebSocket DAP bridge completed websocket handshake"); return WebSocket.CreateFromStream(handshakeStream, isServer: true, subProtocol: null, keepAliveInterval: _keepAliveInterval); } @@ -325,7 +329,7 @@ private async Task PumpWebSocketToTcpAsync(WebSocket source, NetworkStream desti { if (messageStream.Length + result.Count > MaxInboundMessageSize) { - _trace.Warning($"WebSocket message exceeds maximum allowed size of {MaxInboundMessageSize} bytes, closing connection"); + Trace.Warning($"WebSocket message exceeds maximum allowed size of {MaxInboundMessageSize} bytes, closing connection"); await source.CloseAsync( WebSocketCloseStatus.MessageTooBig, $"Message exceeds {MaxInboundMessageSize} byte limit", diff --git a/src/Test/L0/Worker/WebSocketDapBridgeL0.cs b/src/Test/L0/Worker/WebSocketDapBridgeL0.cs index dc4950f06f3..b1186d67de6 100644 --- a/src/Test/L0/Worker/WebSocketDapBridgeL0.cs +++ b/src/Test/L0/Worker/WebSocketDapBridgeL0.cs @@ -65,9 +65,13 @@ public async Task BridgeForwardsWebSocketFramesToTcpAndBack() var targetPort = ((IPEndPoint)targetListener.LocalEndpoint).Port; var bridgePort = GetFreePort(); - await using var bridge = new WebSocketDapBridge(hc.GetTrace("DapWebSocketBridge"), bridgePort, targetPort); + var bridge = new WebSocketDapBridge(); + bridge.Initialize(hc); + bridge.Configure(bridgePort, targetPort); bridge.Start(); + try + { var echoTask = Task.Run(async () => { using var targetClient = await targetListener.AcceptTcpClientAsync(); @@ -124,6 +128,11 @@ public async Task BridgeForwardsWebSocketFramesToTcpAndBack() Assert.Equal(payload, echoed); await echoTask; + } + finally + { + await bridge.ShutdownAsync(); + } } [Fact] @@ -134,9 +143,13 @@ public async Task BridgeRejectsNonWebSocketRequests() using var hc = CreateTestContext(); var bridgePort = GetFreePort(); - await using var bridge = new WebSocketDapBridge(hc.GetTrace("DapWebSocketBridge"), bridgePort, GetFreePort()); + var bridge = new WebSocketDapBridge(); + bridge.Initialize(hc); + bridge.Configure(bridgePort, GetFreePort()); bridge.Start(); + try + { using var client = new TcpClient(); await client.ConnectAsync(IPAddress.Loopback, bridgePort); using var stream = client.GetStream(); @@ -162,6 +175,11 @@ public async Task BridgeRejectsNonWebSocketRequests() Assert.Contains("400 BadRequest", response); Assert.Contains("Expected a websocket upgrade request.", response); + } + finally + { + await bridge.ShutdownAsync(); + } } [Theory] @@ -191,10 +209,14 @@ public async Task BridgeRejectsOversizedWebSocketMessage() var targetPort = ((IPEndPoint)targetListener.LocalEndpoint).Port; var bridgePort = GetFreePort(); - await using var bridge = new WebSocketDapBridge(hc.GetTrace("DapWebSocketBridge"), bridgePort, targetPort); + var bridge = new WebSocketDapBridge(); + bridge.Initialize(hc); + bridge.Configure(bridgePort, targetPort); bridge.MaxInboundMessageSize = 64; // artificially small limit for testing bridge.Start(); + try + { using var client = new ClientWebSocket(); await client.ConnectAsync(new Uri($"ws://127.0.0.1:{bridgePort}/"), CancellationToken.None); @@ -216,12 +238,17 @@ await client.SendAsync( Assert.Equal(WebSocketMessageType.Close, result.MessageType); Assert.Equal(WebSocketCloseStatus.MessageTooBig, client.CloseStatus); + } + finally + { + await bridge.ShutdownAsync(); + } } [Fact] [Trait("Level", "L0")] [Trait("Category", "Worker")] - public async Task BridgeDisposeCompletesWhenPeerDoesNotCloseGracefully() + public async Task BridgeShutdownCompletesWhenPeerDoesNotCloseGracefully() { using var hc = CreateTestContext(); using var targetListener = new TcpListener(IPAddress.Loopback, 0); @@ -230,17 +257,19 @@ public async Task BridgeDisposeCompletesWhenPeerDoesNotCloseGracefully() var targetPort = ((IPEndPoint)targetListener.LocalEndpoint).Port; var bridgePort = GetFreePort(); - var bridge = new WebSocketDapBridge(hc.GetTrace("DapWebSocketBridge"), bridgePort, targetPort); + var bridge = new WebSocketDapBridge(); + bridge.Initialize(hc); + bridge.Configure(bridgePort, targetPort); bridge.Start(); // Connect a raw TCP client but never perform WebSocket close handshake using var rawClient = new TcpClient(); await rawClient.ConnectAsync(IPAddress.Loopback, bridgePort); - // Dispose should complete within a bounded time, not hang - var disposeTask = bridge.DisposeAsync().AsTask(); - var completed = await Task.WhenAny(disposeTask, Task.Delay(TimeSpan.FromSeconds(15))); - Assert.True(completed == disposeTask, "Bridge dispose should complete within the timeout, not hang on a non-cooperative peer"); + // Shutdown should complete within a bounded time, not hang + var shutdownTask = bridge.ShutdownAsync(); + var completed = await Task.WhenAny(shutdownTask, Task.Delay(TimeSpan.FromSeconds(15))); + Assert.True(completed == shutdownTask, "Bridge shutdown should complete within the timeout, not hang on a non-cooperative peer"); } } } From 7eab609cd65dae991860d17a5a1ad6953b7faee5 Mon Sep 17 00:00:00 2001 From: Francesco Renzi Date: Fri, 10 Apr 2026 07:04:39 -0700 Subject: [PATCH 06/12] PR feedback --- src/Runner.Worker/Dap/WebSocketDapBridge.cs | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/src/Runner.Worker/Dap/WebSocketDapBridge.cs b/src/Runner.Worker/Dap/WebSocketDapBridge.cs index d5dfb3679a7..6f10f2ac837 100644 --- a/src/Runner.Worker/Dap/WebSocketDapBridge.cs +++ b/src/Runner.Worker/Dap/WebSocketDapBridge.cs @@ -169,17 +169,10 @@ private async Task HandleClientAsync(TcpClient incomingClient, CancellationToken var wsToTcpTask = PumpWebSocketToTcpAsync(webSocket, dapStream, proxyToken); var tcpToWsTask = PumpTcpToWebSocketAsync(dapStream, webSocket, proxyToken); - var completedTask = await Task.WhenAny(wsToTcpTask, tcpToWsTask); + await Task.WhenAny(wsToTcpTask, tcpToWsTask); sessionCts.Cancel(); - try - { - await completedTask; - } - catch (OperationCanceledException) when (proxyToken.IsCancellationRequested) - { - // expected during shutdown - } + await CloseWebSocketAsync(webSocket); try { @@ -198,8 +191,6 @@ private async Task HandleClientAsync(TcpClient incomingClient, CancellationToken // peer disconnected while unwinding } } - - await CloseWebSocketAsync(webSocket); } } } From bce1071d1405023f1187e7d2b981b0df1320ecc7 Mon Sep 17 00:00:00 2001 From: Francesco Renzi Date: Fri, 10 Apr 2026 13:52:38 -0700 Subject: [PATCH 07/12] feedback --- src/Runner.Worker/Dap/DapDebugger.cs | 3 ++ src/Runner.Worker/Dap/WebSocketDapBridge.cs | 44 +++++++++++++++++++-- 2 files changed, 43 insertions(+), 4 deletions(-) diff --git a/src/Runner.Worker/Dap/DapDebugger.cs b/src/Runner.Worker/Dap/DapDebugger.cs index 69e850a02bd..de189866028 100644 --- a/src/Runner.Worker/Dap/DapDebugger.cs +++ b/src/Runner.Worker/Dap/DapDebugger.cs @@ -299,6 +299,9 @@ public async Task StopAsync() if (await Task.WhenAny(shutdownTask, Task.Delay(5_000)) != shutdownTask) { Trace.Warning("WebSocket DAP bridge shutdown timed out after 5s"); + _ = shutdownTask.ContinueWith( + t => Trace.Error($"WebSocket DAP bridge shutdown faulted: {t.Exception?.GetBaseException().Message}"), + TaskContinuationOptions.OnlyOnFaulted); } else { diff --git a/src/Runner.Worker/Dap/WebSocketDapBridge.cs b/src/Runner.Worker/Dap/WebSocketDapBridge.cs index 6f10f2ac837..d9f10e5b33c 100644 --- a/src/Runner.Worker/Dap/WebSocketDapBridge.cs +++ b/src/Runner.Worker/Dap/WebSocketDapBridge.cs @@ -32,6 +32,8 @@ internal enum IncomingStreamPrefixKind private static readonly TimeSpan _closeTimeout = TimeSpan.FromSeconds(5); private static readonly TimeSpan _handshakeTimeout = TimeSpan.FromSeconds(10); private const string _webSocketAcceptMagic = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; + private const int _maxHeaderCount = 64; + private static readonly byte[] _headerEndMarker = new byte[] { (byte)'\r', (byte)'\n', (byte)'\r', (byte)'\n' }; private int _listenPort; private int _targetPort; @@ -190,6 +192,10 @@ private async Task HandleClientAsync(TcpClient incomingClient, CancellationToken { // peer disconnected while unwinding } + catch (InvalidOperationException ex) + { + Trace.Warning($"DAP protocol error: {ex.Message}"); + } } } } @@ -230,6 +236,13 @@ private async Task AcceptWebSocketAsync(NetworkStream stream, Cancell var headers = new Dictionary(StringComparer.OrdinalIgnoreCase); while (true) { + if (headers.Count >= _maxHeaderCount) + { + Trace.Warning($"Rejected WebSocket request with too many headers (>{_maxHeaderCount})"); + await WriteHttpErrorAsync(stream, HttpStatusCode.BadRequest, "Too many headers.", cancellationToken); + return null; + } + var line = await ReadLineAsync(handshakeStream, cancellationToken); if (line == null) { @@ -278,6 +291,13 @@ private async Task AcceptWebSocketAsync(NetworkStream stream, Cancell } var webSocketKey = headers["Sec-WebSocket-Key"]; + if (!IsValidWebSocketKey(webSocketKey)) + { + Trace.Warning("Rejected WebSocket request with invalid Sec-WebSocket-Key"); + await WriteHttpErrorAsync(stream, HttpStatusCode.BadRequest, "Invalid Sec-WebSocket-Key.", cancellationToken); + return null; + } + var acceptValue = ComputeAcceptValue(webSocketKey); var responseBytes = Encoding.ASCII.GetBytes( "HTTP/1.1 101 Switching Protocols\r\n" + @@ -383,8 +403,7 @@ private static bool TryParseDapMessage(List buffer, out byte[] messageBody { messageBody = null; - var headerEndMarker = new byte[] { (byte)'\r', (byte)'\n', (byte)'\r', (byte)'\n' }; - var headerEndIndex = FindSequence(buffer, headerEndMarker); + var headerEndIndex = FindSequence(buffer, _headerEndMarker); if (headerEndIndex == -1) { return false; @@ -409,8 +428,7 @@ private static bool TryParseDapMessage(List buffer, out byte[] messageBody if (contentLength < 0) { - buffer.RemoveRange(0, headerEndIndex + 4); - return false; + throw new InvalidOperationException("DAP message missing or unparseable Content-Length header; tearing down session."); } var messageStart = headerEndIndex + 4; @@ -495,6 +513,24 @@ private static string ComputeAcceptValue(string webSocketKey) } } + private static bool IsValidWebSocketKey(string key) + { + if (string.IsNullOrEmpty(key) || key.IndexOfAny(new[] { '\r', '\n' }) >= 0) + { + return false; + } + + try + { + var decoded = Convert.FromBase64String(key); + return decoded.Length == 16; + } + catch (FormatException) + { + return false; + } + } + private static async Task ReadLineAsync(Stream stream, CancellationToken cancellationToken) { var lineBuilder = new StringBuilder(); From c76d48ba4ff26e576cfaf4d0ab41bbd0c54f060c Mon Sep 17 00:00:00 2001 From: Francesco Renzi Date: Mon, 13 Apr 2026 13:28:14 +0100 Subject: [PATCH 08/12] Update src/Runner.Worker/Dap/WebSocketDapBridge.cs Co-authored-by: Tingluo Huang --- src/Runner.Worker/Dap/WebSocketDapBridge.cs | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/src/Runner.Worker/Dap/WebSocketDapBridge.cs b/src/Runner.Worker/Dap/WebSocketDapBridge.cs index d9f10e5b33c..5e28c7990b3 100644 --- a/src/Runner.Worker/Dap/WebSocketDapBridge.cs +++ b/src/Runner.Worker/Dap/WebSocketDapBridge.cs @@ -184,17 +184,9 @@ private async Task HandleClientAsync(TcpClient incomingClient, CancellationToken { // expected during shutdown } - catch (IOException) + catch (Exception ex) { - // peer disconnected while unwinding - } - catch (WebSocketException) - { - // peer disconnected while unwinding - } - catch (InvalidOperationException ex) - { - Trace.Warning($"DAP protocol error: {ex.Message}"); + Trace.Warning($"DAP protocol error: {ex}"); } } } From dadaed246a9d15b04457db9cf2d56771326d908b Mon Sep 17 00:00:00 2001 From: Francesco Renzi Date: Mon, 13 Apr 2026 06:01:44 -0700 Subject: [PATCH 09/12] bridge interface --- src/Runner.Worker/Dap/DapDebugger.cs | 8 +++----- src/Runner.Worker/Dap/IWebSocketDapBridge.cs | 12 ++++++++++++ src/Runner.Worker/Dap/WebSocketDapBridge.cs | 20 +++++--------------- src/Test/L0/Worker/DapDebuggerL0.cs | 7 +++++++ src/Test/L0/Worker/WebSocketDapBridgeL0.cs | 12 ++++-------- 5 files changed, 31 insertions(+), 28 deletions(-) create mode 100644 src/Runner.Worker/Dap/IWebSocketDapBridge.cs diff --git a/src/Runner.Worker/Dap/DapDebugger.cs b/src/Runner.Worker/Dap/DapDebugger.cs index de189866028..89879e2ff2c 100644 --- a/src/Runner.Worker/Dap/DapDebugger.cs +++ b/src/Runner.Worker/Dap/DapDebugger.cs @@ -66,7 +66,7 @@ public sealed class DapDebugger : RunnerService, IDapDebugger // Dev Tunnel relay host for remote debugging private TunnelRelayTunnelHost _tunnelRelayHost; - private WebSocketDapBridge _webSocketBridge; + private IWebSocketDapBridge _webSocketBridge; // Cancellation source for the connection loop, cancelled in StopAsync // so AcceptTcpClientAsync unblocks cleanly without relying on listener disposal. @@ -149,10 +149,8 @@ public async Task StartAsync(IExecutionContext jobContext) else { Trace.Info($"Internal DAP debugger listening on {_listener.LocalEndpoint}"); - _webSocketBridge = new WebSocketDapBridge(); - _webSocketBridge.Initialize(HostContext); - _webSocketBridge.Configure(debuggerConfig.Tunnel.Port, InternalDapPort); - _webSocketBridge.Start(); + _webSocketBridge = HostContext.CreateService(); + _webSocketBridge.Start(debuggerConfig.Tunnel.Port, InternalDapPort); } // Start Dev Tunnel relay so remote clients reach the local DAP port. diff --git a/src/Runner.Worker/Dap/IWebSocketDapBridge.cs b/src/Runner.Worker/Dap/IWebSocketDapBridge.cs new file mode 100644 index 00000000000..a468aa54a82 --- /dev/null +++ b/src/Runner.Worker/Dap/IWebSocketDapBridge.cs @@ -0,0 +1,12 @@ +using System.Threading.Tasks; +using GitHub.Runner.Common; + +namespace GitHub.Runner.Worker.Dap +{ + [ServiceLocator(Default = typeof(WebSocketDapBridge))] + public interface IWebSocketDapBridge : IRunnerService + { + void Start(int listenPort, int targetPort); + Task ShutdownAsync(); + } +} diff --git a/src/Runner.Worker/Dap/WebSocketDapBridge.cs b/src/Runner.Worker/Dap/WebSocketDapBridge.cs index 5e28c7990b3..dd84f1a51df 100644 --- a/src/Runner.Worker/Dap/WebSocketDapBridge.cs +++ b/src/Runner.Worker/Dap/WebSocketDapBridge.cs @@ -13,7 +13,7 @@ namespace GitHub.Runner.Worker.Dap { - internal sealed class WebSocketDapBridge : RunnerService + internal sealed class WebSocketDapBridge : RunnerService, IWebSocketDapBridge { internal enum IncomingStreamPrefixKind { @@ -37,7 +37,6 @@ internal enum IncomingStreamPrefixKind private int _listenPort; private int _targetPort; - private bool _configured; private TcpListener _listener; private CancellationTokenSource _loopCts; @@ -45,25 +44,16 @@ internal enum IncomingStreamPrefixKind public int MaxInboundMessageSize { get; set; } = _defaultMaxInboundMessageSize; - public void Configure(int listenPort, int targetPort) + public void Start(int listenPort, int targetPort) { - _listenPort = listenPort; - _targetPort = targetPort; - _configured = true; - } - - public void Start() - { - if (!_configured) - { - throw new InvalidOperationException("Configure must be called before Start."); - } - if (_listener != null) { throw new InvalidOperationException("WebSocket DAP bridge already started."); } + _listenPort = listenPort; + _targetPort = targetPort; + _listener = new TcpListener(IPAddress.Loopback, _listenPort); _listener.Start(); _loopCts = new CancellationTokenSource(); diff --git a/src/Test/L0/Worker/DapDebuggerL0.cs b/src/Test/L0/Worker/DapDebuggerL0.cs index 6f4bdadd47e..6e6c4f0095d 100644 --- a/src/Test/L0/Worker/DapDebuggerL0.cs +++ b/src/Test/L0/Worker/DapDebuggerL0.cs @@ -28,6 +28,13 @@ private TestHostContext CreateTestContext(bool enableWebSocketBridge = false, [C _debugger.Initialize(hc); _debugger.SkipTunnelRelay = true; _debugger.SkipWebSocketBridge = !enableWebSocketBridge; + if (enableWebSocketBridge) + { + var bridge = new WebSocketDapBridge(); + bridge.Initialize(hc); + hc.EnqueueInstance(bridge); + } + return hc; } diff --git a/src/Test/L0/Worker/WebSocketDapBridgeL0.cs b/src/Test/L0/Worker/WebSocketDapBridgeL0.cs index b1186d67de6..f48878c13f4 100644 --- a/src/Test/L0/Worker/WebSocketDapBridgeL0.cs +++ b/src/Test/L0/Worker/WebSocketDapBridgeL0.cs @@ -67,8 +67,7 @@ public async Task BridgeForwardsWebSocketFramesToTcpAndBack() var bridge = new WebSocketDapBridge(); bridge.Initialize(hc); - bridge.Configure(bridgePort, targetPort); - bridge.Start(); + bridge.Start(bridgePort, targetPort); try { @@ -145,8 +144,7 @@ public async Task BridgeRejectsNonWebSocketRequests() var bridge = new WebSocketDapBridge(); bridge.Initialize(hc); - bridge.Configure(bridgePort, GetFreePort()); - bridge.Start(); + bridge.Start(bridgePort, GetFreePort()); try { @@ -211,9 +209,8 @@ public async Task BridgeRejectsOversizedWebSocketMessage() var bridge = new WebSocketDapBridge(); bridge.Initialize(hc); - bridge.Configure(bridgePort, targetPort); bridge.MaxInboundMessageSize = 64; // artificially small limit for testing - bridge.Start(); + bridge.Start(bridgePort, targetPort); try { @@ -259,8 +256,7 @@ public async Task BridgeShutdownCompletesWhenPeerDoesNotCloseGracefully() var bridge = new WebSocketDapBridge(); bridge.Initialize(hc); - bridge.Configure(bridgePort, targetPort); - bridge.Start(); + bridge.Start(bridgePort, targetPort); // Connect a raw TCP client but never perform WebSocket close handshake using var rawClient = new TcpClient(); From 9fafef98122a9dd1bf7dfa2e097b934b2c632674 Mon Sep 17 00:00:00 2001 From: Francesco Renzi Date: Mon, 13 Apr 2026 07:22:48 -0700 Subject: [PATCH 10/12] test fix --- src/Runner.Worker/Dap/DapDebugger.cs | 1 + src/Runner.Worker/Dap/IWebSocketDapBridge.cs | 1 + src/Runner.Worker/Dap/WebSocketDapBridge.cs | 2 ++ src/Test/L0/Worker/DapDebuggerL0.cs | 18 ++++++++------- src/Test/L0/Worker/WebSocketDapBridgeL0.cs | 23 +++++++------------- 5 files changed, 22 insertions(+), 23 deletions(-) diff --git a/src/Runner.Worker/Dap/DapDebugger.cs b/src/Runner.Worker/Dap/DapDebugger.cs index 89879e2ff2c..eea723f08e8 100644 --- a/src/Runner.Worker/Dap/DapDebugger.cs +++ b/src/Runner.Worker/Dap/DapDebugger.cs @@ -114,6 +114,7 @@ public sealed class DapDebugger : RunnerService, IDapDebugger internal DapSessionState State => _state; internal int InternalDapPort => (_listener?.LocalEndpoint as IPEndPoint)?.Port ?? 0; + internal int BridgeListenPort => _webSocketBridge?.ListenPort ?? 0; public override void Initialize(IHostContext hostContext) { diff --git a/src/Runner.Worker/Dap/IWebSocketDapBridge.cs b/src/Runner.Worker/Dap/IWebSocketDapBridge.cs index a468aa54a82..20e9feab957 100644 --- a/src/Runner.Worker/Dap/IWebSocketDapBridge.cs +++ b/src/Runner.Worker/Dap/IWebSocketDapBridge.cs @@ -6,6 +6,7 @@ namespace GitHub.Runner.Worker.Dap [ServiceLocator(Default = typeof(WebSocketDapBridge))] public interface IWebSocketDapBridge : IRunnerService { + int ListenPort { get; } void Start(int listenPort, int targetPort); Task ShutdownAsync(); } diff --git a/src/Runner.Worker/Dap/WebSocketDapBridge.cs b/src/Runner.Worker/Dap/WebSocketDapBridge.cs index dd84f1a51df..d94709edab6 100644 --- a/src/Runner.Worker/Dap/WebSocketDapBridge.cs +++ b/src/Runner.Worker/Dap/WebSocketDapBridge.cs @@ -44,6 +44,8 @@ internal enum IncomingStreamPrefixKind public int MaxInboundMessageSize { get; set; } = _defaultMaxInboundMessageSize; + public int ListenPort => (_listener?.LocalEndpoint as IPEndPoint)?.Port ?? 0; + public void Start(int listenPort, int targetPort) { if (_listener != null) diff --git a/src/Test/L0/Worker/DapDebuggerL0.cs b/src/Test/L0/Worker/DapDebuggerL0.cs index 6e6c4f0095d..3df1f53e868 100644 --- a/src/Test/L0/Worker/DapDebuggerL0.cs +++ b/src/Test/L0/Worker/DapDebuggerL0.cs @@ -285,15 +285,16 @@ public async Task StartAsyncWithWebSocketBridgeAcceptsInitializeOverWebSocket() { using (CreateTestContext(enableWebSocketBridge: true)) { - var port = GetFreePort(); using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); - var jobContext = CreateJobContextWithTunnel(cts.Token, port); + var jobContext = CreateJobContextWithTunnel(cts.Token, GetFreePort()); await _debugger.StartAsync(jobContext.Object); + var bridgePort = _debugger.BridgeListenPort; Assert.NotEqual(0, _debugger.InternalDapPort); - Assert.NotEqual(port, _debugger.InternalDapPort); + Assert.NotEqual(0, bridgePort); + Assert.NotEqual(bridgePort, _debugger.InternalDapPort); - using var client = await ConnectWebSocketClientAsync(port); + using var client = await ConnectWebSocketClientAsync(bridgePort); await SendRequestAsync(client, new Request { Seq = 1, @@ -320,15 +321,16 @@ public async Task StartAsyncWithWebSocketBridgeAcceptsPreUpgradedWebSocketStream { using (CreateTestContext(enableWebSocketBridge: true)) { - var port = GetFreePort(); using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); - var jobContext = CreateJobContextWithTunnel(cts.Token, port); + var jobContext = CreateJobContextWithTunnel(cts.Token, GetFreePort()); await _debugger.StartAsync(jobContext.Object); + var bridgePort = _debugger.BridgeListenPort; Assert.NotEqual(0, _debugger.InternalDapPort); - Assert.NotEqual(port, _debugger.InternalDapPort); + Assert.NotEqual(0, bridgePort); + Assert.NotEqual(bridgePort, _debugger.InternalDapPort); - using var tcpClient = await ConnectClientAsync(port); + using var tcpClient = await ConnectClientAsync(bridgePort); using var webSocket = WebSocket.CreateFromStream( tcpClient.GetStream(), isServer: false, diff --git a/src/Test/L0/Worker/WebSocketDapBridgeL0.cs b/src/Test/L0/Worker/WebSocketDapBridgeL0.cs index f48878c13f4..e6367acf403 100644 --- a/src/Test/L0/Worker/WebSocketDapBridgeL0.cs +++ b/src/Test/L0/Worker/WebSocketDapBridgeL0.cs @@ -20,13 +20,6 @@ private TestHostContext CreateTestContext([CallerMemberName] string testName = " return new TestHostContext(this, testName); } - private static ushort GetFreePort() - { - using var listener = new TcpListener(IPAddress.Loopback, 0); - listener.Start(); - return (ushort)((IPEndPoint)listener.LocalEndpoint).Port; - } - private static async Task ReadWebSocketMessageAsync(ClientWebSocket client, TimeSpan timeout) { using var cts = new CancellationTokenSource(timeout); @@ -63,11 +56,11 @@ public async Task BridgeForwardsWebSocketFramesToTcpAndBack() targetListener.Start(); var targetPort = ((IPEndPoint)targetListener.LocalEndpoint).Port; - var bridgePort = GetFreePort(); var bridge = new WebSocketDapBridge(); bridge.Initialize(hc); - bridge.Start(bridgePort, targetPort); + bridge.Start(0, targetPort); + var bridgePort = bridge.ListenPort; try { @@ -140,11 +133,11 @@ public async Task BridgeForwardsWebSocketFramesToTcpAndBack() public async Task BridgeRejectsNonWebSocketRequests() { using var hc = CreateTestContext(); - var bridgePort = GetFreePort(); var bridge = new WebSocketDapBridge(); bridge.Initialize(hc); - bridge.Start(bridgePort, GetFreePort()); + bridge.Start(0, 0); + var bridgePort = bridge.ListenPort; try { @@ -205,12 +198,12 @@ public async Task BridgeRejectsOversizedWebSocketMessage() targetListener.Start(); var targetPort = ((IPEndPoint)targetListener.LocalEndpoint).Port; - var bridgePort = GetFreePort(); var bridge = new WebSocketDapBridge(); bridge.Initialize(hc); bridge.MaxInboundMessageSize = 64; // artificially small limit for testing - bridge.Start(bridgePort, targetPort); + bridge.Start(0, targetPort); + var bridgePort = bridge.ListenPort; try { @@ -252,11 +245,11 @@ public async Task BridgeShutdownCompletesWhenPeerDoesNotCloseGracefully() targetListener.Start(); var targetPort = ((IPEndPoint)targetListener.LocalEndpoint).Port; - var bridgePort = GetFreePort(); var bridge = new WebSocketDapBridge(); bridge.Initialize(hc); - bridge.Start(bridgePort, targetPort); + bridge.Start(0, targetPort); + var bridgePort = bridge.ListenPort; // Connect a raw TCP client but never perform WebSocket close handshake using var rawClient = new TcpClient(); From ff0d4e03db57e2fd49cbf7bab688f7b6cc063408 Mon Sep 17 00:00:00 2001 From: Francesco Renzi Date: Mon, 13 Apr 2026 09:30:21 -0700 Subject: [PATCH 11/12] tests --- src/Runner.Worker/Dap/DapDebugger.cs | 1 - src/Runner.Worker/Dap/IWebSocketDapBridge.cs | 1 - src/Runner.Worker/Dap/WebSocketDapBridge.cs | 2 +- src/Test/L0/Worker/DapDebuggerL0.cs | 35 +++++++++++++++++--- src/Test/L0/Worker/WebSocketDapBridgeL0.cs | 2 ++ 5 files changed, 33 insertions(+), 8 deletions(-) diff --git a/src/Runner.Worker/Dap/DapDebugger.cs b/src/Runner.Worker/Dap/DapDebugger.cs index eea723f08e8..89879e2ff2c 100644 --- a/src/Runner.Worker/Dap/DapDebugger.cs +++ b/src/Runner.Worker/Dap/DapDebugger.cs @@ -114,7 +114,6 @@ public sealed class DapDebugger : RunnerService, IDapDebugger internal DapSessionState State => _state; internal int InternalDapPort => (_listener?.LocalEndpoint as IPEndPoint)?.Port ?? 0; - internal int BridgeListenPort => _webSocketBridge?.ListenPort ?? 0; public override void Initialize(IHostContext hostContext) { diff --git a/src/Runner.Worker/Dap/IWebSocketDapBridge.cs b/src/Runner.Worker/Dap/IWebSocketDapBridge.cs index 20e9feab957..a468aa54a82 100644 --- a/src/Runner.Worker/Dap/IWebSocketDapBridge.cs +++ b/src/Runner.Worker/Dap/IWebSocketDapBridge.cs @@ -6,7 +6,6 @@ namespace GitHub.Runner.Worker.Dap [ServiceLocator(Default = typeof(WebSocketDapBridge))] public interface IWebSocketDapBridge : IRunnerService { - int ListenPort { get; } void Start(int listenPort, int targetPort); Task ShutdownAsync(); } diff --git a/src/Runner.Worker/Dap/WebSocketDapBridge.cs b/src/Runner.Worker/Dap/WebSocketDapBridge.cs index d94709edab6..e2f8dc47370 100644 --- a/src/Runner.Worker/Dap/WebSocketDapBridge.cs +++ b/src/Runner.Worker/Dap/WebSocketDapBridge.cs @@ -44,7 +44,7 @@ internal enum IncomingStreamPrefixKind public int MaxInboundMessageSize { get; set; } = _defaultMaxInboundMessageSize; - public int ListenPort => (_listener?.LocalEndpoint as IPEndPoint)?.Port ?? 0; + internal int ListenPort => (_listener?.LocalEndpoint as IPEndPoint)?.Port ?? 0; public void Start(int listenPort, int targetPort) { diff --git a/src/Test/L0/Worker/DapDebuggerL0.cs b/src/Test/L0/Worker/DapDebuggerL0.cs index 3df1f53e868..9454b1a3a67 100644 --- a/src/Test/L0/Worker/DapDebuggerL0.cs +++ b/src/Test/L0/Worker/DapDebuggerL0.cs @@ -20,19 +20,43 @@ public sealed class DapDebuggerL0 private const string TimeoutEnvironmentVariable = "ACTIONS_RUNNER_DAP_CONNECTION_TIMEOUT"; private const string TunnelConnectTimeoutVariable = "ACTIONS_RUNNER_DAP_TUNNEL_CONNECT_TIMEOUT_SECONDS"; private DapDebugger _debugger; + private TestWebSocketDapBridge _testWebSocketBridge; + + private sealed class TestWebSocketDapBridge : RunnerService, IWebSocketDapBridge + { + private readonly WebSocketDapBridge _inner = new WebSocketDapBridge(); + + public int ListenPort => _inner.ListenPort; + + public override void Initialize(IHostContext hostContext) + { + base.Initialize(hostContext); + _inner.Initialize(hostContext); + } + + public void Start(int listenPort, int targetPort) + { + _inner.Start(0, targetPort); + } + + public Task ShutdownAsync() + { + return _inner.ShutdownAsync(); + } + } private TestHostContext CreateTestContext(bool enableWebSocketBridge = false, [CallerMemberName] string testName = "") { var hc = new TestHostContext(this, testName); _debugger = new DapDebugger(); + _testWebSocketBridge = null; _debugger.Initialize(hc); _debugger.SkipTunnelRelay = true; _debugger.SkipWebSocketBridge = !enableWebSocketBridge; if (enableWebSocketBridge) { - var bridge = new WebSocketDapBridge(); - bridge.Initialize(hc); - hc.EnqueueInstance(bridge); + _testWebSocketBridge = new TestWebSocketDapBridge(); + hc.EnqueueInstance(_testWebSocketBridge); } return hc; @@ -83,6 +107,7 @@ private static async Task ConnectClientAsync(int port) private static async Task ConnectWebSocketClientAsync(int port) { var client = new ClientWebSocket(); + client.Options.Proxy = null; await client.ConnectAsync(new Uri($"ws://127.0.0.1:{port}/"), CancellationToken.None); return client; } @@ -289,7 +314,7 @@ public async Task StartAsyncWithWebSocketBridgeAcceptsInitializeOverWebSocket() var jobContext = CreateJobContextWithTunnel(cts.Token, GetFreePort()); await _debugger.StartAsync(jobContext.Object); - var bridgePort = _debugger.BridgeListenPort; + var bridgePort = _testWebSocketBridge.ListenPort; Assert.NotEqual(0, _debugger.InternalDapPort); Assert.NotEqual(0, bridgePort); Assert.NotEqual(bridgePort, _debugger.InternalDapPort); @@ -325,7 +350,7 @@ public async Task StartAsyncWithWebSocketBridgeAcceptsPreUpgradedWebSocketStream var jobContext = CreateJobContextWithTunnel(cts.Token, GetFreePort()); await _debugger.StartAsync(jobContext.Object); - var bridgePort = _debugger.BridgeListenPort; + var bridgePort = _testWebSocketBridge.ListenPort; Assert.NotEqual(0, _debugger.InternalDapPort); Assert.NotEqual(0, bridgePort); Assert.NotEqual(bridgePort, _debugger.InternalDapPort); diff --git a/src/Test/L0/Worker/WebSocketDapBridgeL0.cs b/src/Test/L0/Worker/WebSocketDapBridgeL0.cs index e6367acf403..d4210dfcbc7 100644 --- a/src/Test/L0/Worker/WebSocketDapBridgeL0.cs +++ b/src/Test/L0/Worker/WebSocketDapBridgeL0.cs @@ -110,6 +110,7 @@ public async Task BridgeForwardsWebSocketFramesToTcpAndBack() }); using var client = new ClientWebSocket(); + client.Options.Proxy = null; await client.ConnectAsync(new Uri($"ws://127.0.0.1:{bridgePort}/"), CancellationToken.None); var dapMessage = "{\"type\":\"request\",\"seq\":1,\"command\":\"initialize\"}"; @@ -208,6 +209,7 @@ public async Task BridgeRejectsOversizedWebSocketMessage() try { using var client = new ClientWebSocket(); + client.Options.Proxy = null; await client.ConnectAsync(new Uri($"ws://127.0.0.1:{bridgePort}/"), CancellationToken.None); // Send a message that exceeds the 64-byte limit From cc3ed692907533ed18fbd76477d3b083156c0ddd Mon Sep 17 00:00:00 2001 From: Francesco Renzi Date: Fri, 17 Apr 2026 13:08:08 +0100 Subject: [PATCH 12/12] Apply suggestions from code review Co-authored-by: Tingluo Huang --- src/Runner.Worker/Dap/WebSocketDapBridge.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Runner.Worker/Dap/WebSocketDapBridge.cs b/src/Runner.Worker/Dap/WebSocketDapBridge.cs index e2f8dc47370..4aad4bdb9a6 100644 --- a/src/Runner.Worker/Dap/WebSocketDapBridge.cs +++ b/src/Runner.Worker/Dap/WebSocketDapBridge.cs @@ -218,7 +218,7 @@ private async Task AcceptWebSocketAsync(NetworkStream stream, Cancell } var headers = new Dictionary(StringComparer.OrdinalIgnoreCase); - while (true) + while (!cancellationToken.IsCancellationRequested) { if (headers.Count >= _maxHeaderCount) { @@ -335,7 +335,7 @@ await source.CloseAsync( messageStream.Write(buffer, 0, result.Count); } } - while (!result.EndOfMessage); + while (!result.EndOfMessage && !cancellationToken.IsCancellationRequested); if (result.MessageType != WebSocketMessageType.Binary && result.MessageType != WebSocketMessageType.Text)