Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 28 additions & 3 deletions client_tools/client_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,8 @@ def generate_models():
client_module_path.mkdir(parents=True, exist_ok=True)

model_code = AUTO_GEN_WARNING + '''from typing import Any, Dict, Optional
from twinkle_client.http import http_post
import time
from twinkle_client.http import http_get, http_post
from twinkle_client.types.model import (
CalculateLossResponse,
CalculateMetricResponse,
Expand Down Expand Up @@ -673,25 +674,49 @@ def upload_to_hub(
hub_model_id: str,
hub_token: Optional[str] = None,
async_upload: bool = True,
poll_interval: float = 5.0,
) -> None:
"""Upload model checkpoint to hub.

Submits the upload task to the server and polls for completion.
Blocks until the upload finishes or raises on failure.

Args:
checkpoint_dir: The directory path of the checkpoint to upload.
hub_model_id: The hub model id.
hub_token: The hub token (optional).
async_upload: Whether to use async upload (default: True).
async_upload: Deprecated, has no effect. The server always runs the
upload in the background and the client polls for completion.
poll_interval: Seconds between status poll requests (default: 5).
"""
response = http_post(
url=f'{self.server_url}/upload_to_hub',
json_data={
'checkpoint_dir': checkpoint_dir,
'hub_model_id': hub_model_id,
'hub_token': hub_token,
'async_upload': async_upload,
}
)
response.raise_for_status()
request_id = response.json().get('request_id')
if not request_id:
return
Comment thread
Yunnglin marked this conversation as resolved.

print(f'[upload_to_hub] Upload started (task {request_id}), waiting for completion...')
while True:
status_resp = http_get(url=f'{self.server_url}/upload_status/{request_id}')
status_resp.raise_for_status()
data = status_resp.json()
status = data.get('status', 'unknown')
if status == 'completed':
print(f'[upload_to_hub] Upload completed successfully.')
return
elif status == 'failed':
error = data.get('error', 'Unknown error')
raise RuntimeError(f'[upload_to_hub] Upload failed: {error}')
else:
print(f'[upload_to_hub] Status: {status}...')
time.sleep(poll_interval)
'''

# Write the model client file
Expand Down
4 changes: 4 additions & 0 deletions cookbook/client/server/megatron/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,10 @@ print_info "日志输出到: $LOG_FILE"
echo ""

# 启动服务器并实时显示日志
touch "$LOG_FILE" # 预创建文件,避免 tail -f 在文件尚未写入时报错
nohup python -m twinkle.server --config "$SERVER_CONFIG_FILE" > "$LOG_FILE" 2>&1 &
SERVER_PID=$!
print_success "Twinkle Server 已启动 (PID: $SERVER_PID)"

# 实时显示日志(阻塞进程)
tail -f "$LOG_FILE"
Comment thread
Yunnglin marked this conversation as resolved.
2 changes: 1 addition & 1 deletion cookbook/client/server/megatron/server_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ applications:
use_megatron: true # Use Megatron-LM backend
model_id: "ms://Qwen/Qwen3.6-35B-A3B" # ModelScope model identifier
max_length: 32000 # model max length
max_loras: 5 # model max loras
max_loras: 3 # model max loras
nproc_per_node: 4 # Number of GPU processes per node
device_group:
name: model
Expand Down
62 changes: 62 additions & 0 deletions cookbook/client/tinker/self_host/upload_to_hub.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# Tinker-Compatible Client - Upload Checkpoint to Hub Example
#
# This script demonstrates how to upload a Tinker checkpoint to ModelScope Hub.
# Tinker checkpoints use the same twinkle:// path format as Twinkle checkpoints,
# so the upload is handled identically via the Twinkle upload interface.
#
# How it works:
# 1. The server submits the upload as a background task and returns a
# request_id immediately, so the HTTP call never times out.
# 2. The client polls /upload_status/{request_id} every few seconds and
# blocks until the upload completes or raises on failure.
#
# Prerequisites:
# - Server must be running (see server.py / server_config.yaml)
# - A ModelScope API token with write access to the target repository

import dotenv

dotenv.load_dotenv('.env')

from twinkle import get_logger, init_twinkle_client
from twinkle_client.model import MultiLoraTransformersModel

logger = get_logger()

# ── Configuration ─────────────────────────────────────────────────────────────
base_model = 'Qwen/Qwen3.5-4B'
base_url = 'http://localhost:8000'
api_key = 'EMPTY_TOKEN' # token used for model training / server access

