Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 75 additions & 42 deletions examples/create_workspace_with_landscape.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,59 +3,92 @@

from codesphere import CodesphereSDK
from codesphere.resources.workspace import WorkspaceCreate
from codesphere.resources.workspace.landscape import ProfileBuilder, ProfileConfig
from codesphere.resources.workspace.landscape import (
PipelineStage,
PipelineState,
ProfileBuilder,
)
from codesphere.resources.workspace.logs import LogStage

TEAM_ID = 123 # Replace with your actual team ID
TEAM_ID = 123


async def get_plan_id(sdk: CodesphereSDK, plan_name: str = "Micro") -> int:
plans = await sdk.metadata.list_plans()
plan = next((p for p in plans if p.title == plan_name and not p.deprecated), None)
if not plan:
raise ValueError(f"Plan '{plan_name}' not found")
return plan.id
async def main():
async with CodesphereSDK() as sdk:
plans = await sdk.metadata.list_plans()
plan = next((p for p in plans if p.title == "Micro" and not p.deprecated), None)
if not plan:
raise ValueError("Micro plan not found")

workspace_name = f"pipeline-demo-{int(time.time())}"

print(f"Creating workspace '{workspace_name}'...")
workspace = await sdk.workspaces.create(
WorkspaceCreate(plan_id=plan.id, team_id=TEAM_ID, name=workspace_name)
)
print(f"✓ Workspace created (ID: {workspace.id})")

print("Waiting for workspace to start...")
await workspace.wait_until_running(timeout=300.0, poll_interval=5.0)
print("✓ Workspace is running\n")

profile = (
ProfileBuilder()
.prepare()
.add_step("echo 'Installing dependencies...' && sleep 2")
.add_step("echo 'Setup complete!' && sleep 1")
.done()
.add_reactive_service("web")
.plan(plan.id)
.add_step(
'for i in $(seq 1 20); do echo "[$i] Processing request..."; sleep 1; done'
)
.add_port(3000, public=True)
.add_path("/", port=3000)
.replicas(1)
.done()
.build()
)

def build_web_profile(plan_id: int) -> ProfileConfig:
"""Build a simple web service landscape profile."""
return (
ProfileBuilder()
.prepare()
.add_step("npm install", name="Install dependencies")
.done()
.add_reactive_service("web")
.plan(plan_id)
.add_step("npm start")
.add_port(3000, public=True)
.add_path("/", port=3000)
.replicas(1)
.env("NODE_ENV", "production")
.build()
)
print("Deploying landscape profile...")
await workspace.landscape.save_profile("production", profile)
await workspace.landscape.deploy(profile="production")
print("✓ Profile deployed\n")

print("--- Prepare Stage ---")
await workspace.landscape.start_stage(
PipelineStage.PREPARE, profile="production"
)
prepare_status = await workspace.landscape.wait_for_stage(
PipelineStage.PREPARE, timeout=60.0
)

async def create_workspace(sdk: CodesphereSDK, plan_id: int, name: str):
workspace = await sdk.workspaces.create(
WorkspaceCreate(plan_id=plan_id, team_id=TEAM_ID, name=name)
)
await workspace.wait_until_running(timeout=300.0, poll_interval=5.0)
return workspace
for status in prepare_status:
icon = "✓" if status.state == PipelineState.SUCCESS else "✗"
print(f"{icon} {status.server}: {status.state.value}")

print("\nPrepare logs:")
for step in range(len(prepare_status[0].steps)):
logs = await workspace.logs.collect(
stage=LogStage.PREPARE, step=step, timeout=5.0
)
for entry in logs:
if entry.get_text():
print(f" {entry.get_text().strip()}")

async def deploy_landscape(workspace, profile: dict, profile_name: str = "production"):
await workspace.landscape.save_profile(profile_name, profile)
await workspace.landscape.deploy(profile=profile_name)
print("Deployment started!")
print("\n--- Run Stage ---")
await workspace.landscape.start_stage(PipelineStage.RUN, profile="production")
print("Started run stage\n")

