diff --git a/backend/backends.go b/backend/backends.go index bf073a9a..54349e18 100644 --- a/backend/backends.go +++ b/backend/backends.go @@ -155,18 +155,10 @@ func (bs *Backends) Flush() { bs.wg.Add(1) go func() { defer bs.wg.Done() - var buf bytes.Buffer - err := Compress(&buf, p) - if err != nil { - log.Printf("write file error: %s\n", err) - return - } - - p = buf.Bytes() - + // maybe blocked here, run in another goroutine if bs.HttpBackend.IsActive() { - err = bs.HttpBackend.WriteCompressed(p) + err := bs.HttpBackend.Write(p) switch err { case nil: return @@ -182,7 +174,7 @@ func (bs *Backends) Flush() { log.Printf("write http error: %s\n", err) } - err = bs.fb.Write(p) + err := bs.fb.Write(p) if err != nil { log.Printf("write file error: %s\n", err) } @@ -229,7 +221,7 @@ func (bs *Backends) Rewrite() (err error) { return } - err = bs.HttpBackend.WriteCompressed(p) + err = bs.HttpBackend.Write(p) switch err { case nil: diff --git a/backend/cluster.go b/backend/cluster.go index 934f1989..615e3240 100644 --- a/backend/cluster.go +++ b/backend/cluster.go @@ -182,7 +182,7 @@ func (ic *InfluxCluster) WriteStatistics() (err error) { if err != nil { return } - return ic.Write([]byte(line + "\n")) + return ic.Write([]byte(line + "\n"), "") } func (ic *InfluxCluster) ForbidQuery(s string) (err error) { @@ -443,7 +443,7 @@ func (ic *InfluxCluster) Query(w http.ResponseWriter, req *http.Request) (err er // Wrong in one row will not stop others. // So don't try to return error, just print it. -func (ic *InfluxCluster) WriteRow(line []byte) { +func (ic *InfluxCluster) WriteRow(line []byte, optionParams string) { atomic.AddInt64(&ic.stats.PointsWritten, 1) // maybe trim? line = bytes.TrimRight(line, " \t\r\n") @@ -470,6 +470,7 @@ func (ic *InfluxCluster) WriteRow(line []byte) { // don't block here for a lont time, we just have one worker. for _, b := range bs { + line = append([]byte(optionParams), line...) err = b.Write(line) if err != nil { log.Printf("cluster write fail: %s\n", key) @@ -480,7 +481,7 @@ func (ic *InfluxCluster) WriteRow(line []byte) { return } -func (ic *InfluxCluster) Write(p []byte) (err error) { +func (ic *InfluxCluster) Write(p []byte, optionParams string) (err error) { atomic.AddInt64(&ic.stats.WriteRequests, 1) defer func(start time.Time) { atomic.AddInt64(&ic.stats.WriteRequestDuration, time.Since(start).Nanoseconds()) @@ -503,8 +504,7 @@ func (ic *InfluxCluster) Write(p []byte) (err error) { if len(line) == 0 { break } - - ic.WriteRow(line) + ic.WriteRow(line, optionParams) } ic.lock.RLock() diff --git a/backend/cluster_test.go b/backend/cluster_test.go index f97c2c61..6f2d164c 100644 --- a/backend/cluster_test.go +++ b/backend/cluster_test.go @@ -138,7 +138,7 @@ func TestInfluxdbClusterWrite(t *testing.T) { }, } for _, tt := range tests { - err := ic.Write(tt.args) + err := ic.Write(tt.args, "rp=one_week&precision=s|") if err != nil { t.Error(tt.name, err) continue diff --git a/backend/http.go b/backend/http.go index 62afe84d..7e90f4ee 100644 --- a/backend/http.go +++ b/backend/http.go @@ -166,29 +166,79 @@ func (hb *HttpBackend) Query(w http.ResponseWriter, req *http.Request) (err erro return } +func ScanParams(pointbuf []byte) (params string, point []byte) { + buflen := len(pointbuf) + + for i := 0; i < buflen; i++ { + if pointbuf[i] == '|' { + return string(pointbuf[:i]), pointbuf[i+1:] + } + } + return "", pointbuf +} + func (hb *HttpBackend) Write(p []byte) (err error) { - var buf bytes.Buffer - err = Compress(&buf, p) - if err != nil { - log.Print("compress error: ", err) - return + buf := bytes.NewBuffer(p) + bufMap := make(map[string]*bytes.Buffer) + + for { + line, _ := buf.ReadBytes('\n') + if len(line) ==0 { + break + } + optionParams, point:= ScanParams(line) + + if (bufMap[optionParams] == nil) { + bufMap[optionParams] = &bytes.Buffer{} + } + bufMap[optionParams].Write(point) } - log.Printf("http backend write %s", hb.DB) - err = hb.WriteStream(&buf, true) + for optionParams, buffer := range(bufMap) { + p := buffer.Bytes() + var buf bytes.Buffer + err := Compress(&buf, p) + if err != nil { + log.Printf("write file error: %s\n", err) + continue + } + err = hb.WriteStream(&buf, true, optionParams) + } return } + func (hb *HttpBackend) WriteCompressed(p []byte) (err error) { buf := bytes.NewBuffer(p) - err = hb.WriteStream(buf, true) + err = hb.WriteStream(buf, true, "") return } -func (hb *HttpBackend) WriteStream(stream io.Reader, compressed bool) (err error) { +func parseParamString(optionParams string) (result map[string]string) { + result = make(map[string]string) + + entrys := strings.Split(optionParams, "&") + + for _, entry := range(entrys){ + pair := strings.Split(entry, "=") + if pair[1] != "" { + result[pair[0]] = pair[1] + } + } + return +} + +func (hb *HttpBackend) WriteStream(stream io.Reader, compressed bool, optionParams string) (err error) { q := url.Values{} q.Set("db", hb.DB) + if (optionParams != "") { + paramsObject := parseParamString(optionParams) + for key, value := range(paramsObject){ + q.Set(key, value) + } + } + req, err := http.NewRequest("POST", hb.URL+"/write?"+q.Encode(), stream) if compressed { req.Header.Add("Content-Encoding", "gzip") diff --git a/backend/http_test.go b/backend/http_test.go index a7748ed6..63eaf5b9 100644 --- a/backend/http_test.go +++ b/backend/http_test.go @@ -42,7 +42,7 @@ func TestHttpBackendWrite(t *testing.T) { hb := NewHttpBackend(cfg) defer hb.Close() - err := hb.Write([]byte("cpu,host=server01,region=uswest value=1 1434055562000000000\ncpu value=3,value2=4 1434055562000010000")) + err := hb.Write([]byte("rp=one_week&precision=s|cpu,host=server01,region=uswest value=1 1434055562000000000\nrp=one_week&precision=s|cpu value=3,value2=4 1434055562000010000")) if err != nil { t.Errorf("error: %s", err) return diff --git a/monitor/metric.go b/monitor/metric.go index bb3972dc..0f944c34 100644 --- a/monitor/metric.go +++ b/monitor/metric.go @@ -2,8 +2,8 @@ package monitor import ( "time" - - client "github.com/influxdata/influxdb/client/v2" + + client "github.com/influxdata/influxdb1-client/v2" ) type Metric struct { diff --git a/service/http.go b/service/http.go index 6a63624c..122dca01 100644 --- a/service/http.go +++ b/service/http.go @@ -11,7 +11,7 @@ import ( "net/http" "net/http/pprof" "strings" - + "github.com/shell909090/influx-proxy/backend" ) @@ -102,8 +102,13 @@ func (hs *HttpService) HandlerWrite(w http.ResponseWriter, req *http.Request) { w.Write([]byte("method not allow.")) return } + query := req.URL.Query() + db := query.Get("db") + rp := query.Get("rp") + precision := query.Get("precision") - db := req.URL.Query().Get("db") + // "|"作为分隔符与line进行拼接, 在backend/http.go中进行拆开 + optionParams := "rp=" + rp + "&precision=" + precision + "|" if hs.db != "" { if db != hs.db { @@ -132,7 +137,7 @@ func (hs *HttpService) HandlerWrite(w http.ResponseWriter, req *http.Request) { return } - err = hs.ic.Write(p) + err = hs.ic.Write(p, optionParams) if err == nil { w.WriteHeader(204) } diff --git a/service/main.go b/service/main.go index 3908c88f..74a1a4cb 100644 --- a/service/main.go +++ b/service/main.go @@ -24,6 +24,7 @@ var ( ConfigFile string NodeName string RedisAddr string + RedisPwd string LogFilePath string ) @@ -33,7 +34,8 @@ func init() { flag.StringVar(&LogFilePath, "log-file-path", "/var/log/influx-proxy.log", "output file") flag.StringVar(&ConfigFile, "config", "", "config file") flag.StringVar(&NodeName, "node", "l1", "node name") - flag.StringVar(&RedisAddr, "redis", "localhost:6379", "config file") + flag.StringVar(&RedisAddr, "redis", "localhost:6379", "redis address") + flag.StringVar(&RedisPwd, "redisPwd", "", "redis password") flag.Parse() } @@ -90,6 +92,10 @@ func main() { cfg.Addr = RedisAddr } + if RedisPwd != "" { + cfg.Password = RedisPwd + } + rcs := backend.NewRedisConfigSource(&cfg.Options, cfg.Node) nodecfg, err := rcs.LoadNode()