Skip to content

Commit ff540a7

Browse files
committed
feat: added CEL based visual policy builder.
1 parent e4ee2b2 commit ff540a7

19 files changed

Lines changed: 1180 additions & 398 deletions

README.md

Lines changed: 269 additions & 112 deletions
Large diffs are not rendered by default.

analytics/broadcaster.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ func BroadcastOutputPatch(serverName, jsonrpcID, outputPayload string) {
6868
func Subscribe() (chan []byte, func()) {
6969
// Buffer allows handling spikes without instantly dropping events
7070
ch := make(chan []byte, 100)
71-
71+
7272
sseClientsMu.Lock()
7373
sseClients[ch] = true
7474
sseClientsMu.Unlock()

analytics/client.go

Lines changed: 13 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"encoding/json"
66
"fmt"
7-
"log"
87
"time"
98
)
109

@@ -30,7 +29,6 @@ func (c *MCPClient) Discover(ctx context.Context) ([]MCPTool, error) {
3029

3130
// STEP 1: Initialize
3231
initReq := []byte(`{"jsonrpc": "2.0", "id": 1, "method": "initialize", "params": {"protocolVersion": "2024-11-05", "capabilities": {}, "clientInfo": {"name": "AgentGate", "version": "1.0.0"}}}`)
33-
log.Printf("[MCPClient] Sending Initialize: %s", string(initReq))
3432
if err := c.transport.Send(initReq); err != nil {
3533
return nil, fmt.Errorf("initialize send failed: %w", err)
3634
}
@@ -40,20 +38,17 @@ func (c *MCPClient) Discover(ctx context.Context) ([]MCPTool, error) {
4038
var lastErr error
4139
for i := 0; i < 10; i++ { // Try reading up to 10 messages safely without hanging
4240
raw, err := c.transport.Receive(ctx)
43-
41+
4442
if err != nil {
45-
log.Printf("[MCPClient] Initialize Receive Error: %v", err)
4643
lastErr = err
4744
break
4845
}
49-
log.Printf("[MCPClient] Initialize Received Raw: %s", string(raw))
50-
46+
5147
resp, err := parseTolerantResponse(raw)
5248
if err != nil {
53-
log.Printf("[MCPClient] Initialize Parse Error (Ignoring): %v", err)
5449
continue // Ignore garbage preamble prints
5550
}
56-
51+
5752
if resp.ID != nil {
5853
idVal := fmt.Sprintf("%v", resp.ID)
5954
if idVal == "1" || idVal == "1.0" {
@@ -65,7 +60,7 @@ func (c *MCPClient) Discover(ctx context.Context) ([]MCPTool, error) {
6560
} else if sess, ok := resp.Result["sessionId"].(string); ok {
6661
c.SessionID = sess
6762
}
68-
63+
6964
// Ensure HTTP transport picks it up for subsequent stateful loops
7065
if httpT, ok := c.transport.(*HTTPTransport); ok {
7166
if httpT.SessionID == "" {
@@ -87,7 +82,6 @@ func (c *MCPClient) Discover(ctx context.Context) ([]MCPTool, error) {
8782

8883
// STEP 3: Initialized Notification
8984
notifyReq := []byte(`{"jsonrpc": "2.0", "method": "notifications/initialized"}`)
90-
log.Printf("[MCPClient] Sending Initialized Notification: %s", string(notifyReq))
9185
if err := c.transport.Send(notifyReq); err != nil {
9286
// Just a notification broadcast, ignore send errors natively
9387
}
@@ -108,38 +102,32 @@ func (c *MCPClient) Discover(ctx context.Context) ([]MCPTool, error) {
108102
var toolsFound []MCPTool
109103

110104
for _, v := range variants {
111-
log.Printf("[MCPClient] Sending Tools Request Variant (ID: %d): %s", v.ID, v.Req)
112105
if err := c.transport.Send([]byte(v.Req)); err != nil {
113-
log.Printf("[MCPClient] Sending Variant Failed: %v", err)
114106
continue
115107
}
116-
117-
// Attempt to receive for this fallback variant
108+
118109
var innerBreak bool
119110
var matchedVariant bool
120-
111+
121112
for j := 0; j < 5; j++ {
122-
receiveCtx, cancel := context.WithTimeout(ctx, 3*time.Second) // Force 3-second rapid discard
113+
receiveCtx, cancel := context.WithTimeout(ctx, 3*time.Second)
123114
raw, err := c.transport.Receive(receiveCtx)
124115
cancel()
125-
116+
126117
if err != nil {
127-
log.Printf("[MCPClient] Tools Request Receive Error: %v", err)
128118
innerBreak = true
129119
break
130120
}
131-
log.Printf("[MCPClient] Tools Request Received Raw: %s", string(raw))
132-
121+
133122
resp, err := parseTolerantResponse(raw)
134123
if err != nil {
135-
log.Printf("[MCPClient] Tools Request Parse Error (Ignoring): %v", err)
136124
continue
137125
}
138-
126+
139127
if resp.ID != nil {
140128
idVal := fmt.Sprintf("%v", resp.ID)
141129
expectedID := fmt.Sprintf("%d", v.ID)
142-
130+
143131
if idVal == expectedID || idVal == expectedID+".0" {
144132
matchedVariant = true
145133
if len(resp.Result) > 0 {
@@ -155,11 +143,11 @@ func (c *MCPClient) Discover(ctx context.Context) ([]MCPTool, error) {
155143
}
156144
}
157145
}
158-
146+
159147
if matchedVariant || innerBreak {
160148
continue
161149
}
162150
}
163-
151+
164152
return nil, fmt.Errorf("failed to discover tools across all fallback variants")
165153
}

analytics/db.go

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -83,21 +83,21 @@ func RecordRequest(serverName, status, agentID, jsonrpcID, inputPayload, toolNam
8383
}
8484

8585
id, _ := res.LastInsertId()
86-
86+
8787
record := RequestRecord{
88-
ID: id,
89-
Timestamp: time.Now().UTC().Format(time.RFC3339),
90-
Status: status,
91-
ServerName: serverName,
92-
AgentID: agentID,
93-
ToolName: toolName,
94-
Arguments: args,
95-
Reason: reason,
96-
LatencyMs: latencyMs,
97-
JSONRPCID: jsonrpcID,
98-
InputPayload: inputPayload,
88+
ID: id,
89+
Timestamp: time.Now().UTC().Format(time.RFC3339),
90+
Status: status,
91+
ServerName: serverName,
92+
AgentID: agentID,
93+
ToolName: toolName,
94+
Arguments: args,
95+
Reason: reason,
96+
LatencyMs: latencyMs,
97+
JSONRPCID: jsonrpcID,
98+
InputPayload: inputPayload,
9999
}
100-
100+
101101
// Push to the in-memory SSE channels
102102
Broadcast(record)
103103
}()

analytics/transport.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -87,17 +87,17 @@ type StdioTransport struct {
8787
stdin io.WriteCloser
8888
stdout io.ReadCloser
8989
ctx context.Context
90-
91-
queue chan []byte
92-
errCh chan error
90+
91+
queue chan []byte
92+
errCh chan error
9393
stderrBuf *bytes.Buffer
9494
stderrMu sync.Mutex
9595
}
9696

9797
func (t *StdioTransport) Connect(ctx context.Context) error {
9898
t.ctx = ctx
9999
t.cmd = exec.CommandContext(ctx, t.Command, t.Args...)
100-
100+
101101
t.cmd.Env = os.Environ()
102102
for k, v := range t.Env {
103103
t.cmd.Env = append(t.cmd.Env, fmt.Sprintf("%s=%s", k, v))
@@ -182,13 +182,13 @@ func (t *StdioTransport) getStderr() string {
182182

183183
func (t *StdioTransport) Receive(ctx context.Context) ([]byte, error) {
184184
select {
185-
case <-ctx.Done(): // Short-circuited scoped fallback context
185+
case <-ctx.Done(): // Short-circuited scoped fallback context
186186
stderr := t.getStderr()
187187
if stderr != "" {
188188
return nil, fmt.Errorf("%w. Last Stderr: %s", ctx.Err(), stderr)
189189
}
190190
return nil, ctx.Err()
191-
case <-t.ctx.Done(): // Global connection died
191+
case <-t.ctx.Done(): // Global connection died
192192
stderr := t.getStderr()
193193
if stderr != "" {
194194
return nil, fmt.Errorf("%w. Last Stderr: %s", t.ctx.Err(), stderr)
@@ -238,7 +238,7 @@ func (t *HTTPTransport) Send(req []byte) error {
238238
if err != nil {
239239
return err
240240
}
241-
241+
242242
httpReq.Header.Set("Content-Type", "application/json")
243243
httpReq.Header.Set("Accept", "application/json, text/event-stream")
244244
if t.SessionID != "" {

0 commit comments

Comments
 (0)