print("Streaming logs from 'web' service (using context manager):")
count = 0
async with workspace.logs.open_server_stream(step=0, server="web") as stream:
async for entry in stream:
if entry.get_text():
print(f" {entry.get_text().strip()}")
count += 1

async def main():
async with CodesphereSDK() as sdk:
plan_id = await get_plan_id(sdk)
workspace = await create_workspace(
sdk, plan_id, f"landscape-demo-{int(time.time())}"
)
profile = build_web_profile(plan_id)
await deploy_landscape(workspace, profile)
print(f"\n✓ Stream ended ({count} log entries)")


if __name__ == "__main__":
Expand Down
8 changes: 5 additions & 3 deletions src/codesphere/core/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
from .base import ResourceBase
from .operations import APIOperation, AsyncCallable
from .handler import _APIOperationExecutor, APIRequestHandler
from .base import CamelModel, ResourceBase
from .handler import APIRequestHandler, _APIOperationExecutor
from .operations import APIOperation, AsyncCallable, StreamOperation

__all__ = [
"CamelModel",
"ResourceBase",
"APIOperation",
"_APIOperationExecutor",
"APIRequestHandler",
"AsyncCallable",
"StreamOperation",
]
14 changes: 11 additions & 3 deletions src/codesphere/core/operations.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,25 @@
from typing import Callable, Awaitable, Generic, Optional, Type, TypeAlias, TypeVar

from pydantic import BaseModel
from typing import Awaitable, Callable, Generic, Optional, Type, TypeAlias, TypeVar

from pydantic import BaseModel, ConfigDict

_T = TypeVar("_T")
ResponseT = TypeVar("ResponseT")
InputT = TypeVar("InputT")
EntryT = TypeVar("EntryT")

AsyncCallable: TypeAlias = Callable[[], Awaitable[_T]]


class APIOperation(BaseModel, Generic[ResponseT, InputT]):
model_config = ConfigDict(arbitrary_types_allowed=True, frozen=True)

method: str
endpoint_template: str
response_model: Type[ResponseT]
input_model: Optional[Type[InputT]] = None


class StreamOperation(BaseModel, Generic[EntryT]):
model_config = ConfigDict(arbitrary_types_allowed=True, frozen=True)
endpoint_template: str
entry_model: Type[EntryT]
6 changes: 6 additions & 0 deletions src/codesphere/resources/workspace/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from .git import GitHead, WorkspaceGitManager
from .logs import LogEntry, LogProblem, LogStage, LogStream, WorkspaceLogManager
from .resources import WorkspacesResource
from .schemas import (
CommandInput,
Expand All @@ -19,4 +20,9 @@
"CommandOutput",
"WorkspaceGitManager",
"GitHead",
"LogStream",
"WorkspaceLogManager",
"LogEntry",
"LogProblem",
"LogStage",
]
10 changes: 10 additions & 0 deletions src/codesphere/resources/workspace/landscape/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
ManagedServiceConfig,
NetworkConfig,
PathConfig,
PipelineStage,
PipelineState,
PipelineStatus,
PipelineStatusList,
PortConfig,
Profile,
ProfileBuilder,
Expand All @@ -12,6 +16,7 @@
ReactiveServiceConfig,
StageConfig,
Step,
StepStatus,
)

__all__ = [
Expand All @@ -28,4 +33,9 @@
"NetworkConfig",
"PortConfig",
"PathConfig",
"PipelineStage",
"PipelineState",
"PipelineStatus",
"PipelineStatusList",
"StepStatus",
]
104 changes: 103 additions & 1 deletion src/codesphere/resources/workspace/landscape/models.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import asyncio
import logging
import re
from typing import TYPE_CHECKING, Dict, List, Optional, Union
Expand All @@ -10,10 +11,20 @@
from .operations import (
_DEPLOY_OP,
_DEPLOY_WITH_PROFILE_OP,
_GET_PIPELINE_STATUS_OP,
_SCALE_OP,
_START_PIPELINE_STAGE_OP,
_START_PIPELINE_STAGE_WITH_PROFILE_OP,
_STOP_PIPELINE_STAGE_OP,
_TEARDOWN_OP,
)
from .schemas import Profile, ProfileConfig
from .schemas import (
PipelineStage,
PipelineState,
PipelineStatusList,
Profile,
ProfileConfig,
)

