Skip to content

Commit 9e41f48

Browse files
committed
add local chroma mode with grep/list retrieval commands
1 parent 26bbe07 commit 9e41f48

10 files changed

Lines changed: 908 additions & 116 deletions

File tree

README.md

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,16 @@ Index a text document into ChromaDB (with optional chunking and metadata).
3939

4040
```bash
4141
mpipe index --file notes.txt --collection docs --embedding-model accounts/fireworks/models/kimi-k2-instruct-0905
42-
mpipe index --document "Hello world" --collection scratch --embedding-model accounts/fireworks/models/kimi-k2-instruct-0905
42+
mpipe index --document "Hello world" --source "chat://session/42" --collection scratch --embedding-model accounts/fireworks/models/kimi-k2-instruct-0905
43+
mpipe index --file notes.txt --collection docs --chroma-path ./.chroma --embedding-model accounts/fireworks/models/kimi-k2-instruct-0905
4344
```
4445

46+
Source metadata policy:
47+
48+
- `--file`: `source` is auto-filled with the file path (unless `--source` is provided).
49+
- `--document`: `--source` is required.
50+
- `--source` always wins over metadata values.
51+
4552
Provide embeddings via stdin (one vector per line, comma-separated):
4653

4754
```bash
@@ -59,9 +66,30 @@ mpipe index --file notes.txt --metadata-json metadata.json --metadata lang=fr --
5966
ChromaDB connection resolution:
6067

6168
- CLI: `--chroma-url` or `--chroma-host`/`--chroma-port`/`--chroma-scheme`
62-
- Env: `CHROMA_URL`, `CHROMA_HOST`, `CHROMA_PORT`, `CHROMA_SCHEME`
69+
- Local persistent mode: `--chroma-path <dir>` (or env `CHROMA_PATH`) auto-starts `chroma run` and stores data in that directory
70+
- Env: `CHROMA_URL`, `CHROMA_HOST`, `CHROMA_PORT`, `CHROMA_SCHEME`, `CHROMA_PATH`
6371
- Collection: `--collection` or `CHROMA_COLLECTION` (default `mpipe`)
6472

73+
Note: `--chroma-url` cannot be combined with `--chroma-path`.
74+
75+
## `mpipe list`
76+
77+
List entries in a collection.
78+
79+
```bash
80+
mpipe list --collection docs --limit 20
81+
mpipe list --collection docs --offset 20 --limit 20 --json
82+
```
83+
84+
## `mpipe grep`
85+
86+
Classic RAG: retrieve top chunks from ChromaDB, then ask the LLM with those chunks as context.
87+
88+
```bash
89+
mpipe grep --collection docs --embedding-model accounts/fireworks/models/qwen3-embedding-8b --model gpt-4o-mini "What does the retry logic do?"
90+
mpipe grep --collection docs --top-k 8 --embedding-model accounts/fireworks/models/qwen3-embedding-8b --provider fireworks --model accounts/fireworks/models/kimi-k2-instruct-0905 "Resume this document"
91+
```
92+
6593
### Prompt input
6694

6795
- `mpipe ask "question"` uses the CLI argument as prompt.

src/commands/chroma.rs

Lines changed: 289 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,289 @@
1+
use std::env;
2+
use std::fs;
3+
use std::io;
4+
use std::path::PathBuf;
5+
use std::process::{Child, Command, Stdio};
6+
use std::time::Duration;
7+
8+
use chromadb::client::{ChromaAuthMethod, ChromaClient, ChromaClientOptions};
9+
use clap::Args;
10+
11+
const DEFAULT_CHROMA_LOCAL_HOST: &str = "127.0.0.1";
12+
const DEFAULT_CHROMA_PORT: u16 = 8000;
13+
const LOCAL_CHROMA_START_TIMEOUT: Duration = Duration::from_secs(10);
14+
const LOCAL_CHROMA_POLL_INTERVAL: Duration = Duration::from_millis(250);
15+
16+
#[derive(Debug, Args, Clone, Default)]
17+
pub struct ChromaConnectArgs {
18+
#[arg(long = "chroma-url")]
19+
pub chroma_url: Option<String>,
20+
21+
#[arg(long = "chroma-host")]
22+
pub chroma_host: Option<String>,
23+
24+
#[arg(long = "chroma-port")]
25+
pub chroma_port: Option<u16>,
26+
27+
#[arg(long = "chroma-scheme")]
28+
pub chroma_scheme: Option<String>,
29+
30+
#[arg(long = "chroma-path")]
31+
pub chroma_path: Option<PathBuf>,
32+
}
33+
34+
#[derive(Debug, Clone)]
35+
enum ChromaConnection {
36+
Remote {
37+
url: Option<String>,
38+
},
39+
LocalPersistent {
40+
url: String,
41+
host: String,
42+
port: u16,
43+
path: PathBuf,
44+
},
45+
}
46+
47+
impl ChromaConnection {
48+
fn url(&self) -> Option<&str> {
49+
match self {
50+
Self::Remote { url } => url.as_deref(),
51+
Self::LocalPersistent { url, .. } => Some(url.as_str()),
52+
}
53+
}
54+
}
55+
56+
pub struct LocalChromaGuard {
57+
child: Option<Child>,
58+
}
59+
60+
impl Drop for LocalChromaGuard {
61+
fn drop(&mut self) {
62+
if let Some(mut child) = self.child.take() {
63+
let _ = child.kill();
64+
let _ = child.wait();
65+
}
66+
}
67+
}
68+
69+
pub async fn connect(
70+
args: &ChromaConnectArgs,
71+
) -> Result<(ChromaClient, Option<LocalChromaGuard>), String> {
72+
let connection = resolve_chroma_connection(args)?;
73+
let guard = start_local_chroma_if_needed(&connection).await?;
74+
let client = ChromaClient::new(ChromaClientOptions {
75+
url: connection.url().map(str::to_string),
76+
auth: ChromaAuthMethod::None,
77+
database: "default_database".to_string(),
78+
})
79+
.await
80+
.map_err(|err| format!("Failed to connect to ChromaDB: {err}"))?;
81+
Ok((client, guard))
82+
}
83+
84+
fn resolve_chroma_connection(args: &ChromaConnectArgs) -> Result<ChromaConnection, String> {
85+
let chroma_path = resolve_chroma_path(args);
86+
if let Some(path) = chroma_path {
87+
if args.chroma_url.is_some() {
88+
return Err("--chroma-url cannot be used with --chroma-path/CHROMA_PATH.".to_string());
89+
}
90+
91+
let scheme = args
92+
.chroma_scheme
93+
.clone()
94+
.or_else(|| env::var("CHROMA_SCHEME").ok())
95+
.unwrap_or_else(|| "http".to_string());
96+
if !scheme.eq_ignore_ascii_case("http") {
97+
return Err(format!(
98+
"--chroma-path requires --chroma-scheme=http (got '{scheme}')."
99+
));
100+
}
101+
102+
let host = args
103+
.chroma_host
104+
.clone()
105+
.or_else(|| env::var("CHROMA_HOST").ok())
106+
.unwrap_or_else(|| DEFAULT_CHROMA_LOCAL_HOST.to_string());
107+
if host.contains("://") {
108+
return Err(
109+
"--chroma-host must be a hostname when using --chroma-path (no scheme)."
110+
.to_string(),
111+
);
112+
}
113+
114+
let port = resolve_chroma_port(args)?;
115+
let url = format!("http://{host}:{port}");
116+
117+
return Ok(ChromaConnection::LocalPersistent {
118+
url,
119+
host,
120+
port,
121+
path,
122+
});
123+
}
124+
125+
Ok(ChromaConnection::Remote {
126+
url: resolve_chroma_url(args)?,
127+
})
128+
}
129+
130+
fn resolve_chroma_url(args: &ChromaConnectArgs) -> Result<Option<String>, String> {
131+
if let Some(url) = &args.chroma_url {
132+
let trimmed = url.trim();
133+
if trimmed.is_empty() {
134+
return Err("--chroma-url cannot be empty".to_string());
135+
}
136+
return Ok(Some(trimmed.to_string()));
137+
}
138+
139+
let scheme = args
140+
.chroma_scheme
141+
.clone()
142+
.or_else(|| env::var("CHROMA_SCHEME").ok());
143+
let host = args
144+
.chroma_host
145+
.clone()
146+
.or_else(|| env::var("CHROMA_HOST").ok());
147+
let port = resolve_chroma_port_opt(args)?;
148+
149+
if scheme.is_none() && host.is_none() && port.is_none() {
150+
return Ok(None);
151+
}
152+
153+
if let Some(host) = host.clone()
154+
&& host.contains("://")
155+
&& scheme.is_none()
156+
&& port.is_none()
157+
{
158+
return Ok(Some(host));
159+
}
160+
161+
let scheme = scheme.unwrap_or_else(|| "http".to_string());
162+
let host = host.unwrap_or_else(|| "localhost".to_string());
163+
let port = port.unwrap_or(DEFAULT_CHROMA_PORT);
164+
Ok(Some(format!("{scheme}://{host}:{port}")))
165+
}
166+
167+
fn resolve_chroma_port(args: &ChromaConnectArgs) -> Result<u16, String> {
168+
Ok(resolve_chroma_port_opt(args)?.unwrap_or(DEFAULT_CHROMA_PORT))
169+
}
170+
171+
fn resolve_chroma_port_opt(args: &ChromaConnectArgs) -> Result<Option<u16>, String> {
172+
if let Some(port) = args.chroma_port {
173+
return Ok(Some(port));
174+
}
175+
176+
match env::var("CHROMA_PORT") {
177+
Ok(raw) => {
178+
let trimmed = raw.trim();
179+
if trimmed.is_empty() {
180+
return Ok(None);
181+
}
182+
let parsed = trimmed.parse::<u16>().map_err(|_| {
183+
format!("Invalid CHROMA_PORT '{trimmed}': expected integer 0-65535")
184+
})?;
185+
Ok(Some(parsed))
186+
}
187+
Err(_) => Ok(None),
188+
}
189+
}
190+
191+
fn resolve_chroma_path(args: &ChromaConnectArgs) -> Option<PathBuf> {
192+
if let Some(path) = &args.chroma_path {
193+
return Some(path.clone());
194+
}
195+
196+
env::var("CHROMA_PATH").ok().and_then(|value| {
197+
let trimmed = value.trim();
198+
if trimmed.is_empty() {
199+
None
200+
} else {
201+
Some(PathBuf::from(trimmed))
202+
}
203+
})
204+
}
205+
206+
async fn start_local_chroma_if_needed(
207+
connection: &ChromaConnection,
208+
) -> Result<Option<LocalChromaGuard>, String> {
209+
let ChromaConnection::LocalPersistent {
210+
url,
211+
host,
212+
port,
213+
path,
214+
} = connection
215+
else {
216+
return Ok(None);
217+
};
218+
219+
fs::create_dir_all(path).map_err(|err| {
220+
format!(
221+
"Failed to create Chroma persistence directory '{}': {err}",
222+
path.display()
223+
)
224+
})?;
225+
226+
let mut child = Command::new("chroma")
227+
.arg("run")
228+
.arg("--path")
229+
.arg(path)
230+
.arg("--host")
231+
.arg(host)
232+
.arg("--port")
233+
.arg(port.to_string())
234+
.stdin(Stdio::null())
235+
.stdout(Stdio::null())
236+
.stderr(Stdio::null())
237+
.spawn()
238+
.map_err(|err| {
239+
if err.kind() == io::ErrorKind::NotFound {
240+
"Failed to start local ChromaDB: 'chroma' command not found. Install Chroma or run a server manually and use --chroma-url."
241+
.to_string()
242+
} else {
243+
format!("Failed to start local ChromaDB process: {err}")
244+
}
245+
})?;
246+
247+
wait_for_chroma_ready(url).await.map_err(|err| {
248+
let _ = child.kill();
249+
let _ = child.wait();
250+
err
251+
})?;
252+
253+
eprintln!(
254+
"mpipe: started local ChromaDB at {url} (path: {})",
255+
path.display()
256+
);
257+
258+
Ok(Some(LocalChromaGuard { child: Some(child) }))
259+
}
260+
261+
async fn wait_for_chroma_ready(url: &str) -> Result<(), String> {
262+
let deadline = std::time::Instant::now() + LOCAL_CHROMA_START_TIMEOUT;
263+
264+
loop {
265+
let attempt_error = match ChromaClient::new(ChromaClientOptions {
266+
url: Some(url.to_string()),
267+
auth: ChromaAuthMethod::None,
268+
database: "default_database".to_string(),
269+
})
270+
.await
271+
{
272+
Ok(client) => match client.heartbeat().await {
273+
Ok(_) => return Ok(()),
274+
Err(err) => format!("heartbeat failed: {err}"),
275+
},
276+
Err(err) => format!("connection failed: {err}"),
277+
};
278+
279+
if std::time::Instant::now() >= deadline {
280+
return Err(format!(
281+
"Local ChromaDB did not become ready at {url} within {}s ({}).",
282+
LOCAL_CHROMA_START_TIMEOUT.as_secs(),
283+
attempt_error
284+
));
285+
}
286+
287+
tokio::time::sleep(LOCAL_CHROMA_POLL_INTERVAL).await;
288+
}
289+
}

0 commit comments

Comments
 (0)