Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions mistralrs-cli/src/ui/handlers/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ pub async fn delete_chat(
Extension(app): Extension<Arc<AppState>>,
Json(req): Json<DeleteChatRequest>,
) -> impl IntoResponse {

let path = format!("{}/{}.json", app.chats_dir, req.id);
match fs::remove_file(&path).await {
Ok(_) => (StatusCode::OK, "Deleted").into_response(),
Expand All @@ -384,6 +385,7 @@ pub async fn load_chat(
Extension(app): Extension<Arc<AppState>>,
Json(req): Json<LoadChatRequest>,
) -> impl IntoResponse {

let path = format!("{}/{}.json", app.chats_dir, req.id);
if let Ok(bytes) = fs::read(&path).await {
if let Ok(chat) = serde_json::from_slice::<ChatFile>(&bytes) {
Expand All @@ -399,6 +401,7 @@ pub async fn rename_chat(
Extension(app): Extension<Arc<AppState>>,
Json(req): Json<RenameChatRequest>,
) -> impl IntoResponse {

let path = format!("{}/{}.json", app.chats_dir, req.id);
if let Ok(bytes) = fs::read(&path).await {
if let Ok(mut chat) = serde_json::from_slice::<ChatFile>(&bytes) {
Expand Down Expand Up @@ -427,6 +430,7 @@ pub async fn append_message(
Extension(app): Extension<Arc<AppState>>,
Json(req): Json<AppendMessageRequest>,
) -> impl IntoResponse {

if let Err(e) = append_chat_message(&app, &req.id, &req.role, &req.content, req.images).await {
error!("append message error: {}", e);
return (StatusCode::INTERNAL_SERVER_ERROR, "append failed").into_response();
Expand Down
1 change: 1 addition & 0 deletions mistralrs-cli/src/ui/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ pub fn get_cache_dir() -> PathBuf {
});
cache_home.join("mistralrs")
}

54 changes: 30 additions & 24 deletions mistralrs-core/src/paged_attention/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,16 @@ impl PagedAttentionScheduler {
return buckets.into_values().next().unwrap();
}

// Find the bucket with the shortest sequence length
// Find the bucket containing the OLDEST sequence (lowest timestamp) to ensure FCFS priority
let min_key = *buckets
.keys()
.min_by_key(|(len, _, _)| *len)
.iter()
.min_by_key(|(_, seqs)| {
seqs.iter()
.map(|seq| get_mut_arcmutex!(seq).timestamp())
.min()
.unwrap()
})
.map(|(key, _)| key)
.expect("No sequence buckets");

let selected = buckets.remove(&min_key).unwrap();
Expand Down Expand Up @@ -232,8 +238,9 @@ impl PagedAttentionScheduler {
*count += 1;

if *count > WAITING_TIMEOUT {
// Try to preempt a running sequence
if let Some(seq_to_preempt) = self.running.pop_back() {
// Continuously preempt running sequences until allocation succeeds
let mut success = false;
while let Some(seq_to_preempt) = self.running.pop_back() {
self._preempt(seq_to_preempt);

// Retry allocation
Expand All @@ -242,25 +249,28 @@ impl PagedAttentionScheduler {
kv_mgr.allocate_slots(seq_id, num_tokens, &computed.block_ids);
drop(kv_mgr);

if retry.is_none() {
if retry.is_some() {
self.waiting_counts.remove(&seq_id);
success = true;
break;
}
}

if !success {
// Even after emptying `running`, it doesn't fit.
if self.running.is_empty() {
let id = seq_id;
warn!(
"Sequence {id} with length of {num_tokens} tokens still exceeds KV cache size \
even after evicting another sequence.",
"Sequence {id} with length of {num_tokens} tokens is too long and exceeds max KV cache size. \
Ignored."
);
get_mut_arcmutex!(seq).set_state(SequenceState::FinishedIgnored);
did_ignore = true;
} else {
self.waiting_counts.remove(&seq_id);
warn!("Sequence {seq_id} still waiting for memory...");
// Safely break the loop to wait for the next iteration without dropping the request!
break;
}
} else {
warn!(
"Sequence {seq_id} with length of {num_tokens} tokens is too long and exceeds KV cache size. \
To fix, increase the maximum sequence length for the KV cache, for example with \
`--max-seq-len`/ `max_seq_len` in automatic device mapping parameters.",
);
get_mut_arcmutex!(seq).set_state(SequenceState::FinishedIgnored);
did_ignore = true;
}
} else {
break;
Expand Down Expand Up @@ -334,6 +344,7 @@ impl PagedAttentionScheduler {
self.sort_running_by_priority_fcfs();

let mut running: VecDeque<Arc<Mutex<Sequence>>> = VecDeque::new();
let mut deferred_running: VecDeque<Arc<Mutex<Sequence>>> = VecDeque::new();
while !self.running.is_empty() {
let seq = self.running.pop_front().unwrap();
let mut finished_with_break = false;
Expand Down Expand Up @@ -367,16 +378,12 @@ impl PagedAttentionScheduler {
{
running.push_back(seq);
} else {
self.running.push_back(seq);
deferred_running.push_back(seq);
}
}
}
self.running = running;

// Bucket running completions by sequence length
let running_for_bucket = std::mem::take(&mut self.running);
let bucketed = self.bucket_and_preempt_sequences(running_for_bucket);
self.running = bucketed;
self.running.extend(deferred_running);

self.running
.iter()
Expand Down Expand Up @@ -497,7 +504,6 @@ impl PagedAttentionScheduler {
self.running
.make_contiguous()
.sort_by_key(|seq| get_mut_arcmutex!(seq).timestamp());
self.running.make_contiguous().reverse();
}
}

Expand Down
4 changes: 4 additions & 0 deletions mistralrs-web-chat/src/handlers/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,7 @@ pub async fn delete_chat(
State(app): State<Arc<AppState>>,
Json(req): Json<DeleteChatRequest>,
) -> impl IntoResponse {

let path = format!("{}/{}.json", app.chats_dir, req.id);
if let Err(e) = tokio::fs::remove_file(&path).await {
error!("delete chat error: {}", e);
Expand All @@ -478,6 +479,7 @@ pub async fn load_chat(
State(app): State<Arc<AppState>>,
Json(req): Json<LoadChatRequest>,
) -> impl IntoResponse {

let path = format!("{}/{}.json", app.chats_dir, req.id);
match fs::read(&path).await {
Ok(data) => match serde_json::from_slice::<ChatFile>(&data) {
Expand Down Expand Up @@ -510,6 +512,7 @@ pub async fn rename_chat(
State(app): State<Arc<AppState>>,
Json(req): Json<RenameChatRequest>,
) -> impl IntoResponse {

let path = format!("{}/{}.json", app.chats_dir, req.id);
if let Ok(data) = fs::read(&path).await {
if let Ok(mut chat) = serde_json::from_slice::<ChatFile>(&data) {
Expand Down Expand Up @@ -538,6 +541,7 @@ pub async fn append_message(
State(app): State<Arc<AppState>>,
Json(req): Json<AppendMessageRequest>,
) -> impl IntoResponse {

if let Err(e) =
crate::chat::append_chat_message(&app, &req.id, &req.role, &req.content, req.images).await
{
Expand Down
1 change: 1 addition & 0 deletions mistralrs-web-chat/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ pub fn get_cache_dir() -> PathBuf {
});
cache_home.join("mistralrs-web-chat")
}

Loading