diff --git a/lambda-runtime/Cargo.toml b/lambda-runtime/Cargo.toml index 7d0b570ba..442c9c61c 100644 --- a/lambda-runtime/Cargo.toml +++ b/lambda-runtime/Cargo.toml @@ -78,6 +78,7 @@ pin-project-lite = { workspace = true } tracing-appender = "0.2" tracing-capture = "0.1.0" tracing-subscriber = { version = "0.3", features = ["registry"] } +tracing-test = "0.2" [package.metadata.docs.rs] all-features = true diff --git a/lambda-runtime/src/layers/api_client.rs b/lambda-runtime/src/layers/api_client.rs index 7113ee0a0..df9b864db 100644 --- a/lambda-runtime/src/layers/api_client.rs +++ b/lambda-runtime/src/layers/api_client.rs @@ -85,10 +85,230 @@ where .boxed(); self.set(RuntimeApiClientFuture::Second(next_fut)); } - Err(err) => break Err(err), + Err(err) => { + log_or_print!( + tracing: tracing::error!(error = ?err, "failed to build Lambda Runtime API request"), + fallback: eprintln!("failed to build Lambda Runtime API request: {err:?}") + ); + break Err(err); + } + }, + RuntimeApiClientFutureProj::Second(fut) => match ready!(fut.poll(cx)) { + Ok(resp) if !resp.status().is_success() => { + let status = resp.status(); + + // TODO + // we should consume the response body of the call in order to give a more specific message. + // https://github.com/aws/aws-lambda-rust-runtime/issues/1110 + + log_or_print!( + tracing: tracing::error!(status = %status, "Lambda Runtime API returned non-200 response"), + fallback: eprintln!("Lambda Runtime API returned non-200 response: status={status}") + ); + + // Adding more information on top of 410 Gone, to make it more clear since we cannot access the body of the message + if status == 410 { + log_or_print!( + tracing: tracing::error!("Lambda function timeout!"), + fallback: eprintln!("Lambda function timeout!") + ); + } + + // Return Ok to maintain existing contract - runtime continues despite API errors + break Ok(()); + } + Ok(_) => break Ok(()), + Err(err) => { + log_or_print!( + tracing: tracing::error!(error = ?err, "Lambda Runtime API request failed"), + fallback: eprintln!("Lambda Runtime API request failed: {err:?}") + ); + break Err(err); + } }, - RuntimeApiClientFutureProj::Second(fut) => break ready!(fut.poll(cx)).map(|_| ()), } }) } } + +#[cfg(test)] +mod tests { + use super::*; + use http::StatusCode; + use http_body_util::Full; + use hyper::body::Bytes; + use lambda_runtime_api_client::body::Body; + use std::convert::Infallible; + use tokio::net::TcpListener; + use tracing_test::traced_test; + + async fn start_mock_server(status: StatusCode) -> String { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + let url = format!("http://{}", addr); + + tokio::spawn(async move { + let (stream, _) = listener.accept().await.unwrap(); + let io = hyper_util::rt::TokioIo::new(stream); + + let service = hyper::service::service_fn(move |_req| async move { + Ok::<_, Infallible>( + http::Response::builder() + .status(status) + .body(Full::new(Bytes::from("test response"))) + .unwrap(), + ) + }); + + let _ = hyper_util::server::conn::auto::Builder::new(hyper_util::rt::TokioExecutor::new()) + .serve_connection(io, service) + .await; + }); + + // Give the server a moment to start + tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; + url + } + + #[tokio::test] + #[traced_test] + async fn test_successful_response() { + let url = start_mock_server(StatusCode::OK).await; + let client = Arc::new( + lambda_runtime_api_client::Client::builder() + .with_endpoint(url.parse().unwrap()) + .build() + .unwrap(), + ); + + let request_fut = + async { Ok::<_, BoxError>(http::Request::builder().uri("/test").body(Body::empty()).unwrap()) }; + + let future = RuntimeApiClientFuture::First(request_fut, client); + let result = future.await; + + assert!(result.is_ok()); + // No error logs should be present + assert!(!logs_contain("Lambda Runtime API returned non-200 response")); + } + + #[tokio::test] + #[traced_test] + async fn test_410_timeout_error() { + let url = start_mock_server(StatusCode::GONE).await; + let client = Arc::new( + lambda_runtime_api_client::Client::builder() + .with_endpoint(url.parse().unwrap()) + .build() + .unwrap(), + ); + + let request_fut = + async { Ok::<_, BoxError>(http::Request::builder().uri("/test").body(Body::empty()).unwrap()) }; + + let future = RuntimeApiClientFuture::First(request_fut, client); + let result = future.await; + + // Returns Ok to maintain contract, but logs the error + assert!(result.is_ok()); + + // Verify the error was logged + assert!(logs_contain("Lambda Runtime API returned non-200 response")); + assert!(logs_contain("Lambda function timeout!")); + } + + #[tokio::test] + #[traced_test] + async fn test_500_error() { + let url = start_mock_server(StatusCode::INTERNAL_SERVER_ERROR).await; + let client = Arc::new( + lambda_runtime_api_client::Client::builder() + .with_endpoint(url.parse().unwrap()) + .build() + .unwrap(), + ); + + let request_fut = + async { Ok::<_, BoxError>(http::Request::builder().uri("/test").body(Body::empty()).unwrap()) }; + + let future = RuntimeApiClientFuture::First(request_fut, client); + let result = future.await; + + // Returns Ok to maintain contract, but logs the error + assert!(result.is_ok()); + + // Verify the error was logged with status code + assert!(logs_contain("Lambda Runtime API returned non-200 response")); + } + + #[tokio::test] + #[traced_test] + async fn test_404_error() { + let url = start_mock_server(StatusCode::NOT_FOUND).await; + let client = Arc::new( + lambda_runtime_api_client::Client::builder() + .with_endpoint(url.parse().unwrap()) + .build() + .unwrap(), + ); + + let request_fut = + async { Ok::<_, BoxError>(http::Request::builder().uri("/test").body(Body::empty()).unwrap()) }; + + let future = RuntimeApiClientFuture::First(request_fut, client); + let result = future.await; + + // Returns Ok to maintain contract, but logs the error + assert!(result.is_ok()); + + // Verify the error was logged + assert!(logs_contain("Lambda Runtime API returned non-200 response")); + } + + #[tokio::test] + #[traced_test] + async fn test_request_build_error() { + let client = Arc::new( + lambda_runtime_api_client::Client::builder() + .with_endpoint("http://localhost:9001".parse().unwrap()) + .build() + .unwrap(), + ); + + let request_fut = async { Err::, BoxError>("Request build error".into()) }; + + let future = RuntimeApiClientFuture::First(request_fut, client); + let result = future.await; + + assert!(result.is_err()); + let err = result.unwrap_err(); + assert!(err.to_string().contains("Request build error")); + + // Verify the error was logged + assert!(logs_contain("failed to build Lambda Runtime API request")); + } + + #[tokio::test] + #[traced_test] + async fn test_network_error() { + // Use an invalid endpoint that will fail to connect + let client = Arc::new( + lambda_runtime_api_client::Client::builder() + .with_endpoint("http://127.0.0.1:1".parse().unwrap()) // Port 1 should be unreachable + .build() + .unwrap(), + ); + + let request_fut = + async { Ok::<_, BoxError>(http::Request::builder().uri("/test").body(Body::empty()).unwrap()) }; + + let future = RuntimeApiClientFuture::First(request_fut, client); + let result = future.await; + + // Network errors should propagate as Err + assert!(result.is_err()); + + // Verify the error was logged + assert!(logs_contain("Lambda Runtime API request failed")); + } +} diff --git a/lambda-runtime/src/lib.rs b/lambda-runtime/src/lib.rs index 148050877..8755a130e 100644 --- a/lambda-runtime/src/lib.rs +++ b/lambda-runtime/src/lib.rs @@ -19,6 +19,9 @@ use tokio_stream::Stream; use tower::util::ServiceFn; pub use tower::{self, service_fn, Service}; +#[macro_use] +mod macros; + /// Diagnostic utilities to convert Rust types into Lambda Error types. pub mod diagnostic; pub use diagnostic::Diagnostic; diff --git a/lambda-runtime/src/macros.rs b/lambda-runtime/src/macros.rs new file mode 100644 index 000000000..8c531470d --- /dev/null +++ b/lambda-runtime/src/macros.rs @@ -0,0 +1,10 @@ +// Logs using tracing `error!` if a dispatcher is set, otherwise falls back to `eprintln!`. +macro_rules! log_or_print { + (tracing: $tracing_expr:expr, fallback: $fallback_expr:expr) => { + if tracing::dispatcher::has_been_set() { + $tracing_expr; + } else { + $fallback_expr; + } + }; +} diff --git a/lambda-runtime/src/runtime.rs b/lambda-runtime/src/runtime.rs index 191b55ae3..755377ae6 100644 --- a/lambda-runtime/src/runtime.rs +++ b/lambda-runtime/src/runtime.rs @@ -363,13 +363,12 @@ where /// unset. pub async fn run(self) -> Result<(), BoxError> { if let Some(raw) = concurrency_env_value() { - if tracing::dispatcher::has_been_set() { - tracing::warn!( + log_or_print!( + tracing: tracing::warn!( "AWS_LAMBDA_MAX_CONCURRENCY is set to '{raw}', but the concurrency-tokio feature is not enabled; running sequentially", - ); - } else { - eprintln!("AWS_LAMBDA_MAX_CONCURRENCY is set to '{raw}', but the concurrency-tokio feature is not enabled; running sequentially"); - } + ), + fallback: eprintln!("AWS_LAMBDA_MAX_CONCURRENCY is set to '{raw}', but the concurrency-tokio feature is not enabled; running sequentially") + ); } let incoming = incoming(&self.client); Self::run_with_incoming(self.service, self.config, incoming).await @@ -938,6 +937,8 @@ mod endpoint_tests { #[tokio::test] #[cfg(feature = "concurrency-tokio")] + #[traced_test] + #[cfg(feature = "tokio-concurrent-runtime")] async fn test_concurrent_structured_logging_isolation() -> Result<(), Error> { use std::collections::HashSet; use tracing::info;