# Checkpoint to upload: the twinkle:// path returned by training_client.save_state(),
# e.g. 'twinkle://20260301_142318-Qwen_Qwen3-4B-199d2cdb/weights/my-lora-epoch-0'
tinker_path = 'twinkle://REPLACE_ME/weights/REPLACE_ME'

# ModelScope destination (must belong to your account)
hub_model_id = 'your_username/your-model-name'
hub_token = None # Set to your ModelScope API token, or None to use server default
# ── End of configuration ──────────────────────────────────────────────────────


def upload():
# Step 1: Initialize the Twinkle client.
# Tinker checkpoints (twinkle:// paths) are resolved by the same checkpoint
# manager on the server, so init_twinkle_client is sufficient for upload.
init_twinkle_client(base_url=base_url, api_key=api_key)

# Step 2: Create the model client (no training state needed for upload)
model = MultiLoraTransformersModel(model_id=f'ms://{base_model}')

# Step 3: Upload checkpoint to ModelScope Hub.
# The client polls for completion automatically; progress is printed to stdout.
logger.info(f'Uploading {tinker_path!r} → {hub_model_id!r} ...')
model.upload_to_hub(
checkpoint_dir=tinker_path,
hub_model_id=hub_model_id,
hub_token=hub_token,
)
logger.info(f'Upload complete: https://modelscope.cn/models/{hub_model_id}')


if __name__ == '__main__':
upload()
61 changes: 61 additions & 0 deletions cookbook/client/twinkle/self_host/upload_to_hub.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# Twinkle Client - Upload Checkpoint to Hub Example
#
# This script demonstrates how to upload a saved checkpoint to ModelScope Hub
# using the Twinkle client. No training is required: any existing checkpoint
# (obtained from a previous run via model.save()) can be uploaded directly.
#
# How it works:
# 1. The server submits the upload as a background task and returns a
# request_id immediately, so the HTTP call never times out.
# 2. The client polls /upload_status/{request_id} every few seconds and
# blocks until the upload completes or raises on failure.
#
# Prerequisites:
# - Server must be running (see server.py / server_config.yaml)
# - A ModelScope API token with write access to the target repository

import dotenv

dotenv.load_dotenv('.env')

from twinkle import get_logger, init_twinkle_client
from twinkle_client.model import MultiLoraTransformersModel

logger = get_logger()

# ── Configuration ─────────────────────────────────────────────────────────────
base_model = 'Qwen/Qwen3.5-4B'
base_url = 'http://localhost:8000'
api_key = 'EMPTY_TOKEN' # token used for model training / server access

# Checkpoint to upload: either a twinkle:// path or a local directory path.
# Example twinkle:// path (from model.save()):
# 'twinkle://20260410_131831-Qwen_Qwen3_5-4B-85279a20/weights/my-checkpoint'
twinkle_path = 'twinkle://REPLACE_ME/weights/REPLACE_ME'

# ModelScope destination (must belong to your account)
hub_model_id = 'your_username/your-model-name'
hub_token = None # Set to your ModelScope API token, or None to use server default
# ── End of configuration ──────────────────────────────────────────────────────


def upload():
# Step 1: Initialize the Twinkle client
init_twinkle_client(base_url=base_url, api_key=api_key)

# Step 2: Create the model client (no training state needed for upload)
model = MultiLoraTransformersModel(model_id=f'ms://{base_model}')

# Step 3: Upload checkpoint to ModelScope Hub.
# The client polls for completion automatically; progress is printed to stdout.
logger.info(f'Uploading {twinkle_path!r} → {hub_model_id!r} ...')
model.upload_to_hub(
checkpoint_dir=twinkle_path,
hub_model_id=hub_model_id,
hub_token=hub_token,
)
logger.info(f'Upload complete: https://modelscope.cn/models/{hub_model_id}')


