Skip to content

Commit ac47b87

Browse files
committed
feat: switch VectorCourt from sync consult to async submit flow
1 parent ec33eab commit ac47b87

3 files changed

Lines changed: 343 additions & 12 deletions

File tree

internal/tui/app.go

Lines changed: 202 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@ package tui
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
7+
"net/http"
68
"strings"
79
"time"
810

@@ -96,12 +98,44 @@ type deliberationState struct {
9698
status deliberationStatus
9799
startTime time.Time
98100
cancel context.CancelFunc
99-
flightID string // ID of the flight record to update with VectorCourt data
101+
flightID string // ID of the flight record to update with VectorCourt data
102+
sparMsg string // latest spar event message for live display
103+
queuePos int // queue position (0 = not queued / processing)
104+
sparCh <-chan vectorcourt.SparEvent // SSE event channel (nil when not streaming)
100105
}
101106

102107
// deliberationTickMsg fires every second to update the elapsed timer.
103108
type deliberationTickMsg struct{}
104109

110+
// deliberationSubmittedMsg signals that /v1/submit returned successfully.
111+
// The Update handler starts polling and SSE streaming.
112+
type deliberationSubmittedMsg struct {
113+
submissionID string
114+
caseID string
115+
position int
116+
client *vectorcourt.Client
117+
question string
118+
gate *vectorcourt.GateResult
119+
vcSnapshot *flight.VectorCourtSnapshot
120+
}
121+
122+
// deliberationPollMsg carries an intermediate poll result.
123+
type deliberationPollMsg struct {
124+
status *vectorcourt.SubmissionStatus
125+
err error
126+
client *vectorcourt.Client
127+
submissionID string
128+
caseID string
129+
question string
130+
gate *vectorcourt.GateResult
131+
vcSnapshot *flight.VectorCourtSnapshot
132+
}
133+
134+
// deliberationSparMsg carries a live spar event into the TUI.
135+
type deliberationSparMsg struct {
136+
event vectorcourt.SparEvent
137+
}
138+
105139
// deliberationResultMsg carries the async VectorCourt verdict back to Update.
106140
type deliberationResultMsg struct {
107141
statusMsg string
@@ -458,14 +492,90 @@ func (m AppModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
458492
m.editor.deliberationMsg = ""
459493
return m, nil
460494
}
461-
// Update elapsed time display.
462495
elapsed := time.Since(m.deliberation.startTime).Truncate(time.Second)
463-
m.editor.deliberationMsg = fmt.Sprintf(" ⏳ deliberating... (%s)", elapsed)
464-
// Keep ticking every second while deliberation is active.
496+
if m.deliberation.sparMsg != "" {
497+
m.editor.deliberationMsg = fmt.Sprintf(" ⏳ %s (%s)", m.deliberation.sparMsg, elapsed)
498+
} else if m.deliberation.queuePos > 0 {
499+
m.editor.deliberationMsg = fmt.Sprintf(" ⏳ queued #%d (%s)", m.deliberation.queuePos, elapsed)
500+
} else {
501+
m.editor.deliberationMsg = fmt.Sprintf(" ⏳ deliberating... (%s)", elapsed)
502+
}
465503
return m, tea.Tick(time.Second, func(_ time.Time) tea.Msg {
466504
return deliberationTickMsg{}
467505
})
468506

507+
case deliberationSubmittedMsg:
508+
if m.deliberation.status != deliberationActive {
509+
return m, nil
510+
}
511+
m.deliberation.queuePos = msg.position
512+
m.editor.copyMsg = fmt.Sprintf("submitted — queue #%d", msg.position)
513+
514+
// Start polling and SSE stream in parallel.
515+
pollCmd := m.startPollCmd(msg)
516+
streamCmd := m.startStreamCmd(msg.submissionID, msg.client)
517+
return m, tea.Batch(pollCmd, streamCmd)
518+
519+
case deliberationPollMsg:
520+
if m.deliberation.status != deliberationActive {
521+
return m, nil
522+
}
523+
if msg.err != nil {
524+
return m, tea.Tick(pollInterval, func(_ time.Time) tea.Msg {
525+
return m.pollOnce(msg.client, msg.submissionID, msg.caseID, msg.question, msg.gate, msg.vcSnapshot)
526+
})
527+
}
528+
switch msg.status.Status {
529+
case "completed":
530+
raw := msg.status.Verdict
531+
if len(raw) == 0 {
532+
// Fetch full case if verdict not inline.
533+
var err error
534+
raw, err = msg.client.GetCase(context.Background(), msg.caseID)
535+
if err != nil {
536+
return m, nil
537+
}
538+
}
539+
stashVerdict(raw, msg.question)
540+
// Trigger the result message to finalize.
541+
resultCmd := func() tea.Msg {
542+
return deliberationResultMsg{
543+
statusMsg: formatVerdictSummary(raw, msg.gate),
544+
vcSnapshot: msg.vcSnapshot,
545+
}
546+
}
547+
return m, resultCmd
548+
case "failed":
549+
errMsg := "deliberation failed"
550+
if msg.status.Error != "" {
551+
errMsg = msg.status.Error
552+
}
553+
resultCmd := func() tea.Msg {
554+
return deliberationResultMsg{
555+
err: fmt.Errorf("%s", errMsg),
556+
vcSnapshot: msg.vcSnapshot,
557+
}
558+
}
559+
return m, resultCmd
560+
default:
561+
m.deliberation.queuePos = msg.status.Position
562+
return m, tea.Tick(pollInterval, func(_ time.Time) tea.Msg {
563+
return m.pollOnce(msg.client, msg.submissionID, msg.caseID, msg.question, msg.gate, msg.vcSnapshot)
564+
})
565+
}
566+
567+
case deliberationSparMsg:
568+
if m.deliberation.status != deliberationActive {
569+
return m, nil
570+
}
571+
m.deliberation.sparMsg = msg.event.Message
572+
// Continue reading from the stream.
573+
if m.deliberation.sparCh != nil && !msg.event.Final {
574+
ch := m.deliberation.sparCh
575+
return m, func() tea.Msg { return readNextSpar(ch) }
576+
}
577+
return m, nil
578+
469579
case deliberationResultMsg:
470580
// Attach VectorCourt snapshot to flight record (best-effort).
471581
if m.recorder != nil && m.deliberation.flightID != "" && msg.vcSnapshot != nil {
@@ -474,6 +584,9 @@ func (m AppModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
474584
m.deliberation.status = deliberationIdle
475585
m.deliberation.cancel = nil
476586
m.deliberation.flightID = ""
587+
m.deliberation.sparMsg = ""
588+
m.deliberation.queuePos = 0
589+
m.deliberation.sparCh = nil
477590
m.editor.deliberationMsg = ""
478591
if msg.err != nil {
479592
m.editor.copyStatus = copyError
@@ -975,19 +1088,38 @@ func (m *AppModel) startDeliberation(payload string) tea.Cmd {
9751088
snap.FilingQuality = gate.Quality
9761089
}
9771090

978-
// Submit for deliberation.
979-
raw, err := client.Consult(ctx, &vectorcourt.ConsultRequest{
1091+
// Try async submit flow; fall back to sync Consult on 404/501.
1092+
sub, submitErr := client.Submit(ctx, &vectorcourt.SubmitRequest{
9801093
Question: question,
9811094
Filing: filing,
9821095
})
983-
if err != nil {
984-
return deliberationResultMsg{err: fmt.Errorf("consult: %w", err), vcSnapshot: snap}
1096+
if submitErr != nil {
1097+
var apiErr *vectorcourt.APIError
1098+
if errors.As(submitErr, &apiErr) && (apiErr.StatusCode == http.StatusNotFound || apiErr.StatusCode == http.StatusNotImplemented) {
1099+
// Server doesn't support async flow — use sync Consult.
1100+
raw, err := client.Consult(ctx, &vectorcourt.ConsultRequest{
1101+
Question: question,
1102+
Filing: filing,
1103+
})
1104+
if err != nil {
1105+
return deliberationResultMsg{err: fmt.Errorf("consult: %w", err), vcSnapshot: snap}
1106+
}
1107+
stashVerdict(raw, question)
1108+
return deliberationResultMsg{statusMsg: formatVerdictSummary(raw, gate), vcSnapshot: snap}
1109+
}
1110+
return deliberationResultMsg{err: fmt.Errorf("submit: %w", submitErr), vcSnapshot: snap}
9851111
}
9861112

987-
// Auto-stash the verdict.
988-
stashVerdict(raw, question)
989-
990-
return deliberationResultMsg{statusMsg: formatVerdictSummary(raw, gate), vcSnapshot: snap}
1113+
// Async submit accepted — hand off to poll+stream phase.
1114+
return deliberationSubmittedMsg{
1115+
submissionID: sub.SubmissionID,
1116+
caseID: sub.CaseID,
1117+
position: sub.Position,
1118+
client: client,
1119+
question: question,
1120+
gate: gate,
1121+
vcSnapshot: snap,
1122+
}
9911123
}
9921124

9931125
tickCmd := tea.Tick(time.Second, func(_ time.Time) tea.Msg {
@@ -997,6 +1129,64 @@ func (m *AppModel) startDeliberation(payload string) tea.Cmd {
9971129
return tea.Batch(submitCmd, tickCmd)
9981130
}
9991131

1132+
const pollInterval = 2 * time.Second
1133+
1134+
// startPollCmd returns a tea.Cmd that does the first poll after a delay.
1135+
func (m *AppModel) startPollCmd(msg deliberationSubmittedMsg) tea.Cmd {
1136+
return tea.Tick(pollInterval, func(_ time.Time) tea.Msg {
1137+
return m.pollOnce(msg.client, msg.submissionID, msg.caseID, msg.question, msg.gate, msg.vcSnapshot)
1138+
})
1139+
}
1140+
1141+
// pollOnce performs a single poll and returns a deliberationPollMsg.
1142+
func (m *AppModel) pollOnce(client *vectorcourt.Client, submissionID, caseID, question string, gate *vectorcourt.GateResult, snap *flight.VectorCourtSnapshot) tea.Msg {
1143+
status, err := client.PollSubmission(context.Background(), submissionID)
1144+
if err != nil {
1145+
return deliberationPollMsg{
1146+
err: err,
1147+
client: client,
1148+
submissionID: submissionID,
1149+
caseID: caseID,
1150+
question: question,
1151+
gate: gate,
1152+
vcSnapshot: snap,
1153+
}
1154+
}
1155+
1156+
return deliberationPollMsg{
1157+
status: status,
1158+
client: client,
1159+
submissionID: submissionID,
1160+
caseID: caseID,
1161+
question: question,
1162+
gate: gate,
1163+
vcSnapshot: snap,
1164+
}
1165+
}
1166+
1167+
// startStreamCmd connects to the SSE stream and returns the first event read command.
1168+
func (m *AppModel) startStreamCmd(submissionID string, client *vectorcourt.Client) tea.Cmd {
1169+
endpoint := client.Endpoint()
1170+
1171+
return func() tea.Msg {
1172+
ch, err := vectorcourt.StreamSpar(context.Background(), endpoint, submissionID)
1173+
if err != nil {
1174+
return nil
1175+
}
1176+
m.deliberation.sparCh = ch
1177+
return readNextSpar(ch)
1178+
}
1179+
}
1180+
1181+
// readNextSpar reads the next event from the spar channel.
1182+
func readNextSpar(ch <-chan vectorcourt.SparEvent) tea.Msg {
1183+
ev, ok := <-ch
1184+
if !ok {
1185+
return nil
1186+
}
1187+
return deliberationSparMsg{event: ev}
1188+
}
1189+
10001190
// recordLaunch appends a flight record for the launch and returns its ID.
10011191
func (m *AppModel) recordLaunch(target, payload string) string {
10021192
if m.recorder == nil {

0 commit comments

Comments
 (0)