This repository was archived by the owner on Sep 6, 2025. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathlogpruner.go
More file actions
272 lines (249 loc) · 9.49 KB
/
logpruner.go
File metadata and controls
272 lines (249 loc) · 9.49 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
package main
import (
"bytes"
"encoding/json"
"fmt"
"github.com/juju/deputy"
"github.com/peterbourgon/g2s"
"github.com/spf13/viper"
"log"
"os"
"os/exec"
"time"
)
// Allow logging of debug information (see: https://gist.github.com/a53mt/60c1002955e6d3096078).
const debug debugging = true // or flip to false
type debugging bool
func (d debugging) Printf(format string, args ...interface{}) {
if d {
log.Printf("DEBUG "+format, args...)
}
}
// The Docker image to run.
const DOCKER_IMAGE_TO_RUN string = "my/logpruner:0.0.1"
// Struct to hold the ClouWatch describe-alarms JSON response.
// Generated via JSONGen (https://github.com/bemasher/JSONGen).
type AlarmFreeLogSpace struct {
MetricAlarms []struct {
ActionsEnabled bool
AlarmActions []string
AlarmArn string
AlarmConfigurationUpdatedTimestamp string
AlarmDescription string
AlarmName string
ComparisonOperator string
Dimensions []struct {
Name string
Value string
}
EvaluationPeriods int64
InsufficientDataActions []interface{}
MetricName string
Namespace string
OKActions []interface{}
Period int64
StateReason string
StateReasonData string
StateUpdatedTimestamp string
StateValue string
Statistic string
Threshold float64
}
}
// Holding all the required information to run AWS cli and ElasticSearch curator commands.
type LogprunerCfg struct {
// Needed for AWS cli describe-alarms.
AlarmName string `mapstructure:"alarm_name"`
// Needed for ElasticSearch curator.
Host string `mapstructure:"host"`
Port int `mapstructure:"port"`
OlderThanDays int `mapstructure:"older_than_days"`
// The zero value for bool is false.
UseSSL bool `mapstructure:"use_SSL"`
SSLValidation bool `mapstructure:"ssl_validation"`
}
// The StatsD client for publishing metrics.
var statsd *g2s.Statsd = nil
// Check if we are operable at all.
func init() {
// Environment vars check.
envVarsToChk := []string{"AWS_DEFAULT_REGION", "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"}
for _, ev := range envVarsToChk {
if _, err := getEnvVarOrErr(ev); err != nil {
log.Fatalf("Error: %s. Exiting now.\n", err.Error())
}
}
// Config file check.
configDir := "/etc/logpruner"
if _, err := os.Stat(configDir); err != nil {
if os.IsNotExist(err) {
log.Fatalf("Error: Configuration directory '%s' does not exist. Exiting now.\n", configDir)
} else {
// TODO Check other possible errors (permissions etc.)
}
}
// Change into config dir.
if err := os.Chdir(configDir); err != nil {
log.Fatalf("Error: Could not change into config dir '%s'. %s\n", configDir, err.Error())
}
viper.SetConfigName("logpruner_config") // name of config file (without extension)
viper.AddConfigPath(configDir) // path to look for the config file in
err := viper.ReadInConfig() // Find and read the config file
if err != nil { // Handle errors reading the config file
log.Fatalf("Error: Unable to read config file: %s", err.Error())
}
// Publish metrics via StatsD. If there is no 'statsd_addr' field in the config file, we don't.
if viper.IsSet("statsd_addr") {
// Check if there is a value for the key.
statsdAddr := viper.GetString("statsd_addr")
if statsdAddr != "" {
s, err := g2s.Dial("udp", statsdAddr)
if err != nil {
log.Printf("WARNING: Unable to connect to StatsD host '%s'; error: %s. *Not* publishing metrics but running anyway.\n", statsdAddr, err.Error())
} else {
// Bind new StatsD client to global var.
statsd = s
}
}
}
}
// Helper function for publishing counter metrics to StatsD.
func incrStatsDCounterBy1(statsd *g2s.Statsd, counterName string) {
// Only do something if we have a valid client.
if statsd != nil {
statsd.Counter(1.0, counterName, 1)
}
}
// Helper function spitting out the CLI syntax for ES curator.
func (lpc LogprunerCfg) renderForCuratorDeleteIndexAction() string {
res := "/usr/bin/curator "
res = res + fmt.Sprintf("--host %s --port %d",
lpc.Host,
lpc.Port)
// Handle boolean values.
if lpc.UseSSL {
res = res + " --use_ssl"
// Per default SSL validation happens.
if !lpc.SSLValidation {
res = res + " --ssl-no-validate"
}
}
res = res + fmt.Sprintf(" delete indices --older-than %d --time-unit days", lpc.OlderThanDays)
res = res + " " + "--timestring '%Y.%m.%d'"
// Transform to be used as 'exec.Command' arg.
return res
}
// Helper function to retrieve the value of an OS environment variable. If not set or empty, return error.
func getEnvVarOrErr(varName string) (string, error) {
switch envVar := os.Getenv(varName); envVar {
case "":
return "", fmt.Errorf("Required environment variable '%s' unset or empty.\n", varName)
default:
return envVar, nil
}
}
// Read the config values into typed struct values.
func retrieveCfgVals() (map[string]*LogprunerCfg, error) {
// Containing the retrieved config values in a typed manner.
var cfgVals map[string]*LogprunerCfg = make(map[string]*LogprunerCfg)
// Iterate over all defined indexes.
indexesMap := viper.GetStringMapString("es_indexes")
for idxName, _ := range indexesMap {
// Create new struct container holding the config vals. The map key equals the index name defined on the
// 2.nd level in the YAML configuration file.
cfgVals[idxName] = &LogprunerCfg{}
if err := viper.UnmarshalKey("es_indexes"+"."+idxName, cfgVals[idxName]); err != nil {
return nil, fmt.Errorf("Error unmarshalling config values into LogprunerCfg struct: %s", err.Error())
}
}
return cfgVals, nil
}
// Using AWS cli tool to retrieve an alarm with the given name.
func getCloudWatchAlarm(alarmName string) (string, error) {
cmdStdoutPipeBuffer := bytes.NewBuffer(nil)
d := deputy.Deputy{
Errors: deputy.FromStderr,
// Capture the cmd output into cmdStdOutPipeBuffer.
StdoutLog: func(b []byte) {
cmdStdoutPipeBuffer.WriteString(string(b))
},
Timeout: time.Second * 30,
}
cmd := exec.Command("/bin/sh", "-c",
fmt.Sprintf("aws cloudwatch describe-alarms --alarm-names %s", alarmName))
if err := d.Run(cmd); err != nil {
return "", fmt.Errorf("(getCloudWatchAlarm) >> Error executing docker run cmd. Error: %s\n", err.Error())
}
return cmdStdoutPipeBuffer.String(), nil
}
// Uses ElasticSearch curator tool to delete old indexes.
func deleteESIndex(logrunerCfg *LogprunerCfg) error {
d := deputy.Deputy{
Errors: deputy.FromStderr,
Timeout: time.Second * 300,
}
// It took me a long time to figure *this* *specific* *order* of args to pass to exec.Command:
// sh interpreter -> sh interpreter option '-c' -> cmd to exececute by shell interpreter
cmd := exec.Command("/bin/sh", "-c", logrunerCfg.renderForCuratorDeleteIndexAction())
debug.Printf("(deleteESIndex) 'cmd': %v\n", cmd.Args)
if err := d.Run(cmd); err != nil {
return fmt.Errorf("(deleteESIndex) >> Error executing docker run cmd. Error: %s\n", err.Error())
}
return nil
}
func isDeleteActionRequired(alarmDesc *AlarmFreeLogSpace) (bool, error) {
switch alarmState := alarmDesc.MetricAlarms[0].StateValue; alarmState {
case "OK":
return false, nil
case "ALARM":
return true, nil
default:
return false, fmt.Errorf("Unknown StateValue '%s'. Do not know how to handle.\n", alarmState)
}
}
func main() {
// Collect config values.
if cfgVals, err := retrieveCfgVals(); err != nil {
log.Fatalf(err.Error())
} else {
// Print collected config values.
for idxName, lpCfg := range cfgVals {
fmt.Printf("==> Retrieving alarm values for index: '%s' and alarm name: '%s'\n", idxName, lpCfg.AlarmName)
debug.Printf("============================================================\n")
debug.Printf(" %#v\n", lpCfg)
debug.Printf("============================================================\n")
cloudWatchAlarmJSON, err := getCloudWatchAlarm(lpCfg.AlarmName)
if err != nil {
log.Println(err.Error())
} else {
debug.Printf("cloudWatchAlarm: '%s'\n", cloudWatchAlarmJSON)
var alarmDesc AlarmFreeLogSpace
if err := json.Unmarshal([]byte(cloudWatchAlarmJSON), &alarmDesc); err != nil {
log.Fatalf("Error unmarshalling AWS CloudWatch response JSON: %s\n", err.Error())
}
log.Printf("AlarmName: '%s'\n", alarmDesc.MetricAlarms[0].AlarmName)
log.Printf("AlarmArn: '%s'\n", alarmDesc.MetricAlarms[0].AlarmArn)
log.Printf("StateValue: '%s'\n", alarmDesc.MetricAlarms[0].StateValue)
if delActnReq, err := isDeleteActionRequired(&alarmDesc); err != nil {
log.Fatalf(err.Error())
} else {
debug.Printf("*** DELETE ACTION REQUIRED? %t ***", delActnReq)
// Let's delete some old ElasticSearch indexes.
if delActnReq {
log.Printf(">>> TRIGGERING DELETE OLD INDEXES ACTION for index '%s' <<<", idxName)
if err := deleteESIndex(lpCfg); err != nil {
log.Printf("Could not delete old indexes for '%s' at host '%s', port %d. Error: %s\n", idxName, lpCfg.Host, lpCfg.Port, err.Error())
incrStatsDCounterBy1(statsd, "logpruner.error."+idxName)
} else {
log.Printf("Successfully deleted old indexes for '%s' at host '%s', port %d.\n", idxName, lpCfg.Host, lpCfg.Port)
incrStatsDCounterBy1(statsd, "logpruner.success."+idxName)
}
} else {
log.Printf("Nothing to do for '%s' at host '%s', port %d.\n", idxName, lpCfg.Host, lpCfg.Port)
incrStatsDCounterBy1(statsd, "logpruner.nothing."+idxName)
}
}
}
}
}
}