diff --git a/README.md b/README.md index b1785da..364af5c 100644 --- a/README.md +++ b/README.md @@ -79,6 +79,78 @@ Estas chamadas criarão arquivos de logs no diretório /applog/service-name/INFO ``` +## Envio de logs via UDP (GELF/Graylog) + +O logger suporta envio de logs via UDP no formato **GELF 1.1** (Graylog Extended Log Format), compatível com Graylog, Logstash/Kibana, Fluentd e outras plataformas de observabilidade. + +### Configuração via código + +```go +logger, _ := mchlogtoolkitgo.NewLogger("service-name", "info") +logger.SetUDPTarget("graylog.example.com:12201") // Habilita envio UDP com GZIP +logger.Initialize() + +defer logger.Close() // Fecha a conexão UDP ao encerrar +``` + +Para desabilitar a compressão GZIP ou a saída em arquivo: +```go +logger.SetUDPTargetWithOptions("graylog.example.com:12201", false) // Sem GZIP +logger.DisableFileOutput() // Somente UDP, sem arquivos locais +``` + +### Configuração via variáveis de ambiente + +O logger detecta automaticamente as seguintes variáveis de ambiente durante o `Initialize()`: + +| Variável | Descrição | Exemplo | +|---|---|---| +| `MCHLOG_UDP_TARGET` | Endereço do servidor GELF (host:porta) | `graylog.example.com:12201` | +| `MCHLOG_UDP_COMPRESS` | Compressão GZIP (`true`/`false`, padrão: `true`) | `false` | +| `MCHLOG_FILE_OUTPUT` | Saída em arquivo (`true`/`false`, padrão: `true`) | `false` | + +Exemplo de uso com variáveis de ambiente: +```bash +export MCHLOG_UDP_TARGET=graylog.example.com:12201 +export MCHLOG_UDP_COMPRESS=true +export MCHLOG_FILE_OUTPUT=false +``` + +Com as variáveis definidas, o `Initialize()` configura o envio UDP automaticamente: +```go +logger, _ := mchlogtoolkitgo.NewLogger("service-name", "info") +logger.Initialize() // Detecta MCHLOG_UDP_TARGET e configura UDP +defer logger.Close() +``` + +### Formato da mensagem GELF + +As mensagens são enviadas no formato GELF 1.1: +```json +{ + "version": "1.1", + "host": "nome-do-host", + "short_message": "mensagem de informação", + "timestamp": 1711540800.123, + "level": 6, + "_source": "path/service.go", + "_line": "42", + "_trace": "" +} +``` + +O campo `level` segue o padrão syslog: Emergency (0), Error (3), Warning (4), Informational (6), Debug (7). + +### Modos de operação + +| Modo | Arquivo | UDP | Configuração | +|---|---|---|---| +| Somente arquivo (padrão) | sim | nao | Nenhuma configuração adicional | +| Arquivo + UDP | sim | sim | Definir `MCHLOG_UDP_TARGET` | +| Somente UDP | nao | sim | Definir `MCHLOG_UDP_TARGET` e `MCHLOG_FILE_OUTPUT=false` | + +--- + ## Boas práticas de logs Nesta seção são apresentados exemplos de bons e maus usos de logs. diff --git a/logger.go b/logger.go index a6f8487..5cc2809 100644 --- a/logger.go +++ b/logger.go @@ -3,6 +3,8 @@ package mchlogtoolkitgo import ( "encoding/json" "errors" + "log" + "os" "runtime" "strconv" "strings" @@ -20,6 +22,18 @@ const ( DebugPath = "./applog/" ProdPath = "/applog/" + + // EnvUDPTarget is the environment variable name for the UDP target address. + // Format: "host:port" (e.g., "graylog.example.com:12201") + EnvUDPTarget = "MCHLOG_UDP_TARGET" + + // EnvUDPCompress is the environment variable to enable/disable GZIP compression for UDP. + // Values: "true" or "false" (default: "true") + EnvUDPCompress = "MCHLOG_UDP_COMPRESS" + + // EnvFileOutput is the environment variable to enable/disable file output. + // Values: "true" or "false" (default: "true") + EnvFileOutput = "MCHLOG_FILE_OUTPUT" ) // Logger é a estrutura que encapsula as funcionalidades de log da aplicação @@ -50,8 +64,26 @@ func NewLogger(service, level string) (*Logger, error) { return l, nil } -// Initialize inicializa o logger +// Initialize inicializa o logger. +// If the environment variable MCHLOG_UDP_TARGET is set, UDP output is automatically configured. +// If MCHLOG_FILE_OUTPUT is set to "false", file output is disabled. func (l *Logger) Initialize() { + // Check environment variables for UDP configuration + if target := os.Getenv(EnvUDPTarget); target != "" { + compress := true + if v := os.Getenv(EnvUDPCompress); strings.ToLower(v) == "false" { + compress = false + } + if err := mchlogcore.SetUDPTarget(target, compress); err != nil { + log.Printf("[mchlog] failed to configure UDP target %q from environment: %v", target, err) + } + } + + // Check environment variable for file output + if v := os.Getenv(EnvFileOutput); strings.ToLower(v) == "false" { + mchlogcore.SetFileOutput(false) + } + mchlogcore.InitializeMchLog(l.path + l.service + "/") l.log = &mchlogcore.MchLog } @@ -80,6 +112,39 @@ func (l *Logger) SetLevel(level string) error { return nil } +// SetUDPTarget configures the logger to send GELF messages via UDP to the given address. +// The address should be in "host:port" format (e.g., "graylog.example.com:12201"). +// GZIP compression is enabled by default. +// +// Note: UDP target and file output settings are global and shared across all +// Logger instances. Changing them on one instance affects all others. +func (l *Logger) SetUDPTarget(address string) error { + return mchlogcore.SetUDPTarget(address, true) +} + +// SetUDPTargetWithOptions configures the logger to send GELF messages via UDP with explicit options. +// The address should be in "host:port" format (e.g., "graylog.example.com:12201"). +// If compress is true, messages will be GZIP compressed before sending. +func (l *Logger) SetUDPTargetWithOptions(address string, compress bool) error { + return mchlogcore.SetUDPTarget(address, compress) +} + +// DisableFileOutput disables file-based log output. +// When called, logs are only sent via UDP (if configured). +// This is a global setting shared across all Logger instances. +// When disabled before Initialize(), no log directories or files are created. +func (l *Logger) DisableFileOutput() { + mchlogcore.SetFileOutput(false) +} + +// Close drains any buffered UDP messages and closes the UDP connection. +// It does NOT close file handles used by the file logging backends (V1/V2), +// as those write directly to unbuffered *os.File handles managed by zerolog +// and are kept open for the lifetime of the process. +func (l *Logger) Close() error { + return mchlogcore.CloseUDP() +} + func (l *Logger) Test(message string) { if l.level != TestLevel { return diff --git a/mchlogcore/mchlog.go b/mchlogcore/mchlog.go index 16e32e8..7ffa3cb 100644 --- a/mchlogcore/mchlog.go +++ b/mchlogcore/mchlog.go @@ -1,8 +1,13 @@ package mchlogcore import ( + "log" + "sync" + "sync/atomic" + "github.com/gaudiumsoftware/mchlogtoolkitgo/mchlogcorev1" "github.com/gaudiumsoftware/mchlogtoolkitgo/mchlogcorev2" + "github.com/gaudiumsoftware/mchlogtoolkitgo/mchloggelf" ) // LogVersion is a type to define which version of the logger to use @@ -13,25 +18,178 @@ const ( V1 LogVersion = iota // V2 refers to the second version of the logger, which has a simpler file structure without IP or timestamps in names V2 + + // udpBufferSize is the size of the buffered channel for async UDP sends. + // Messages beyond this buffer are dropped to prevent memory exhaustion. + udpBufferSize = 1000 ) var currentVersion = V1 +// udpMu protects udpTransport, udpChan, and udpDone from concurrent access. +// LogSubject holds RLock during the entire channel send to prevent the channel +// from being closed mid-send. +var udpMu sync.RWMutex +var udpTransport *mchloggelf.UDPTransport +var udpChan chan *mchloggelf.GELFMessage +var udpDone chan struct{} + +// fileOutputEnabled uses atomic.Bool for lock-free concurrent reads in LogSubject. +var fileOutputEnabled atomic.Bool + +func init() { + fileOutputEnabled.Store(true) +} + // SetVersion chooses which version to use (V1 or V2). // This should ideally be called before InitializeMchLog. func SetVersion(v LogVersion) { currentVersion = v } +// SetUDPTarget configures a UDP transport to send GELF messages to the given address. +// The address should be in "host:port" format (e.g., "graylog.example.com:12201"). +// If compress is true, messages will be GZIP compressed. +func SetUDPTarget(address string, compress bool) error { + // Phase 1: under write lock, close the channel and capture references + // to the old worker so we can wait on it without holding the lock. + udpMu.Lock() + oldDone, oldTransport := detachWorkerLocked() + udpMu.Unlock() + + // Phase 2: wait for the old worker to drain without holding any lock, + // so LogSubject calls are not blocked during the drain. + waitAndClose(oldDone, oldTransport) + + // Phase 3: create new transport (network operation, no lock needed) + t, err := mchloggelf.NewUDPTransport(address, compress) + if err != nil { + return err + } + + // Phase 4: install new worker under write lock + udpMu.Lock() + defer udpMu.Unlock() + + // If another SetUDPTarget ran between phase 1 and 4 and installed a new + // worker, shut it down first (last caller wins). This wait is bounded: + // the write lock prevents new sends, so the worker only drains what is + // already in the buffer. + innerDone, innerTransport := detachWorkerLocked() + if innerDone != nil { + <-innerDone + } + if innerTransport != nil { + _ = innerTransport.Close() + } + + udpTransport = t + startWorkerLocked() + return nil +} + +// detachWorkerLocked closes the channel and clears the global references, +// returning the old done channel and transport so the caller can wait and +// clean up outside the lock. Must be called with udpMu write lock held. +// After this call, udpChan is nil so LogSubject will skip UDP sends. +func detachWorkerLocked() (<-chan struct{}, *mchloggelf.UDPTransport) { + if udpChan == nil { + return nil, nil + } + close(udpChan) + done := udpDone + transport := udpTransport + udpChan = nil + udpDone = nil + udpTransport = nil + return done, transport +} + +// waitAndClose waits for the worker to finish draining and closes the transport. +// Safe to call with nil arguments (no-op). +func waitAndClose(done <-chan struct{}, transport *mchloggelf.UDPTransport) { + if done != nil { + <-done + } + if transport != nil { + _ = transport.Close() + } +} + +// startWorkerLocked initializes the buffered channel and starts a single worker +// goroutine that reads messages and sends them over UDP sequentially. +// Must be called with udpMu write lock held. +func startWorkerLocked() { + udpChan = make(chan *mchloggelf.GELFMessage, udpBufferSize) + udpDone = make(chan struct{}) + + // Capture references for the goroutine so it operates independently + // of the global variables. The worker reads from its own channel ref + // and does not need the mutex. + ch := udpChan + done := udpDone + transport := udpTransport + + go func() { + defer close(done) + for msg := range ch { + if err := transport.Send(msg); err != nil { + log.Printf("[mchlog] failed to send GELF message via UDP: %v", err) + } + } + }() +} + +// SetFileOutput enables or disables file-based log output. +// When disabled, logs are only sent via UDP (if configured). +func SetFileOutput(enabled bool) { + fileOutputEnabled.Store(enabled) +} + +// CloseUDP closes the UDP worker and transport connection if active. +// Waits for the worker to drain remaining buffered messages before returning. +func CloseUDP() error { + udpMu.Lock() + done, transport := detachWorkerLocked() + udpMu.Unlock() + + waitAndClose(done, transport) + return nil +} + // LogType is the facade structure that delegates calls to either V1 or V2 implementation type LogType struct{} -// LogSubject records the content to the log file using the selected version +// LogSubject records the content to the log file and/or sends it via UDP using the selected version func (l *LogType) LogSubject(subject string, content any, errLog error, ascendStackFrame ...int) { - if currentVersion == V1 { - mchlogcorev1.MchLog.LogSubject(subject, content, errLog, ascendStackFrame...) - } else { - mchlogcorev2.MchLog.LogSubject(subject, content, errLog, ascendStackFrame...) + if fileOutputEnabled.Load() { + if currentVersion == V1 { + mchlogcorev1.MchLog.LogSubject(subject, content, errLog, ascendStackFrame...) + } else { + mchlogcorev2.MchLog.LogSubject(subject, content, errLog, ascendStackFrame...) + } + } + + // Hold RLock for the entire nil-check + send to prevent detachWorkerLocked + // from closing the channel between the check and the send. The non-blocking + // select ensures we never block while holding the lock. + udpMu.RLock() + defer udpMu.RUnlock() + + if udpChan == nil { + return + } + + msg, err := mchloggelf.NewGELFMessage(subject, content, errLog) + if err != nil { + log.Printf("[mchlog] failed to create GELF message for subject %q: %v", subject, err) + return + } + select { + case udpChan <- msg: + // Message queued successfully + default: + log.Printf("[mchlog] UDP send buffer full, dropping GELF message for subject %q", subject) } } @@ -55,14 +213,18 @@ func (l *LogType) GetIP() string { // MchLog is the global instance of the log facade var MchLog LogType -// InitializeMchLog initializes the selected version's backend with the given path +// InitializeMchLog initializes the selected version's backend with the given path. +// If file output is disabled, the file backend is not initialized and no +// directories or files are created. func InitializeMchLog(path string) { versionName := "V1" - if currentVersion == V1 { - mchlogcorev1.InitializeMchLog(path) - } else { - versionName = "V2" - mchlogcorev2.InitializeMchLog(path) + if fileOutputEnabled.Load() { + if currentVersion == V1 { + mchlogcorev1.InitializeMchLog(path) + } else { + versionName = "V2" + mchlogcorev2.InitializeMchLog(path) + } } // The first log in info should be the version of the logger (v1 or v2) diff --git a/mchloggelf/gelf.go b/mchloggelf/gelf.go new file mode 100644 index 0000000..e31efb9 --- /dev/null +++ b/mchloggelf/gelf.go @@ -0,0 +1,192 @@ +package mchloggelf + +import ( + "encoding/json" + "fmt" + "os" + "sync" + "time" +) + +// cachedHostname is resolved once and reused for all GELF messages, +// avoiding a syscall per log message in high-throughput scenarios. +var cachedHostname string +var hostnameOnce sync.Once + +func getHostname() string { + hostnameOnce.Do(func() { + h, err := os.Hostname() + if err != nil || h == "" { + cachedHostname = "unknown" + } else { + cachedHostname = h + } + }) + return cachedHostname +} + +// GELF syslog severity levels +const ( + SyslogEmergency = 0 + SyslogAlert = 1 + SyslogCritical = 2 + SyslogError = 3 + SyslogWarning = 4 + SyslogNotice = 5 + SyslogInformational = 6 + SyslogDebug = 7 +) + +// GELFMessage represents a GELF 1.1 formatted log message. +// See: https://go2docs.graylog.org/current/getting_in_log_data/gelf.html +type GELFMessage struct { + Version string `json:"version"` + Host string `json:"host"` + ShortMessage string `json:"short_message"` + FullMessage string `json:"full_message,omitempty"` + Timestamp float64 `json:"timestamp"` + Level int `json:"level"` + Extra map[string]any `json:"-"` +} + +// MarshalJSON implements custom JSON marshaling that merges standard GELF fields +// with extra fields prefixed with underscore, as required by the GELF spec. +func (m *GELFMessage) MarshalJSON() ([]byte, error) { + fields := make(map[string]any) + + // Standard GELF fields + fields["version"] = m.Version + fields["host"] = m.Host + fields["short_message"] = m.ShortMessage + if m.FullMessage != "" { + fields["full_message"] = m.FullMessage + } + fields["timestamp"] = m.Timestamp + fields["level"] = m.Level + + // Extra fields with underscore prefix + for k, v := range m.Extra { + if k == "id" { + // GELF spec: _id is not allowed + continue + } + fields["_"+k] = v + } + + return json.Marshal(fields) +} + +// LevelToSyslog maps application log level strings to syslog severity levels. +// The level parameter corresponds to the "subject" used in LogSubject, which in +// the standard Logger API is always one of: "fatal", "error", "warn", "info", +// "debug", or "test". If a non-standard subject is passed, it defaults to +// SyslogInformational (6). +func LevelToSyslog(level string) int { + switch level { + case "fatal": + return SyslogEmergency + case "error": + return SyslogError + case "warn": + return SyslogWarning + case "info": + return SyslogInformational + case "debug", "test": + return SyslogDebug + default: + return SyslogInformational + } +} + +// NewGELFMessage creates a GELF message from the log subject and content. +// The content is expected to be JSON bytes (as produced by formatLog in logger.go) +// or a map[string]any / map[string]string. +func NewGELFMessage(subject string, content any, errLog error) (*GELFMessage, error) { + msg := &GELFMessage{ + Version: "1.1", + Host: getHostname(), + Timestamp: float64(time.Now().UnixNano()) / 1e9, + Level: LevelToSyslog(subject), + Extra: make(map[string]any), + } + + // Attach application error before any early return so it is never lost + if errLog != nil { + msg.Extra["error"] = errLog.Error() + } + + // Parse content into a map to extract fields + contentMap, err := contentToMap(content) + if err != nil { + // Fallback: use subject as short_message, errLog is already attached above + msg.ShortMessage = subject + return msg, nil + } + + // Extract short_message from the "message" field + if message, ok := contentMap["message"]; ok { + msg.ShortMessage = toString(message) + delete(contentMap, "message") + } else { + msg.ShortMessage = subject + } + + // Extract level from content to avoid duplication (already mapped to syslog level) + delete(contentMap, "level") + + // All remaining fields become extra fields (prefixed with _ during marshal) + for k, v := range contentMap { + msg.Extra[k] = v + } + + // GELF requires short_message to be non-empty + if msg.ShortMessage == "" { + msg.ShortMessage = "(empty)" + } + + return msg, nil +} + +// contentToMap normalizes the content ([]byte, string, map) into map[string]any. +func contentToMap(content any) (map[string]any, error) { + switch c := content.(type) { + case []byte: + var m map[string]any + if err := json.Unmarshal(c, &m); err != nil { + return nil, err + } + return m, nil + case string: + var m map[string]any + if err := json.Unmarshal([]byte(c), &m); err != nil { + return nil, err + } + return m, nil + case map[string]any: + // Make a copy to avoid modifying the original + m := make(map[string]any, len(c)) + for k, v := range c { + m[k] = v + } + return m, nil + case map[string]string: + m := make(map[string]any, len(c)) + for k, v := range c { + m[k] = v + } + return m, nil + default: + return nil, fmt.Errorf("unsupported content type: %T", content) + } +} + +// toString converts an interface value to string. +func toString(v any) string { + switch s := v.(type) { + case string: + return s + default: + b, _ := json.Marshal(v) + return string(b) + } +} diff --git a/mchloggelf/gelf_test.go b/mchloggelf/gelf_test.go new file mode 100644 index 0000000..d5974c5 --- /dev/null +++ b/mchloggelf/gelf_test.go @@ -0,0 +1,206 @@ +package mchloggelf + +import ( + "encoding/json" + "errors" + "os" + "testing" + "time" +) + +func TestLevelToSyslog(t *testing.T) { + tests := []struct { + level string + expected int + }{ + {"fatal", SyslogEmergency}, + {"error", SyslogError}, + {"warn", SyslogWarning}, + {"info", SyslogInformational}, + {"debug", SyslogDebug}, + {"test", SyslogDebug}, + {"unknown", SyslogInformational}, + } + + for _, tt := range tests { + t.Run(tt.level, func(t *testing.T) { + got := LevelToSyslog(tt.level) + if got != tt.expected { + t.Errorf("LevelToSyslog(%q) = %d, want %d", tt.level, got, tt.expected) + } + }) + } +} + +func TestNewGELFMessageFromBytes(t *testing.T) { + content := []byte(`{"message":"test message","level":"info","source":"/app/main.go","line":"42","trace":""}`) + + msg, err := NewGELFMessage("info", content, nil) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if msg.Version != "1.1" { + t.Errorf("version = %q, want %q", msg.Version, "1.1") + } + + if msg.Host == "" { + t.Error("host should never be empty (GELF spec requires it)") + } + hostname, _ := os.Hostname() + if hostname == "" { + hostname = "unknown" + } + if msg.Host != hostname { + t.Errorf("host = %q, want %q", msg.Host, hostname) + } + + if msg.ShortMessage != "test message" { + t.Errorf("short_message = %q, want %q", msg.ShortMessage, "test message") + } + + if msg.Level != SyslogInformational { + t.Errorf("level = %d, want %d", msg.Level, SyslogInformational) + } + + if msg.Extra["source"] != "/app/main.go" { + t.Errorf("extra[source] = %v, want %q", msg.Extra["source"], "/app/main.go") + } + + if msg.Extra["line"] != "42" { + t.Errorf("extra[line] = %v, want %q", msg.Extra["line"], "42") + } + + // Timestamp should be recent + now := float64(time.Now().UnixNano()) / 1e9 + if msg.Timestamp < now-5 || msg.Timestamp > now+1 { + t.Errorf("timestamp %f is not recent (now=%f)", msg.Timestamp, now) + } +} + +func TestNewGELFMessageFromMap(t *testing.T) { + content := map[string]string{ + "message": "map message", + "level": "debug", + "source": "/app/handler.go", + } + + msg, err := NewGELFMessage("debug", content, nil) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if msg.ShortMessage != "map message" { + t.Errorf("short_message = %q, want %q", msg.ShortMessage, "map message") + } + + if msg.Level != SyslogDebug { + t.Errorf("level = %d, want %d", msg.Level, SyslogDebug) + } +} + +func TestNewGELFMessageWithError(t *testing.T) { + content := []byte(`{"message":"something failed"}`) + errLog := errors.New("connection refused") + + msg, err := NewGELFMessage("error", content, errLog) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if msg.Extra["error"] != "connection refused" { + t.Errorf("extra[error] = %v, want %q", msg.Extra["error"], "connection refused") + } + + if msg.Level != SyslogError { + t.Errorf("level = %d, want %d", msg.Level, SyslogError) + } +} + +func TestNewGELFMessageEmptyMessage(t *testing.T) { + content := []byte(`{"level":"info"}`) + + msg, err := NewGELFMessage("info", content, nil) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if msg.ShortMessage == "" { + t.Error("short_message should never be empty") + } +} + +func TestGELFMessageMarshalJSON(t *testing.T) { + msg := &GELFMessage{ + Version: "1.1", + Host: "testhost", + ShortMessage: "test", + Timestamp: 1234567890.123, + Level: SyslogInformational, + Extra: map[string]any{ + "source": "/app/main.go", + "service": "my-service", + }, + } + + data, err := msg.MarshalJSON() + if err != nil { + t.Fatalf("MarshalJSON error: %v", err) + } + + var result map[string]any + if err := json.Unmarshal(data, &result); err != nil { + t.Fatalf("Unmarshal error: %v", err) + } + + // Standard fields + if result["version"] != "1.1" { + t.Errorf("version = %v, want %q", result["version"], "1.1") + } + if result["host"] != "testhost" { + t.Errorf("host = %v, want %q", result["host"], "testhost") + } + if result["short_message"] != "test" { + t.Errorf("short_message = %v, want %q", result["short_message"], "test") + } + + // Extra fields should have underscore prefix + if result["_source"] != "/app/main.go" { + t.Errorf("_source = %v, want %q", result["_source"], "/app/main.go") + } + if result["_service"] != "my-service" { + t.Errorf("_service = %v, want %q", result["_service"], "my-service") + } + + // Original keys without underscore should not exist + if _, ok := result["source"]; ok { + t.Error("field 'source' should not exist (should be '_source')") + } +} + +func TestGELFMessageMarshalJSONSkipsID(t *testing.T) { + msg := &GELFMessage{ + Version: "1.1", + Host: "testhost", + ShortMessage: "test", + Timestamp: 1234567890.123, + Level: SyslogInformational, + Extra: map[string]any{ + "id": "should-be-skipped", + }, + } + + data, err := msg.MarshalJSON() + if err != nil { + t.Fatalf("MarshalJSON error: %v", err) + } + + var result map[string]any + if err := json.Unmarshal(data, &result); err != nil { + t.Fatalf("Unmarshal error: %v", err) + } + + if _, ok := result["_id"]; ok { + t.Error("_id field should be skipped per GELF spec") + } +} diff --git a/mchloggelf/udp.go b/mchloggelf/udp.go new file mode 100644 index 0000000..b27428a --- /dev/null +++ b/mchloggelf/udp.go @@ -0,0 +1,148 @@ +package mchloggelf + +import ( + "bytes" + "compress/gzip" + "crypto/rand" + "fmt" + "net" + "sync" +) + +const ( + // maxChunkSize is the maximum size of a single UDP datagram for GELF. + maxChunkSize = 8192 + // chunkHeaderSize is the size of the GELF chunk header (magic + msgID + seqNum + seqCount). + chunkHeaderSize = 12 + // maxChunkDataSize is the maximum payload per chunk. + maxChunkDataSize = maxChunkSize - chunkHeaderSize + // maxChunks is the maximum number of chunks per GELF message. + maxChunks = 128 + // chunkMagicByte0 and chunkMagicByte1 are the GELF chunk magic bytes. + chunkMagicByte0 = 0x1e + chunkMagicByte1 = 0x0f +) + +// UDPTransport sends GELF messages over UDP. +type UDPTransport struct { + addr *net.UDPAddr + conn *net.UDPConn + compress bool + mu sync.Mutex +} + +// NewUDPTransport creates a new UDP transport that sends GELF messages to the given address. +// The address should be in "host:port" format (e.g., "graylog.example.com:12201"). +// If compress is true, messages will be GZIP compressed before sending. +func NewUDPTransport(address string, compress bool) (*UDPTransport, error) { + addr, err := net.ResolveUDPAddr("udp", address) + if err != nil { + return nil, fmt.Errorf("failed to resolve UDP address %q: %w", address, err) + } + + conn, err := net.DialUDP("udp", nil, addr) + if err != nil { + return nil, fmt.Errorf("failed to dial UDP %q: %w", address, err) + } + + return &UDPTransport{ + addr: addr, + conn: conn, + compress: compress, + }, nil +} + +// Send marshals the GELF message and sends it over UDP. +// If the message exceeds maxChunkSize, it is split into GELF chunks. +func (t *UDPTransport) Send(msg *GELFMessage) error { + data, err := msg.MarshalJSON() + if err != nil { + return fmt.Errorf("failed to marshal GELF message: %w", err) + } + + if t.compress { + data, err = gzipCompress(data) + if err != nil { + return fmt.Errorf("failed to compress GELF message: %w", err) + } + } + + t.mu.Lock() + defer t.mu.Unlock() + + if t.conn == nil { + return fmt.Errorf("transport is closed") + } + + if len(data) <= maxChunkSize { + _, err = t.conn.Write(data) + return err + } + + return t.sendChunked(data) +} + +// sendChunked splits the data into GELF chunks and sends each one. +func (t *UDPTransport) sendChunked(data []byte) error { + chunkCount := (len(data) + maxChunkDataSize - 1) / maxChunkDataSize + if chunkCount > maxChunks { + return fmt.Errorf("message too large: would require %d chunks (max %d)", chunkCount, maxChunks) + } + + // Generate a random 8-byte message ID + msgID := make([]byte, 8) + if _, err := rand.Read(msgID); err != nil { + return fmt.Errorf("failed to generate message ID: %w", err) + } + + for i := 0; i < chunkCount; i++ { + start := i * maxChunkDataSize + end := start + maxChunkDataSize + if end > len(data) { + end = len(data) + } + + chunk := make([]byte, 0, chunkHeaderSize+end-start) + // Magic bytes + chunk = append(chunk, chunkMagicByte0, chunkMagicByte1) + // Message ID (8 bytes) + chunk = append(chunk, msgID...) + // Sequence number (1 byte) + chunk = append(chunk, byte(i)) + // Sequence count (1 byte) + chunk = append(chunk, byte(chunkCount)) + // Payload + chunk = append(chunk, data[start:end]...) + + if _, err := t.conn.Write(chunk); err != nil { + return fmt.Errorf("failed to send chunk %d/%d (chunks 1-%d already sent, message will be incomplete on receiver): %w", i+1, chunkCount, i, err) + } + } + + return nil +} + +// Close closes the UDP connection. +func (t *UDPTransport) Close() error { + t.mu.Lock() + defer t.mu.Unlock() + if t.conn != nil { + err := t.conn.Close() + t.conn = nil + return err + } + return nil +} + +// gzipCompress compresses data using GZIP. +func gzipCompress(data []byte) ([]byte, error) { + var buf bytes.Buffer + w := gzip.NewWriter(&buf) + if _, err := w.Write(data); err != nil { + return nil, err + } + if err := w.Close(); err != nil { + return nil, err + } + return buf.Bytes(), nil +} diff --git a/mchloggelf/udp_test.go b/mchloggelf/udp_test.go new file mode 100644 index 0000000..204c9d1 --- /dev/null +++ b/mchloggelf/udp_test.go @@ -0,0 +1,281 @@ +package mchloggelf + +import ( + "bytes" + "compress/gzip" + "encoding/json" + "io" + "net" + "testing" + "time" +) + +func TestUDPTransportSendSmallMessage(t *testing.T) { + // Start a local UDP listener + addr, err := net.ResolveUDPAddr("udp", "127.0.0.1:0") + if err != nil { + t.Fatalf("ResolveUDPAddr: %v", err) + } + listener, err := net.ListenUDP("udp", addr) + if err != nil { + t.Fatalf("ListenUDP: %v", err) + } + defer listener.Close() + + localAddr := listener.LocalAddr().String() + + // Create transport without compression for easier validation + transport, err := NewUDPTransport(localAddr, false) + if err != nil { + t.Fatalf("NewUDPTransport: %v", err) + } + defer transport.Close() + + msg := &GELFMessage{ + Version: "1.1", + Host: "testhost", + ShortMessage: "hello gelf", + Timestamp: 1234567890.123, + Level: SyslogInformational, + Extra: map[string]any{"service": "test-svc"}, + } + + if err := transport.Send(msg); err != nil { + t.Fatalf("Send: %v", err) + } + + // Read from listener + buf := make([]byte, maxChunkSize) + listener.SetReadDeadline(time.Now().Add(2 * time.Second)) + n, _, err := listener.ReadFromUDP(buf) + if err != nil { + t.Fatalf("ReadFromUDP: %v", err) + } + + var received map[string]any + if err := json.Unmarshal(buf[:n], &received); err != nil { + t.Fatalf("Unmarshal received data: %v", err) + } + + if received["version"] != "1.1" { + t.Errorf("version = %v, want %q", received["version"], "1.1") + } + if received["short_message"] != "hello gelf" { + t.Errorf("short_message = %v, want %q", received["short_message"], "hello gelf") + } + if received["_service"] != "test-svc" { + t.Errorf("_service = %v, want %q", received["_service"], "test-svc") + } +} + +func TestUDPTransportSendWithCompression(t *testing.T) { + addr, err := net.ResolveUDPAddr("udp", "127.0.0.1:0") + if err != nil { + t.Fatalf("ResolveUDPAddr: %v", err) + } + listener, err := net.ListenUDP("udp", addr) + if err != nil { + t.Fatalf("ListenUDP: %v", err) + } + defer listener.Close() + + localAddr := listener.LocalAddr().String() + + transport, err := NewUDPTransport(localAddr, true) + if err != nil { + t.Fatalf("NewUDPTransport: %v", err) + } + defer transport.Close() + + msg := &GELFMessage{ + Version: "1.1", + Host: "testhost", + ShortMessage: "compressed message", + Timestamp: 1234567890.123, + Level: SyslogDebug, + Extra: map[string]any{}, + } + + if err := transport.Send(msg); err != nil { + t.Fatalf("Send: %v", err) + } + + buf := make([]byte, maxChunkSize) + listener.SetReadDeadline(time.Now().Add(2 * time.Second)) + n, _, err := listener.ReadFromUDP(buf) + if err != nil { + t.Fatalf("ReadFromUDP: %v", err) + } + + // Data should be gzip compressed + reader, err := gzip.NewReader(bytes.NewReader(buf[:n])) + if err != nil { + t.Fatalf("gzip.NewReader: %v (data might not be compressed)", err) + } + defer reader.Close() + + decompressed, err := io.ReadAll(reader) + if err != nil { + t.Fatalf("ReadAll: %v", err) + } + + var received map[string]any + if err := json.Unmarshal(decompressed, &received); err != nil { + t.Fatalf("Unmarshal: %v", err) + } + + if received["short_message"] != "compressed message" { + t.Errorf("short_message = %v, want %q", received["short_message"], "compressed message") + } +} + +func TestUDPTransportChunking(t *testing.T) { + addr, err := net.ResolveUDPAddr("udp", "127.0.0.1:0") + if err != nil { + t.Fatalf("ResolveUDPAddr: %v", err) + } + listener, err := net.ListenUDP("udp", addr) + if err != nil { + t.Fatalf("ListenUDP: %v", err) + } + defer listener.Close() + + localAddr := listener.LocalAddr().String() + + // Create transport without compression so the message stays large + transport, err := NewUDPTransport(localAddr, false) + if err != nil { + t.Fatalf("NewUDPTransport: %v", err) + } + defer transport.Close() + + // Create a large message that will require chunking + largePayload := make([]byte, 10000) + for i := range largePayload { + largePayload[i] = 'A' + } + + msg := &GELFMessage{ + Version: "1.1", + Host: "testhost", + ShortMessage: string(largePayload), + Timestamp: 1234567890.123, + Level: SyslogInformational, + Extra: map[string]any{}, + } + + if err := transport.Send(msg); err != nil { + t.Fatalf("Send: %v", err) + } + + // Read chunks + listener.SetReadDeadline(time.Now().Add(2 * time.Second)) + chunks := make([][]byte, 0) + for { + buf := make([]byte, maxChunkSize+100) + n, _, err := listener.ReadFromUDP(buf) + if err != nil { + break + } + chunks = append(chunks, buf[:n]) + } + + if len(chunks) < 2 { + t.Fatalf("expected multiple chunks, got %d", len(chunks)) + } + + // Verify chunk headers + for i, chunk := range chunks { + if len(chunk) < chunkHeaderSize { + t.Fatalf("chunk %d too small: %d bytes", i, len(chunk)) + } + if chunk[0] != chunkMagicByte0 || chunk[1] != chunkMagicByte1 { + t.Errorf("chunk %d: invalid magic bytes: %x %x", i, chunk[0], chunk[1]) + } + seqNum := int(chunk[10]) + seqCount := int(chunk[11]) + if seqNum != i { + t.Errorf("chunk %d: sequence number = %d, want %d", i, seqNum, i) + } + if seqCount != len(chunks) { + t.Errorf("chunk %d: sequence count = %d, want %d", i, seqCount, len(chunks)) + } + } + + // All chunks should share the same message ID (bytes 2-9) + if len(chunks) > 1 { + msgID := chunks[0][2:10] + for i := 1; i < len(chunks); i++ { + if !bytes.Equal(chunks[i][2:10], msgID) { + t.Errorf("chunk %d has different message ID", i) + } + } + } +} + +func TestGzipCompress(t *testing.T) { + input := []byte("hello world, this is a test of gzip compression") + + compressed, err := gzipCompress(input) + if err != nil { + t.Fatalf("gzipCompress: %v", err) + } + + // Verify we can decompress + reader, err := gzip.NewReader(bytes.NewReader(compressed)) + if err != nil { + t.Fatalf("gzip.NewReader: %v", err) + } + defer reader.Close() + + decompressed, err := io.ReadAll(reader) + if err != nil { + t.Fatalf("ReadAll: %v", err) + } + + if !bytes.Equal(decompressed, input) { + t.Errorf("decompressed data does not match original") + } +} + +func TestUDPTransportClose(t *testing.T) { + addr, err := net.ResolveUDPAddr("udp", "127.0.0.1:0") + if err != nil { + t.Fatalf("ResolveUDPAddr: %v", err) + } + listener, err := net.ListenUDP("udp", addr) + if err != nil { + t.Fatalf("ListenUDP: %v", err) + } + defer listener.Close() + + transport, err := NewUDPTransport(listener.LocalAddr().String(), false) + if err != nil { + t.Fatalf("NewUDPTransport: %v", err) + } + + if err := transport.Close(); err != nil { + t.Errorf("Close: %v", err) + } + + // Sending after close should fail + msg := &GELFMessage{ + Version: "1.1", + Host: "testhost", + ShortMessage: "after close", + Timestamp: 1234567890.123, + Level: SyslogInformational, + Extra: map[string]any{}, + } + + if err := transport.Send(msg); err == nil { + t.Error("expected error when sending after close") + } +} + +func TestNewUDPTransportInvalidAddress(t *testing.T) { + _, err := NewUDPTransport("invalid:::address", false) + if err == nil { + t.Error("expected error for invalid address") + } +}