-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathpython.go
More file actions
376 lines (316 loc) · 9.32 KB
/
python.go
File metadata and controls
376 lines (316 loc) · 9.32 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
package masktunnel
import (
"bytes"
"context"
"encoding/json"
"fmt"
"sort"
"strings"
"sync"
"time"
"github.com/rs/zerolog"
)
// This file provides small, gopy-friendly helpers for Python bindings.
// ========== Context helpers ==========
var (
globalCtx context.Context
globalCancel context.CancelFunc
globalOnce sync.Once
)
// Background returns a process-wide background context.
func Background() context.Context {
globalOnce.Do(func() {
globalCtx, globalCancel = context.WithCancel(context.Background())
})
return globalCtx
}
// ContextWithCancel wraps a context and its cancel function.
type ContextWithCancel struct {
ctx context.Context
cancel context.CancelFunc
}
// NewContextWithCancel creates a new cancellable context.
func NewContextWithCancel() *ContextWithCancel {
ctx, cancel := context.WithCancel(context.Background())
return &ContextWithCancel{ctx: ctx, cancel: cancel}
}
// Cancel cancels the underlying context.
func (c *ContextWithCancel) Cancel() {
if c != nil && c.cancel != nil {
c.cancel()
}
}
// Context returns the underlying context.
func (c *ContextWithCancel) Context() context.Context {
if c == nil {
return context.Background()
}
return c.ctx
}
// CancelGlobalContext cancels the global background context.
func CancelGlobalContext() {
if globalCancel != nil {
globalCancel()
}
}
// NewContext returns a new background-derived context.
func NewContext() context.Context {
return context.Background()
}
// ========== Time constants ==========
var (
Nanosecond = time.Nanosecond
Microsecond = time.Microsecond
Millisecond = time.Millisecond
Second = time.Second
Minute = time.Minute
Hour = time.Hour
)
// ParseDuration parses a duration string (e.g. "300ms", "2h45m").
func ParseDuration(s string) (time.Duration, error) {
return time.ParseDuration(s)
}
// ========== Logger bridge ==========
// Zerolog level constants for Python bindings
var (
LevelTrace = zerolog.TraceLevel
LevelDebug = zerolog.DebugLevel
LevelInfo = zerolog.InfoLevel
LevelWarn = zerolog.WarnLevel
LevelError = zerolog.ErrorLevel
LevelFatal = zerolog.FatalLevel
LevelPanic = zerolog.PanicLevel
)
// LogEntry represents a single log entry with metadata
type LogEntry struct {
LoggerID string
Message string
Time int64 // Unix timestamp in nanoseconds
}
// Global log buffer for storing log entries
var (
logBuffer []LogEntry
bufferMutex sync.RWMutex
bufferSize = 10000 // Maximum number of log entries to keep
loggerCount int64
// Channel for notifying new log entries
logNotifyChannels []chan struct{}
channelsMutex sync.RWMutex
)
// addLogEntry adds a log entry to the global buffer
func addLogEntry(loggerID, message string) {
bufferMutex.Lock()
defer bufferMutex.Unlock()
entry := LogEntry{
LoggerID: loggerID,
Message: message,
Time: time.Now().UnixNano(),
}
// Add to buffer
logBuffer = append(logBuffer, entry)
// Keep buffer size under limit (simple FIFO)
if len(logBuffer) > bufferSize {
copy(logBuffer, logBuffer[len(logBuffer)-bufferSize:])
logBuffer = logBuffer[:bufferSize]
}
// Notify all waiting listeners
notifyLogListeners()
}
// notifyLogListeners notifies all registered listeners about new log entries
func notifyLogListeners() {
channelsMutex.RLock()
defer channelsMutex.RUnlock()
for _, ch := range logNotifyChannels {
select {
case ch <- struct{}{}:
default:
// Channel is full or blocked, skip
}
}
}
// GetLogEntries returns and clears log entries from the buffer
func GetLogEntries() []LogEntry {
bufferMutex.Lock()
defer bufferMutex.Unlock()
if len(logBuffer) == 0 {
return nil
}
// Copy and clear buffer
entries := make([]LogEntry, len(logBuffer))
copy(entries, logBuffer)
logBuffer = logBuffer[:0] // Clear buffer but keep capacity
return entries
}
// WaitForLogEntries waits for new log entries with timeout (in milliseconds)
// Returns log entries when available, or nil on timeout
// If timeoutMs is 0, waits indefinitely
func WaitForLogEntries(timeoutMs int64) []LogEntry {
// First, check if there are already entries available
if entries := GetLogEntries(); entries != nil {
return entries
}
// Create a notification channel for this listener
notifyCh := make(chan struct{}, 1)
// Register the channel
channelsMutex.Lock()
logNotifyChannels = append(logNotifyChannels, notifyCh)
channelIndex := len(logNotifyChannels) - 1
channelsMutex.Unlock()
// Cleanup function to remove the channel
defer func() {
channelsMutex.Lock()
defer channelsMutex.Unlock()
// Remove the channel from the slice
if channelIndex < len(logNotifyChannels) {
logNotifyChannels = append(logNotifyChannels[:channelIndex], logNotifyChannels[channelIndex+1:]...)
}
}()
// Wait for notification or timeout
if timeoutMs > 0 {
timer := time.NewTimer(time.Duration(timeoutMs) * time.Millisecond)
defer timer.Stop()
select {
case <-notifyCh:
return GetLogEntries()
case <-timer.C:
return nil // Timeout
}
} else {
// Wait indefinitely
<-notifyCh
return GetLogEntries()
}
}
// CancelLogWaiters cancels all waiting log listeners
func CancelLogWaiters() {
channelsMutex.Lock()
defer channelsMutex.Unlock()
// Close all channels to cancel waiting operations
for _, ch := range logNotifyChannels {
close(ch)
}
logNotifyChannels = logNotifyChannels[:0] // Clear the slice
}
// NewLoggerWithID creates a new zerolog.Logger that tags output with the given ID
func NewLoggerWithID(id string) zerolog.Logger {
return zerolog.New(&bufferWriter{id: id}).With().Timestamp().Logger()
}
// NewLoggerWithIDAndLevel creates a new zerolog.Logger with specified level that tags output with the given ID
func NewLoggerWithIDAndLevel(id string, level zerolog.Level) zerolog.Logger {
return zerolog.New(&bufferWriter{id: id}).Level(level).With().Timestamp().Logger()
}
// NewLogger creates a new zerolog.Logger with auto-generated ID
func NewLogger(cb func(line string)) zerolog.Logger {
// Generate a unique ID for this logger instance
loggerCount++
id := fmt.Sprintf("logger_%d", loggerCount)
return zerolog.New(&bufferWriter{id: id}).With().Timestamp().Logger()
}
// NewLoggerWithLevel creates a new zerolog.Logger with auto-generated ID and specified level
func NewLoggerWithLevel(level zerolog.Level, cb func(line string)) zerolog.Logger {
// Generate a unique ID for this logger instance
loggerCount++
id := fmt.Sprintf("logger_%d", loggerCount)
return zerolog.New(&bufferWriter{id: id}).Level(level).With().Timestamp().Logger()
}
// SetLoggerGlobalLevel sets the global log level for all loggers
func SetLoggerGlobalLevel(level zerolog.Level) {
zerolog.SetGlobalLevel(level)
}
// bufferWriter writes log lines to the global buffer
type bufferWriter struct {
id string
mu sync.Mutex
buf []byte
}
func (w *bufferWriter) Write(p []byte) (int, error) {
if len(p) == 0 {
return 0, nil
}
w.mu.Lock()
defer w.mu.Unlock()
// zerolog writes one event per line (ending with '\n'), but Write calls are
// not guaranteed to align with lines. Buffer until we see a newline.
w.buf = append(w.buf, p...)
for {
i := bytes.IndexByte(w.buf, '\n')
if i < 0 {
break
}
line := string(w.buf[:i])
w.buf = w.buf[i+1:]
rawLine := strings.TrimSpace(line)
if rawLine == "" {
continue
}
// Keep the original zerolog JSON line; Python formats it for CLI output.
addLogEntry(w.id, rawLine)
}
// Safety valve: avoid unbounded growth if something writes without newlines.
if len(w.buf) > 1024*1024 {
rawLine := strings.TrimSpace(string(w.buf))
if rawLine != "" {
addLogEntry(w.id, rawLine)
}
w.buf = w.buf[:0]
}
return len(p), nil
}
// formatLogLine parses a JSON log line and keeps it as JSON for Python to parse
// This avoids double-formatting issues where both Go and Python format the same log
func formatLogLine(line string) string {
// Try to parse as JSON
var logObj map[string]interface{}
if err := json.Unmarshal([]byte(line), &logObj); err != nil {
// If not valid JSON, wrap it in a simple JSON structure
simpleLog := map[string]interface{}{
"level": "info",
"message": line,
}
if formatted, err := json.Marshal(simpleLog); err == nil {
return string(formatted)
}
return line
}
// Extract basic fields
level, _ := logObj["level"].(string)
message, _ := logObj["message"].(string)
if message == "" {
message, _ = logObj["msg"].(string)
}
// Collect extra fields (excluding standard fields) and append to message
var extraFields []string
for key, value := range logObj {
switch key {
case "level", "time", "message", "msg":
// Skip standard fields
continue
default:
// Format the field as key=value
extraFields = append(extraFields, fmt.Sprintf("%s=%v", key, value))
}
}
// Sort extra fields for consistent output
sort.Strings(extraFields)
// Build the final message with extra fields
finalMessage := message
if len(extraFields) > 0 {
if finalMessage != "" {
finalMessage += " " + strings.Join(extraFields, " ")
} else {
finalMessage = strings.Join(extraFields, " ")
}
}
// Return a clean JSON structure for Python to parse
// Don't include timestamp here - let Python add its own
result := map[string]interface{}{
"level": level,
"message": finalMessage,
}
// Convert back to JSON for Python to parse
if formatted, err := json.Marshal(result); err == nil {
return string(formatted)
}
// Fallback to original line if formatting fails
return line
}