-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.go
More file actions
209 lines (169 loc) · 6.17 KB
/
Copy pathmain.go
File metadata and controls
209 lines (169 loc) · 6.17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
package main
import (
"errors"
"net/http"
"os"
"os/signal"
"syscall"
"time"
_ "net/http/pprof"
"github.com/Shimmur/logtailer/cache"
"github.com/Shimmur/logtailer/reporter"
"github.com/kelseyhightower/envconfig"
director "github.com/relistan/go-director"
"github.com/relistan/rubberneck"
log "github.com/sirupsen/logrus"
)
var (
Version = "local-build"
)
const (
// NewRelicBaseURL is the base URL where we'll send events
NewRelicBaseURL = "https://insights-collector.newrelic.com/v1/accounts/"
)
type Config struct {
Environment string `envconfig:"ENVIRONMENT" default:"dev"`
BasePath string `envconfig:"BASE_PATH" default:"/var/log/pods"`
DiscoInterval time.Duration `envconfig:"DISCO_INTERVAL" default:"5s"`
MaxTrackedLogs int `envconfig:"MAX_TRACKED_LOGS" default:"100"`
CacheFilePath string `envconfig:"CACHE_FILE_PATH" default:"/var/log/logtailer.json"`
CacheFlushInterval time.Duration `envconfig:"CACHE_FLUSH_INTERVAL" default:"3s"`
SyslogAddress string `envconfig:"SYSLOG_ADDRESS" default:"127.0.0.1:514"`
NewRelicAccount string `envconfig:"NEW_RELIC_ACCOUNT"`
NewRelicKey string `envconfig:"NEW_RELIC_LICENSE_KEY"`
TokenLimit int `envconfig:"TOKEN_LIMIT" default:"300"`
LimitInterval time.Duration `envconfig:"LIMIT_INTERVAL" default:"1m"`
KubeHost string `envconfig:"KUBERNETES_SERVICE_HOST" default:"127.0.0.1"`
KubePort int `envconfig:"KUBERNETES_SERVICE_PORT" default:"8080"`
KubeTimeout time.Duration `envconfig:"KUBERNETES_TIMEOUT" default:"3s"`
KubeCredsPath string `envconfig:"KUBERNETES_CREDS_PATH" default:"/var/run/secrets/kubernetes.io/serviceaccount"`
EnableRegexLogLevelParsing bool `envconfig:"ENABLE_REGEX_LOG_LEVEL_PARSING" default:"false"`
Debug bool `envconfig:"DEBUG" default:"false"`
}
func configureCache(config *Config) *cache.Cache {
cache := cache.NewCache(config.MaxTrackedLogs, config.CacheFilePath)
// If the cache file doesn't exist, don't load it
if _, err := os.Stat(config.CacheFilePath); errors.Is(err, os.ErrNotExist) {
return cache
}
// It existed, we need to load it up
cache.Load()
return cache
}
// NewTailerWithUDPSyslog is passed to PodTracker to generate new Tailers with
// UDP Syslog output. It uses a closure to pass in cache, address, and hostname.
func NewTailerWithUDPSyslog(c *cache.Cache, hostname string,
config *Config, rptr *reporter.LimitExceededReporter) NewTailerFunc {
return func(pod *Pod) LogTailer {
// Configure the fields we log to Syslog
udpLogger := NewUDPSyslogger(map[string]string{
"ServiceName": pod.ServiceName,
"Environment": pod.Environment,
"PodName": pod.Name,
"Hostname": hostname,
}, config.SyslogAddress, config.EnableRegexLogLevelParsing, pod.LogFormat)
// Inject the UDPSyslogger into the RateLimitingLogger
limitingLogger := NewRateLimitingLogger(rptr, config.TokenLimit, config.LimitInterval, pod.ServiceName, udpLogger)
// Wrap the return value from NewTailer as an interface
return NewTailer(pod, c, limitingLogger)
}
}
// getHostname figures out what the hostname is that we should use for log records
func getHostname() string {
// This allows us to override the hostname for running inside a container and having the
// host's hostname in logs
if hostname := os.Getenv("HOSTNAME"); hostname != "" {
return hostname
}
// Otherwise fall back to the Uname from syscall
hostname, _ := os.Hostname()
return hostname
}
// waitForInterrupt is called to block waiting on an INT or TERM signal
func waitForInterrupt(signalChan chan os.Signal) {
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
<-signalChan
log.Info("Caught signal, shutting down")
}
func configureService() *Config {
var config Config
err := envconfig.Process("log", &config)
if err != nil {
log.Fatal(err.Error())
}
// Redact the secret key
var redacted = "[REDACTED]"
maskFunc := func(argument string) *string {
if argument == "NewRelicKey" {
return &redacted
}
return nil
}
printer := rubberneck.NewPrinterWithKeyMasking(log.Printf, maskFunc, rubberneck.NoAddLineFeed)
printer.Print(config)
// Maybe enable debug logging for this service
if config.Debug {
log.SetLevel(log.DebugLevel)
}
return &config
}
func main() {
config := configureService()
var filter DiscoveryFilter
// Some deps for injection
cache := configureCache(config)
podFilter := NewPodFilter(
config.KubeHost, config.KubePort, config.KubeTimeout, config.KubeCredsPath,
)
disco := NewDirListDiscoverer(config.BasePath, config.Environment)
rptr := reporter.NewLimitExceededReporter(
NewRelicBaseURL, config.NewRelicKey, config.NewRelicAccount,
)
podDiscoveryLooper := director.NewImmediateTimedLooper(
director.FOREVER, config.DiscoInterval, make(chan error))
cacheLooper := director.NewTimedLooper(
director.FOREVER, config.CacheFlushInterval, make(chan error))
// In the event our filter can't find the right creds, etc, we fail open
if podFilter != nil {
filter = podFilter
} else {
log.Warn("Failed to configure filter, proceeding anyway using stub...")
filter = &StubFilter{}
}
// Set up and run the tracker
newTailerFunc := NewTailerWithUDPSyslog(cache, getHostname(), config, rptr)
tracker := NewPodTracker(podDiscoveryLooper, disco, newTailerFunc, filter)
go tracker.Run()
// Set up the state server for debugging
tracker.ServeHTTP()
// Run the reporter
go rptr.Run()
// Persist the cache on a timer
go cacheLooper.Loop(func() error {
// Get the latest offsets into the main cache
tracker.FlushOffsets()
// Write them out
err := cache.Persist()
if err != nil {
log.Errorf("Persisting offsets failed: %s", err)
}
return nil
})
if config.Debug {
log.Info("Staring pprof server on port 8081")
go func() {
log.Infof("Starting pprof server on :8081")
if err := http.ListenAndServe("localhost:8081", nil); err != nil {
log.Fatalf("pprof server failed: %v", err)
}
}()
}
// Block waiting on signal
signalChan := make(chan os.Signal, 1)
waitForInterrupt(signalChan)
podDiscoveryLooper.Quit()
cacheLooper.Quit()
// Let these shut down properly, including flushing offsets
podDiscoveryLooper.WaitWithoutError()
cacheLooper.WaitWithoutError()
}