Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ exclude = [
# All files now have type annotations! 🎉
#
# ============================================================================
# Section 2: Files that need strict typing issues fixed (131 files)
# Section 2: Files that need strict typing issues fixed (133 files)
# ============================================================================
# These files have some type hints but fail strict type checking due to
# incomplete annotations, Any usage, or other strict mode violations.
Expand Down Expand Up @@ -397,9 +397,11 @@ exclude = [
"^src/llama_stack/cli/stack/utils\\.py$",
"^src/llama_stack/cli/subcommand\\.py$",
"^src/llama_stack/cli/utils\\.py$",
# Providers - Inline (27 files)
# Providers - Inline (29 files)
"^src/llama_stack/providers/inline/batches/reference/__init__\\.py$",
"^src/llama_stack/providers/inline/batches/reference/batches\\.py$",
"^src/llama_stack/providers/inline/datasetio/localfs/__init__\\.py$",
"^src/llama_stack/providers/inline/datasetio/localfs/config\\.py$",
"^src/llama_stack/providers/inline/eval/builtin/__init__\\.py$",
"^src/llama_stack/providers/inline/eval/builtin/eval\\.py$",
"^src/llama_stack/providers/inline/file_processor/pypdf/__init__\\.py$",
Expand Down Expand Up @@ -514,7 +516,6 @@ exclude = [
"^src/llama_stack/core/routers/",
"^src/llama_stack/core/routing_tables/",
# Provider directories - Inline
"^src/llama_stack/providers/inline/datasetio/localfs/",
"^src/llama_stack/providers/inline/safety/code_scanner/",
"^src/llama_stack/providers/inline/safety/llama_guard/",
"^src/llama_stack/providers/inline/scoring/basic/",
Expand Down
24 changes: 12 additions & 12 deletions src/llama_stack/distributions/template.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def filter_empty_values(obj: Any) -> Any:
return None

if isinstance(obj, dict):
filtered = {}
filtered: dict[Any, Any] = {}
for key, value in obj.items():
# Special handling for specific fields
if key == "module" and isinstance(value, str) and value == "":
Expand All @@ -70,12 +70,12 @@ def filter_empty_values(obj: Any) -> Any:
return filtered

elif isinstance(obj, list):
filtered = []
filtered_list: list[Any] = []
for item in obj:
filtered_item = filter_empty_values(item)
if filtered_item is not None:
filtered.append(filtered_item)
return filtered
filtered_list.append(filtered_item)
return filtered_list

else:
# For all other types (including empty strings and dicts that aren't module/config),
Expand Down Expand Up @@ -227,12 +227,12 @@ def run_config(
if provider.provider_type not in provider_registry[api]:
raise ValueError(f"Unknown provider type: {provider.provider_type} for API: {api_str}")
provider_id = provider.provider_type.split("::")[-1]
config_class = provider_registry[api][provider.provider_type].config_class
assert config_class is not None, (
config_class_str = provider_registry[api][provider.provider_type].config_class
assert config_class_str is not None, (
f"No config class for provider type: {provider.provider_type} for API: {api_str}"
)

config_class = instantiate_class_type(config_class)
config_class = instantiate_class_type(config_class_str)
if hasattr(config_class, "sample_run_config"):
config = config_class.sample_run_config(__distro_dir__=f"~/.llama/distributions/{name}")
else:
Expand Down Expand Up @@ -353,14 +353,14 @@ def generate_markdown_docs(self) -> str:
providers_table += f"| {api} | {providers_str} |\n"

if self.template_path is not None:
template = self.template_path.read_text()
template_str = self.template_path.read_text()
comment = "<!-- This file was auto-generated by distro_codegen.py, please edit source -->\n"
orphantext = "---\norphan: true\n---\n"

if template.startswith(orphantext):
template = template.replace(orphantext, orphantext + comment)
if template_str.startswith(orphantext):
template_str = template_str.replace(orphantext, orphantext + comment)
else:
template = comment + template
template_str = comment + template_str

# Render template with rich-generated table
env = jinja2.Environment(
Expand All @@ -369,7 +369,7 @@ def generate_markdown_docs(self) -> str:
# NOTE: autoescape is required to prevent XSS attacks
autoescape=True,
)
template = env.from_string(template)
template = env.from_string(template_str)

default_models = []
if self.available_models_by_provider:
Expand Down
52 changes: 35 additions & 17 deletions src/llama_stack/providers/inline/datasetio/localfs/datasetio.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,39 @@
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from typing import Any
from typing import TYPE_CHECKING, Any, cast

from llama_stack.core.storage.kvstore import kvstore_impl
from llama_stack.core.storage.kvstore import KVStore, kvstore_impl
from llama_stack.providers.utils.datasetio.url_utils import get_dataframe_from_uri
from llama_stack.providers.utils.pagination import paginate_records
from llama_stack_api import Dataset, DatasetIO, DatasetsProtocolPrivate, PaginatedResponse
from llama_stack_api.datasetio import AppendRowsParams, IterRowsRequest

if TYPE_CHECKING:
import pandas

from .config import LocalFSDatasetIOConfig

DATASETS_PREFIX = "localfs_datasets:"


class SimpleDatasetStore:
"""Simple dataset store implementation."""

def __init__(self, dataset_infos: dict[str, Dataset]) -> None:
self.dataset_infos = dataset_infos

def get_dataset(self, dataset_id: str) -> Dataset:
return self.dataset_infos[dataset_id]


class PandasDataframeDataset:
"""Wraps a dataset definition with lazy pandas DataFrame loading."""

def __init__(self, dataset_def: Dataset, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.dataset_def = dataset_def
self.df = None
self.df: pandas.DataFrame | None = None

def __len__(self) -> int:
assert self.df is not None, "Dataset not loaded. Please call .load() first"
Expand All @@ -48,7 +62,8 @@ async def load(self) -> None:
raise ValueError(f"Unsupported dataset source type: {self.dataset_def.source.type}")

if self.df is None:
raise ValueError(f"Failed to load dataset from {self.dataset_def.url}")
source_type = self.dataset_def.source.type
raise ValueError(f"Failed to load dataset from source type: {source_type}")


class LocalFSDatasetIOImpl(DatasetIO, DatasetsProtocolPrivate):
Expand All @@ -57,8 +72,9 @@ class LocalFSDatasetIOImpl(DatasetIO, DatasetsProtocolPrivate):
def __init__(self, config: LocalFSDatasetIOConfig) -> None:
self.config = config
# local registry for keeping track of datasets within the provider
self.dataset_infos = {}
self.kvstore = None
self.dataset_infos: dict[str, Dataset] = {}
self.dataset_store = SimpleDatasetStore(self.dataset_infos)
self.kvstore: KVStore | None = None

async def initialize(self) -> None:
self.kvstore = await kvstore_impl(self.config.kvstore)
Expand All @@ -67,8 +83,8 @@ async def initialize(self) -> None:
end_key = f"{DATASETS_PREFIX}\xff"
stored_datasets = await self.kvstore.values_in_range(start_key, end_key)

for dataset in stored_datasets:
dataset = Dataset.model_validate_json(dataset)
for dataset_json in stored_datasets:
dataset = Dataset.model_validate_json(dataset_json)
self.dataset_infos[dataset.identifier] = dataset

async def shutdown(self) -> None: ...
Expand All @@ -77,6 +93,7 @@ async def register_dataset(
self,
dataset_def: Dataset,
) -> None:
assert self.kvstore is not None, "KVStore must be initialized"
# Store in kvstore
key = f"{DATASETS_PREFIX}{dataset_def.identifier}"
await self.kvstore.set(
Expand All @@ -86,29 +103,30 @@ async def register_dataset(
self.dataset_infos[dataset_def.identifier] = dataset_def

async def unregister_dataset(self, dataset_id: str) -> None:
assert self.kvstore is not None, "KVStore must be initialized"
key = f"{DATASETS_PREFIX}{dataset_id}"
await self.kvstore.delete(key=key)
del self.dataset_infos[dataset_id]

async def iterrows(
self,
dataset_id: str,
start_index: int | None = None,
limit: int | None = None,
request: IterRowsRequest,
) -> PaginatedResponse:
dataset_def = self.dataset_infos[dataset_id]
dataset_def = self.dataset_infos[request.dataset_id]
dataset_impl = PandasDataframeDataset(dataset_def)
await dataset_impl.load()

records = dataset_impl.df.to_dict("records")
return paginate_records(records, start_index, limit)
assert dataset_impl.df is not None, "Dataset DataFrame should be loaded"
records = cast(list[dict[str, Any]], dataset_impl.df.to_dict("records"))
return paginate_records(records, request.start_index, request.limit)

async def append_rows(self, dataset_id: str, rows: list[dict[str, Any]]) -> None:
async def append_rows(self, params: AppendRowsParams) -> None:
import pandas

dataset_def = self.dataset_infos[dataset_id]
dataset_def = self.dataset_infos[params.dataset_id]
dataset_impl = PandasDataframeDataset(dataset_def)
await dataset_impl.load()

new_rows_df = pandas.DataFrame(rows)
new_rows_df = pandas.DataFrame(params.rows)
assert dataset_impl.df is not None, "Dataset DataFrame should be loaded"
dataset_impl.df = pandas.concat([dataset_impl.df, new_rows_df], ignore_index=True)
Original file line number Diff line number Diff line change
Expand Up @@ -950,15 +950,24 @@ async def _run_background_response_loop(
extra_body: dict | None = None,
) -> None:
"""Inner loop for background response processing, separated for timeout wrapping."""
# Check if response was cancelled before starting
# Check if response was cancelled before starting and update to in_progress atomically
existing = await self.responses_store.get_response_object(response_id)
if existing.status == "cancelled":
logger.info("Background response was cancelled before processing started", response_id=response_id)
return

# Update status to in_progress
existing.status = "in_progress"
await self.responses_store.update_response_object(existing)
# Only update to in_progress if still queued (avoid overwriting cancelled status)
if existing.status == "queued":
existing.status = "in_progress"
await self.responses_store.update_response_object(existing)
else:
# If not queued (e.g., already cancelled), exit
logger.info(
"Background response no longer in queued state, exiting",
response_id=response_id,
status=existing.status,
)
return

# Process the response using existing streaming logic
stream_gen = self._create_streaming_response(
Expand Down
Loading