Version 1.0.0 - Python 3.11+ - BSD-3-Clause
- Introduction
- Installation
- Quickstart
- Core concepts
- CLI reference
- Caching
- Parallel execution
- Watch mode
- Remote cache
- TUI
- Configuration
- Architecture
- Recipes and patterns
- FAQ and troubleshooting
- Appendix A: public API
- Appendix B: glossary
ntask is a Python-native task runner built around four properties:
- Python-native. Tasks are plain Python functions. No DSL, no YAML, no shell-string-with-substitution. Type annotations drive CLI argument parsing automatically.
- Explicit. Every dependency is declared. Nothing runs unless the DAG asks it to.
- File-based. Configuration lives in
tasks.pyandpyproject.toml. There's no server, no daemon. - Cache-correct. Caching is content-addressed, not timestamp-based. A file checked out, restored, or rebuilt with identical content always hashes the same.
Make re-runs everything unless you carefully maintain .PHONY targets and mtime stamps. just has no dependency graph. Invoke (Fabric's task runner) works for simple tasks but has no DAG and no caching. doit has caching but uses a verbose dict-based task format. Bazel is hermetic and expensive to set up.
ntask occupies the space between those tools: smarter than Make for the cases that matter (lint, typecheck, test, build) without Bazel's setup cost.
A @cached task's key includes the resolved keys of all its upstream dependencies. If task A depends on task B, and B's inputs change, B gets a new key, which feeds into A's key. A then misses even when its own inputs are unchanged.
You get this for free. Declare your deps; propagation is automatic.
| Feature | ntask | make | just | invoke | doit | poe |
|---|---|---|---|---|---|---|
| Typed Python tasks | yes | no | no | partial | no | partial |
| Content-hash caching | yes | partial | no | no | partial | no |
| Transitive cache-key propagation | yes | no | no | no | partial | no |
| Remote cache (S3/GCS/HTTP) | yes | no | no | no | no | no |
| DAG dependency resolution | yes | yes | no | partial | yes | no |
| Type-hint to CLI args | yes | no | partial | partial | no | yes |
| Parallel DAG execution | yes | yes | no | no | yes | no |
| Live DAG TUI | yes | no | no | no | no | no |
| Zero daemon | yes | yes | yes | yes | yes | yes |
pip install ntaskInstalls the ntask console script and the ntask Python package.
Requirements: Python 3.11 or newer. Tested on 3.11, 3.12, and 3.13.
pip install ntask[s3] # boto3 for S3 remote cache
pip install ntask[gcs] # google-cloud-storage for GCS remote cache
pip install ntask[all] # bothThe local-fs and http remote backends use only the standard library and need no extra.
| Package | Minimum | Role |
|---|---|---|
rich |
13.7 | Terminal rendering |
xxhash |
3.4 | Content hashing (xxh3_128) |
pathspec |
0.12 | Gitignore-style globs |
anyio |
4.3 | Structured async concurrency |
watchfiles |
0.22 | OS-level FS events for watch mode |
textual |
0.80 | TUI framework |
ntask --version
# ntask 1.0.0tasks.py:
from ntask import task, cached, depends, shell
@task
def install():
"""Install dev dependencies."""
shell('pip install -e ".[dev]"')
@task
@cached(inputs=["src/**/*.py", "tests/**/*.py"])
def test():
"""Run the test suite."""
shell("pytest -q")
@task
@cached(inputs=["src/**/*.py"])
def lint():
"""Lint with ruff."""
shell("ruff check src/")
@task
def check():
"""All quality checks."""
depends(lint, test)ntask --list
ntask test
ntask check # lint + test
ntask check -j # all CPU cores
ntask watch test # rerun on input changeThe first run executes everything. The second ntask check finds matching cache entries for lint and test and skips both:
o lint cached (3a8f9c2d)
o test cached (4b7e1a82)
+ check (0.00s)
Edit a source file and run again - only the affected tasks rerun. Run ntask --why test to see exactly what changed.
Registers a function in the global task registry. Usable bare or as a factory:
from ntask import task
@task
def build():
...
@task(deps=[build], parallel=True)
def publish():
...| Parameter | Type | Default | Description |
|---|---|---|---|
deps |
list[Callable | str] |
[] |
Tasks (or fqn strings) that must complete first. |
parallel |
bool |
True |
If False, this task runs alone (see §7.3). |
concurrency |
int | None |
None |
Removed in 1.0. Has no effect. |
Decorated functions keep their original signature and remain callable from tests or other code.
Discovery walks upward from the current working directory looking for tasks.py or a tasks/ package (with __init__.py). The first match wins. Importing the module executes top-level decorators which populate the registry.
FQN. A bare task build has fqn "build". A method inside a @group has fqn "<group>.<method>".
Type hints become CLI args. ntask inspects the function signature and builds an argparse subparser for the task automatically:
@task
def test(pattern: str = "", verbose: bool = False):
flags = "-v" if verbose else ""
k = f"-k {pattern}" if pattern else ""
shell(f"pytest {flags} {k}")ntask test --pattern=auth --verbose| Hint | CLI |
|---|---|
s: str = "" |
--s VALUE |
n: int = 0 |
--n VALUE |
flag: bool = False |
--flag |
flag: bool = True |
--no-flag |
m: Literal["a","b"] |
choice from {a,b} |
m: SomeEnum |
choice from enum values |
p: Path |
coerced via Path.expanduser |
xs: list[str] |
--xs A B C (zero or more) |
Missing default makes the parameter positional.
Opts a task into content-hash caching. Stack with @task; either order works.
@task
@cached(inputs=["src/**/*.py"], env=["CI"], outputs=["dist/"])
def build():
shell("python -m build")| Parameter | Type | Default | Description |
|---|---|---|---|
inputs |
list[str] |
[] |
Gitignore-style globs; matched files are content-hashed. |
env |
list[str] |
[] |
Env var names included in the key. |
outputs |
list[str] |
[] |
Globs captured to the local store after success; restored on hit. |
propagate |
bool |
True |
Include upstream cache keys in this key. |
strict |
bool |
True |
Include Python version, platform tag, and body hash. |
See §6 for the full cache contract.
Class decorator that namespaces methods under a common prefix:
from ntask import task, group, shell
@group("docs")
class Docs:
@task
def build():
shell("mkdocs build")
@task
def serve():
shell("mkdocs serve")Registers docs.build and docs.serve. Methods take no self. Group names must be non-empty and must not contain .. Nesting groups isn't supported.
Cross-group dependencies work via @task(deps=[Other.method, ...]) - the class attribute resolves through the underlying __ntask_task__ marker - or via fqn strings.
Declares dependencies inside a task body.
@task
def check():
depends(lint, test)Two roles:
- Static (import time): an AST visitor reads
depends(name)calls and adds DAG edges before any task runs. Bare names that match registered fqns are picked up; dotted attributes and string fqns are not. - Runtime: when execution reaches the call,
depends()is a no-op (the static analysis already wired things up).
For cross-group references, prefer @task(deps=[Api.lint, "web.test"]) over depends().
Subprocess helper.
from ntask import shell, ShellResult
# String - runs through /bin/sh -c (POSIX) or cmd.exe /c (Windows)
shell("ruff check src/")
# List - direct exec, no shell interpolation
shell(["pytest", "-v", "-k", pattern])
# Capture
result: ShellResult = shell("git rev-parse HEAD", capture=True)
print(result.stdout.strip())
# Tolerate failure
result = shell("optional-tool", check=False)
if not result.ok:
print("skipping")
# Override cwd / env
shell("make", cwd="/path/to/sub", env={"CC": "gcc"})def shell(
cmd: str | list[str],
*,
check: bool = True,
capture: bool = False,
cwd: str | Path | None = None,
env: dict[str, str] | None = None,
) -> ShellResult | None: ...ShellResult fields: returncode: int, stdout: str, stderr: str, duration: float, plus ok: bool (property).
Execution modes chosen automatically:
- Capture (
capture=True) -subprocess.runwithcapture_output=True. No streaming, no prefix. - Log-file (TUI active) - stdout+stderr piped to
.ntask/logs/<run-id>/<fqn>.log. Nothing on screen. - Prefixed streaming (parallel, no TUI) - lines piped through threads that prefix
[fqn]before flushing. - Default streaming -
subprocess.runinheriting stdout/stderr.
check=True (default) raises ShellError on non-zero exit.
NtaskError
+- DiscoveryError tasks.py / tasks/ not found or import failed
+- CycleError circular dependency in the DAG
+- ShellError shell() exited non-zero
from ntask import NtaskError, CycleError, DiscoveryError, ShellErrorCycleError has .cycle: list[str] and renders as cycle in task graph: a -> b -> a.
ShellError has .returncode: int and .cmd: str | list[str].
All interactions go through the ntask console script.
| Flag | Description |
|---|---|
-l, --list |
List all registered tasks with docstring summaries. |
--graph [TASK] |
Render the DAG. With a task, only the reachable subgraph. |
--graph-format {ascii,mermaid,dot} |
Output format for --graph. |
--dry-run |
Print the task plan without running. |
--no-cache |
Don't read or write the cache. |
--offline |
Skip the remote cache; use local only. |
--force TASK |
Force-run TASK (bypass its cache). Repeatable. |
--why TASK |
Explain the last cache decision for TASK. |
-j [N], --jobs [N] |
Concurrency cap. Bare -j uses os.cpu_count(). |
--keep-going |
Continue independent tasks after a failure. |
-v, --verbose |
Stream all output. |
-q, --quiet |
Suppress cache-hit lines. |
--no-color |
Disable ANSI colour. |
--no-tui |
Use the line-based renderer. |
--version |
Print version and exit. |
-h, --help |
Print help. |
Run a task and its dependencies. Args are parsed from the function signature.
ntask test --pattern=auth --verbose
ntask release --version=1.2.0Lists all tasks with their docstring summaries and a * mark on cached tasks. Tasks inside a @group appear under a sub-table.
Tasks
+---------+---+----------------------------------------+
| build | * | Build the wheel. |
| check | | All quality checks. |
| install | | Install dev dependencies. |
| lint | * | Lint with ruff. |
| test | * | Run the test suite. |
+---------+---+----------------------------------------+
Use --graph for dependency arrows.
Renders the dependency tree.
ntask --graph check
ntask --graph --graph-format mermaid > docs/dag.md
ntask --graph --graph-format dot | dot -Tsvg > dag.svg$ ntask --dry-run check
would run:
lint
test
check
Computes the current cache key, loads the prior entry, and diffs them.
$ ntask --why test
Task: test
Last cached: 2026-04-22 10:43:12 (2h 15m ago)
Cache key: 3a8f9c2d...
Duration: 4.21s
If you ran `test` now:
x MISS - 2 changes since last cache
Changes:
- src/auth.py modified
- BUILD_ENV changed
Inputs:
Declared globs: src/**/*.py, tests/**/*.py
Files matched: 87 (1 changed)
Environment:
BUILD_ENV = 'prod' (changed)
--why reports local workspace deltas against the named task's prior breakdown. It doesn't simulate the full transitive graph.
Bypass the cache for a specific task. The task still runs and writes a new cache entry.
ntask --force test check # force test; lint cached as usualntask clean # remove .ntask/cache/ (output blobs survive)
ntask clean --all # remove the entire .ntask/ directorySee §8.
| Code | Meaning |
|---|---|
| 0 | All tasks succeeded. |
| 1 | At least one task failed. |
| 2 | Usage / discovery error. |
| 130 | Interrupted (Ctrl-C). |
The full contract is in caching.md. This section gives the working summary.
xxh3_128 is a non-cryptographic 128-bit hash. About 10x faster than SHA-256 on modern hardware, with collision probability negligible at the file counts a build pipeline produces. ntask's threat model is accidental collision, not adversarial.
Every @cached task's key is xxh3_128 over a length-prefixed concatenation:
- Format version (
ntask/v1) - Task fqn
- Body hash (if
strict=True) - Python version (if
strict=True) - Platform tag (if
strict=True) - Env values -
NAME=valueper declared env name, sorted; unset is<unset> - Input patterns - sorted globs
- Input manifest digest - hash of
(path, content-digest, mode)triples - Upstream keys - resolved keys of direct deps in declaration order (skipped if
propagate=False)
inspect.getsource -> ast.parse(dedent) -> strip docstrings ->
ast.dump(annotate_fields=False, include_attributes=False) -> xxh3_128
Consequences:
- Reformatting (ruff format / black) doesn't invalidate.
- Adding or editing docstrings doesn't invalidate.
- Comment changes don't invalidate.
- Real logic changes do invalidate.
If B depends on A, A's resolved key is part of B's key. A's miss propagates to B. This composes through the entire DAG.
@cached(propagate=False) opts out for tasks whose output is genuinely independent of upstream output.
strict=True (default) ties the key to body, Python version, and platform tag.
strict=False replaces those three with placeholders so cache entries persist across refactors, Python upgrades, and OS/arch changes. Use only for pure data transformations where you actively want cross-platform sharing.
pathspec gitwildmatch mode:
src/**/*.py- all.pyrecursively undersrc/*.toml- any.tomlat the root!src/generated/**/*.py- re-modify a previously-included match (same as.gitignore). Order matters: include first, then negate.
.gitignore is loaded from the project root and applied automatically. Files are sorted before hashing - filesystem ordering is irrelevant.
After success, declared outputs are captured into .ntask/outputs/<outputs-hash>/. The outputs-hash is itself a hash over (path, content-digest) pairs.
On a hit, files are hard-linked (POSIX) or copied (Windows / cross-fs) back to the workspace.
Two implications:
- Hard-linked files share inode and mtime with the captured copy. Tools that compare mtimes may behave differently than after a fresh run.
- Output tarballs for remote upload are constructed in memory. Outputs in the gigabyte+ range may pressure RAM during push.
.ntask/
+-- cache/
| +-- <fqn>/
| +-- <key>.json CacheEntry with breakdown
+-- outputs/
| +-- <outputs-hash>/ content-addressed output tree
+-- logs/
+-- <run-id>/ per-run TUI log dir
+-- lint.log
+-- test.log
ntask clean removes cache/ only. ntask clean --all removes the whole .ntask/. Log directories accumulate; clean manually with rm -rf .ntask/logs/.
Every miss produces a MissReport of MissItem(kind, detail) entries.
kind |
Meaning |
|---|---|
first-run |
No prior entry. |
input-modified |
A matched file's content or mode changed. |
input-added |
A new file appeared in the matched set. |
input-removed |
A previously-matched file was deleted. |
env-changed |
An env var's value changed. |
env-added |
An env var appeared. |
env-removed |
An env var disappeared. |
body-changed |
The structural AST changed. |
upstream-invalidated |
An upstream dep's key changed. |
python-changed |
Python version differs. |
platform-changed |
Platform tag differs. |
Order: input-* first (alphabetical by path, kinds interleaved), then env-* (alphabetical), then body-changed, then upstream-invalidated, then python-changed/platform-changed. first-run is always alone.
Inline rendering shows the first item:
x test: cache miss (input-modified: src/auth.py (+2 more))
--why shows the full report plus the prior breakdown.
| Command | Effect |
|---|---|
ntask --no-cache |
Disable all cache reads/writes for this invocation. |
ntask --force lint |
Bypass lint's cache lookup; still write after. |
ntask clean |
Wipe cache entries (outputs survive). |
ntask clean --all |
Wipe the entire .ntask/. |
Sequential by default. Opt in per invocation.
ntask check -j # concurrency = os.cpu_count()
ntask check -j 4 # cap at 4
ntask check --jobs 2 # same as -j 2-j is a global cap implemented as an anyio.CapacityLimiter. DAG ordering is preserved: a task waits for its dependencies; the limiter adds a "slot must be free" condition on top.
When concurrency is greater than 1, shell() reads the per-task _current_line_prefix context var (set by the executor) and routes process output through threaded pumps that prefix [fqn] per line:
[lint] checking src/
[test] pytest starting
[lint] src/auth.py: ok
[test] collected 87 items
[lint] All checks passed.
Lines from the same task are atomic; lines from different tasks may interleave. With concurrency 1 (default) or shell(capture=True), no prefix is added.
For a release step, a database migration, anything that mutates shared state:
@task(parallel=False)
def migrate():
shell("alembic upgrade head")Two invariants:
- Before an exclusive task starts, all running normal tasks drain to zero.
- While an exclusive task runs, no other task starts.
Implemented via an internal _ParallelCoordinator with anyio events. parallel=False is a DAG-wide property; even under -j 16, the exclusive task runs alone.
Under -j > 1, shell() sets stdin=DEVNULL. Tasks that prompt for input will receive EOF. If a task genuinely needs a TTY (pdb, interactive prompt), run sequentially (-j 1).
ntask watch test
ntask watch test --pattern=auth
ntask watch lint -j 4Reruns a @cached task whenever any of its declared input files changes on disk.
- The target must be
@cached(inputs=[...]). Watch uses the inputs glob as the watch set. Non-cached targets exit with code 2. - On startup, watch runs the task once even if the cache would have been a hit (gives you fresh output).
- After the initial run, the watch loop awaits OS-level FS events from
watchfiles. Each batch is filtered throughinputsglobs and.gitignore. - Relevant change: clear screen, print status header (
Watching: <patterns>/Target: <fqn> (<change-detail>)), rerun. - Changes during a run are coalesced into one queued rerun; no cancel-and-restart.
- One target per
ntask watchinvocation. - Editing
tasks.pydoesn't reload. Ctrl-C and restart to pick up new task definitions. - The TUI doesn't activate during watch (line-based renderer is used).
- A delete-and-replace where the new file has the same content hash isn't detected as a change (rare edge case).
Ctrl-C exits cleanly with code 0.
Share cache hits across machines.
# pyproject.toml
[tool.ntask.remote_cache]
type = "s3"
bucket = "my-team-cache"
prefix = "myproject/"No code changes needed. Remote is transparent: local first, then remote, then execute.
For each @cached task:
- Compute the key.
- Check local (
.ntask/cache/<fqn>/<key>.json). Hit -> restore outputs, skip. - Check remote (unless
--offline). Hit -> download entry + outputs, mirror to local, restore. - Execute.
- Write local entry.
- Push entry + output tarball to remote (unless
--offline).
Local filesystem (shared NFS, Docker volume, single-host multi-user):
[tool.ntask.remote_cache]
type = "local-fs"
path = "/mnt/shared/cache"HTTP (any server with PUT enabled - plain GET / PUT / HEAD over stdlib urllib):
[tool.ntask.remote_cache]
type = "http"
url = "https://cache.example.com/ntask"
auth_header = "Bearer $CACHE_TOKEN" # $VAR expanded at load time$VAR references in any string field are expanded via os.path.expandvars at config load time. No WebDAV verbs are used; nginx with ngx_http_dav_module, Caddy, S3 presigned-URL fronts, or any plain HTTP PUT-capable server work.
S3 (pip install ntask[s3]):
[tool.ntask.remote_cache]
type = "s3"
bucket = "my-team-cache"
prefix = "ntask/"Credentials follow boto3's standard chain (env vars, ~/.aws/credentials, IAM, SSO).
S3-compatibles (MinIO, R2, B2):
[tool.ntask.remote_cache]
type = "s3"
bucket = "builds"
endpoint_url = "https://s3.example.r2.cloudflarestorage.com"GCS (pip install ntask[gcs]):
[tool.ntask.remote_cache]
type = "gcs"
bucket = "my-team-cache"
prefix = "ntask/"Credentials: gcloud auth application-default login or GOOGLE_APPLICATION_CREDENTIALS.
| Key pattern | Content |
|---|---|
<prefix>entries/<fqn>/<key>.json |
JSON-serialised CacheEntry. |
<prefix>outputs/<outputs-hash>.tar.gz |
Deterministic tar.gz of output files. |
The tarball is created with sorted entries, mtime=0, uid=gid=0, empty uname/gname. Identical output trees produce byte-identical tarballs regardless of when or where they were captured.
If the remote is unreachable (network, auth, misconfig):
- ntask prints one warning to stderr:
warning: remote cache unreachable: <type>: <message>. Falling back to local. - Subsequent remote operations are silently skipped for the rest of the process.
Builds never fail because the remote is down. --offline is a faster opt-out when you know the remote is unavailable.
The remote operates on trust-your-team. A team member with write access can publish a cache entry with corrupt outputs. Same model as shared CI artefact storage. Use per-team buckets with appropriate IAM. Don't expose to untrusted parties.
- Uploads are synchronous; a task with large outputs blocks completion until upload finishes.
- Output tarballs are constructed in memory; avoid declaring outputs that are gigabytes in size.
- No per-task remote disable; use bucket-level IAM if you need exclusion.
When stdout is a TTY and the TUI is enabled (default), ntask displays a live full-screen interface using Textual.
+-- ntask -----------------------------------------------------+
| > ntask |
| +- > check waiting |
| +- o lint cached |
| +- o typecheck cached |
| +- > test running (8.3s) |
| |
| starting... |
+---------------------------------------------------------------+
| Icon | State |
|---|---|
. |
waiting |
| spinner | running |
+ |
ok |
o |
cached (local) |
o [remote] |
cached (remote) |
x |
failed |
~ |
skipped (upstream failed; --keep-going was set) |
The spinner rotates at 10 Hz via a Textual interval timer.
The footer shows starting... for the duration of the run. After the executor finishes, the TUI exits and the CLI prints the final summary line:
3 ran, 2 cached, 0 failed, 0 skipped - logs: .ntask/logs/20260422-103045-731
When the TUI is active, shell() output is redirected to per-task files at .ntask/logs/<run-id>/<fqn>.log. The run-id is YYYYMMDD-HHMMSS-mmm.
cat .ntask/logs/20260422-103045-731/test.logLog directories accumulate; clean manually with rm -rf .ntask/logs/.
In order of precedence:
ntask check --no-tui # one-off[tool.ntask]
tui = false # always off for this projectntask check | tee build.log # non-TTY: TUI auto-disablesThe TUI also auto-disables in CI environments where stdout isn't a TTY. Setting tui = true in pyproject.toml prevents auto-disable by config but doesn't override the TTY check.
Textual's App.run() must own the main thread (signal handlers). The executor is an anyio coroutine running multiple tasks concurrently. ntask resolves this by inverting:
Main thread Background thread
--------------------- ----------------------------------
tui_app.run() anyio.run(executor.run, ...)
blocks on app exit runs the DAG
receives events via calls app.call_from_thread(...)
call_from_thread to push state updates
call_from_thread is Textual's thread-safe widget update mechanism. The executor uses it for every state event (on_running, on_ok, on_cached, on_failed).
The TUIRenderer.start() lifecycle hook waits on a threading.Event set when the app's on_mount fires. This avoids racing widget mutation against widget construction.
When the executor completes, TUIRenderer.stop() posts app.exit() via call_from_thread. The main thread's app.run() then returns and the CLI joins the bg thread.
[tool.ntask] in pyproject.toml. No pyproject.toml -> all defaults apply.
| Key | Type | Default | Description |
|---|---|---|---|
cache_dir |
string | ".ntask" |
Cache directory, relative to project root. |
default_concurrency |
int | 1 |
Used when -j isn't passed. |
tui |
bool / null | null |
null auto-detect; false always off; true prefer TUI on TTY. |
| Field | Applies to | Required |
|---|---|---|
type |
all | yes |
path |
local-fs |
yes |
url |
http |
yes |
auth_header |
http |
no |
bucket |
s3, gcs |
yes |
prefix |
s3, gcs |
no |
endpoint_url |
s3 |
no |
All string values support $VAR expansion at load time.
[tool.ntask]
cache_dir = ".ntask"
default_concurrency = 4
tui = false
[tool.ntask.remote_cache]
type = "s3"
bucket = "my-company-build-cache"
prefix = "myproject/"
endpoint_url = "https://s3.us-east-1.amazonaws.com"ntask doesn't read env for its own configuration (uses pyproject.toml). However:
- AWS_*, GOOGLE_APPLICATION_CREDENTIALS, etc. are honoured by the underlying SDKs.
$VARreferences inside pyproject.toml string values expand from the environment at load time.- Tasks declare which env affects their cache key via
@cached(env=["NAME"]).
For contributors and embedders.
src/ntask/
+-- __init__.py public re-exports
+-- __main__.py python -m ntask entry
+-- _cli.py argparse, renderer selection, main()
+-- _cli_args.py signature -> argparse converter
+-- _cli_docstring.py docstring summary extraction
+-- _cli_format.py --list and --graph rendering
+-- _cli_why.py --why renderer
+-- _config.py ProjectConfig, RemoteCacheConfig, loader
+-- _coordinator.py _ParallelCoordinator (parallel=False barriers)
+-- _dag.py Graph, toposort (Kahn), build_graph
+-- _depends.py depends() + _DependsVisitor (AST)
+-- _discovery.py find_project_root, discover
+-- _errors.py NtaskError, CycleError, DiscoveryError, ShellError
+-- _executor.py Executor, ExecutionConfig, RunResult
+-- _registry.py Registry, default_registry()
+-- _shell.py shell(), ShellResult, ContextVars for prefix/log-file
+-- _task.py task(), cached(), group(), Task, CachedConfig
+-- _watch.py watch_loop(), filter builder
+-- _cache/
| +-- __init__.py CacheEngine
| +-- body.py structural body hash
| +-- diff.py MissReport, MissItem, diff_cache_state
| +-- hash.py xxh3_128 helpers
| +-- key.py CacheKeyInputs, CacheBreakdown, compute_cache_key
| +-- manifest.py compute_input_manifest
| +-- outputs.py OutputStore (capture + restore)
| +-- store.py CacheStore, CacheEntry
+-- _remote/
| +-- __init__.py make_backend factory
| +-- base.py RemoteBackend protocol
| +-- local_fs.py LocalFSBackend
| +-- http.py HTTPBackend (urllib)
| +-- s3.py S3Backend (boto3, optional extra)
| +-- gcs.py GCSBackend (google-cloud-storage, optional extra)
| +-- _tar.py deterministic tar.gz helpers
+-- _render/
+-- __init__.py re-exports
+-- base.py Renderer protocol
+-- log.py LogRenderer (plain text)
+-- rich.py RichRenderer (TTY, colour)
+-- tui.py TUIRenderer + _DAGApp (Textual)
discover(start) walks from start (typically Path.cwd()) up through parents looking for tasks.py or tasks/__init__.py. The first match is imported with a synthetic module name. Top-level decorators run during import and populate default_registry().
The registry is a module-level dict from fqn to Task. Registering the same fqn twice raises. Registry is built once per process; hot-reload isn't supported.
build_graph(reg) collects two kinds of edges:
- Declared deps (
@task(deps=[a, b])):_resolve_reffollows function-object identity, the__ntask_task__marker, or string fqns. - Static
depends()calls:scan_static_depends(func)parses the function source, walks the body (stopping at nested function/class scopes), and collects bareNamearguments. Dotted attributes are recorded as dotted strings; string literals are flagged dynamic and contribute nothing.
toposort(g) runs Kahn's algorithm (BFS from zero-indegree). If the result is shorter than the node count, a cycle exists; _find_cycle runs DFS over the remaining nodes and reports it via CycleError.
reachable_from([target]) prunes the graph before execution.
Executor(registry, config).run(targets, task_kwargs) -> RunResult
For each task in topological order:
- Await all dependency completion events.
- If any dep failed: mark this task skipped, set its event, return.
- If
@cachedand not--no-cache: compute key + breakdown. - Check local cache. Hit -> restore outputs, signal done, return.
- Check remote cache (if not
--offline). Hit -> mirror to local, restore, signal done, return. - Emit
on_miss_reason. - Enter
_ParallelCoordinator(respectingparallel=False). - Acquire the
CapacityLimiter. - Set per-task ContextVars (
_current_log_filefor TUI,_current_line_prefixfor parallel non-TUI). - Invoke the task. Sync functions go through
anyio.to_thread.run_sync; async functions are awaited directly. - Store cache entry, push to remote. Emit
on_ok. - On failure: emit
on_failed, record in failures dict, re-raise (or suppress with--keep-going). - Signal done.
CacheEngine interface:
compute_key_and_breakdown(t, workspace, upstream_keys_by_dep)- returns(key, CacheBreakdown). One call avoids redundant work; the breakdown is used for both lookup and miss reporting.check(t, key)- local lookup.check_remote(t, key)- remote lookup; mirrors entry+outputs to local on hit. Non-fatal on error.store_entry(t, key, ...)- capture outputs, write JSON entry.restore_outputs(entry, workspace)- hard-link or copy.push_remote(t, entry)- upload entry + output tarball.
CacheStore serialises CacheEntry to JSON at .ntask/cache/<fqn>/<key>.json. The breakdown is included so --why can diff against it.
File hashing uses a ThreadPoolExecutor for parallel reads when many inputs are present.
class Renderer(Protocol):
def on_running(self, fqn: str, *, cmd: str | None) -> None: ...
def on_ok(self, fqn: str, *, duration: float) -> None: ...
def on_cached(self, fqn: str, *, key: str, source: str = "local") -> None: ...
def on_miss_reason(self, fqn: str, *, report: MissReport) -> None: ...
def on_failed(self, fqn: str, *, error: BaseException, tail_lines: list[str]) -> None: ...
def summary(self, *, ran: int, cached: int, failed: int, skipped: int) -> None: ...Optional lifecycle methods start(graph, logs_dir) and stop() are detected via hasattr - present on TUIRenderer, absent on LogRenderer and RichRenderer.
| Class | Used when |
|---|---|
LogRenderer |
Non-TTY (pipe / CI) or --no-color. |
RichRenderer |
TTY with --no-tui, or watch mode. |
TUIRenderer |
TTY, colour enabled, --no-tui not set, Textual importable. |
Selection lives in _cli.py.
Two ContextVars thread information from the executor into shell() without argument plumbing:
_current_line_prefix: ContextVar[str | None]- set to the task fqn when-j > 1and TUI inactive._current_log_file: ContextVar[Path | None]- set to the per-task log path when TUI is active.
ContextVars are copy-on-write within anyio tasks, so values don't leak across coroutines. The executor resets the token after each task call.
depends(*tasks):
- Static (registration time): an AST visitor reads
depends(name)calls in the function source and adds DAG edges before any task runs. Bare-name references are resolved against the registry at graph-build time. - Runtime: the body-level call is a no-op. The static analysis already wired everything into the DAG; no runtime dispatch is needed.
For dynamic dependencies that can't be statically determined, use @task(deps=[...]) - that resolution path accepts function objects and string fqns at registration time.
from ntask import task, group, cached, shell
@group("build")
class Build:
@task
@cached(inputs=["src/**/*.py"], outputs=["dist/"])
def wheel():
shell("python -m build")
@group("publish")
class Publish:
@task(deps=[Build.wheel])
def pypi():
shell("twine upload dist/*.whl")ntask publish.pypi runs build.wheel first.
@task
@cached(
inputs=["src/**/*.proto"],
outputs=["src/generated/**/*.py"],
)
def codegen():
shell("protoc --python_out=src/generated src/**/*.proto")
@task(deps=[codegen])
@cached(inputs=["src/**/*.py"])
def test():
shell("pytest")propagate=True (default) means a codegen miss invalidates test automatically. The generated files are also picked up by test's own inputs= glob, so their content is tracked twice for safety.
@task
@cached(
inputs=["src/**/*.py", "tests/**/*.py"],
env=["CI", "TEST_DATABASE_URL"],
)
def test(pattern: str = ""):
k = f"-k {pattern}" if pattern else ""
shell(f"pytest -v {k}")A run with CI=true TEST_DATABASE_URL=postgres://... has a different key than one with those unset, so CI test results don't get served as hits during local development.
@task(parallel=False, deps=[check, build])
def release(version: str):
"""Cut a release. Never overlaps with anything else."""
shell(f"git tag v{version}")
shell("git push --tags")
shell("twine upload dist/*.whl")ntask release --version=1.2.0 -j 16 still runs release alone after the in-flight DAG drains.
@task
@cached(inputs=["pyproject.toml"])
def maybe_publish():
version = shell("python -m build --version", capture=True).stdout.strip()
existing = shell(f"pip show mypackage", capture=True, check=False)
if version in existing.stdout:
print(f"version {version} already published, skipping")
return
shell("twine upload dist/*.whl")capture=True always uses captured mode (no streaming, no prefix). The returned ShellResult is never None in capture mode.
One-time setup:
- Create a bucket (
my-team-ntask-cache). - IAM policy granting
s3:GetObject,s3:PutObject,s3:HeadObjectto team members / CI roles.
Project (committed):
[tool.ntask.remote_cache]
type = "s3"
bucket = "my-team-ntask-cache"
prefix = "myproject/"Developer:
pip install ntask[s3]
ntask check # first run populates remote
ntask check --offline # fast dev loopCI:
- name: Run checks
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
run: ntask checkCause: the body hash treats formatting-only changes (whitespace, comments, docstrings) as no-ops. If you only reformatted, the key is identical.
Verify: ntask --why <task>. If it says HIT - no changes and you expected a miss, the actual logic didn't change.
Force: ntask --force <task>.
Reset: ntask clean.
Check 1: Is the file matched by the inputs glob? Run ntask --why <task> and look at "Files matched". If it's zero or the file isn't listed, the glob doesn't match.
Check 2: Is the file gitignored? Watch applies the same rules as the cache. Ignored files don't trigger reruns.
Check 3: Did you edit tasks.py? It's not in the watch set. Ctrl-C and restart.
Check 4: Is the file outside the project root? Watch uses the project root as the base directory.
Try ntask check --no-tui. The TUI requires a terminal that handles modern ANSI sequences. Some multiplexers (tmux, screen) without TERM=xterm-256color and COLORTERM=truecolor may misrender.
Permanent fix: tui = false in [tool.ntask].
Symptom: warning: remote cache unreachable: ClientError: ... Falling back to local.
Check credentials: aws s3 ls s3://your-bucket/ from the same shell. boto3 uses the same chain.
Check bucket / prefix: match what's in pyproject.toml.
Offline fallback: ntask check --offline skips remote entirely.
Retry behaviour: none. ntask warns once and uses the local cache for the rest of the process. If the remote is genuinely flaky, fix it at the infrastructure level (transfer acceleration, VPC endpoint).
Under -j > 1, every line through shell() is prefixed [fqn] so lines don't interleave in confusing ways. print() in your task body is NOT prefixed - those writes go raw to stdout.
To label your own non-shell output:
@task
def my_task():
fqn = "my_task"
for item in items:
print(f"[{fqn}] processing {item}")Or run sequentially (-j 1).
error: cycle in task graph: build -> publish -> build
The cycle is the shortest one DFS found in the unresolved nodes. Review @task(deps=[...]) and depends(...) calls and break the cycle.
Cycles are detected at graph-build time; no partial execution occurs.
All of the following are imported from ntask:
from ntask import (
task, cached, group, depends,
shell, ShellResult,
NtaskError, CycleError, DiscoveryError, ShellError,
__version__,
)@task
def fn() -> None: ...
@task(deps: list[Callable | str] = [], parallel: bool = True)
def fn() -> None: ...Registers the function. concurrency keyword was deprecated in 0.3 and is removed in 1.0.
@cached(
inputs: list[str] = [],
outputs: list[str] = [],
env: list[str] = [],
propagate: bool = True,
strict: bool = True,
)
def fn() -> None: ...Stack with @task (either order).
@group(name: str)
class Tasks:
@task
def method() -> None: ...Methods take no self. Group names must be non-empty and must not contain ..
depends(*tasks: Callable | str) -> NoneRuntime no-op; static AST analysis lifts bare-name references into the DAG at registration time.
def shell(
cmd: str | list[str],
*,
check: bool = True,
capture: bool = False,
cwd: str | Path | None = None,
env: dict[str, str] | None = None,
) -> ShellResult | None@dataclass(frozen=True, slots=True)
class ShellResult:
returncode: int
stdout: str
stderr: str
duration: float
@property
def ok(self) -> bool: ...NtaskError (base), DiscoveryError, CycleError (with .cycle: list[str]), ShellError (with .returncode: int and .cmd).
__version__: str # "1.0.0"ntask attaches three dunder attributes to decorated functions for runtime introspection:
<fn>.__ntask_task__: str- registered fqn<fn>.__ntask_cached_config__: CachedConfig- present when@cachedwrapped before@taskran<fn>.__ntask_group__: str | None- present when wrapped inside@group(...)
Read these for diagnostics; don't mutate them.
body hash. Structural AST hash of a task function, computed after stripping docstrings. Stable under whitespace, comment, and formatting changes; sensitive to logic changes.
cache hit. The computed cache key matches an entry in the local or remote store. The task is skipped; outputs are restored from the captured blob.
cache key. A 128-bit xxh3 hash uniquely identifying a particular combination of task identity, inputs, environment, body, and upstream state.
CacheBreakdown. The structured record stored alongside each cache entry containing per-file input digests, env values, body hash, Python version, platform tag, and upstream keys. Used by --why to compute diffs.
CacheEntry. The JSON record at .ntask/cache/<fqn>/<key>.json.
CapacityLimiter. anyio primitive; semaphore that caps concurrent task count to the value given by -j.
content-addressed. Storage scheme where data is located by its hash rather than its name.
DAG. Directed Acyclic Graph. Used for execution order and reachability.
fqn. Fully Qualified Name. build for a bare task; ci.build for a method inside @group("ci").
gitignore-style glob. Pattern following .gitignore rules: ** matches any path component, ! negates, directory prefixes match recursively. Implemented by pathspec in gitwildmatch mode.
input manifest. Ordered list of (path, content-digest, mode) triples for every file matched by inputs=. Its aggregate hash feeds the cache key.
MissItem. A single record in a MissReport describing one cache-miss reason.
MissReport. A tuple of MissItem values describing all reasons a lookup didn't hit. is_hit is True when the tuple is empty.
outputs hash. xxh3_128 of (path, content-digest) pairs for every file matched by outputs=. Used as the directory name in the local store and as the filename stem in the remote store.
parallel=False. Task attribute making the task a DAG-wide barrier: it waits for everything in flight to drain, then holds the DAG exclusively.
propagate. @cached(propagate=True) causes upstream cache keys to be included. Default True. Setting False makes this task's key independent of upstream changes.
remote cache. Shared cache backend (S3, GCS, HTTP, local-fs) from which teams and CI can read and write. Transparent to the task author.
run-id. Timestamp string YYYYMMDD-HHMMSS-mmm used as the directory name for per-run TUI logs.
strict mode. When strict=True (default), the cache key includes body hash, Python version, and platform tag. When False, those three are replaced with placeholders to allow cross-platform / cross-version cache sharing.
transitive key propagation. A downstream task's key includes its upstream dependencies' keys, so any upstream change invalidates the downstream automatically.
toposort. Topological sort of the DAG (Kahn's algorithm). Produces a stable order where each task appears after all its dependencies.
xxh3_128. 128-bit variant of the xxHash non-cryptographic hash. About 10x faster than SHA-256; sufficient collision resistance for build tooling.