Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 67 additions & 6 deletions python/rcs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,87 @@
"""Robot control stack python bindings."""

import io
import os
import zipfile
from collections.abc import Callable
from dataclasses import dataclass
from pathlib import Path, PurePosixPath
from typing import Any

import numpy as np
import requests
from rcs._core import __version__, common

from rcs import camera, envs, hand, sim

GITHUB_ASSET_ARCHIVE_URL = "https://github.com/RobotControlStack/robot-control-stack/archive/refs/tags/{tag}.zip"

def _rcs_prefix() -> str:
env_prefix = os.environ.get("RCS_PREFIX")

def download_assets(
version: str,
prefix: Path,
github_asset_archive_url: str,
assets_dirname: str = "assets",
) -> None:
tag = f"v{version.split('+', maxsplit=1)[0]}"
response = requests.get(github_asset_archive_url.format(tag=tag), timeout=60)
response.raise_for_status()

found_assets = False
with zipfile.ZipFile(io.BytesIO(response.content)) as archive:
for member in archive.infolist():
parts = PurePosixPath(member.filename).parts
if len(parts) < 2 or parts[1] != assets_dirname:
continue

found_assets = True
target = prefix.joinpath(*parts[1:])
if member.is_dir():
target.mkdir(parents=True, exist_ok=True)
else:
target.parent.mkdir(parents=True, exist_ok=True)
target.write_bytes(archive.read(member))

if not found_assets:
msg = f"Could not find {assets_dirname} in the downloaded GitHub archive."
raise FileNotFoundError(msg)


def get_prefix(
env_prefix: str | None,
editable_prefix: Path,
default_prefix: Path,
version: str,
github_asset_archive_url: str,
assets_dirname: str = "assets",
download_fn: Callable[[str, Path, str, str], None] = download_assets,
) -> str:
if env_prefix:
return os.path.abspath(env_prefix)
return os.path.abspath(os.path.join(os.path.dirname(__file__), "../../"))
prefix = Path(env_prefix).expanduser().resolve()
else:
editable_prefix = editable_prefix.resolve()
if (editable_prefix / assets_dirname).is_dir():
return str(editable_prefix)
prefix = default_prefix.resolve()

assets_dir = prefix / assets_dirname
if not assets_dir.is_dir():
prefix.mkdir(parents=True, exist_ok=True)
print(f"Assets not found at {assets_dir}, downloading them now.")
download_fn(version, prefix, github_asset_archive_url, assets_dirname)

return str(prefix)


RCS_PREFIX = _rcs_prefix()
RCS_PREFIX = get_prefix(
os.environ.get("RCS_PREFIX"),
Path(__file__).resolve().parents[2],
Path.home() / ".rcs",
__version__,
GITHUB_ASSET_ARCHIVE_URL,
)


# TODO: assets must be "downloaded" first time this is imported
@dataclass(kw_only=True)
class RobotMetaConfig:

Expand Down
Loading