From 162d2751f267df6ddda7df80be4730059ce021ba Mon Sep 17 00:00:00 2001 From: Dries Pauwels Date: Wed, 12 Aug 2015 10:37:05 +0200 Subject: [PATCH 01/22] added dead letter box + retry count option --- config/config.go | 20 +++++--- consumer/consumer.go | 110 ++++++++++++++++++++++++++++++++++++++----- example.conf | 8 ++++ 3 files changed, 119 insertions(+), 19 deletions(-) diff --git a/config/config.go b/config/config.go index 71bf8c6..1c84757 100644 --- a/config/config.go +++ b/config/config.go @@ -16,14 +16,22 @@ type Config struct { Compression bool } Prefetch struct { - Count int - Global bool + Count int + Global bool } Exchange struct { - Name string - Autodelete bool - Type string - Durable bool + Name string + Autodelete bool + Type string + Durable bool + } + Deadexchange struct { + Name string + AutoDelete bool + Type string + Durable bool + Queue string + Retry int } Logs struct { Error string diff --git a/consumer/consumer.go b/consumer/consumer.go index f53e3d7..c6f7528 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -11,6 +11,8 @@ import ( "github.com/streadway/amqp" "log" "net/url" + "strconv" + "time" ) type Consumer struct { @@ -21,6 +23,8 @@ type Consumer struct { ErrLogger *log.Logger InfLogger *log.Logger Executer *command.CommandExecuter + DeadLetter bool + Retry int Compression bool } @@ -32,19 +36,27 @@ func (c *Consumer) Consume() { } c.InfLogger.Println("Succeeded registering consumer.") + sendCh, err := c.Connection.Channel() + if err != nil { + c.ErrLogger.Println("Could not open channel to republish failed jobs %s", err) + } + defer c.Connection.Close() defer c.Channel.Close() + defer sendCh.Close() forever := make(chan bool) go func() { for d := range msgs { + c.InfLogger.Println("reading deliveries") input := d.Body if c.Compression { var b bytes.Buffer w, err := zlib.NewWriterLevel(&b, zlib.BestCompression) if err != nil { c.ErrLogger.Println("Could not create zlib handler") + d.Nack(true, true) } c.InfLogger.Println("Compressed message") @@ -54,12 +66,55 @@ func (c *Consumer) Consume() { input = b.Bytes() } - cmd := c.Factory.Create(base64.StdEncoding.EncodeToString(input)) - if c.Executer.Execute(cmd) { - d.Ack(true) + if c.DeadLetter { + var retryCount int + if d.Headers == nil { + d.Headers = make(map[string]interface{}, 0) + } + retry, ok := d.Headers["retry_count"] + if !ok { + retry = "0" + } + c.InfLogger.Println(fmt.Sprintf("retry %s", retry)) + + retryCount, err = strconv.Atoi(retry.(string)) + if err != nil { + c.ErrLogger.Fatal("could not parse retry header") + } + + c.InfLogger.Println(fmt.Sprintf("retryCount : %d max retries: %d", retryCount, c.Retry)) + + cmd := c.Factory.Create(base64.StdEncoding.EncodeToString(input)) + if c.Executer.Execute(cmd) { + d.Ack(true) + } else if retryCount >= c.Retry { + d.Nack(true, false) + } else { + //republish message with new retry header + retryCount++ + d.Headers["retry_count"] = strconv.Itoa(retryCount) + republish := amqp.Publishing{ + ContentType: d.ContentType, + ContentEncoding: d.ContentEncoding, + Timestamp: time.Now(), + Body: d.Body, + Headers: d.Headers, + } + err = sendCh.Publish("", c.Queue, false, false, republish) + if err != nil { + c.ErrLogger.Println("error republish %s", err) + } + d.Ack(true) + } } else { - d.Nack(true, true) + cmd := c.Factory.Create(base64.StdEncoding.EncodeToString(input)) + if c.Executer.Execute(cmd) { + d.Ack(true) + } else { + d.Nack(true, false) + } } + } }() c.InfLogger.Println("Waiting for messages...") @@ -100,18 +155,38 @@ func New(cfg *config.Config, factory *command.CommandFactory, errLogger, infLogg } infLogger.Println("Succeeded setting QoS.") - infLogger.Printf("Declaring queue \"%s\"...", cfg.RabbitMq.Queue) - _, err = ch.QueueDeclare(cfg.RabbitMq.Queue, true, false, false, false, nil) - - if nil != err { - return nil, errors.New(fmt.Sprintf("Failed to declare queue: %s", err.Error())) - } - // Check for missing exchange settings to preserve BC if "" == cfg.Exchange.Name && "" == cfg.Exchange.Type && !cfg.Exchange.Durable && !cfg.Exchange.Autodelete { cfg.Exchange.Type = "direct" } + var table map[string]interface{} + deadLetter := false + + if "" != cfg.Deadexchange.Name { + infLogger.Printf("Declaring exchange \"%s\"...", cfg.Deadexchange.Name) + err = ch.ExchangeDeclare(cfg.Deadexchange.Name, cfg.Deadexchange.Type, cfg.Deadexchange.Durable, cfg.Deadexchange.AutoDelete, false, false, amqp.Table{}) + + if nil != err { + return nil, errors.New(fmt.Sprintf("Failed to declare exchange: %s", err.Error())) + } + + table = make(map[string]interface{}, 0) + table["x-dead-letter-exchange"] = cfg.Deadexchange.Name + + infLogger.Printf("Declaring error queue \"%s\"...", cfg.Deadexchange.Queue) + _, err = ch.QueueDeclare(cfg.Deadexchange.Queue, true, false, false, false, amqp.Table{}) + + // Bind queue + infLogger.Printf("Binding error queue \"%s\" to dead letter exchange \"%s\"...", cfg.Deadexchange.Queue, cfg.Deadexchange.Name) + err = ch.QueueBind(cfg.Deadexchange.Queue, "", cfg.Deadexchange.Name, false, amqp.Table{}) + + if nil != err { + return nil, errors.New(fmt.Sprintf("Failed to bind queue to exchange: %s", err.Error())) + } + deadLetter = true + } + // Empty Exchange name means default, no need to declare if "" != cfg.Exchange.Name { infLogger.Printf("Declaring exchange \"%s\"...", cfg.Exchange.Name) @@ -121,15 +196,22 @@ func New(cfg *config.Config, factory *command.CommandFactory, errLogger, infLogg return nil, errors.New(fmt.Sprintf("Failed to declare exchange: %s", err.Error())) } - // Bind queue + // Bind queue (before declare??) infLogger.Printf("Binding queue \"%s\" to exchange \"%s\"...", cfg.RabbitMq.Queue, cfg.Exchange.Name) - err = ch.QueueBind(cfg.RabbitMq.Queue, "", cfg.Exchange.Name, false, nil) + err = ch.QueueBind(cfg.RabbitMq.Queue, "", cfg.Exchange.Name, false, table) if nil != err { return nil, errors.New(fmt.Sprintf("Failed to bind queue to exchange: %s", err.Error())) } } + infLogger.Printf("Declaring queue \"%s\"...with args: %+v", cfg.RabbitMq.Queue, table) + _, err = ch.QueueDeclare(cfg.RabbitMq.Queue, true, false, false, false, table) + + if nil != err { + return nil, errors.New(fmt.Sprintf("Failed to declare queue: %s", err.Error())) + } + return &Consumer{ Channel: ch, Connection: conn, @@ -139,5 +221,7 @@ func New(cfg *config.Config, factory *command.CommandFactory, errLogger, infLogg InfLogger: infLogger, Executer: command.New(errLogger, infLogger), Compression: cfg.RabbitMq.Compression, + DeadLetter: deadLetter, + Retry: cfg.Deadexchange.Retry, }, nil } diff --git a/example.conf b/example.conf index e359ef7..619bac7 100644 --- a/example.conf +++ b/example.conf @@ -17,6 +17,14 @@ autodelete=Off type=direct durable=On +[Deadexchange] +name = name +autodelete=Off +type=fanout +durable=true +queue=taskerrors +retry=3 + [logs] error = /tmp/error.log info = /tmp/info.log From 69748c71b4c69360d04bc529996772ca529bf935 Mon Sep 17 00:00:00 2001 From: Dries Pauwels Date: Fri, 14 Aug 2015 11:39:33 +0200 Subject: [PATCH 02/22] added info loggin php scripts --- command/command_executer.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/command/command_executer.go b/command/command_executer.go index 3c3bfd8..dafb55a 100644 --- a/command/command_executer.go +++ b/command/command_executer.go @@ -21,6 +21,9 @@ func (me CommandExecuter) Execute(cmd *exec.Cmd) bool { me.infLogger.Println("Processing message...") out, err := cmd.CombinedOutput() + //log output php script to info + me.infLogger.Printf("Output php: %s\n", string(out)) + if err != nil { me.infLogger.Println("Failed. Check error log for details.") me.errLogger.Printf("Failed: %s\n", string(out[:])) From 5023d47b39a20c6f3634b00167118155f3dec793 Mon Sep 17 00:00:00 2001 From: Dries Pauwels Date: Fri, 14 Aug 2015 13:00:07 +0200 Subject: [PATCH 03/22] info logging php script --- command/command_executer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/command/command_executer.go b/command/command_executer.go index dafb55a..f2c070b 100644 --- a/command/command_executer.go +++ b/command/command_executer.go @@ -22,7 +22,7 @@ func (me CommandExecuter) Execute(cmd *exec.Cmd) bool { out, err := cmd.CombinedOutput() //log output php script to info - me.infLogger.Printf("Output php: %s\n", string(out)) + me.infLogger.Printf("Output php: %s\n", string(out[:])) if err != nil { me.infLogger.Println("Failed. Check error log for details.") From 4ee5455a080ed864d814ca5ea57e17130960c27d Mon Sep 17 00:00:00 2001 From: Dries Pauwels Date: Wed, 19 Aug 2015 15:26:28 +0200 Subject: [PATCH 04/22] republish channel only created in case of dead-letter-box config --- consumer/consumer.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/consumer/consumer.go b/consumer/consumer.go index c6f7528..ce5dbb5 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -36,14 +36,19 @@ func (c *Consumer) Consume() { } c.InfLogger.Println("Succeeded registering consumer.") - sendCh, err := c.Connection.Channel() - if err != nil { - c.ErrLogger.Println("Could not open channel to republish failed jobs %s", err) + var sendCh *amqp.Channel + + if c.DeadLetter { + var err error + sendCh, err = c.Connection.Channel() + if err != nil { + c.ErrLogger.Println("Could not open channel to republish failed jobs %s", err) + } + defer sendCh.Close() } defer c.Connection.Close() defer c.Channel.Close() - defer sendCh.Close() forever := make(chan bool) From 8a1dfb8afd7117e925a09a6ef3a8739987e72ab9 Mon Sep 17 00:00:00 2001 From: Dries Pauwels Date: Wed, 19 Aug 2015 15:39:58 +0200 Subject: [PATCH 05/22] changed some info messages realted to deadletter exchange initialization --- consumer/consumer.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/consumer/consumer.go b/consumer/consumer.go index ce5dbb5..7ee6d64 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -194,19 +194,19 @@ func New(cfg *config.Config, factory *command.CommandFactory, errLogger, infLogg // Empty Exchange name means default, no need to declare if "" != cfg.Exchange.Name { - infLogger.Printf("Declaring exchange \"%s\"...", cfg.Exchange.Name) + infLogger.Printf("Declaring deadletter exchange \"%s\"...", cfg.Exchange.Name) err = ch.ExchangeDeclare(cfg.Exchange.Name, cfg.Exchange.Type, cfg.Exchange.Durable, cfg.Exchange.Autodelete, false, false, amqp.Table{}) if nil != err { - return nil, errors.New(fmt.Sprintf("Failed to declare exchange: %s", err.Error())) + return nil, errors.New(fmt.Sprintf("Failed to declare deadletter exchange: %s", err.Error())) } // Bind queue (before declare??) - infLogger.Printf("Binding queue \"%s\" to exchange \"%s\"...", cfg.RabbitMq.Queue, cfg.Exchange.Name) + infLogger.Printf("Binding queue \"%s\" to deadletter exchange \"%s\"...", cfg.RabbitMq.Queue, cfg.Exchange.Name) err = ch.QueueBind(cfg.RabbitMq.Queue, "", cfg.Exchange.Name, false, table) if nil != err { - return nil, errors.New(fmt.Sprintf("Failed to bind queue to exchange: %s", err.Error())) + return nil, errors.New(fmt.Sprintf("Failed to bind queue to deadletter exchange: %s", err.Error())) } } From e3437d554c8007f411df86765c26727deed26e24 Mon Sep 17 00:00:00 2001 From: Dries Pauwels Date: Fri, 4 Dec 2015 11:15:14 +0100 Subject: [PATCH 06/22] CP2-472 routing keys --- config/config.go | 5 ++++- consumer/consumer.go | 28 ++++++++++++++-------------- example.conf | 9 ++++++--- 3 files changed, 24 insertions(+), 18 deletions(-) diff --git a/config/config.go b/config/config.go index 1c84757..f637448 100644 --- a/config/config.go +++ b/config/config.go @@ -12,7 +12,6 @@ type Config struct { Password string Port string Vhost string - Queue string Compression bool } Prefetch struct { @@ -25,6 +24,10 @@ type Config struct { Type string Durable bool } + Queue struct { + Key string + Name string + } Deadexchange struct { Name string AutoDelete bool diff --git a/consumer/consumer.go b/consumer/consumer.go index 7ee6d64..552fce0 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -169,7 +169,7 @@ func New(cfg *config.Config, factory *command.CommandFactory, errLogger, infLogg deadLetter := false if "" != cfg.Deadexchange.Name { - infLogger.Printf("Declaring exchange \"%s\"...", cfg.Deadexchange.Name) + infLogger.Printf("Declaring deadletter exchange \"%s\"...", cfg.Deadexchange.Name) err = ch.ExchangeDeclare(cfg.Deadexchange.Name, cfg.Deadexchange.Type, cfg.Deadexchange.Durable, cfg.Deadexchange.AutoDelete, false, false, amqp.Table{}) if nil != err { @@ -187,40 +187,40 @@ func New(cfg *config.Config, factory *command.CommandFactory, errLogger, infLogg err = ch.QueueBind(cfg.Deadexchange.Queue, "", cfg.Deadexchange.Name, false, amqp.Table{}) if nil != err { - return nil, errors.New(fmt.Sprintf("Failed to bind queue to exchange: %s", err.Error())) + return nil, errors.New(fmt.Sprintf("Failed to bind queue to dead-letter exchange: %s", err.Error())) } deadLetter = true } // Empty Exchange name means default, no need to declare if "" != cfg.Exchange.Name { - infLogger.Printf("Declaring deadletter exchange \"%s\"...", cfg.Exchange.Name) + infLogger.Printf("Declaring exchange \"%s\"...", cfg.Exchange.Name) err = ch.ExchangeDeclare(cfg.Exchange.Name, cfg.Exchange.Type, cfg.Exchange.Durable, cfg.Exchange.Autodelete, false, false, amqp.Table{}) if nil != err { - return nil, errors.New(fmt.Sprintf("Failed to declare deadletter exchange: %s", err.Error())) + return nil, errors.New(fmt.Sprintf("Failed to declare exchange: %s", err.Error())) } - // Bind queue (before declare??) - infLogger.Printf("Binding queue \"%s\" to deadletter exchange \"%s\"...", cfg.RabbitMq.Queue, cfg.Exchange.Name) - err = ch.QueueBind(cfg.RabbitMq.Queue, "", cfg.Exchange.Name, false, table) + infLogger.Printf("Declaring queue \"%s\"...with args: %+v", cfg.Queue.Name, table) + _, err = ch.QueueDeclare(cfg.Queue.Name, true, false, false, false, table) if nil != err { - return nil, errors.New(fmt.Sprintf("Failed to bind queue to deadletter exchange: %s", err.Error())) + return nil, errors.New(fmt.Sprintf("Failed to declare queue: %s", err.Error())) } - } - infLogger.Printf("Declaring queue \"%s\"...with args: %+v", cfg.RabbitMq.Queue, table) - _, err = ch.QueueDeclare(cfg.RabbitMq.Queue, true, false, false, false, table) + // Bind queue (before declare??) + infLogger.Printf("Binding queue \"%s\" to exchange \"%s\"...", cfg.Queue.Name, cfg.Exchange.Name) + err = ch.QueueBind(cfg.Queue.Name, cfg.Queue.Key, cfg.Exchange.Name, false, table) - if nil != err { - return nil, errors.New(fmt.Sprintf("Failed to declare queue: %s", err.Error())) + if nil != err { + return nil, errors.New(fmt.Sprintf("Failed to bind queue to deadletter exchange: %s", err.Error())) + } } return &Consumer{ Channel: ch, Connection: conn, - Queue: cfg.RabbitMq.Queue, + Queue: cfg.Queue.Name, Factory: factory, ErrLogger: errLogger, InfLogger: infLogger, diff --git a/example.conf b/example.conf index 619bac7..64b5ef1 100644 --- a/example.conf +++ b/example.conf @@ -4,20 +4,23 @@ username = vagrant password = vagrant vhost=/vagrant port=5672 -queue=mail compression=On +[queue] +name=mail +key=mail + [prefetch] count=3 global=Off [exchange] -name=mail +name=master autodelete=Off type=direct durable=On -[Deadexchange] +[deadexchange] name = name autodelete=Off type=fanout From 4282f2978756b7c35810dbd67445d793cfcf75ec Mon Sep 17 00:00:00 2001 From: Dries Pauwels Date: Fri, 4 Dec 2015 13:25:52 +0100 Subject: [PATCH 07/22] use default direct exchange --- consumer/consumer.go | 25 ++++++++++++++----------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/consumer/consumer.go b/consumer/consumer.go index 552fce0..03288f2 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -201,20 +201,23 @@ func New(cfg *config.Config, factory *command.CommandFactory, errLogger, infLogg return nil, errors.New(fmt.Sprintf("Failed to declare exchange: %s", err.Error())) } - infLogger.Printf("Declaring queue \"%s\"...with args: %+v", cfg.Queue.Name, table) - _, err = ch.QueueDeclare(cfg.Queue.Name, true, false, false, false, table) + } - if nil != err { - return nil, errors.New(fmt.Sprintf("Failed to declare queue: %s", err.Error())) - } + //binding to default direct exchange - // Bind queue (before declare??) - infLogger.Printf("Binding queue \"%s\" to exchange \"%s\"...", cfg.Queue.Name, cfg.Exchange.Name) - err = ch.QueueBind(cfg.Queue.Name, cfg.Queue.Key, cfg.Exchange.Name, false, table) + infLogger.Printf("Declaring queue \"%s\"...with args: %+v", cfg.Queue.Name, table) + _, err = ch.QueueDeclare(cfg.Queue.Name, true, false, false, false, table) - if nil != err { - return nil, errors.New(fmt.Sprintf("Failed to bind queue to deadletter exchange: %s", err.Error())) - } + if nil != err { + return nil, errors.New(fmt.Sprintf("Failed to declare queue: %s", err.Error())) + } + + // Bind queue with key + infLogger.Printf("Binding queue \"%s\" to exchange \"%s\"...", cfg.Queue.Name, "") + err = ch.QueueBind(cfg.Queue.Name, cfg.Queue.Key, cfg.Exchange.Name, false, table) + + if nil != err { + return nil, errors.New(fmt.Sprintf("Failed to bind queue exchange: %s", err.Error())) } return &Consumer{ From b5c396f06b2b512ac1c9205465e814b74a4997c8 Mon Sep 17 00:00:00 2001 From: Dries Pauwels Date: Fri, 4 Dec 2015 13:37:58 +0100 Subject: [PATCH 08/22] empty exchange name -> default direct exchange --- consumer/consumer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/consumer/consumer.go b/consumer/consumer.go index 03288f2..cc5fbec 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -214,7 +214,7 @@ func New(cfg *config.Config, factory *command.CommandFactory, errLogger, infLogg // Bind queue with key infLogger.Printf("Binding queue \"%s\" to exchange \"%s\"...", cfg.Queue.Name, "") - err = ch.QueueBind(cfg.Queue.Name, cfg.Queue.Key, cfg.Exchange.Name, false, table) + err = ch.QueueBind(cfg.Queue.Name, cfg.Queue.Key, "", false, table) if nil != err { return nil, errors.New(fmt.Sprintf("Failed to bind queue exchange: %s", err.Error())) From c22d929a5ac22f4d6ff7eefb0f39bd27b00a6d03 Mon Sep 17 00:00:00 2001 From: Dries Pauwels Date: Fri, 4 Dec 2015 13:46:11 +0100 Subject: [PATCH 09/22] cannot use default direct exchange with routing keys and deadletter --- consumer/consumer.go | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/consumer/consumer.go b/consumer/consumer.go index cc5fbec..eb7abb7 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -201,23 +201,23 @@ func New(cfg *config.Config, factory *command.CommandFactory, errLogger, infLogg return nil, errors.New(fmt.Sprintf("Failed to declare exchange: %s", err.Error())) } - } + //binding to exchange - //binding to default direct exchange + infLogger.Printf("Declaring queue \"%s\"...with args: %+v", cfg.Queue.Name, table) + _, err = ch.QueueDeclare(cfg.Queue.Name, true, false, false, false, table) - infLogger.Printf("Declaring queue \"%s\"...with args: %+v", cfg.Queue.Name, table) - _, err = ch.QueueDeclare(cfg.Queue.Name, true, false, false, false, table) + if nil != err { + return nil, errors.New(fmt.Sprintf("Failed to declare queue: %s", err.Error())) + } - if nil != err { - return nil, errors.New(fmt.Sprintf("Failed to declare queue: %s", err.Error())) - } + // Bind queue with key + infLogger.Printf("Binding queue \"%s\" to exchange \"%s\"...", cfg.Queue.Name, cfg.Exchange.Name) + err = ch.QueueBind(cfg.Queue.Name, cfg.Queue.Key, cfg.Exchange.Name, false, table) - // Bind queue with key - infLogger.Printf("Binding queue \"%s\" to exchange \"%s\"...", cfg.Queue.Name, "") - err = ch.QueueBind(cfg.Queue.Name, cfg.Queue.Key, "", false, table) + if nil != err { + return nil, errors.New(fmt.Sprintf("Failed to bind queue exchange: %s", err.Error())) + } - if nil != err { - return nil, errors.New(fmt.Sprintf("Failed to bind queue exchange: %s", err.Error())) } return &Consumer{ From c585292d07072a174ff28cb5e7337287cc98e962 Mon Sep 17 00:00:00 2001 From: Dries Pauwels Date: Fri, 22 Jan 2016 11:48:07 +0100 Subject: [PATCH 10/22] added rpc netlogger --- command/command.go | 5 +++++ command/command_executer.go | 8 +++++++ command/netlogger.go | 42 +++++++++++++++++++++++++++++++++++++ config/config.go | 1 + example.conf | 1 + main.go | 2 ++ 6 files changed, 59 insertions(+) create mode 100644 command/netlogger.go diff --git a/command/command.go b/command/command.go index 6f1f990..85332b9 100644 --- a/command/command.go +++ b/command/command.go @@ -1,9 +1,14 @@ package command import ( + "github.com/ricbra/rabbitmq-cli-consumer/config" "strings" ) +var ( + Cconf *config.Config +) + func Factory(baseCmd string) *CommandFactory { var pcs []string if split := strings.Split(baseCmd, " "); len(split) > 1 { diff --git a/command/command_executer.go b/command/command_executer.go index f2c070b..d082090 100644 --- a/command/command_executer.go +++ b/command/command_executer.go @@ -8,12 +8,17 @@ import ( type CommandExecuter struct { errLogger *log.Logger infLogger *log.Logger + netLogger *NetLogger } func New(errLogger, infLogger *log.Logger) *CommandExecuter { + netLogger := new(NetLogger) + netLogger.Address = Cconf.Logs.Rpc + return &CommandExecuter{ errLogger: errLogger, infLogger: infLogger, + netLogger: netLogger, } } @@ -28,10 +33,13 @@ func (me CommandExecuter) Execute(cmd *exec.Cmd) bool { me.infLogger.Println("Failed. Check error log for details.") me.errLogger.Printf("Failed: %s\n", string(out[:])) me.errLogger.Printf("Error: %s\n", err) + me.netLogger.SendError([]byte(err.Error())) + me.netLogger.SendError(out[:]) return false } me.infLogger.Println("Processed!") + me.netLogger.SendOK(out) return true } diff --git a/command/netlogger.go b/command/netlogger.go new file mode 100644 index 0000000..800dba1 --- /dev/null +++ b/command/netlogger.go @@ -0,0 +1,42 @@ +package command + +import ( + "bytes" + "encoding/json" + "net/http" +) + +type ProvisionEvent struct { + Error bool `json:"is_error"` + Output string `json:"out"` +} + +type NetLogger struct { + Address string +} + +func (n *NetLogger) SendOK(p []byte) error { + event := ProvisionEvent{ + Error: false, + Output: string(p), + } + return n.send(&event) +} + +func (n *NetLogger) SendError(p []byte) error { + event := ProvisionEvent{ + Error: true, + Output: string(p), + } + return n.send(&event) +} + +func (n *NetLogger) send(p *ProvisionEvent) error { + post, _ := json.Marshal(p) + _, err := http.Post(n.Address, "encoding/json", bytes.NewBuffer(post)) + if err != nil { + return err + } + return nil + +} diff --git a/config/config.go b/config/config.go index f637448..898b69b 100644 --- a/config/config.go +++ b/config/config.go @@ -39,6 +39,7 @@ type Config struct { Logs struct { Error string Info string + Rpc string } } diff --git a/example.conf b/example.conf index 64b5ef1..d6f7869 100644 --- a/example.conf +++ b/example.conf @@ -31,3 +31,4 @@ retry=3 [logs] error = /tmp/error.log info = /tmp/info.log +rpc = https://cims2.dev.nucleus.be/rpc/index.php diff --git a/main.go b/main.go index c84a3c6..6b5cfe5 100644 --- a/main.go +++ b/main.go @@ -42,6 +42,8 @@ func main() { logger := log.New(os.Stderr, "", log.Ldate|log.Ltime) cfg, err := config.LoadAndParse(c.String("configuration")) + command.Cconf = cfg + if err != nil { logger.Fatalf("Failed parsing configuration: %s\n", err) } From d44081eafea1fd0b5ea9c3648655c14c234e00dc Mon Sep 17 00:00:00 2001 From: Dries Pauwels Date: Fri, 22 Jan 2016 12:25:27 +0100 Subject: [PATCH 11/22] added message body to provisioning rpc message --- command/command_executer.go | 2 +- command/netlogger.go | 19 +++++++++++-------- consumer/consumer.go | 4 ++-- 3 files changed, 14 insertions(+), 11 deletions(-) diff --git a/command/command_executer.go b/command/command_executer.go index d082090..695cbe2 100644 --- a/command/command_executer.go +++ b/command/command_executer.go @@ -22,7 +22,7 @@ func New(errLogger, infLogger *log.Logger) *CommandExecuter { } } -func (me CommandExecuter) Execute(cmd *exec.Cmd) bool { +func (me CommandExecuter) Execute(cmd *exec.Cmd, body []byte) bool { me.infLogger.Println("Processing message...") out, err := cmd.CombinedOutput() diff --git a/command/netlogger.go b/command/netlogger.go index 800dba1..ca5e2c3 100644 --- a/command/netlogger.go +++ b/command/netlogger.go @@ -7,26 +7,29 @@ import ( ) type ProvisionEvent struct { - Error bool `json:"is_error"` - Output string `json:"out"` + Error bool `json:"is_error"` + Output string `json:"out"` + Message []byte `json:"message"` } type NetLogger struct { Address string } -func (n *NetLogger) SendOK(p []byte) error { +func (n *NetLogger) SendOK(p []byte, bod []byte) error { event := ProvisionEvent{ - Error: false, - Output: string(p), + Error: false, + Output: string(p), + Message: bod, } return n.send(&event) } -func (n *NetLogger) SendError(p []byte) error { +func (n *NetLogger) SendError(p []byte, bod []byte) error { event := ProvisionEvent{ - Error: true, - Output: string(p), + Error: true, + Output: string(p), + Message: bod, } return n.send(&event) } diff --git a/consumer/consumer.go b/consumer/consumer.go index eb7abb7..b6388b8 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -90,7 +90,7 @@ func (c *Consumer) Consume() { c.InfLogger.Println(fmt.Sprintf("retryCount : %d max retries: %d", retryCount, c.Retry)) cmd := c.Factory.Create(base64.StdEncoding.EncodeToString(input)) - if c.Executer.Execute(cmd) { + if c.Executer.Execute(cmd, d.Body[:]) { d.Ack(true) } else if retryCount >= c.Retry { d.Nack(true, false) @@ -113,7 +113,7 @@ func (c *Consumer) Consume() { } } else { cmd := c.Factory.Create(base64.StdEncoding.EncodeToString(input)) - if c.Executer.Execute(cmd) { + if c.Executer.Execute(cmd, d.Body[:]) { d.Ack(true) } else { d.Nack(true, false) From aed968df96afe2d0244d34f0d63e53fa1aeb46a7 Mon Sep 17 00:00:00 2001 From: Dries Pauwels Date: Fri, 22 Jan 2016 12:39:03 +0100 Subject: [PATCH 12/22] added message body to function calls --- command/command_executer.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/command/command_executer.go b/command/command_executer.go index 695cbe2..5ad73ce 100644 --- a/command/command_executer.go +++ b/command/command_executer.go @@ -33,13 +33,13 @@ func (me CommandExecuter) Execute(cmd *exec.Cmd, body []byte) bool { me.infLogger.Println("Failed. Check error log for details.") me.errLogger.Printf("Failed: %s\n", string(out[:])) me.errLogger.Printf("Error: %s\n", err) - me.netLogger.SendError([]byte(err.Error())) - me.netLogger.SendError(out[:]) + me.netLogger.SendError([]byte(err.Error()), body) + me.netLogger.SendError(out[:], body) return false } me.infLogger.Println("Processed!") - me.netLogger.SendOK(out) + me.netLogger.SendOK(out, body) return true } From 741267866b91e406c42c3e29851469f1c18dac16 Mon Sep 17 00:00:00 2001 From: Dries Pauwels Date: Fri, 22 Jan 2016 15:09:58 +0100 Subject: [PATCH 13/22] added required rpc fields/structure --- command/netlogger.go | 38 ++++++++++++++++++++++++++++++++------ 1 file changed, 32 insertions(+), 6 deletions(-) diff --git a/command/netlogger.go b/command/netlogger.go index ca5e2c3..6a49a03 100644 --- a/command/netlogger.go +++ b/command/netlogger.go @@ -3,33 +3,59 @@ package command import ( "bytes" "encoding/json" + "math/rand" "net/http" ) type ProvisionEvent struct { + JsonRpc string `json:"jsonrpc"` + Method string `json:"method"` + Id int32 `json:"id"` + Params *Parameters `json:"params"` +} + +type Data struct { Error bool `json:"is_error"` Output string `json:"out"` Message []byte `json:"message"` } +type Parameters struct { + Data *Data `json:"data"` +} + type NetLogger struct { Address string } func (n *NetLogger) SendOK(p []byte, bod []byte) error { event := ProvisionEvent{ - Error: false, - Output: string(p), - Message: bod, + JsonRpc: "2.0", + Method: "Event::createProvisioningEvent", + Id: rand.Int31(), + Params: &Parameters{ + Data: &Data{ + Error: false, + Output: string(p), + Message: bod, + }, + }, } return n.send(&event) } func (n *NetLogger) SendError(p []byte, bod []byte) error { event := ProvisionEvent{ - Error: true, - Output: string(p), - Message: bod, + JsonRpc: "2.0", + Method: "Event::createProvisioningEvent", + Id: rand.Int31(), + Params: &Parameters{ + Data: &Data{ + Error: true, + Output: string(p), + Message: bod, + }, + }, } return n.send(&event) } From c3e6ae6401886765e3bad1372b468df4f5e393f4 Mon Sep 17 00:00:00 2001 From: Dries Pauwels Date: Fri, 22 Jan 2016 15:36:11 +0100 Subject: [PATCH 14/22] conditional provisioning log to cims2 --- command/command_executer.go | 28 ++++++++++++++++++++++------ 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/command/command_executer.go b/command/command_executer.go index 5ad73ce..edcaf76 100644 --- a/command/command_executer.go +++ b/command/command_executer.go @@ -12,14 +12,23 @@ type CommandExecuter struct { } func New(errLogger, infLogger *log.Logger) *CommandExecuter { - netLogger := new(NetLogger) - netLogger.Address = Cconf.Logs.Rpc + + if len(Cconf.Logs.Rpc) > 1 { + netLogger := new(NetLogger) + netLogger.Address = Cconf.Logs.Rpc + + return &CommandExecuter{ + errLogger: errLogger, + infLogger: infLogger, + netLogger: netLogger, + } + } return &CommandExecuter{ errLogger: errLogger, infLogger: infLogger, - netLogger: netLogger, } + } func (me CommandExecuter) Execute(cmd *exec.Cmd, body []byte) bool { @@ -33,13 +42,20 @@ func (me CommandExecuter) Execute(cmd *exec.Cmd, body []byte) bool { me.infLogger.Println("Failed. Check error log for details.") me.errLogger.Printf("Failed: %s\n", string(out[:])) me.errLogger.Printf("Error: %s\n", err) - me.netLogger.SendError([]byte(err.Error()), body) - me.netLogger.SendError(out[:], body) + + if len(Cconf.Logs.Rpc) > 1 { + me.netLogger.SendError([]byte(err.Error()), body) + me.netLogger.SendError(out[:], body) + } + return false } me.infLogger.Println("Processed!") - me.netLogger.SendOK(out, body) + + if len(Cconf.Logs.Rpc) > 1 { + me.netLogger.SendOK(out, body) + } return true } From eead0e3ea5fababbae71b8d2cecfaddd1e6c23b0 Mon Sep 17 00:00:00 2001 From: Dries Pauwels Date: Tue, 26 Jan 2016 12:33:44 +0100 Subject: [PATCH 15/22] bind dead-letter-box with keys to mathich error queues --- command/command_executer.go | 1 + consumer/consumer.go | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/command/command_executer.go b/command/command_executer.go index edcaf76..f639ca7 100644 --- a/command/command_executer.go +++ b/command/command_executer.go @@ -33,6 +33,7 @@ func New(errLogger, infLogger *log.Logger) *CommandExecuter { func (me CommandExecuter) Execute(cmd *exec.Cmd, body []byte) bool { me.infLogger.Println("Processing message...") + me.infLogger.Printf("Cmd: %s\n", cmd.Path) out, err := cmd.CombinedOutput() //log output php script to info diff --git a/consumer/consumer.go b/consumer/consumer.go index b6388b8..1cb7063 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -184,7 +184,7 @@ func New(cfg *config.Config, factory *command.CommandFactory, errLogger, infLogg // Bind queue infLogger.Printf("Binding error queue \"%s\" to dead letter exchange \"%s\"...", cfg.Deadexchange.Queue, cfg.Deadexchange.Name) - err = ch.QueueBind(cfg.Deadexchange.Queue, "", cfg.Deadexchange.Name, false, amqp.Table{}) + err = ch.QueueBind(cfg.Deadexchange.Queue, cfg.Queue.Name, cfg.Deadexchange.Name, false, amqp.Table{}) if nil != err { return nil, errors.New(fmt.Sprintf("Failed to bind queue to dead-letter exchange: %s", err.Error())) From 957d8200c93ec72b8deb70fd518b3266a6477f12 Mon Sep 17 00:00:00 2001 From: Dries Pauwels Date: Tue, 26 Jan 2016 12:50:06 +0100 Subject: [PATCH 16/22] aded dead-letter-routing key --- consumer/consumer.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/consumer/consumer.go b/consumer/consumer.go index 1cb7063..65bd949 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -178,13 +178,14 @@ func New(cfg *config.Config, factory *command.CommandFactory, errLogger, infLogg table = make(map[string]interface{}, 0) table["x-dead-letter-exchange"] = cfg.Deadexchange.Name + table["x-dead-letter-routing-key"] = cfg.Queue.Key infLogger.Printf("Declaring error queue \"%s\"...", cfg.Deadexchange.Queue) _, err = ch.QueueDeclare(cfg.Deadexchange.Queue, true, false, false, false, amqp.Table{}) // Bind queue infLogger.Printf("Binding error queue \"%s\" to dead letter exchange \"%s\"...", cfg.Deadexchange.Queue, cfg.Deadexchange.Name) - err = ch.QueueBind(cfg.Deadexchange.Queue, cfg.Queue.Name, cfg.Deadexchange.Name, false, amqp.Table{}) + err = ch.QueueBind(cfg.Deadexchange.Queue, "", cfg.Deadexchange.Name, false, amqp.Table{}) if nil != err { return nil, errors.New(fmt.Sprintf("Failed to bind queue to dead-letter exchange: %s", err.Error())) From 792a6d97411dcddcb0e32d504b0bef1ea7a04158 Mon Sep 17 00:00:00 2001 From: Dries Pauwels Date: Tue, 26 Jan 2016 12:56:26 +0100 Subject: [PATCH 17/22] binding error queues --- consumer/consumer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/consumer/consumer.go b/consumer/consumer.go index 65bd949..ae3547f 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -185,7 +185,7 @@ func New(cfg *config.Config, factory *command.CommandFactory, errLogger, infLogg // Bind queue infLogger.Printf("Binding error queue \"%s\" to dead letter exchange \"%s\"...", cfg.Deadexchange.Queue, cfg.Deadexchange.Name) - err = ch.QueueBind(cfg.Deadexchange.Queue, "", cfg.Deadexchange.Name, false, amqp.Table{}) + err = ch.QueueBind(cfg.Deadexchange.Queue, cfg.Queue.Key, cfg.Deadexchange.Name, false, amqp.Table{}) if nil != err { return nil, errors.New(fmt.Sprintf("Failed to bind queue to dead-letter exchange: %s", err.Error())) From a71f8c0212a880988adef88c55a252be727ac2fb Mon Sep 17 00:00:00 2001 From: Dries Pauwels Date: Tue, 26 Jan 2016 16:49:35 +0100 Subject: [PATCH 18/22] netlogger refactor --- command/command_executer.go | 7 ++++--- command/netlogger.go | 20 ++------------------ 2 files changed, 6 insertions(+), 21 deletions(-) diff --git a/command/command_executer.go b/command/command_executer.go index f639ca7..378b1c2 100644 --- a/command/command_executer.go +++ b/command/command_executer.go @@ -45,8 +45,9 @@ func (me CommandExecuter) Execute(cmd *exec.Cmd, body []byte) bool { me.errLogger.Printf("Error: %s\n", err) if len(Cconf.Logs.Rpc) > 1 { - me.netLogger.SendError([]byte(err.Error()), body) - me.netLogger.SendError(out[:], body) + me.infLogger.Println("rpc parameters: %s", Cconf.Logs.Rpc) + me.netLogger.Send([]byte(err.Error()), body[:], true) + me.netLogger.Send(out[:], body[:], true) } return false @@ -55,7 +56,7 @@ func (me CommandExecuter) Execute(cmd *exec.Cmd, body []byte) bool { me.infLogger.Println("Processed!") if len(Cconf.Logs.Rpc) > 1 { - me.netLogger.SendOK(out, body) + me.netLogger.Send(out[:], body[:], false) } return true diff --git a/command/netlogger.go b/command/netlogger.go index 6a49a03..72de62a 100644 --- a/command/netlogger.go +++ b/command/netlogger.go @@ -28,30 +28,14 @@ type NetLogger struct { Address string } -func (n *NetLogger) SendOK(p []byte, bod []byte) error { +func (n *NetLogger) Send(p []byte, bod []byte, isError bool) error { event := ProvisionEvent{ JsonRpc: "2.0", Method: "Event::createProvisioningEvent", Id: rand.Int31(), Params: &Parameters{ Data: &Data{ - Error: false, - Output: string(p), - Message: bod, - }, - }, - } - return n.send(&event) -} - -func (n *NetLogger) SendError(p []byte, bod []byte) error { - event := ProvisionEvent{ - JsonRpc: "2.0", - Method: "Event::createProvisioningEvent", - Id: rand.Int31(), - Params: &Parameters{ - Data: &Data{ - Error: true, + Error: isError, Output: string(p), Message: bod, }, From bc6aaf176ec537cc4cc65ee026a240d678c12a4e Mon Sep 17 00:00:00 2001 From: Dries Pauwels Date: Tue, 26 Jan 2016 17:09:30 +0100 Subject: [PATCH 19/22] logging rpc --- command/command_executer.go | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/command/command_executer.go b/command/command_executer.go index 378b1c2..24fe993 100644 --- a/command/command_executer.go +++ b/command/command_executer.go @@ -45,9 +45,13 @@ func (me CommandExecuter) Execute(cmd *exec.Cmd, body []byte) bool { me.errLogger.Printf("Error: %s\n", err) if len(Cconf.Logs.Rpc) > 1 { - me.infLogger.Println("rpc parameters: %s", Cconf.Logs.Rpc) - me.netLogger.Send([]byte(err.Error()), body[:], true) - me.netLogger.Send(out[:], body[:], true) + me.infLogger.Printf("rpc parameters: %s", Cconf.Logs.Rpc) + if err := me.netLogger.Send([]byte(err.Error()), body[:], true); err != nil { + me.infLogger.Printf("failed sending provision error event -> error: %s", err) + } + if err := me.netLogger.Send(out[:], body[:], true); err != nil { + me.infLogger.Printf("failed sending provision error event -> error: %s", err) + } } return false @@ -56,7 +60,9 @@ func (me CommandExecuter) Execute(cmd *exec.Cmd, body []byte) bool { me.infLogger.Println("Processed!") if len(Cconf.Logs.Rpc) > 1 { - me.netLogger.Send(out[:], body[:], false) + if err := me.netLogger.Send(out[:], body[:], false); err != nil { + me.infLogger.Printf("failed sending provision success event -> error: %s", err) + } } return true From 8ae86802901fdfe9a7de6b28bbcc5c4dea9d9818 Mon Sep 17 00:00:00 2001 From: Dries Pauwels Date: Tue, 26 Jan 2016 17:22:05 +0100 Subject: [PATCH 20/22] netlogger fix verify tls --- command/command_executer.go | 3 +-- command/netlogger.go | 17 ++++++++++++++++- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/command/command_executer.go b/command/command_executer.go index 24fe993..5aa03dc 100644 --- a/command/command_executer.go +++ b/command/command_executer.go @@ -14,8 +14,7 @@ type CommandExecuter struct { func New(errLogger, infLogger *log.Logger) *CommandExecuter { if len(Cconf.Logs.Rpc) > 1 { - netLogger := new(NetLogger) - netLogger.Address = Cconf.Logs.Rpc + netLogger := NewNetLogger() return &CommandExecuter{ errLogger: errLogger, diff --git a/command/netlogger.go b/command/netlogger.go index 72de62a..6741461 100644 --- a/command/netlogger.go +++ b/command/netlogger.go @@ -2,6 +2,7 @@ package command import ( "bytes" + "crypto/tls" "encoding/json" "math/rand" "net/http" @@ -44,6 +45,21 @@ func (n *NetLogger) Send(p []byte, bod []byte, isError bool) error { return n.send(&event) } +func NewNetLogger() *NetLogger { + cfg := &tls.Config{ + InsecureSkipVerify: true, + } + + http.DefaultClient.Transport = &http.Transport{ + TLSClientConfig: cfg, + } + + netLogger := new(NetLogger) + netLogger.Address = Cconf.Logs.Rpc + + return netLogger +} + func (n *NetLogger) send(p *ProvisionEvent) error { post, _ := json.Marshal(p) _, err := http.Post(n.Address, "encoding/json", bytes.NewBuffer(post)) @@ -51,5 +67,4 @@ func (n *NetLogger) send(p *ProvisionEvent) error { return err } return nil - } From 4b4b08b058bbebe5415885a898a91c0d9d6755c9 Mon Sep 17 00:00:00 2001 From: Dries Pauwels Date: Wed, 27 Jan 2016 16:54:02 +0100 Subject: [PATCH 21/22] only one rpc call on failure --- command/command_executer.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/command/command_executer.go b/command/command_executer.go index 5aa03dc..e9e6be9 100644 --- a/command/command_executer.go +++ b/command/command_executer.go @@ -32,7 +32,7 @@ func New(errLogger, infLogger *log.Logger) *CommandExecuter { func (me CommandExecuter) Execute(cmd *exec.Cmd, body []byte) bool { me.infLogger.Println("Processing message...") - me.infLogger.Printf("Cmd: %s\n", cmd.Path) + me.infLogger.Printf("Cmd: %s params: %s \n", cmd.Path, cmd.Args) out, err := cmd.CombinedOutput() //log output php script to info @@ -45,9 +45,6 @@ func (me CommandExecuter) Execute(cmd *exec.Cmd, body []byte) bool { if len(Cconf.Logs.Rpc) > 1 { me.infLogger.Printf("rpc parameters: %s", Cconf.Logs.Rpc) - if err := me.netLogger.Send([]byte(err.Error()), body[:], true); err != nil { - me.infLogger.Printf("failed sending provision error event -> error: %s", err) - } if err := me.netLogger.Send(out[:], body[:], true); err != nil { me.infLogger.Printf("failed sending provision error event -> error: %s", err) } From f3361a468c55a7349fd65e69a65796ad9e171738 Mon Sep 17 00:00:00 2001 From: Dries Pauwels Date: Fri, 5 Feb 2016 16:11:03 +0100 Subject: [PATCH 22/22] 1 based isntead of zero based retry count --- consumer/consumer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/consumer/consumer.go b/consumer/consumer.go index ae3547f..34cc876 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -78,7 +78,7 @@ func (c *Consumer) Consume() { } retry, ok := d.Headers["retry_count"] if !ok { - retry = "0" + retry = "1" } c.InfLogger.Println(fmt.Sprintf("retry %s", retry))