Skip to content
21 changes: 21 additions & 0 deletions clients/aws-sdk-lex-runtime-v2/tests/integration/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0

from smithy_aws_core.identity import EnvironmentCredentialsResolver

from aws_sdk_lex_runtime_v2.client import LexRuntimeV2Client
from aws_sdk_lex_runtime_v2.config import Config

BOT_ALIAS_ID = "TSTALIASID"
LOCALE_ID = "en_US"
Comment thread
Alan4506 marked this conversation as resolved.
REGION = "us-east-1"


def create_lex_client(region: str) -> LexRuntimeV2Client:
return LexRuntimeV2Client(
config=Config(
endpoint_uri=f"https://runtime-v2-lex.{region}.amazonaws.com",
region=region,
aws_credentials_identity_resolver=EnvironmentCredentialsResolver(),
)
)
158 changes: 158 additions & 0 deletions clients/aws-sdk-lex-runtime-v2/tests/integration/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0

"""Pytest fixtures for Lex Runtime V2 integration tests.

Creates and tears down a Lex V2 bot with a Greeting intent once per
test session. All integration tests receive the bot_id via the
``lex_bot`` fixture.
"""

import json
import uuid
from typing import Any

import boto3
import pytest

from . import LOCALE_ID, REGION

# Tags applied to all resources so orphaned resources from interrupted
# test runs can be discovered and cleaned up.
_TAGS = [{"Key": "Purpose", "Value": "IntegTest"}]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Non-blocking]

Since the idea behind this is to have a single tag that can be applied to as many integ test related resources as possible, this would be better suited for smithy-aws-core and we can import it.

Since that would require a PR to smithy-aws-core and a publish, can we just make a backlog item for that and come back?



def _create_lex_bot(
iam_client: Any, lex_client: Any, sts_client: Any, role_name: str, bot_name: str
) -> str:
"""Create a Lex V2 bot with a Greeting intent.

Args:
iam_client: A boto3 IAM client.
lex_client: A boto3 lexv2-models client.
sts_client: A boto3 STS client.
role_name: The name of the IAM role to create for the bot.
bot_name: The name of the Lex bot to create.

Returns:
The bot ID.
"""
account_id = sts_client.get_caller_identity()["Account"]
role_arn = f"arn:aws:iam::{account_id}:role/{role_name}"

# Create IAM role for the bot
trust_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {"Service": "lexv2.amazonaws.com"},
"Action": "sts:AssumeRole",
}
],
}
iam_client.create_role(
RoleName=role_name,
AssumeRolePolicyDocument=json.dumps(trust_policy),
Tags=_TAGS,
)

# Create bot
response = lex_client.create_bot(
botName=bot_name,
roleArn=role_arn,
dataPrivacy={"childDirected": False},
# 5-minute idle timeout is sufficient for integration tests.
idleSessionTTLInSeconds=300,
botTags={t["Key"]: t["Value"] for t in _TAGS},
)
bot_id = response["botId"]
lex_client.get_waiter("bot_available").wait(botId=bot_id)

# Create locale
lex_client.create_bot_locale(
botId=bot_id,
botVersion="DRAFT",
localeId=LOCALE_ID,
# Required field. Confidence threshold (0-1) that determines when Lex
# inserts AMAZON.FallbackIntent into the interpretations list.
# 0.40 is a reasonable value for a simple test bot.
nluIntentConfidenceThreshold=0.40,
)
lex_client.get_waiter("bot_locale_created").wait(
botId=bot_id, botVersion="DRAFT", localeId=LOCALE_ID
)

# Create intent
lex_client.create_intent(
intentName="Greeting",
botId=bot_id,
botVersion="DRAFT",
localeId=LOCALE_ID,
sampleUtterances=[
{"utterance": "Hello"},
{"utterance": "Hi"},
{"utterance": "Hey"},
],
intentClosingSetting={
"closingResponse": {
"messageGroups": [
{
"message": {
"plainTextMessage": {"value": "Hello! How can I help you?"}
}
}
]
},
"active": True,
},
)

