Python port of the Go orgdatacore library for organizational data management.
This library provides thread-safe access to organizational data including employees, teams, organizations, pillars, and team groups.
UV is a fast Python package installer.
# Install the package
uv pip install -e .
# With GCS support (recommended for production)
uv pip install -e ".[gcs]"
# Or use uv sync for development (installs dev dependencies)
uv sync
# With GCS support
uv sync --extra gcs# Install from source
pip install -e .
# With GCS support (recommended for production)
pip install -e ".[gcs]"
# With development dependencies
pip install -e ".[dev]"from orgdatacore import Service, GCSConfig
from orgdatacore.datasources import GCSDataSourceWithSDK
from datetime import timedelta
# Configure GCS data source
config = GCSConfig(
bucket="your-bucket",
object_path="path/to/org_data.json",
project_id="your-project",
check_interval=timedelta(minutes=5),
)
source = GCSDataSourceWithSDK(config)
# Option 1: Constructor injection (recommended for simple cases)
service = Service(data_source=source)
# Option 2: Lazy loading (matches Go API, good for deferred loading)
# service = Service()
# service.load_from_data_source(source)
# Query employees
employee = service.get_employee_by_uid("jsmith")
if employee:
print(f"Found: {employee.full_name}")The library supports pluggable data sources. Implement the DataSource interface for your storage backend:
from orgdatacore import Service
from orgdatacore.interface import DataSource
from typing import BinaryIO, Callable, Optional
from io import BytesIO
class S3DataSource(DataSource):
"""Example custom DataSource for AWS S3."""
def __init__(self, bucket: str, key: str):
self.bucket = bucket
self.key = key
def load(self) -> BinaryIO:
import boto3
s3 = boto3.client('s3')
response = s3.get_object(Bucket=self.bucket, Key=self.key)
return BytesIO(response['Body'].read())
def watch(self, callback: Callable[[], Optional[Exception]]) -> Optional[Exception]:
# Implement watching logic (polling, S3 events, etc.)
return None
def __str__(self) -> str:
return f"s3://{self.bucket}/{self.key}"
# Use your custom data source
service = Service()
source = S3DataSource("my-org-bucket", "data/org_data.json")
service.load_from_data_source(source)The main class providing access to organizational data.
get_employee_by_uid(uid: str) -> Employee | Noneget_employee_by_slack_id(slack_id: str) -> Employee | Noneget_employee_by_github_id(github_id: str) -> Employee | Noneget_manager_for_employee(uid: str) -> Employee | Noneget_team_by_name(team_name: str) -> Team | Noneget_org_by_name(org_name: str) -> Org | Noneget_pillar_by_name(pillar_name: str) -> Pillar | Noneget_team_group_by_name(team_group_name: str) -> TeamGroup | None
get_teams_for_uid(uid: str) -> list[str]get_teams_for_slack_id(slack_id: str) -> list[str]get_team_members(team_name: str) -> list[Employee]is_employee_in_team(uid: str, team_name: str) -> boolis_slack_user_in_team(slack_id: str, team_name: str) -> bool
is_employee_in_org(uid: str, org_name: str) -> boolis_slack_user_in_org(slack_id: str, org_name: str) -> boolget_user_organizations(slack_user_id: str) -> list[OrgInfo]
get_version() -> DataVersionload_from_data_source(source: DataSource) -> Nonestart_data_source_watcher(source: DataSource) -> None
get_hierarchy_path(entity_name: str, entity_type: str) -> list[HierarchyPathEntry]get_descendants_tree(entity_name: str) -> HierarchyNode | None
get_jira_projects() -> list[str]get_jira_components(project: str) -> list[str]get_teams_by_jira_project(project: str) -> list[JiraOwnerInfo]get_teams_by_jira_component(project: str, component: str) -> list[JiraOwnerInfo]get_jira_ownership_for_team(team_name: str) -> list[dict]
get_component_by_name(name: str) -> Component | Noneget_all_components() -> list[Component]
get_all_employee_uids() -> list[str]get_all_team_names() -> list[str]get_all_org_names() -> list[str]get_all_pillar_names() -> list[str]get_all_team_group_names() -> list[str]
Implement this protocol for custom storage backends (no inheritance needed):
from typing import BinaryIO, Callable, Optional
class MyDataSource: # No inheritance needed!
def load(self) -> BinaryIO:
"""Return a file-like object containing JSON data."""
...
def watch(self, callback: Callable[[], Optional[Exception]]) -> Optional[Exception]:
"""Start watching for changes, call callback when data updates."""
...
def __str__(self) -> str:
"""Return a description of this data source."""
...For production use with Google Cloud Storage (requires google-cloud-storage):
from orgdatacore import GCSConfig
from orgdatacore.datasources import GCSDataSourceWithSDK
from datetime import timedelta
config = GCSConfig(
bucket="your-bucket",
object_path="path/to/data.json",
project_id="your-project",
check_interval=timedelta(minutes=5),
)
source = GCSDataSourceWithSDK(config)For asyncio-based applications (FastAPI, aiohttp, etc.), use AsyncService:
import asyncio
from orgdatacore import AsyncService, GCSConfig
from orgdatacore._async import AsyncGCSDataSource
async def main():
# Configure async GCS data source
config = GCSConfig(
bucket="your-bucket",
object_path="path/to/org_data.json",
project_id="your-project",
)
source = AsyncGCSDataSource(config)
# Create and initialize async service
service = AsyncService()
await service.load_from_data_source(source)
# Query employees (all methods are async)
employee = await service.get_employee_by_uid("jsmith")
if employee:
print(f"Found: {employee.full_name}")
# Check team membership
is_member = await service.is_employee_in_team("jsmith", "platform-team")
# Get hierarchy path
path = await service.get_hierarchy_path("platform-team", "team")
for entry in path:
print(f" {entry.type}: {entry.name}")
asyncio.run(main())import asyncio
from orgdatacore import AsyncService, GCSConfig
from orgdatacore._async import AsyncGCSDataSource
async def run_with_auto_reload():
config = GCSConfig(
bucket="your-bucket",
object_path="org_data.json",
project_id="your-project",
)
source = AsyncGCSDataSource(config)
service = AsyncService()
# Start watcher - loads data and monitors for changes
await service.start_data_source_watcher(source)
try:
# Service auto-reloads when data changes
while True:
teams = await service.get_all_team_names()
print(f"Currently tracking {len(teams)} teams")
await asyncio.sleep(60)
finally:
await service.stop_watcher()
asyncio.run(run_with_auto_reload())The AsyncService has the same methods as Service, but all are async:
await get_employee_by_uid(uid)→Employee | Noneawait get_employee_by_email(email)→Employee | Noneawait get_employee_by_slack_id(slack_id)→Employee | Noneawait get_employee_by_github_id(github_id)→Employee | Noneawait get_manager_for_employee(uid)→Employee | Noneawait get_team_by_name(name)→Team | Noneawait get_org_by_name(name)→Org | Noneawait get_pillar_by_name(name)→Pillar | Noneawait get_team_group_by_name(name)→TeamGroup | Noneawait get_component_by_name(name)→Component | None
await get_teams_for_uid(uid)→list[str]await get_teams_for_slack_id(slack_id)→list[str]await get_team_members(team_name)→tuple[Employee, ...]await get_org_members(org_name)→tuple[Employee, ...]await is_employee_in_team(uid, team_name)→boolawait is_slack_user_in_team(slack_id, team_name)→boolawait is_employee_in_org(uid, org_name)→boolawait is_slack_user_in_org(slack_id, org_name)→bool
await get_hierarchy_path(entity_name, entity_type)→list[HierarchyPathEntry]await get_descendants_tree(entity_name)→HierarchyNode | Noneawait get_user_organizations(uid)→tuple[OrgInfo, ...]
await get_jira_projects()→list[str]await get_jira_components(project)→list[str]await get_teams_by_jira_project(project)→list[JiraOwnerInfo]await get_teams_by_jira_component(project, component)→list[JiraOwnerInfo]await get_jira_ownership_for_team(team_name)→list[dict]
await get_all_employee_uids()→list[str]await get_all_team_names()→list[str]await get_all_org_names()→list[str]await get_all_pillar_names()→list[str]await get_all_team_group_names()→list[str]await get_all_employees()→tuple[Employee, ...]await get_all_teams()→tuple[Team, ...]await get_all_orgs()→tuple[Org, ...]await get_all_pillars()→tuple[Pillar, ...]await get_all_team_groups()→tuple[TeamGroup, ...]await get_all_components()→tuple[Component, ...]
await load_from_data_source(source)→Noneawait start_data_source_watcher(source)→Noneawait stop_watcher()→Noneis_healthy()→bool(sync)is_ready()→bool(sync)get_version()→DataVersion(sync)
The Service class is thread-safe. All read operations can be performed concurrently, and data reloading is atomic.
The AsyncService class is asyncio-safe, using asyncio.Lock to protect data access during concurrent async operations.
# Set up development environment
uv sync
# Run tests
uv run pytest
# Run tests with coverage
uv run pytest --cov=orgdatacore --cov-report=html
# Type checking
uv run mypy orgdatacore
# Code formatting
uv run black orgdatacore tests
uv run isort orgdatacore tests
# Linting
uv run ruff check orgdatacore tests# Install dev dependencies
pip install -e ".[dev]"
# Run tests
pytest
# Run tests with coverage
pytest --cov=orgdatacore --cov-report=html
# Type checking
mypy orgdatacore
# Code formatting
black orgdatacore tests
isort orgdatacore testsRun the GCS demo with real data:
# Make sure you're logged in
gcloud auth application-default login
# Install GCS support
pip install -e ".[gcs]"
# Run the demo
python examples/gcs_demo.pyApache-2.0