if __name__ == '__main__':
upload()
26 changes: 16 additions & 10 deletions src/twinkle/hub/hub.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,17 +374,23 @@ def push_to_hub(cls,
ignore_patterns = []
if revision is None or revision == 'main':
revision = 'master'
result = push_to_hub(
repo_id,
folder_path,
token or cls.ms_token,
private,
commit_message=commit_message,
ignore_file_pattern=ignore_patterns,
revision=revision,
tag=path_in_repo)
try:
result = push_to_hub(
repo_id,
folder_path,
token or cls.ms_token,
private,
commit_message=commit_message,
ignore_file_pattern=ignore_patterns,
revision=revision,
tag=path_in_repo)
except Exception as exc:
raise RuntimeError(f'ModelScope push_to_hub raised an exception '
f'(repo_id={repo_id!r}, folder_path={folder_path!r}): {exc}') from exc
if not result:
raise Exception('Failed to push to hub')
raise RuntimeError(f'ModelScope push_to_hub returned a falsy result '
f'(repo_id={repo_id!r}, folder_path={folder_path!r}). '
f'This usually indicates an invalid/expired token or insufficient write permission.')

@classmethod
def load_dataset(cls,
Expand Down
1 change: 1 addition & 0 deletions src/twinkle/model/megatron/megatron.py
Original file line number Diff line number Diff line change
Expand Up @@ -1337,6 +1337,7 @@ def set_template(self, template_cls: Union[Template, Type[Template], str], **kwa
"""
adapter_name = kwargs.pop('adapter_name', self._get_default_group())
optimizer_config = self.optimizer_group[adapter_name]
kwargs['model_id'] = self.tokenizer_id
optimizer_config.template = construct_class(template_cls, Template, twinkle.template, **kwargs)

@remote_function(dispatch='all')
Expand Down
37 changes: 32 additions & 5 deletions src/twinkle/server/model/twinkle_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
"""
from __future__ import annotations

import asyncio
import torch
import traceback
from fastapi import Depends, FastAPI, HTTPException, Request
Expand Down Expand Up @@ -347,12 +348,12 @@ async def _task():

await run_task(self.schedule_task_and_wait(_task, task_type='load'))

@app.post('/twinkle/upload_to_hub')
@app.post('/twinkle/upload_to_hub', response_model=types.UploadToHubResponse)
async def upload_to_hub(
request: Request,
body: types.UploadToHubRequest,
self: ModelManagement = Depends(self_fn),
) -> None:
) -> types.UploadToHubResponse:
token = await self._on_request_start(request)

async def _task():
Expand All @@ -370,13 +371,39 @@ async def _task():
checkpoint_manager.get_ckpt_dir(model_id=model_id_to_load, checkpoint_id=checkpoint_id))
else:
checkpoint_dir = body.checkpoint_dir
self.model.upload_to_hub(
# Run blocking upload in thread pool so the event loop is not blocked.
# async_upload is intentionally ignored here: the task queue + client polling
# already provide the fire-and-forget / wait semantics without holding the
# HTTP connection open for the full duration of the upload.
await asyncio.to_thread(
self.model.upload_to_hub,
checkpoint_dir=checkpoint_dir,
hub_model_id=body.hub_model_id,
hub_token=body.hub_token or token,
async_upload=body.async_upload)
async_upload=False,
)

future_ref = await self.schedule_task(_task, task_type='upload_to_hub')
request_id = future_ref.get('request_id')
if request_id is None:
raise HTTPException(status_code=500, detail=f'Upload task scheduling failed: {future_ref}')
return types.UploadToHubResponse(request_id=request_id)

await run_task(self.schedule_task_and_wait(_task, task_type='upload_to_hub'))
@app.get('/twinkle/upload_status/{request_id}', response_model=types.UploadStatusResponse)
async def upload_status(
request: Request,
request_id: str,
self: ModelManagement = Depends(self_fn),
) -> types.UploadStatusResponse:
await self._on_request_start(request)
record = await self.state.get_future(request_id)
if record is None:
raise HTTPException(status_code=404, detail=f'Upload task not found: {request_id}')
status = record.get('status', 'unknown')
error = None
if status == 'failed':
error = record.get('result', {}).get('error', 'Unknown error')
Comment thread
Yunnglin marked this conversation as resolved.
return types.UploadStatusResponse(request_id=request_id, status=status, error=error)

@app.post('/twinkle/add_adapter_to_model', response_model=types.AddAdapterResponse)
async def add_adapter_to_model(
Expand Down
13 changes: 13 additions & 0 deletions src/twinkle_client/dataset/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,19 @@ def check(self, **kwargs):
return response.json()["result"]


def cast_column(self, column: str, decode: bool = True):
response = http_post(
url=f'{self.server_url}/call',
json_data={
'processor_id': self.processor_id,
'function': 'cast_column',
**{'column': column, 'decode': decode},
}
)
response.raise_for_status()
return response.json()["result"]


def map(self, preprocess_func: Union[Preprocessor, Callable, str, Type[Preprocessor]], dataset_meta: DatasetMeta = None, init_args: Dict[str, Any] = None, **kwargs):
response = http_post(
url=f'{self.server_url}/call',
Expand Down
Loading
Loading