diff --git a/docs/docusaurus.config.ts b/docs/docusaurus.config.ts index dc8ee2b..105e09c 100644 --- a/docs/docusaurus.config.ts +++ b/docs/docusaurus.config.ts @@ -76,7 +76,7 @@ const config: Config = { {to: '/docs/sdk/python', label: 'Python', position: 'left'}, {to: '/docs/sdk/rust', label: 'Rust', position: 'left'}, {to: '/docs/intro', label: 'Documentation', position: 'left'}, - {to: '/blog', label: 'Blog', position: 'left'}, + // {to: '/blog', label: 'Blog', position: 'left'}, ], }, prism: { diff --git a/docs/src/components/GitHubStats/index.tsx b/docs/src/components/GitHubStats/index.tsx new file mode 100644 index 0000000..c33ea13 --- /dev/null +++ b/docs/src/components/GitHubStats/index.tsx @@ -0,0 +1,106 @@ +import React, { useState, useEffect, useCallback } from 'react'; +import styles from './styles.module.css'; + +const OWNER = 'vectorlessflow'; +const REPO = 'vectorless'; + +function formatNumber(num: number | null): string { + if (num === null) return '—'; + return num.toLocaleString(); +} + +async function fetchRepoBasics(): Promise { + const resp = await fetch(`https://api.github.com/repos/${OWNER}/${REPO}`, { + headers: { Accept: 'application/vnd.github.v3+json' }, + }); + if (!resp.ok) throw new Error(`HTTP ${resp.status}`); + const data = await resp.json(); + return data.stargazers_count ?? 0; +} + +async function fetchOpenIssues(): Promise { + const resp = await fetch( + `https://api.github.com/search/issues?q=repo:${OWNER}/${REPO}+type:issue+state:open&per_page=1`, + { headers: { Accept: 'application/vnd.github.v3+json' } }, + ); + if (!resp.ok) throw new Error(`Issues: ${resp.status}`); + const data = await resp.json(); + return data.total_count ?? 0; +} + +async function fetchOpenPRs(): Promise { + const resp = await fetch( + `https://api.github.com/search/issues?q=repo:${OWNER}/${REPO}+type:pr+state:open&per_page=1`, + { headers: { Accept: 'application/vnd.github.v3+json' } }, + ); + if (!resp.ok) throw new Error(`PR: ${resp.status}`); + const data = await resp.json(); + return data.total_count ?? 0; +} + +export default function GitHubStats(): React.ReactElement { + const [stars, setStars] = useState(null); + const [issues, setIssues] = useState(null); + const [prs, setPrs] = useState(null); + const [error, setError] = useState(false); + + const load = useCallback(async () => { + setStars(null); + setIssues(null); + setPrs(null); + setError(false); + try { + const [s, i, p] = await Promise.all([fetchRepoBasics(), fetchOpenIssues(), fetchOpenPRs()]); + setStars(s); + setIssues(i); + setPrs(p); + } catch { + setError(true); + } + }, []); + + useEffect(() => { + load(); + const t = setInterval(load, 300000); + return () => clearInterval(t); + }, [load]); + + return ( +
window.open('https://github.com/vectorlessflow/vectorless', '_blank', 'noopener,noreferrer')} + role="link" + tabIndex={0} + onKeyDown={(e) => { if (e.key === 'Enter') window.open('https://github.com/vectorlessflow/vectorless', '_blank', 'noopener,noreferrer'); }}> +
+
+
+ + Stars +
+
+ {error ? '—' : formatNumber(stars)} +
+
+
+
+ + Issues +
+
+ {error ? '—' : formatNumber(issues)} +
+
+
+
+ + PRs +
+
+ {error ? '—' : formatNumber(prs)} +
+
+
+
+ ); +} diff --git a/docs/src/components/GitHubStats/styles.module.css b/docs/src/components/GitHubStats/styles.module.css new file mode 100644 index 0000000..f97ece0 --- /dev/null +++ b/docs/src/components/GitHubStats/styles.module.css @@ -0,0 +1,61 @@ +.widget { + background: var(--bg-offset); + backdrop-filter: blur(12px); + border-radius: 1.2rem; + border: 1px solid var(--border); + padding: 0.9rem 1.2rem; + width: 130px; + min-width: 130px; + box-sizing: border-box; + transition: all 0.2s ease; + box-shadow: 0 8px 20px -6px rgba(0, 0, 0, 0.1); + cursor: pointer; +} + +.widget:hover { + border-color: var(--primary); +} + +.statList { + display: flex; + flex-direction: column; + gap: 0.5rem; +} + +.statRow { + display: flex; + align-items: center; + justify-content: space-between; + gap: 1rem; + padding: 0.4rem 0; +} + +.statLabel { + display: flex; + align-items: center; + gap: 0.5rem; + font-weight: 500; + font-size: 0.8rem; + color: var(--text-light); + white-space: nowrap; + flex-shrink: 0; +} + +.statNumber { + font-family: 'JetBrains Mono', monospace; + font-size: 0.9rem; + font-weight: 700; + letter-spacing: -0.3px; + color: var(--text); + white-space: nowrap; + margin-left: auto; +} + +.errorText { color: #F87171; } + +@media (max-width: 880px) { + .widget { + width: 100%; + min-width: unset; + } +} diff --git a/docs/src/css/custom.css b/docs/src/css/custom.css index 179c95c..e808cee 100644 --- a/docs/src/css/custom.css +++ b/docs/src/css/custom.css @@ -72,6 +72,21 @@ padding: 0 !important; } +nav.navbar, +.navbar--fixed-top { + border-bottom: none !important; + box-shadow: none !important; +} + +.navbar::after, +.navbar::before { + display: none !important; +} + +.navbar-sidebar { + background-color: var(--bg) !important; +} + .navbar__inner { height: 68px !important; max-width: 1280px; diff --git a/docs/src/pages/index.module.css b/docs/src/pages/index.module.css index acbe8e1..233e81e 100644 --- a/docs/src/pages/index.module.css +++ b/docs/src/pages/index.module.css @@ -9,19 +9,24 @@ padding: 0; height: calc(100vh - 68px); overflow: hidden; + position: relative; display: flex; align-items: center; justify-content: center; background-color: var(--bg); - background-image: - linear-gradient(rgba(175, 120, 139, 0.06) 1px, transparent 1px), - linear-gradient(90deg, rgba(175, 120, 139, 0.06) 1px, transparent 1px); - background-size: 48px 48px; font-family: 'Space Grotesk', sans-serif; color: var(--text); line-height: 1.5; } +/* ===== Stats widget (top-right corner) ===== */ +.statsCorner { + position: absolute; + top: 20px; + right: 32px; + z-index: 2; +} + /* ===== Hero Container ===== */ .hero { max-width: 1280px; diff --git a/docs/src/pages/index.tsx b/docs/src/pages/index.tsx index 4626d46..e9863ec 100644 --- a/docs/src/pages/index.tsx +++ b/docs/src/pages/index.tsx @@ -2,6 +2,7 @@ import type {ReactNode} from 'react'; import useDocusaurusContext from '@docusaurus/useDocusaurusContext'; import Layout from '@theme/Layout'; import Link from '@docusaurus/Link'; +import GitHubStats from '@site/src/components/GitHubStats'; import styles from './index.module.css'; @@ -34,6 +35,9 @@ function HamsterIcon({size = 14}: {size?: number}) { function HomepageHeader() { return (
+
+ +
{/* Left: Brand + Features */}
diff --git a/docs/src/theme/Navbar/index.tsx b/docs/src/theme/Navbar/index.tsx index 3acdbca..666e74f 100644 --- a/docs/src/theme/Navbar/index.tsx +++ b/docs/src/theme/Navbar/index.tsx @@ -5,7 +5,6 @@ import NavbarItem from '@theme/NavbarItem'; import NavbarMobileSidebarToggle from '@theme/Navbar/MobileSidebar/Toggle'; import useBaseUrl from '@docusaurus/useBaseUrl'; import Link from '@docusaurus/Link'; -import GitHubStar from '../../components/GitHubStar'; import type {Props as NavbarItemConfig} from '@theme/NavbarItem'; import styles from './styles.module.css'; @@ -61,9 +60,6 @@ export default function Navbar(): React.ReactElement { {centerItems.map((item, i) => )}
-
- -
diff --git a/docs/src/theme/Navbar/styles.module.css b/docs/src/theme/Navbar/styles.module.css index a2265d3..9e352f5 100644 --- a/docs/src/theme/Navbar/styles.module.css +++ b/docs/src/theme/Navbar/styles.module.css @@ -36,14 +36,10 @@ font-weight: 600; font-family: 'Inter', 'Libre Franklin', -apple-system, BlinkMacSystemFont, sans-serif; letter-spacing: -0.02em; - color: #111827; + color: var(--text); line-height: 1; } -[data-theme='dark'] .logo { - color: #D0D6E4; -} - /* Center: navigation links */ .navbarCenter { position: absolute; @@ -57,19 +53,16 @@ .navbarCenter :global(.navbar__link) { font-size: 0.875rem; font-weight: 400; - color: #374151; + color: var(--text-light); padding: 0; text-decoration: none; transition: opacity 0.15s ease; } -[data-theme='dark'] .navbarCenter :global(.navbar__link) { - color: #C8D0E0; -} - .navbarCenter :global(.navbar__link:hover) { opacity: 0.7; text-decoration: none; + color: var(--primary); } .navbarCenter :global(.navbar__link--active) { @@ -80,7 +73,7 @@ opacity: 1; } -/* Right: github star */ +/* Right: theme toggle */ .navbarRight { display: flex; align-items: center; @@ -90,11 +83,6 @@ padding-right: 24px; } -.githubStarWrapper { - display: flex; - align-items: center; -} - /* Theme toggle */ .themeToggle { display: inline-flex; @@ -105,7 +93,7 @@ border-radius: 8px; border: 1px solid var(--border); background: transparent; - color: #374151; + color: var(--text-light); cursor: pointer; transition: all 0.15s; } @@ -115,15 +103,6 @@ color: var(--primary); } -[data-theme='dark'] .themeToggle { - color: #C8D0E0; -} - -[data-theme='dark'] .themeToggle:hover { - border-color: var(--primary); - color: var(--primary); -} - @media (max-width: 996px) { .navbarBrand { padding-left: 16px; diff --git a/python/src/engine.rs b/python/src/engine.rs index 1f8ae87..2392456 100644 --- a/python/src/engine.rs +++ b/python/src/engine.rs @@ -18,6 +18,7 @@ use super::error::to_py_err; use super::graph::PyDocumentGraph; use super::metrics::PyMetricsReport; use super::results::{PyIndexResult, PyQueryResult}; +use super::streaming::PyStreamingQuery; // ============================================================ // Engine async helpers (named functions to avoid FnOnce HRTB issue) @@ -58,6 +59,11 @@ async fn run_get_graph(engine: Arc) -> PyResult> Ok(graph.map(|g| PyDocumentGraph { inner: g })) } +async fn run_query_stream(engine: Arc, ctx: QueryContext) -> PyResult { + let rx = engine.query_stream(ctx).await.map_err(to_py_err)?; + Ok(PyStreamingQuery::new(rx)) +} + fn run_metrics_report(engine: Arc) -> PyMetricsReport { PyMetricsReport { inner: engine.metrics_report(), @@ -186,6 +192,29 @@ impl PyEngine { future_into_py(py, run_query(engine, query_ctx)) } + /// Query documents with streaming progress events. + /// + /// Returns a StreamingQuery async iterator that yields real-time + /// retrieval events as dicts with a ``"type"`` key. + /// + /// Args: + /// ctx: QueryContext with query text and scope. + /// + /// Returns: + /// StreamingQuery async iterator. + /// + /// Raises: + /// VectorlessError: If query setup fails. + fn query_stream<'py>( + &self, + py: Python<'py>, + ctx: &PyQueryContext, + ) -> PyResult> { + let engine = Arc::clone(&self.inner); + let query_ctx = ctx.inner.clone(); + future_into_py(py, run_query_stream(engine, query_ctx)) + } + /// List all indexed documents. /// /// Returns: diff --git a/python/src/lib.rs b/python/src/lib.rs index a3951cd..d17c583 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -13,6 +13,7 @@ mod error; mod graph; mod metrics; mod results; +mod streaming; use config::PyConfig; use context::{PyIndexContext, PyIndexOptions, PyQueryContext}; @@ -25,6 +26,7 @@ use results::{ PyEvidenceItem, PyFailedItem, PyIndexItem, PyIndexMetrics, PyIndexResult, PyQueryMetrics, PyQueryResult, PyQueryResultItem, }; +use streaming::PyStreamingQuery; /// Vectorless - Reasoning-native document intelligence engine. /// @@ -60,6 +62,7 @@ fn _vectorless(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; m.add_class::()?; m.add("__version__", env!("CARGO_PKG_VERSION"))?; diff --git a/python/src/streaming.rs b/python/src/streaming.rs new file mode 100644 index 0000000..b8d028d --- /dev/null +++ b/python/src/streaming.rs @@ -0,0 +1,159 @@ +// Copyright (c) 2026 vectorless developers +// SPDX-License-Identifier: Apache-2.0 + +//! PyO3 streaming query wrapper. +//! +//! Bridges Rust's `mpsc::Receiver` to a Python async iterator, +//! yielding real-time retrieval progress events as dicts. + +use pyo3::exceptions::PyStopAsyncIteration; +use pyo3::prelude::*; +use pyo3::types::PyDict; +use pyo3_async_runtimes::tokio::future_into_py; +use std::sync::Arc; +use tokio::sync::{mpsc, Mutex}; + +use ::vectorless::retrieval::{RetrieveEvent, SufficiencyLevel}; + +/// Convert a `RetrieveEvent` into a Python dict with a `"type"` key. +fn event_to_dict(event: RetrieveEvent, py: Python<'_>) -> PyObject { + let dict = PyDict::new(py); + match event { + RetrieveEvent::Started { query, strategy } => { + dict.set_item("type", "started").unwrap(); + dict.set_item("query", query).unwrap(); + dict.set_item("strategy", strategy).unwrap(); + } + RetrieveEvent::StageCompleted { stage, elapsed_ms } => { + dict.set_item("type", "stage_completed").unwrap(); + dict.set_item("stage", stage).unwrap(); + dict.set_item("elapsed_ms", elapsed_ms).unwrap(); + } + RetrieveEvent::NodeVisited { + node_id, + title, + score, + } => { + dict.set_item("type", "node_visited").unwrap(); + dict.set_item("node_id", node_id).unwrap(); + dict.set_item("title", title).unwrap(); + dict.set_item("score", score).unwrap(); + } + RetrieveEvent::ContentFound { + node_id, + title, + preview, + score, + } => { + dict.set_item("type", "content_found").unwrap(); + dict.set_item("node_id", node_id).unwrap(); + dict.set_item("title", title).unwrap(); + dict.set_item("preview", preview).unwrap(); + dict.set_item("score", score).unwrap(); + } + RetrieveEvent::Backtracking { from, to, reason } => { + dict.set_item("type", "backtracking").unwrap(); + dict.set_item("from", from).unwrap(); + dict.set_item("to", to).unwrap(); + dict.set_item("reason", reason).unwrap(); + } + RetrieveEvent::SufficiencyCheck { level, tokens } => { + let level_str = match level { + SufficiencyLevel::Sufficient => "sufficient", + SufficiencyLevel::PartialSufficient => "partial_sufficient", + SufficiencyLevel::Insufficient => "insufficient", + }; + dict.set_item("type", "sufficiency_check").unwrap(); + dict.set_item("level", level_str).unwrap(); + dict.set_item("tokens", tokens).unwrap(); + } + RetrieveEvent::Completed { response } => { + dict.set_item("type", "completed").unwrap(); + dict.set_item("confidence", response.confidence).unwrap(); + dict.set_item("is_sufficient", response.is_sufficient).unwrap(); + dict.set_item("strategy_used", response.strategy_used).unwrap(); + dict.set_item("tokens_used", response.tokens_used).unwrap(); + dict.set_item("content", response.content).unwrap(); + + let results: Vec = response + .results + .into_iter() + .map(|r| { + let rd = PyDict::new(py); + rd.set_item("node_id", &r.node_id).unwrap(); + rd.set_item("title", &r.title).unwrap(); + rd.set_item("content", &r.content).unwrap(); + rd.set_item("score", r.score).unwrap(); + rd.set_item("depth", r.depth).unwrap(); + rd.into() + }) + .collect(); + dict.set_item("results", results).unwrap(); + } + RetrieveEvent::Error { message } => { + dict.set_item("type", "error").unwrap(); + dict.set_item("message", message).unwrap(); + } + } + dict.into() +} + +/// Python-facing async iterator over streaming retrieval events. +/// +/// Usage:: +/// +/// stream = await engine.query_stream(ctx) +/// async for event in stream: +/// print(event["type"]) +#[pyclass(name = "StreamingQuery")] +pub struct PyStreamingQuery { + rx: Arc>>>, +} + +impl PyStreamingQuery { + pub fn new(rx: mpsc::Receiver) -> Self { + Self { + rx: Arc::new(Mutex::new(Some(rx))), + } + } +} + +#[pymethods] +impl PyStreamingQuery { + fn __aiter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> { + slf + } + + fn __anext__<'py>(&self, py: Python<'py>) -> PyResult> { + let rx = Arc::clone(&self.rx); + future_into_py(py, async move { + let mut guard = rx.lock().await; + match guard.as_mut() { + None => Err(PyStopAsyncIteration::new_err("stream exhausted")), + Some(receiver) => match receiver.recv().await { + Some(event) => { + let is_terminal = matches!( + &event, + RetrieveEvent::Completed { .. } | RetrieveEvent::Error { .. } + ); + if is_terminal { + *guard = None; + } + // Convert to Python dict — safe because future_into_py + // ensures we're on a thread that can acquire the GIL. + let obj = Python::with_gil(|py| event_to_dict(event, py)); + Ok(obj) + } + None => { + *guard = None; + Err(PyStopAsyncIteration::new_err("stream closed")) + } + }, + } + }) + } + + fn __repr__(&self) -> String { + "StreamingQuery(...)".to_string() + } +} diff --git a/python/vectorless/__init__.py b/python/vectorless/__init__.py index 5855916..d2ff88d 100644 --- a/python/vectorless/__init__.py +++ b/python/vectorless/__init__.py @@ -14,8 +14,10 @@ # High-level API (recommended) from vectorless.session import Session +from vectorless.sync_session import SyncSession from vectorless.config import EngineConfig, load_config, load_config_from_env, load_config_from_file from vectorless.events import EventEmitter +from vectorless.streaming import StreamingQueryResult from vectorless.types import ( DocumentGraphWrapper, EdgeEvidence, @@ -38,6 +40,7 @@ __all__ = [ # Primary API "Session", + "SyncSession", # Configuration "EngineConfig", "load_config", @@ -45,6 +48,8 @@ "load_config_from_file", # Events "EventEmitter", + # Streaming + "StreamingQueryResult", # Result types "QueryResponse", "QueryResult", diff --git a/python/vectorless/_async_utils.py b/python/vectorless/_async_utils.py new file mode 100644 index 0000000..0d093bc --- /dev/null +++ b/python/vectorless/_async_utils.py @@ -0,0 +1,29 @@ +"""Shared async utilities for sync/async context bridging.""" + +from __future__ import annotations + +import asyncio +import concurrent.futures +from typing import Coroutine, TypeVar + +T = TypeVar("T") + + +def run_async(coro: Coroutine[object, object, T]) -> T: + """Run an async coroutine synchronously. + + Handles both pure-script and Jupyter (existing event loop) contexts. + + - No running event loop: uses ``asyncio.run()``. + - Running event loop (Jupyter): runs the coroutine in a new thread + with its own event loop, then waits for the result. + """ + try: + asyncio.get_running_loop() + except RuntimeError: + return asyncio.run(coro) + + # Running loop exists — offload to a new thread. + with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool: + future = pool.submit(asyncio.run, coro) + return future.result() diff --git a/python/vectorless/_compat/langchain.py b/python/vectorless/_compat/langchain.py index f277d96..21a6720 100644 --- a/python/vectorless/_compat/langchain.py +++ b/python/vectorless/_compat/langchain.py @@ -2,13 +2,13 @@ from __future__ import annotations -import asyncio from typing import Any, List, Optional from langchain_core.callbacks import CallbackManagerForRetrieverRun from langchain_core.documents import Document from langchain_core.retrievers import BaseRetriever +from vectorless._async_utils import run_async from vectorless.session import Session @@ -30,6 +30,13 @@ class VectorlessRetriever(BaseRetriever): ) docs = retriever.invoke("What is the revenue?") + + Or with an existing Session (avoids re-initializing the engine):: + + from vectorless import Session + + session = Session(api_key="sk-...", model="gpt-4o") + retriever = VectorlessRetriever(session=session, doc_ids=["doc-123"]) """ api_key: str = "" @@ -38,16 +45,20 @@ class VectorlessRetriever(BaseRetriever): doc_ids: List[str] = [] top_k: int = 3 workspace_scope: bool = False + session: Optional[Session] = None class Config: arbitrary_types_allowed = True - def _build_session(self) -> Session: - return Session( - api_key=self.api_key or None, - model=self.model or None, - endpoint=self.endpoint or None, - ) + def _get_session(self) -> Session: + """Get or lazily create a cached Session instance.""" + if self.session is None: + self.session = Session( + api_key=self.api_key or None, + model=self.model or None, + endpoint=self.endpoint or None, + ) + return self.session def _get_relevant_documents( self, @@ -56,8 +67,8 @@ def _get_relevant_documents( run_manager: Optional[CallbackManagerForRetrieverRun] = None, ) -> List[Document]: """Synchronous retrieval.""" - session = self._build_session() - response = asyncio.run( + session = self._get_session() + response = run_async( session.ask( query, doc_ids=self.doc_ids if self.doc_ids else None, @@ -73,7 +84,7 @@ async def _aget_relevant_documents( run_manager: Optional[CallbackManagerForRetrieverRun] = None, ) -> List[Document]: """Async retrieval.""" - session = self._build_session() + session = self._get_session() response = await session.ask( query, doc_ids=self.doc_ids if self.doc_ids else None, diff --git a/python/vectorless/_compat/llamaindex.py b/python/vectorless/_compat/llamaindex.py index f0cdcc1..0de9630 100644 --- a/python/vectorless/_compat/llamaindex.py +++ b/python/vectorless/_compat/llamaindex.py @@ -2,9 +2,9 @@ from __future__ import annotations -import asyncio from typing import Any, List, Optional +from vectorless._async_utils import run_async from vectorless.session import Session @@ -35,19 +35,23 @@ def __init__( doc_ids: Optional[List[str]] = None, top_k: int = 3, workspace_scope: bool = False, + session: Optional[Session] = None, ) -> None: - self._session = Session( - api_key=api_key or None, - model=model or None, - endpoint=endpoint or None, - ) + if session is not None: + self._session = session + else: + self._session = Session( + api_key=api_key or None, + model=model or None, + endpoint=endpoint or None, + ) self._doc_ids = doc_ids or [] self._top_k = top_k self._workspace_scope = workspace_scope def retrieve(self, query: str) -> List[Any]: """Synchronous retrieval, returns LlamaIndex NodeWithScore objects.""" - response = asyncio.run(self._query(query)) + response = run_async(self._query(query)) return self._to_nodes(response) async def aretrieve(self, query: str) -> List[Any]: diff --git a/python/vectorless/_core.py b/python/vectorless/_core.py index 967bcff..c83d089 100644 --- a/python/vectorless/_core.py +++ b/python/vectorless/_core.py @@ -22,6 +22,7 @@ QueryMetrics, QueryResult, QueryResultItem, + StreamingQuery, VectorlessError, WeightedKeyword, __version__, @@ -46,6 +47,7 @@ "QueryMetrics", "QueryResult", "QueryResultItem", + "StreamingQuery", "VectorlessError", "WeightedKeyword", "__version__", diff --git a/python/vectorless/session.py b/python/vectorless/session.py index 5a31c5b..0cbdc72 100644 --- a/python/vectorless/session.py +++ b/python/vectorless/session.py @@ -280,15 +280,33 @@ async def ask( async def query_stream( self, question: str, - **kwargs: Any, + *, + doc_ids: Optional[List[str]] = None, + workspace_scope: bool = False, + timeout_secs: Optional[int] = None, ) -> StreamingQueryResult: """Stream query progress as an async iterator. - Note: Currently wraps ``ask()`` and yields synthetic events. - Real-time streaming requires Rust-side ``query_stream()`` exposure. + Yields real-time events from the retrieval pipeline. + Terminal events are ``'completed'`` (with results) or ``'error'``. + + Usage:: + + stream = await session.query_stream("What is the revenue?") + async for event in stream: + print(event["type"], event) + result = stream.result """ - response = await self.ask(question, **kwargs) - return StreamingQueryResult(response) + ctx = QueryContext(question) + if doc_ids is not None: + ctx = ctx.with_doc_ids(doc_ids) + elif workspace_scope: + ctx = ctx.with_workspace() + if timeout_secs is not None: + ctx = ctx.with_timeout_secs(timeout_secs) + + raw_stream = await self._engine.query_stream(ctx) + return StreamingQueryResult(raw_stream) # ── Document Management ─────────────────────────────────── diff --git a/python/vectorless/streaming.py b/python/vectorless/streaming.py index d3109ef..9f01661 100644 --- a/python/vectorless/streaming.py +++ b/python/vectorless/streaming.py @@ -1,30 +1,30 @@ -"""Streaming query compatibility layer. +"""Streaming query results backed by real-time Rust streaming events. -Provides an async iterator interface for queries. Currently wraps the -synchronous ``query()`` and yields synthetic progress events. Real-time -streaming requires exposing ``query_stream()`` from Rust via PyO3. +Wraps the PyO3 ``StreamingQuery`` async iterator and builds a +``QueryResponse`` from the terminal ``completed`` event. """ from __future__ import annotations -from typing import AsyncIterator, Dict, List, Optional +from typing import Any, AsyncIterator, Dict, List, Optional -from vectorless.types.results import QueryResponse +from vectorless.types.results import QueryResponse, QueryResult class StreamingQueryResult: - """Async iterator for query progress events. + """Async iterator for real-time query progress events. Usage:: stream = await session.query_stream("What is the revenue?") async for event in stream: - print(event) - result = stream.result + print(event["type"], event) + result = stream.result # Available after iteration completes """ - def __init__(self, response: QueryResponse) -> None: - self._response = response + def __init__(self, raw_stream: Any) -> None: + self._stream = raw_stream # PyStreamingQuery from Rust + self._result: Optional[QueryResponse] = None self._consumed = False def __aiter__(self) -> AsyncIterator[Dict]: @@ -35,43 +35,41 @@ async def _iterate(self) -> AsyncIterator[Dict]: return self._consumed = True - # Synthetic events from the final result - yield {"type": "started", "message": "Query started"} - - for i, item in enumerate(self._response.items): - yield { - "type": "candidate_found", - "doc_id": item.doc_id, - "score": item.score, - "confidence": item.confidence, - "index": i, - } - - for j, evidence in enumerate(item.evidence): - yield { - "type": "evidence", - "doc_id": item.doc_id, - "evidence_title": evidence.title, - "evidence_path": evidence.path, - "content_length": len(evidence.content), - "index": j, - } - - if self._response.has_failures(): - for failed in self._response.failed: - yield { - "type": "error", - "source": failed.source, - "error": failed.error, - } - - yield { - "type": "completed", - "total_results": len(self._response.items), - "total_failures": len(self._response.failed), - } + completed_event: Optional[Dict] = None + + async for event in self._stream: + event_type = event.get("type", "") + + yield event + + if event_type in ("completed", "error"): + if event_type == "completed": + completed_event = event + break # Terminal events end the stream + + if completed_event is not None: + self._result = self._build_response(completed_event) + + @staticmethod + def _build_response(event: Dict) -> QueryResponse: + """Build a QueryResponse from the completed event dict.""" + items: List[QueryResult] = [] + for r in event.get("results", []): + node_id = r.get("node_id") + items.append( + QueryResult( + doc_id=node_id or "", + content=r.get("content") or "", + score=r.get("score", 0.0), + confidence=event.get("confidence", 0.0), + node_ids=[node_id] if node_id else [], + evidence=[], + metrics=None, + ) + ) + return QueryResponse(items=items, failed=[]) @property def result(self) -> Optional[QueryResponse]: """Final result, available after iteration completes.""" - return self._response if self._consumed else None + return self._result if self._consumed else None diff --git a/python/vectorless/sync_session.py b/python/vectorless/sync_session.py new file mode 100644 index 0000000..bb772d1 --- /dev/null +++ b/python/vectorless/sync_session.py @@ -0,0 +1,196 @@ +"""Synchronous Vectorless Session API. + +``SyncSession`` provides the same API as ``Session`` but with synchronous +methods — no ``async``/``await`` required. Works in scripts, Jupyter +notebooks, and any synchronous Python context. +""" + +from __future__ import annotations + +from pathlib import Path +from typing import Any, List, Optional, Union + +from vectorless._async_utils import run_async +from vectorless.config import EngineConfig, load_config_from_env, load_config_from_file +from vectorless.events import EventEmitter +from vectorless.session import Session +from vectorless.streaming import StreamingQueryResult +from vectorless.types.graph import DocumentGraphWrapper +from vectorless.types.results import IndexResultWrapper, QueryResponse + + +class SyncSession: + """Synchronous Vectorless session. + + Same API as ``Session`` but all methods are blocking (no async/await). + Works in Jupyter notebooks, scripts, and synchronous contexts. + + Usage:: + + from vectorless import SyncSession + + session = SyncSession(api_key="sk-...", model="gpt-4o") + result = session.index(path="./report.pdf") + answer = session.ask("What is the Q4 revenue?", doc_ids=[result.doc_id]) + print(answer.single().content) + """ + + def __init__( + self, + api_key: Optional[str] = None, + model: Optional[str] = None, + endpoint: Optional[str] = None, + config: Optional[EngineConfig] = None, + config_file: Optional[Union[str, Path]] = None, + events: Optional[EventEmitter] = None, + ) -> None: + self._session = Session( + api_key=api_key, + model=model, + endpoint=endpoint, + config=config, + config_file=config_file, + events=events, + ) + + @classmethod + def from_env(cls, events: Optional[EventEmitter] = None) -> "SyncSession": + """Create a SyncSession from environment variables.""" + config = load_config_from_env() + return cls(config=config, events=events) + + @classmethod + def from_config_file( + cls, + path: Union[str, Path], + events: Optional[EventEmitter] = None, + ) -> "SyncSession": + """Create a SyncSession from a TOML config file.""" + config = load_config_from_file(Path(path)) + return cls(config=config, events=events) + + # ── Indexing ────────────────────────────────────────────── + + def index( + self, + path: Optional[Union[str, Path]] = None, + paths: Optional[List[Union[str, Path]]] = None, + directory: Optional[Union[str, Path]] = None, + content: Optional[str] = None, + bytes_data: Optional[bytes] = None, + format: str = "markdown", + name: Optional[str] = None, + mode: str = "default", + force: bool = False, + ) -> IndexResultWrapper: + """Index a document (synchronous). + + Exactly one source must be provided: path, paths, directory, + content, or bytes_data. + """ + return run_async( + self._session.index( + path=path, + paths=paths, + directory=directory, + content=content, + bytes_data=bytes_data, + format=format, + name=name, + mode=mode, + force=force, + ) + ) + + def index_batch( + self, + paths: List[Union[str, Path]], + *, + mode: str = "default", + jobs: int = 1, + force: bool = False, + progress: bool = True, + ) -> List[IndexResultWrapper]: + """Index multiple files with optional concurrency (synchronous).""" + return run_async( + self._session.index_batch( + paths, + mode=mode, + jobs=jobs, + force=force, + progress=progress, + ) + ) + + # ── Querying ────────────────────────────────────────────── + + def ask( + self, + question: str, + *, + doc_ids: Optional[List[str]] = None, + workspace_scope: bool = False, + timeout_secs: Optional[int] = None, + ) -> QueryResponse: + """Ask a question and get results with source attribution (synchronous).""" + return run_async( + self._session.ask( + question, + doc_ids=doc_ids, + workspace_scope=workspace_scope, + timeout_secs=timeout_secs, + ) + ) + + def query_stream( + self, + question: str, + **kwargs: Any, + ) -> StreamingQueryResult: + """Start a streaming query (synchronous). + + Returns a ``StreamingQueryResult`` that is consumed as an async + iterator. For fully synchronous queries, use ``ask()`` instead. + """ + return run_async(self._session.query_stream(question, **kwargs)) + + # ── Document Management ─────────────────────────────────── + + def list_documents(self) -> list: + """List all indexed documents.""" + return run_async(self._session.list_documents()) + + def remove_document(self, doc_id: str) -> bool: + """Remove a document by ID.""" + return run_async(self._session.remove_document(doc_id)) + + def document_exists(self, doc_id: str) -> bool: + """Check if a document exists.""" + return run_async(self._session.document_exists(doc_id)) + + def clear_all(self) -> int: + """Remove all indexed documents. Returns count removed.""" + return run_async(self._session.clear_all()) + + # ── Graph ───────────────────────────────────────────────── + + def get_graph(self) -> Optional[DocumentGraphWrapper]: + """Get the cross-document relationship graph.""" + return run_async(self._session.get_graph()) + + # ── Metrics ─────────────────────────────────────────────── + + def metrics_report(self) -> Any: + """Get a comprehensive metrics report.""" + return self._session.metrics_report() + + # ── Context Manager ─────────────────────────────────────── + + def __enter__(self) -> "SyncSession": + return self + + def __exit__(self, *args: Any) -> None: + pass + + def __repr__(self) -> str: + return f"SyncSession({self._session!r})" diff --git a/rust/src/lib.rs b/rust/src/lib.rs index e9aa745..853e34e 100644 --- a/rust/src/lib.rs +++ b/rust/src/lib.rs @@ -85,6 +85,9 @@ pub use events::{EventEmitter, IndexEvent, QueryEvent, WorkspaceEvent}; // Metrics pub use metrics::{IndexMetrics, LlmMetricsReport, MetricsReport, RetrievalMetricsReport}; +// Retrieval (streaming) +pub use retrieval::RetrieveEvent; + // Errors pub use error::{Error, Result}; diff --git a/rust/src/retrieval/mod.rs b/rust/src/retrieval/mod.rs index e2454d4..e35b519 100644 --- a/rust/src/retrieval/mod.rs +++ b/rust/src/retrieval/mod.rs @@ -24,5 +24,5 @@ pub mod postprocessor; pub mod stream; mod types; -pub use stream::RetrieveEventReceiver; +pub use stream::{RetrieveEvent, RetrieveEventReceiver}; pub use types::*;