Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 4 additions & 12 deletions backend/backends.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,18 +155,10 @@ func (bs *Backends) Flush() {
bs.wg.Add(1)
go func() {
defer bs.wg.Done()
var buf bytes.Buffer
err := Compress(&buf, p)
if err != nil {
log.Printf("write file error: %s\n", err)
return
}

p = buf.Bytes()


// maybe blocked here, run in another goroutine
if bs.HttpBackend.IsActive() {
err = bs.HttpBackend.WriteCompressed(p)
err := bs.HttpBackend.Write(p)
switch err {
case nil:
return
Expand All @@ -182,7 +174,7 @@ func (bs *Backends) Flush() {
log.Printf("write http error: %s\n", err)
}

err = bs.fb.Write(p)
err := bs.fb.Write(p)
if err != nil {
log.Printf("write file error: %s\n", err)
}
Expand Down Expand Up @@ -229,7 +221,7 @@ func (bs *Backends) Rewrite() (err error) {
return
}

err = bs.HttpBackend.WriteCompressed(p)
err = bs.HttpBackend.Write(p)

switch err {
case nil:
Expand Down
10 changes: 5 additions & 5 deletions backend/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func (ic *InfluxCluster) WriteStatistics() (err error) {
if err != nil {
return
}
return ic.Write([]byte(line + "\n"))
return ic.Write([]byte(line + "\n"), "")
}

func (ic *InfluxCluster) ForbidQuery(s string) (err error) {
Expand Down Expand Up @@ -443,7 +443,7 @@ func (ic *InfluxCluster) Query(w http.ResponseWriter, req *http.Request) (err er

// Wrong in one row will not stop others.
// So don't try to return error, just print it.
func (ic *InfluxCluster) WriteRow(line []byte) {
func (ic *InfluxCluster) WriteRow(line []byte, optionParams string) {
atomic.AddInt64(&ic.stats.PointsWritten, 1)
// maybe trim?
line = bytes.TrimRight(line, " \t\r\n")
Expand All @@ -470,6 +470,7 @@ func (ic *InfluxCluster) WriteRow(line []byte) {

// don't block here for a lont time, we just have one worker.
for _, b := range bs {
line = append([]byte(optionParams), line...)
err = b.Write(line)
if err != nil {
log.Printf("cluster write fail: %s\n", key)
Expand All @@ -480,7 +481,7 @@ func (ic *InfluxCluster) WriteRow(line []byte) {
return
}

func (ic *InfluxCluster) Write(p []byte) (err error) {
func (ic *InfluxCluster) Write(p []byte, optionParams string) (err error) {
atomic.AddInt64(&ic.stats.WriteRequests, 1)
defer func(start time.Time) {
atomic.AddInt64(&ic.stats.WriteRequestDuration, time.Since(start).Nanoseconds())
Expand All @@ -503,8 +504,7 @@ func (ic *InfluxCluster) Write(p []byte) (err error) {
if len(line) == 0 {
break
}

ic.WriteRow(line)
ic.WriteRow(line, optionParams)
}

ic.lock.RLock()
Expand Down
2 changes: 1 addition & 1 deletion backend/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func TestInfluxdbClusterWrite(t *testing.T) {
},
}
for _, tt := range tests {
err := ic.Write(tt.args)
err := ic.Write(tt.args, "rp=one_week&precision=s|")
if err != nil {
t.Error(tt.name, err)
continue
Expand Down
68 changes: 59 additions & 9 deletions backend/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,29 +166,79 @@ func (hb *HttpBackend) Query(w http.ResponseWriter, req *http.Request) (err erro
return
}

func ScanParams(pointbuf []byte) (params string, point []byte) {
buflen := len(pointbuf)

for i := 0; i < buflen; i++ {
if pointbuf[i] == '|' {
return string(pointbuf[:i]), pointbuf[i+1:]
}
}
return "", pointbuf
}

func (hb *HttpBackend) Write(p []byte) (err error) {
var buf bytes.Buffer
err = Compress(&buf, p)
if err != nil {
log.Print("compress error: ", err)
return
buf := bytes.NewBuffer(p)
bufMap := make(map[string]*bytes.Buffer)

for {
line, _ := buf.ReadBytes('\n')
if len(line) ==0 {
break
}
optionParams, point:= ScanParams(line)

if (bufMap[optionParams] == nil) {
bufMap[optionParams] = &bytes.Buffer{}
}
bufMap[optionParams].Write(point)
}

log.Printf("http backend write %s", hb.DB)
err = hb.WriteStream(&buf, true)
for optionParams, buffer := range(bufMap) {
p := buffer.Bytes()
var buf bytes.Buffer
err := Compress(&buf, p)
if err != nil {
log.Printf("write file error: %s\n", err)
continue
}
err = hb.WriteStream(&buf, true, optionParams)
}
return
}


func (hb *HttpBackend) WriteCompressed(p []byte) (err error) {
buf := bytes.NewBuffer(p)
err = hb.WriteStream(buf, true)
err = hb.WriteStream(buf, true, "")
return
}

func (hb *HttpBackend) WriteStream(stream io.Reader, compressed bool) (err error) {
func parseParamString(optionParams string) (result map[string]string) {
result = make(map[string]string)

entrys := strings.Split(optionParams, "&")

for _, entry := range(entrys){
pair := strings.Split(entry, "=")
if pair[1] != "" {
result[pair[0]] = pair[1]
}
}
return
}

func (hb *HttpBackend) WriteStream(stream io.Reader, compressed bool, optionParams string) (err error) {
q := url.Values{}
q.Set("db", hb.DB)

if (optionParams != "") {
paramsObject := parseParamString(optionParams)
for key, value := range(paramsObject){
q.Set(key, value)
}
}

req, err := http.NewRequest("POST", hb.URL+"/write?"+q.Encode(), stream)
if compressed {
req.Header.Add("Content-Encoding", "gzip")
Expand Down
2 changes: 1 addition & 1 deletion backend/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestHttpBackendWrite(t *testing.T) {
hb := NewHttpBackend(cfg)
defer hb.Close()

err := hb.Write([]byte("cpu,host=server01,region=uswest value=1 1434055562000000000\ncpu value=3,value2=4 1434055562000010000"))
err := hb.Write([]byte("rp=one_week&precision=s|cpu,host=server01,region=uswest value=1 1434055562000000000\nrp=one_week&precision=s|cpu value=3,value2=4 1434055562000010000"))
if err != nil {
t.Errorf("error: %s", err)
return
Expand Down
4 changes: 2 additions & 2 deletions monitor/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package monitor

import (
"time"

client "github.com/influxdata/influxdb/client/v2"
client "github.com/influxdata/influxdb1-client/v2"
)

type Metric struct {
Expand Down
11 changes: 8 additions & 3 deletions service/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"net/http"
"net/http/pprof"
"strings"

"github.com/shell909090/influx-proxy/backend"
)

Expand Down Expand Up @@ -102,8 +102,13 @@ func (hs *HttpService) HandlerWrite(w http.ResponseWriter, req *http.Request) {
w.Write([]byte("method not allow."))
return
}
query := req.URL.Query()
db := query.Get("db")
rp := query.Get("rp")
precision := query.Get("precision")

db := req.URL.Query().Get("db")
// "|"作为分隔符与line进行拼接, 在backend/http.go中进行拆开
optionParams := "rp=" + rp + "&precision=" + precision + "|"

if hs.db != "" {
if db != hs.db {
Expand Down Expand Up @@ -132,7 +137,7 @@ func (hs *HttpService) HandlerWrite(w http.ResponseWriter, req *http.Request) {
return
}

err = hs.ic.Write(p)
err = hs.ic.Write(p, optionParams)
if err == nil {
w.WriteHeader(204)
}
Expand Down
8 changes: 7 additions & 1 deletion service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ var (
ConfigFile string
NodeName string
RedisAddr string
RedisPwd string
LogFilePath string
)

Expand All @@ -33,7 +34,8 @@ func init() {
flag.StringVar(&LogFilePath, "log-file-path", "/var/log/influx-proxy.log", "output file")
flag.StringVar(&ConfigFile, "config", "", "config file")
flag.StringVar(&NodeName, "node", "l1", "node name")
flag.StringVar(&RedisAddr, "redis", "localhost:6379", "config file")
flag.StringVar(&RedisAddr, "redis", "localhost:6379", "redis address")
flag.StringVar(&RedisPwd, "redisPwd", "", "redis password")
flag.Parse()
}

Expand Down Expand Up @@ -90,6 +92,10 @@ func main() {
cfg.Addr = RedisAddr
}

if RedisPwd != "" {
cfg.Password = RedisPwd
}

rcs := backend.NewRedisConfigSource(&cfg.Options, cfg.Node)

nodecfg, err := rcs.LoadNode()
Expand Down