# Build locale
lex_client.build_bot_locale(botId=bot_id, botVersion="DRAFT", localeId=LOCALE_ID)
lex_client.get_waiter("bot_locale_built").wait(
botId=bot_id, botVersion="DRAFT", localeId=LOCALE_ID
)

return bot_id


def _delete_lex_bot(
iam_client: Any, lex_client: Any, role_name: str, bot_id: str | None
) -> None:
"""Delete a Lex V2 bot and its associated IAM role.

Args:
iam_client: A boto3 IAM client.
lex_client: A boto3 lexv2-models client.
role_name: The name of the IAM role to delete.
bot_id: The bot ID to delete, or None if creation failed.
"""
if bot_id:
lex_client.delete_bot(botId=bot_id, skipResourceInUseCheck=True)

try:
iam_client.delete_role(RoleName=role_name)
except iam_client.exceptions.NoSuchEntityException:
pass


@pytest.fixture(scope="session")
def lex_bot():
"""Create a Lex bot for the test session and delete it after."""
unique_suffix = uuid.uuid4().hex[:16]
role_name = f"integ-test-lex-runtime-v2-role-{unique_suffix}"
bot_name = f"integ-test-lex-runtime-v2-bot-{unique_suffix}"

iam_client = boto3.client("iam")
lex_client = boto3.client("lexv2-models", region_name=REGION)
sts_client = boto3.client("sts")

bot_id = None
try:
bot_id = _create_lex_bot(
iam_client, lex_client, sts_client, role_name, bot_name
)
yield bot_id
finally:
_delete_lex_bot(iam_client, lex_client, role_name, bot_id)
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0

"""Test bidirectional streaming event stream handling."""

import asyncio
import uuid

from smithy_core.aio.eventstream import DuplexEventStream

from aws_sdk_lex_runtime_v2.models import (
StartConversationInput,
StartConversationRequestEventStream,
StartConversationRequestEventStreamConfigurationEvent,
StartConversationRequestEventStreamTextInputEvent,
StartConversationRequestEventStreamDisconnectionEvent,
StartConversationResponseEventStream,
StartConversationResponseEventStreamHeartbeatEvent,
StartConversationResponseEventStreamIntentResultEvent,
StartConversationResponseEventStreamTextResponseEvent,
StartConversationResponseEventStreamTranscriptEvent,
StartConversationOutput,
ConfigurationEvent,
TextInputEvent,
DisconnectionEvent,
)
from . import BOT_ALIAS_ID, LOCALE_ID, REGION, create_lex_client


async def _send_events(
stream: DuplexEventStream[
StartConversationRequestEventStream,
StartConversationResponseEventStream,
StartConversationOutput,
],
) -> None:
"""Send configuration, text input, and disconnection events."""
input_stream = stream.input_stream

await input_stream.send(
StartConversationRequestEventStreamConfigurationEvent(
value=ConfigurationEvent(response_content_type="text/plain; charset=utf-8")
)
)

await input_stream.send(
StartConversationRequestEventStreamTextInputEvent(
value=TextInputEvent(text="Hello")
)
)

await asyncio.sleep(3)

await input_stream.send(
StartConversationRequestEventStreamDisconnectionEvent(
value=DisconnectionEvent()
)
)

await input_stream.close()


async def _receive_events(
stream: DuplexEventStream[
StartConversationRequestEventStream,
StartConversationResponseEventStream,
StartConversationOutput,
],
) -> tuple[bool, bool, bool, bool]:
"""Receive and collect output from the stream.

Returns:
Tuple of (got_transcript, got_intent_result, got_text_response, got_heartbeat)
"""
got_transcript = False
got_intent_result = False
got_text_response = False
got_heartbeat = False

_, output_stream = await stream.await_output()
if output_stream is None:
return got_transcript, got_intent_result, got_text_response, got_heartbeat

