- Init git repo –
git init --initial-branch=main && gh repo create. -
pyproject.toml– minimal project metadata,uvbackend, Python ≥ 3.14. -
uv pip install --groups dev– add dev deps:ruff,pytest,pytest-asyncio,mypy,pre-commit. - Pre-commit hooks – formatters & linter.
- CI – GitHub Actions workflow running
pyteston every PR.
- Pydantic V2 models reflecting YAML keys.
-
load_config(path) -> dict[str, FeedConfig](implemented via Pydantic Settings). - Unit tests using fixture YAML.
- CRUD helpers:
-
add_download -
update_status -
next_queued_downloads -
get_download_by_id -
get_errors -
get_downloads_to_prune_by_keep_last -
get_downloads_to_prune_by_since -
delete_downloads
-
- Tests with tmp in-memory DB.
- Tests to make sure db access is optimized (e.g. uses indexes)
- Abstraction seam: encapsulate base directory so future S3/GCS back‑ends can subclass
- Implement
save_download_file(feed, file_name, data_stream) -> Path(atomic write) - Implement
delete_download_file(feed, file_name) -> bool - Implement
download_exists(feed, file_name) -> bool - Implement
get_download_stream(feed, file_name) -> IO[bytes] - Ensure directory hygiene: base download and feed directories exist (covered by save_download_file)
- Write unit tests (tmp dir fixtures)
This section details the components that manage the lifecycle of downloads, from discovery to storage and pruning. They are organized into a data_coordinator module and related services/wrappers.
-
from_row(cls, db_row: sqlite3.Row) -> Downloadclass method for mapping.
- Create
ytdlp_wrapper.pyand classYtdlpWrapper. -
YtdlpWrapper.fetch_metadata(feed_id: str, url: str, yt_cli_args: list[str]) -> list[Download]: Fetches metadata for all downloads at the given URL using yt-dlp's metadata extraction capabilities. -
YtdlpWrapper.download_media_to_file(download: Download, yt_cli_args: list[str], download_target_dir: Path) -> Path:- Purpose: Downloads media (video or audio) for the given entry to a specified directory, handling potential merges (e.g., video + audio) via
yt-dlpand FFmpeg. - Arguments:
download: Metadata of the entry to download, used for naming and context.yt_cli_args: List of command-line arguments foryt-dlp(e.g., format selection from feed config).download_target_dir: The base directory where the feed-specific subfolder and media file will be created.
- Returns:
Pathto the successfully downloaded media file.
- Purpose: Downloads media (video or audio) for the given entry to a specified directory, handling potential merges (e.g., video + audio) via
- Sandbox test using a Creative-Commons short video, covered by integration tests using real URLs.
- Unit tests for
YtdlpWrapper.
- implement a global logging framework
- Constructor accepts
DatabaseManager,YtdlpWrapper. -
enqueue_new_downloads(feed_config: FeedConfig) -> int:- Phase 1: Re-fetch metadata for existing DB entries with status 'upcoming'; update those now VOD to 'queued'.
- Phase 2: Fetch metadata for latest N videos via
YtdlpWrapper.fetch_metadata().- For each download not in DB:
- If VOD (
live_status=='not_live'andis_live==False), insert with status 'queued'. - If live or scheduled (
live_status=='upcoming'oris_live==True), insert with status 'upcoming'.
- If VOD (
- For each existing 'upcoming' entry now VOD, update status to 'queued'.
- For each download not in DB:
- Returns count of newly enqueued or transitioned-to-queued downloads.
- preprocess cli args so you dont do it every time
- handle cookies differently? they need to be included even at discovery stage
- just include all args for all stages, including discovery; user will have limitations that need to be outlined in docs
- db should not leak any details about sqlite -- should abstract all that away
- for example, remove all references to sqlite.Row, sqlite.Error
- retries should apply more widely, and with enough failures, should transition to error state
- maybe db.py needs a
bump_error_countfn that handles this - bumps it until it becomes too high, then marks as error
- maybe db.py needs a
- Unit tests for
Enqueuerwith mocked dependencies. - a couple integration tests
- Refactor Download Status Management (State Machine Implementation)
- Phase 1: Database Layer (
db.py)- Remove the existing
DatabaseManager.update_statusmethod. - Implement
DatabaseManager.mark_as_queued_from_upcoming(feed: str, id: str) -> None:- Checks that the current status is in fact
UPCOMING - Sets
status = QUEUED. - Preserves
retriesandlast_error.
- Checks that the current status is in fact
- Implement
DatabaseManager.requeue_download(feed: str, id: str) -> None:- Add a note that this will happen due to:
- manually requeueing an ERROR'd download,
- manually requeueing in order to get the latest version of a download (i.e. it was previously DOWNLOAD)
- un-SKIPping a video
- don't implement this as logic, but just as a note on the docstring
- Sets
status = QUEUED. - Sets
retries = 0,last_error = NULL.
- Add a note that this will happen due to:
- Implement
DatabaseManager.mark_as_downloaded(feed: str, id: str) -> None:- Sets
status = DOWNLOADED. - Sets
retries = 0,last_error = NULL.
- Sets
- Implement
DatabaseManager.skip_download(feed: str, id: str) -> None:- Sets
status = SKIPPED. - Preserves
retriesandlast_error. - Raises
DownloadNotFoundErrororDatabaseOperationErroron failure.
- Sets
- Implement
DatabaseManager.unskip_download(feed_id: str, download_id: str) -> DownloadStatus:- Checks that the download is currently
SKIPPED. - Calls
requeue_download(feed_id, download_id). - Returns
DownloadStatus.QUEUED. - Raises
DownloadNotFoundErrororDatabaseOperationErroron failure.
- Checks that the download is currently
- Implement
DatabaseManager.archive_download(feed: str, id: str) -> None:- Sets
status = ARCHIVED. - Preserves
retriesandlast_error. - Raises
DownloadNotFoundErrororDatabaseOperationErroron failure.
- Sets
- Modify
DatabaseManager.get_download_by_id(feed: str, id: str) -> Download:- Change return type from
Download | NonetoDownload. - Raises
DownloadNotFoundErrorif not found. - Raises
DatabaseOperationErrorfor other DB issues. - Raises
ValueErrorif row parsing fails.
- Change return type from
- Verify
DatabaseManager.upsert_downloadcorrectly handles initial setting ofUPCOMINGorQUEUEDstatus based on the inputDownloadobject, ensuringretriesandlast_errorare appropriate for new items. - Verify
DatabaseManager.bump_retriesremains the sole mechanism for incrementing retries and transitioning toERRORstatus (and handlesDownloadNotFoundErrorfromget_download_by_id).
- Remove the existing
- Phase 2: Service Layer Updates
-
Enqueuer(data_coordinator/enqueuer.py):- Update
_process_single_downloadto correctly handleDownloadNotFoundErrorfromget_download_by_id. - Refactor
_update_download_status_in_db(or remove it) and its call sites (_update_status_to_queued_if_vod,_handle_existing_fetched_download):- When an
UPCOMINGdownload becomes a VOD, calldownload_db.mark_as_queued_from_upcoming. Adapt to its new signature (returnsNone, raises exceptions). - For other status changes previously handled by
_update_download_status_in_db(e.g., an existingERRORrecord being re-processed from feed and needing to beQUEUED), evaluate ifdownload_db.requeue_downloadshould be used or if currentupsert_downloadlogic in_handle_existing_fetched_downloadis sufficient.
- When an
- Ensure
bump_retriescalls remain correct for metadata fetch failures.
- Update
-
Pruner(data_coordinator/pruner.py):- When pruning items, call
download_db.archive_download. Adapt to its new signature (returnsNone, raises exceptions). File deletion logic is already handled byPrunercorrectly before this step.
- When pruning items, call
-
- Phase 3: Test Updates
-
tests/anypod/db/test_db.py:- Remove tests for the old
download_db.update_status. - Add/Update comprehensive unit tests for all new/modified
download_db.mark_as_*,download_db.requeue_*,download_db.unskip_download, anddownload_db.get_download_by_idmethods, including exception checking. - Ensure tests for
upsert_downloadcover setting initialUPCOMINGandQUEUEDstates. - Ensure tests for
bump_retriesare still valid and cover its role, especiallyDownloadNotFoundErrorhandling.
- Remove tests for the old
-
tests/anypod/data_coordinator/test_enqueuer.py:- Update mocks and assertions for
download_db.get_download_by_idto reflect new exception-raising behavior. - Update mocks for status update calls to the new
download_db.mark_as_queued_from_upcomingordownload_db.requeue_downloadmethods. Verify correct arguments and exception handling. - Verify
upsert_downloadis called with correctly statusedDownloadobjects.
- Update mocks and assertions for
-
- Phase 1: Database Layer (
- address various TODOs throughout code base
- Constructor accepts
DatabaseManager,FileManager,YtdlpWrapper. -
download_queued(feed_id: str, feed_config: FeedConfig, limit: int = -1) -> tuple[int, int]: (success_count, failure_count)- Gets queued
Downloadobjects viaDatabaseManager.get_downloads_by_status. - For each
Download:- Call
YtdlpWrapper.download_media_to_file(download, yt_cli_args).- Generate final file_name (e.g., using
download.titleandupdated_metadata['ext']). - Call
FileManager.save_download_file(feed_config.name, final_file_name, source_file_path=completed_file_path).- (Note:
FileManager.save_download_filewill need to implement moving a file fromsource_file_pathto its final managed location.)
- (Note:
- Update DB: status to 'downloaded', store final path from
FileManager, updateext,filesizefromupdated_metadata.
- Generate final file_name (e.g., using
- On failure:
- Update DB: status to 'error', log error, increment retries.
- Ensure cleanup of source file regardless of success/failure of the individual download.
- Call
- Gets queued
- Unit tests for
Downloader(Service) with mocked dependencies. - Debug mode for Enqueuer
- Use old implementation for reference, but prepare for largely a full rewrite
- Constructor accepts
DatabaseManager,FileManager. -
prune_feed_downloads(feed_id: str, keep_last: int | None, prune_before_date: datetime | None) -> tuple[int, int]: (archived_count, files_deleted_count)- Uses
DatabaseManagerto get candidates - Uses
FileManager.delete_download_file()for download. - Uses
DatabaseManager.archive_download()to archive.
- Uses
- Unit tests for
Prunerwith mocked dependencies.
- Split database classes: Refactor
src/anypod/db/db.pyinto separate modules:-
DownloadDatabaseclass for download-level operations (keep existing methods) -
FeedDatabaseclass for feed-level operations (new functionality)
-
- Feed table schema & operations:
- Create
feedstable with schema:last_sync_attempt,last_successful_sync,consecutive_failures,last_error,is_enabled,title,subtitle,description,language,author,image_url,source_type,total_downloads,downloads_since_last_rss,last_rss_generation - add
created_atandupdated_atwith defaults - Implement feed CRUD operations in
FeedDatabase:- Add
FeedNotFoundErrorexception (similar toDownloadNotFoundError) -
upsert_feed(feed: Feed) -> None- Insert or update a feed record, handling None timestamps to allow database defaults -
get_feed_by_id(feed_id: str) -> Feed- Retrieve a specific feed by ID, raiseFeedNotFoundErrorif not found -
get_feeds(enabled: bool | None = None) -> list[Feed]- Get all feeds, or filter by enabled status if provided -
mark_sync_success(feed_id: str) -> None- Setlast_successful_syncto current timestamp, resetconsecutive_failuresto 0, clearlast_error -
mark_sync_failure(feed_id: str, error_message: str) -> None- Setlast_failed_syncto current timestamp, incrementconsecutive_failures, setlast_error -
mark_rss_generated(feed_id: str, new_downloads_count: int) -> None- Setlast_rss_generationto current timestamp, incrementtotal_downloadsbynew_downloads_count, setdownloads_since_last_rsstonew_downloads_count -
set_feed_enabled(feed_id: str, enabled: bool) -> None- Setis_enabledto the provided value -
update_feed_metadata(feed_id: str, *, title: str | None = None, subtitle: str | None = None, description: str | None = None, language: str | None = None, author: str | None = None, image_url: str | None = None) -> None- Update feed metadata fields; only updates provided (non-None) fields; no-op if all None
- Add
- Create
- Download table enhancements:
- Add fields:
quality_info - add fields:
discovered_atandupdated_at, potentiallydownloaded_atwith sqlite triggers - Update
DownloadDatabasemethods to handle new fields - Update all places that create/modify downloads to populate new fields (
Enqueuer,Downloader, etc.)
- Add fields:
- Config and model updates:
- Rename
FeedMetadatatoFeedMetadataOverridesinfeed_config.py - Add
enabledfield toFeedConfig
- Rename
- Feed metadata synchronization:
- Compare
FeedMetadataOverridesfrom config with stored feed metadata in DB - Update DB when config overrides change
- Modify
YtdlpWrapperto make best-effort extraction of non-overriddenFeedMetadataOverridesfields - Mark fields for best-effort extraction when overrides are removed
- Compare
- Unit tests for both
DownloadDatabaseandFeedDatabase - on pruning, also update
total_downloadsvalue - ensure there aren't any read/modify/write loops that arent protected by a transaction
- Create
data_coordinator/types/folder with__init__.pyandprocessing_results.py - Create
ProcessingResultsdataclass with counts, error tracking, status, and timing - Add
archive_feed()method toPrunerclass (setsis_enabled=False) - Constructor accepts
Enqueuer,Downloader,Pruner,RSSFeedGenerator,FeedDatabase -
process_feed(feed_id: str, feed_config: FeedConfig) -> ProcessingResults:- Calculate
fetch_since_datefromfeed.last_successful_sync(NOT feed_config.since) - Execute phases in sequence: enqueue → download → prune → RSS generation
- Inline error handling with graceful degradation between phases
- Update
last_successful_syncorlast_failed_syncbased on outcome - Return comprehensive
ProcessingResultswith all counts and errors
- Calculate
- Update
data_coordinator/__init__.pyto exportDataCoordinator - Integration tests for
DataCoordinatorfocusing on full process_feed flow
- Implement discrepancy detection logic:
- Find DB entries with
DOWNLOADEDstatus but no corresponding download file. - Find download files on disk with no corresponding
DOWNLOADEDDB entry. - (Optional) Automated resolution strategies or reporting for discrepancies.
- Find DB entries with
- Unit tests for discrepancy detection logic.
- Determine if a read/write lock for in-memory feed XML cache is needed for concurrency
- add new fields to Download
- this will also involve potentially changing how i update values, since some (like title) might get changed down the line. so we should try to store the most recent value
- Implement
generate_feed_xml(feed_id)to write to in-memory XML after acquiring write lock - Implement
get_feed_xml(feed_id)for HTTP handlers to read from in-memory XML after acquiring read lock - Write unit tests to verify enclosure URLs and MIME types in generated feeds
- Figure out how to bring in host url.
- duration should be an int
- PathManager Implementation – Create centralized path/URL coordination class:
- Single source of truth for file system paths and URLs based on feed_id + download_id
- Consistent 1:1 mapping between network paths and file paths
- Methods for feed directories, RSS URLs, and media file paths/URLs
- Google-style docstrings with proper Args/Returns/Raises sections
-
FileManagerrefactor -
Prunerrefactor -
RSSFeedGeneratorrefactor - tests refactor
- Core scheduler implementation:
- Add
apschedulerto dependencies in pyproject.toml - Create type-safe APScheduler wrapper (
apscheduler_core.py) - Use APScheduler with
AsyncIOSchedulerfor async support - Schedule jobs based on feed cron expressions from config
- Manage job lifecycle (add/remove/pause/resume)
- Handle graceful shutdown with proper job cleanup
- Each feed gets its own job with unique ID (the feed ID)
- Job-level error handling with proper exception chaining
- Direct DataCoordinator integration (no separate worker)
- Context ID injection for log correlation
- Invalid cron expression validation with SchedulerError
- remove explicit references to monkeypatch
- Add
- Date Window Calculation Logic (
DataCoordinator):- Replace
_calculate_fetch_until_datewith day-aligned logic -
fetch_since_dateshould still belast_successful_sync -
fetch_until_dateshould just be now(); let's simplify this logic, no 2 * cron tick or anything. we can remove that from coordinator.py and debug_enqueuer.py - that may mean that most of the time, these values will fall on the same day. that's fine, and we will dedup results later
- Update
last_successful_synctofetch_until_dateto ensure full coverage (was previouslynow()) - Enhanced logging: log both high-resolution calculated window and day-aligned yt-dlp window while in the context of ytdlp_wrapper
- Replace
- Deduplication Enforcement:
- Verify
Enqueuerproperly handles duplicate video IDs across multiple day fetches - if whatever is in the db is identical to what we retrieved, don't update (which will trigger
updated_at)- it is possible that some metadata might have updated (e.g. uploader might have changed description); so check for that and update if needed
- Add deduplication tests: same video appearing in multiple day windows
- Verify no updates occurred (
updated_atis unchanged) - Verify deduplication works when same video appears in multiple day windows
- Verify
- Documentation Updates:
- Update method docstrings: document day-aligned window strategy clearly
- Update DESIGN_DOC.md: add section explaining yt-dlp day-level precision limitation
- State Reconciler Alignment:
- Update
sinceparameter handling: should only be adate, not adatetime - When
sincechanges, use day-aligned logic for requeuing archived downloads - Ensure consistency between enqueue windows and retention policy windows
- Update
- Tie Feed table
total_downloadsto the Download table with triggers - When downloading an individual file but it is out of range, I get an incomplete response back from yt-dlp, which causes internal errors
- Startup reconciliation implementation:
- Compare config feeds with database feeds
- Handle new feeds: insert into DB and set initial
last_successful_sync - Handle removed feeds: mark as disabled in DB (set
is_enabled=False) - Handle changed feeds: update metadata and configuration
- Ensure every active feed has valid
last_successful_syncbefore scheduling - Evaluate what would happen if it fails midway through. Would simply restarting get back to correct state?
- time box the sync time -- currently only has start time, but will also need end time
- Detect and apply changes to:
-
enabled: Update feed'sis_enabledin database, add/remove from scheduler, trigger initial sync if false->true-
last_successful_syncdoes not need to be optional as it is set proactively on new feed creation
-
-
url: Update existing feed'ssource_url, resetconsecutive_failuresto 0, clearlast_error, resetlast_successful_syncas if it were a new feed; keep download history -
sinceexpansion (earlier date): Query archived downloads withpublished>= newsince, change status from ARCHIVED to QUEUED (will redownload)- modify
get_downloads_by_statusto allow for filtering by date so we don't retrieve the entire db - also consider storing these values in the Feed db (
sinceandkeep_last) so we only query the db if there's a change - modify
requeue_download->requeue_downloadsthat can take a variadic list and batch modify - modify pydantic handling of
sinceto accept JUST a day, and then derive TZ from tiered sources:- from the
sincevalue itself, if included - from a TZ env var
- from the system clock (user would have had to override
/etc/localtime)
- from the
- modify
-
sincecontraction (later date): Mark downloads withpublished< newsincefor archival on next prune cycle -
keep_lastincrease: Query archived downloads ordered bypublishedDESC, restore up to (new_keep_last - current_count) from ARCHIVED to QUEUED (will redownload)- modify
count_downloads_by_statusto accept multiple possible statuses and return all of them
- modify
-
keep_lastdecrease: No immediate action - will apply naturally on next prune cycle -
metadatachanges: Update feed table immediately (title, subtitle, description, language, author, image_url, categories, explicit), trigger RSS regeneration
-
- Unit tests for scheduler with mocked jobs
- Unit tests for state reconciler covering:
- New feed addition
- Feed removal
- Feed configuration changes
- Metadata override changes
- Integration tests for full startup sequence
- Tests for graceful shutdown handling
- Main service orchestration:
- Initialize all components (databases, services)
- Run state reconciler on startup
- Start scheduler with reconciled feeds
- Perform initial sync for all feeds to populate RSS
- Keep service running until shutdown signal
- change path_manager to automatically assume tmp and media dirs -- should only need base dir
- also, we should divide into data dir and config files (cookies.txt and config_path), so we can separate those out to be docker-friendly
- also, it seems a little excessive to add PathManager to Enqueuer and Downloader JUST so they can retrieve the cookie
- especially because theyve duplicated logic on retrieving the cookie -- this needs to be centralized somewhere else
- ytdlp impl looks fine tho
- optimize discover/metadata/download loop to cut down on calls to yt-dlp
- it looks like we are able to retrieve full video detail when querying a playlist without
--flat-playlistoption - jury's still out on channels, but maybe?
- maybe we can pre-emptively classify these when they are added, store the type in the db, and pick the optimal way to retrieve based on that classification
- Future Optimization: Could fetch detailed metadata in one call (86 fields vs 21) but 10x slower - out of scope
- not sure we need ReferenceType anymore. SourceType might be good enough
- i think we can get rid of DISCOVERY type
- get rid of
set_source_specific_ytdlp_options
- it looks like we are able to retrieve full video detail when querying a playlist without
- Cut down on excessive logs
- use the shared conftest for more fixtures
- make the db a folder instead of a file -- it creates
.db-waltype in the same folder.
Context/Goals: Convert anypod from sync to async to enable cancellable long-running operations (especially yt-dlp calls). Currently yt-dlp operations block and can't be interrupted. The async conversion will wrap yt-dlp in subprocess calls that can be properly cancelled, and ripple async throughout the codebase. Key insight: keep CLI args as list[str] instead of converting to dict, eliminating complex dict→CLI conversion.
Implementation Tasks:
- CLI Args Strategy: Remove dict conversion in
feed_config.py- keepyt_argsaslist[str]throughout pipeline - YtdlpCore Async: Implement subprocess calls with
--dump-single-json --flat-playlistfor metadata, parse JSON toYtdlpInfo - Cancellation: Proper subprocess cleanup (
proc.kill()+await proc.wait()onCancelledError) - Isolate yt-dlp cli args into YtdlpCore
- Consistent naming on the ydl vs ytdlp fns
- also separate out the classes into different files
- Remove unused YtdlpCore methods: parse_options(), set_date_range(), set_playlist_limit(), set_cookies()
- Conversion Order: YtdlpCore → YtdlpWrapper → Enqueuer/Downloader/Pruner → DataCoordinator → StateReconciler
- Consider if RSSFeedGenerator needs async updates (probably minimal since it's mostly CPU-bound)
- the answer is no, for now at least
- Implement graceful shutdown handling - it hard crashes on ctrl-c right now
- this includes during init when we're not in APScheduler yet (maybe we should be?)
- Phase 1: Environment Setup & Dependencies
- Add
sqlalchemy[asyncio],sqlmodel,aiosqlite, andalembictopyproject.tomlusinguv add.
- Add
- Phase 2: Refactor Models to SQLModel
- Convert data models in
src/anypod/db/types/(download.py,feed.py) to inherit fromSQLModel, marking them withtable=True. - Use
sqlmodel.Fieldto define primary keys, indexes, and other constraints. - Define the one-to-many relationship between
FeedandDownloadusingsqlmodel.Relationship. - Integrate enum types into SQLModels:
- For
Feed.source_type, declaresa_column=Column(Enum(SourceType)). - For
Download.status, declaresa_column=Column(Enum(DownloadStatus)). - Remove legacy
register_adaptercalls.
- For
- Convert data models in
- Phase 3: Implement the Asynchronous Core
- Create
src/anypod/db/sqlalchemy_core.pyto centralize database connectivity. - Implement
create_async_engineusing thesqlite+aiosqlitedialect andQueuePool(default). - Create an
async_session_makerfor producingAsyncSessioninstances. - Implement a
session()async generator for dependency injection.
- Create
- Phase 4: Refactor Data Access Logic
- Convert all methods in
src/anypod/db/feed_db.pyandsrc/anypod/db/download_db.pytoasync def. - Refactor methods to accept an
AsyncSessionparameter instead of using a shared instance. - Replace
sqlite-utilscalls (upsert,rows_where,get) withSQLAlchemyORM operations (session.add,session.execute(select(...))). - Propagate
asynckeyword up the call stack through thedata_coordinatorandschedulemodules.
- Convert all methods in
- Phase 5: Database Migrations with Alembic
- Initialize Alembic with
alembic init -t async migrations. - Configure
alembic.iniwith thesqlalchemy.urlfor the async driver. - Configure
migrations/env.pyto useSQLModel.metadataas thetarget_metadata. - Generate an initial migration script:
alembic revision --autogenerate -m "Initial schema from SQLModels". - Replace database triggers (
create_trigger) with Alembic-managed versions. - Review and apply the initial migration:
alembic upgrade head.
- Initialize Alembic with
- Add aiofiles dependency and convert FileManager to use async file operations
- Dependencies: Add
aiofilesfor async file operations - File Operations: Use
aiofiles.osto replace path operations
- Dependencies: Add
- After reconciliation, trigger immediate sync:
- Process all enabled feeds to populate RSS
- Ensure RSS feeds available before HTTP server starts
- Handle failures gracefully without blocking startup, unless config is wrong -- that should cause failure until fixed
- how do i break out the api and static serving? different ports? for security reasons, we need to expose static but not apis
- Went with different ports
- Create new HTTP server module at
src/anypod/server/-
__init__.py- Server module exports -
app.py- FastAPI application factory -
dependencies.py- Dependency injection setup -
models/- Pydantic request/response models -
routers/- API route handlers organized by domain
-
- first check on getting the full filepath in there
https://i.ytimg.com/pl_c/PL8mG-RkN2uTw7PhlnAr4pZZz2QubIbujH/studio_square_thumbnail.jpgisn't enough, you need the extra?sqp=CNnJ9cQG-oaymwEICOADEOADSFqi85f_AwYIwe77sQY%3D&rs=AOn4CLB5y7iZmQcD8vHcdJ4WtzLCK_wOuQon the end
- download and host images locally
- Add
fastapi,uvicornto dependencies in pyproject.toml - Create FastAPI app factory with proper dependency injection
- Set up CORS, logging, and error handling middleware
- Ensure logging also includes contextvar
- Configure OpenAPI documentation with proper metadata
-
FeedResponse- Feed data for API responses -
FeedCreateRequest/FeedUpdateRequest- Feed modification requests -
DownloadResponse- Download data for API responses -
PaginatedResponse[T]- Generic paginated response wrapper -
StatsResponse- System and feed statistics -
ErrorResponse- Standardized error responses
-
feeds.py- All feed management endpoints-
GET /api/feeds- List all feeds with pagination, filtering, and sorting -
POST /api/feeds- Create new feed, will write to config file -
GET /api/feeds/{feed_id}- Get detailed feed information -
PUT /api/feeds/{feed_id}- Update feed configuration by modifying config file -
DELETE /api/feeds/{feed_id}- Disables feed and archives all downloads -
POST /api/feeds/{feed_id}/enable- Enable feed processing -
POST /api/feeds/{feed_id}/disable- Disable feed processing -
POST /api/feeds/{feed_id}/sync- Trigger manual sync/processing -
GET /api/feeds/valid- Validate feed config before writing to config file
-
-
downloads.py- Download management endpoints-
GET /api/feeds/{feed_id}/downloads- List downloads for feed (paginated, filtered) -
GET /api/feeds/{feed_id}/downloads/{download_id}- Get specific download details -
POST /api/feeds/{feed_id}/downloads/{download_id}/retry- Retry failed download -
POST /api/feeds/{feed_id}/downloads/{download_id}/skip- Mark download as skipped -
POST /api/feeds/{feed_id}/downloads/{download_id}/unskip- Remove skip status -
DELETE /api/feeds/{feed_id}/downloads/{download_id}- Archive download and delete file
-
-
stats.py- Statistics and monitoring endpoints-
GET /api/feeds/{feed_id}/stats- Detailed feed statistics -
GET /api/stats/summary- System-wide statistics summary including storage
-
-
health.py- Health check endpoints-
GET /api/health- Application health check
-
-
static.py- Content delivery endpoints-
GET /feeds- List all rss feeds in directory -
GET /feeds/{feed_id}.xml- RSS feed XML -
GET /media- List all feeds in directory -
GET /media/{feed_id}- List all files for a feed in directory -
GET /media/{feed_id}/{filename}.{ext}- Media file download -
GET /thumbnails- List all feeds in directory -
GET /thumbnails/{feed_id}- List all thumbnails for a feed in directory -
GET /thumbnails/{feed_id}/{filename}.{ext}- Thumbnail images
-
- Unit tests with
TestClientfor all API endpoints - Integration tests with actual database operations
- maybe enforce feed id must align with same regex as is in validation.py
- Create service layer to bridge HTTP API with existing DataCoordinator
- Extend FeedDatabase/DownloadDatabase with new query methods for API needs
- Add config file read/write utilities for feed CRUD operations
- Implement proper error mapping from domain exceptions to HTTP responses
- Pagination: Implement cursor-based or offset-based pagination
- Filtering: Add query parameters for status, date ranges, search
- Sorting: Support multiple sort fields and directions
- Validation: Comprehensive request validation using Pydantic
- Error Handling: Consistent error responses with proper HTTP status codes
- Configure server host/port via environment variables
- Ensure proper graceful shutdown handling
- Entry in
default.pyto startuvicorn
- Comprehensive OpenAPI documentation
- Example requests/responses for all endpoints
-
python -m anypodparses flags:--ignore-startup-errors,--retry-failed,--log-level. - Docstrings and
argparsehelp messages. - Evaluate logged statements and make sure that only relevant things get logged
- when using
--retry-failed, should also include a date so that we disregard VERY old failures- errors will be common because live videos may be deleted and reuploaded as regular VODs
- write README
- outline limitations with using ytdlp flags -- which ones do you have to avoid using?
- look up some well established open source projects and follow their documentation style
-
Dockerfile(debian:bookworm-slim, default root, overridable UID/GID). -
.dockerignoreto exclude tests, .git, caches. - set up a dev env with containers.
- GH Action
release-yt-dlp.yaml: on yt-dlp tag → rebuild, test, draft release. - GH Action
deps-bump.yaml: weekly minor‑bump PR; require manual approval for major- done with dependabot instead
- make override enum settings caps agnostic (e.g. requires EPISODIC or SERIAL right now)
- create an http endpoint to reset error status videos
- create a top-level http endpoint to reset ERROR status videos across all feeds
- is it possible to introduce tier-based filtering for patreon posts?
- Support an "after-download metadata refresh" workflow: allow a download to be flagged for delayed re-processing so we can re-sync title/description/duration later without requeueing the media. Provide CLI/API controls plus scheduler hooks to re-pull metadata (e.g., for YouTube description edits) while keeping the original enclosure stable.
- delete a download from a feed?
- optional (additional?) yt args override in req
- endpoint for manually triggering a feed
When all boxes are checked, you'll be able to run:
docker run \
-v $(pwd)/config:/config \
-v $(pwd)/data:/data \
-p 8000:8000 \
ghcr.io/thurstonsand/anypod:dev…and subscribe to http://localhost:8000/feeds/this_american_life.xml in your podcast player.