if TYPE_CHECKING:
from ..schemas import CommandOutput
Expand Down Expand Up @@ -95,3 +106,94 @@ async def teardown(self) -> None:

async def scale(self, services: Dict[str, int]) -> None:
await self._execute_operation(_SCALE_OP, data=services)

async def start_stage(
self,
stage: Union[PipelineStage, str],
profile: Optional[str] = None,
) -> None:
if isinstance(stage, PipelineStage):
stage = stage.value

if profile is not None:
_validate_profile_name(profile)
await self._execute_operation(
_START_PIPELINE_STAGE_WITH_PROFILE_OP, stage=stage, profile=profile
)
else:
await self._execute_operation(_START_PIPELINE_STAGE_OP, stage=stage)

async def stop_stage(self, stage: Union[PipelineStage, str]) -> None:
if isinstance(stage, PipelineStage):
stage = stage.value

await self._execute_operation(_STOP_PIPELINE_STAGE_OP, stage=stage)

async def get_stage_status(
self, stage: Union[PipelineStage, str]
) -> PipelineStatusList:
if isinstance(stage, PipelineStage):
stage = stage.value

return await self._execute_operation(_GET_PIPELINE_STATUS_OP, stage=stage)

async def wait_for_stage(
self,
stage: Union[PipelineStage, str],
*,
timeout: float = 300.0,
poll_interval: float = 5.0,
server: Optional[str] = None,
) -> PipelineStatusList:
if poll_interval <= 0:
raise ValueError("poll_interval must be greater than 0")

stage_name = stage.value if isinstance(stage, PipelineStage) else stage
elapsed = 0.0

while elapsed < timeout:
status_list = await self.get_stage_status(stage)

relevant_statuses = []
for s in status_list:
if server is not None:
if s.server == server:
relevant_statuses.append(s)
else:
if s.steps:
relevant_statuses.append(s)
elif s.state != PipelineState.WAITING:
relevant_statuses.append(s)

if not relevant_statuses:
log.debug(
"Pipeline stage '%s': no servers with steps yet, waiting...",
stage_name,
)
await asyncio.sleep(poll_interval)
elapsed += poll_interval
continue

all_completed = all(
s.state
in (PipelineState.SUCCESS, PipelineState.FAILURE, PipelineState.ABORTED)
for s in relevant_statuses
)

if all_completed:
log.debug("Pipeline stage '%s' completed.", stage_name)
return PipelineStatusList(root=relevant_statuses)

states = [f"{s.server}={s.state.value}" for s in relevant_statuses]
log.debug(
"Pipeline stage '%s' status: %s (elapsed: %.1fs)",
stage_name,
", ".join(states),
elapsed,
)
await asyncio.sleep(poll_interval)
elapsed += poll_interval

raise TimeoutError(
f"Pipeline stage '{stage_name}' did not complete within {timeout} seconds."
)
25 changes: 25 additions & 0 deletions src/codesphere/resources/workspace/landscape/operations.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from ....core.operations import APIOperation
from .schemas import PipelineStatusList

_DEPLOY_OP = APIOperation(
method="POST",
Expand All @@ -23,3 +24,27 @@
endpoint_template="/workspaces/{id}/landscape/scale",
response_model=type(None),
)

_START_PIPELINE_STAGE_OP = APIOperation(
method="POST",
endpoint_template="/workspaces/{id}/pipeline/{stage}/start",
response_model=type(None),
)

_START_PIPELINE_STAGE_WITH_PROFILE_OP = APIOperation(
method="POST",
endpoint_template="/workspaces/{id}/pipeline/{stage}/start/{profile}",
response_model=type(None),
)

_STOP_PIPELINE_STAGE_OP = APIOperation(
method="POST",
endpoint_template="/workspaces/{id}/pipeline/{stage}/stop",
response_model=type(None),
)

_GET_PIPELINE_STATUS_OP = APIOperation(
method="GET",
endpoint_template="/workspaces/{id}/pipeline/{stage}",
response_model=PipelineStatusList,
)
Loading