async for event in output_stream:
if isinstance(event, StartConversationResponseEventStreamTranscriptEvent):
got_transcript = True
assert event.value.event_id is not None
assert event.value.transcript == "Hello"
elif isinstance(event, StartConversationResponseEventStreamIntentResultEvent):
got_intent_result = True
assert event.value.event_id is not None
assert event.value.input_mode == "Text"
assert event.value.session_id is not None
assert event.value.session_state is not None
assert event.value.session_state.intent is not None
assert event.value.session_state.intent.name == "Greeting"
assert event.value.session_state.intent.state == "Fulfilled"
assert event.value.interpretations is not None
assert len(event.value.interpretations) == 2
interps_by_name = {
i.intent.name: i for i in event.value.interpretations if i.intent
}
assert "Greeting" in interps_by_name
assert "FallbackIntent" in interps_by_name
assert interps_by_name["Greeting"].nlu_confidence is not None
assert interps_by_name["Greeting"].nlu_confidence.score == 1.0
elif isinstance(event, StartConversationResponseEventStreamTextResponseEvent):
got_text_response = True
assert event.value.event_id is not None
assert event.value.messages is not None
assert len(event.value.messages) == 1
msg = event.value.messages[0]
assert msg.content_type == "PlainText"
assert msg.content == "Hello! How can I help you?"
elif isinstance(event, StartConversationResponseEventStreamHeartbeatEvent):
got_heartbeat = True
assert event.value.event_id is not None
else:
raise RuntimeError(
f"Received unexpected event type in stream: {type(event).__name__}"
)

return got_transcript, got_intent_result, got_text_response, got_heartbeat


async def test_start_conversation(lex_bot: str) -> None:
"""Test bidirectional streaming StartConversation operation."""
client = create_lex_client(REGION)

stream = await client.start_conversation(
input=StartConversationInput(
bot_id=lex_bot,
bot_alias_id=BOT_ALIAS_ID,
locale_id=LOCALE_ID,
session_id=str(uuid.uuid4()),
conversation_mode="TEXT",
)
)

results = await asyncio.gather(_send_events(stream), _receive_events(stream))

got_transcript, got_intent_result, got_text_response, got_heartbeat = results[1]
assert got_transcript, "Expected to receive a TranscriptEvent"
assert got_intent_result, "Expected to receive an IntentResultEvent"
assert got_text_response, "Expected to receive a TextResponseEvent"
assert got_heartbeat, "Expected to receive a HeartbeatEvent"
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0

"""Test non-streaming output type handling."""

import uuid

from aws_sdk_lex_runtime_v2.models import RecognizeTextInput, RecognizeTextOutput
from . import BOT_ALIAS_ID, LOCALE_ID, REGION, create_lex_client


async def test_recognize_text(lex_bot: str) -> None:
"""Test non-streaming RecognizeText operation."""
client = create_lex_client(REGION)
response = await client.recognize_text(
input=RecognizeTextInput(
bot_id=lex_bot,
bot_alias_id=BOT_ALIAS_ID,
locale_id=LOCALE_ID,
session_id=str(uuid.uuid4()),
text="Hello",
)
)

assert isinstance(response, RecognizeTextOutput)
assert response.session_id is not None

# Verify messages
assert response.messages is not None
assert len(response.messages) == 1
msg = response.messages[0]
assert msg.content_type == "PlainText"
assert msg.content == "Hello! How can I help you?"

# Verify session state
assert response.session_state is not None
assert response.session_state.intent is not None
assert response.session_state.intent.name == "Greeting"
assert response.session_state.intent.state == "Fulfilled"

# Verify interpretations
assert response.interpretations is not None
assert len(response.interpretations) == 2
interps_by_name = {i.intent.name: i for i in response.interpretations if i.intent}
assert "Greeting" in interps_by_name
assert "FallbackIntent" in interps_by_name
assert interps_by_name["Greeting"].nlu_confidence is not None
assert interps_by_name["Greeting"].nlu_confidence.score == 1.0