Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions docs/handlers/proxy.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,16 @@ title: Proxy Handler
The Proxy handler implements a layer 4 proxy capable of multiple upstreams with load balancing and health checks.
This handler is at the core of the package functionality and supports both TCP and UDP.

## Metrics

The handler exposes Prometheus metrics on the instance metrics registry (served by Caddy's admin `/metrics`
endpoint), labeled by `upstream`:

- `caddy_layer4_proxy_connections_total` — counter of connections proxied to an upstream;
- `caddy_layer4_proxy_active_connections` — gauge of connections currently being proxied to an upstream;
- `caddy_layer4_proxy_upstream_healthy` — gauge that is `1` when an upstream is healthy and `0` when it is down,
as determined by active health checks.

## Syntax

The handler has the following optional fields:
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/fsnotify/fsnotify v1.10.1
github.com/miekg/dns v1.1.72
github.com/pires/go-proxyproto v0.12.0
github.com/prometheus/client_golang v1.23.2
github.com/quic-go/quic-go v0.59.1
github.com/things-go/go-socks5 v0.1.1
go.uber.org/zap v1.28.0
Expand Down Expand Up @@ -77,6 +78,7 @@ require (
github.com/jackc/puddle/v2 v2.2.2 // indirect
github.com/klauspost/compress v1.18.5 // indirect
github.com/klauspost/cpuid/v2 v2.3.0 // indirect
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/libdns/libdns v1.1.1 // indirect
github.com/manifoldco/promptui v0.9.0 // indirect
github.com/mattn/go-colorable v0.1.14 // indirect
Expand All @@ -89,7 +91,6 @@ require (
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.23.2 // indirect
github.com/prometheus/client_model v0.6.2 // indirect
github.com/prometheus/common v0.67.5 // indirect
github.com/prometheus/otlptranslator v1.0.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions modules/l4proxy/healthchecks.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ func (h *Handler) doActiveHealthCheck(upstream *Upstream, p *peer) error {
if err2 != nil {
return fmt.Errorf("marking unhealthy: %v (original error: %v)", err2, err)
}
h.metrics.setUpstreamHealthy(p.dialAddr, false)
return nil
}
_ = conn.Close()
Expand All @@ -200,6 +201,7 @@ func (h *Handler) doActiveHealthCheck(upstream *Upstream, p *peer) error {
if err != nil {
return fmt.Errorf("marking healthy: %v", err)
}
h.metrics.setUpstreamHealthy(p.dialAddr, true)

return nil
}
83 changes: 83 additions & 0 deletions modules/l4proxy/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright 2020 Matthew Holt
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package l4proxy

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

// proxyMetrics holds the Prometheus collectors for a proxy handler. They are
// registered against the instance's metrics registry (obtained from the Caddy
// context) so they reset cleanly across config reloads.
type proxyMetrics struct {
connectionsTotal *prometheus.CounterVec
activeConns *prometheus.GaugeVec
upstreamHealthy *prometheus.GaugeVec
}

// newProxyMetrics creates and registers the proxy metrics on reg.
func newProxyMetrics(reg *prometheus.Registry) *proxyMetrics {
const ns, sub = "caddy", "layer4_proxy"
return &proxyMetrics{
connectionsTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Namespace: ns,
Subsystem: sub,
Name: "connections_total",
Help: "Total number of connections proxied, labeled by upstream.",
}, []string{"upstream"}),
activeConns: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns,
Subsystem: sub,
Name: "active_connections",
Help: "Number of connections currently being proxied, labeled by upstream.",
}, []string{"upstream"}),
upstreamHealthy: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns,
Subsystem: sub,
Name: "upstream_healthy",
Help: "Whether an upstream is currently healthy (1) or down (0), per active health checks.",
}, []string{"upstream"}),
}
}

// connectionOpened records the start of a proxied connection to upstream.
func (m *proxyMetrics) connectionOpened(upstream string) {
if m == nil {
return
}
m.connectionsTotal.WithLabelValues(upstream).Inc()
m.activeConns.WithLabelValues(upstream).Inc()
}

// connectionClosed records the end of a proxied connection to upstream.
func (m *proxyMetrics) connectionClosed(upstream string) {
if m == nil {
return
}
m.activeConns.WithLabelValues(upstream).Dec()
}

// setUpstreamHealthy records an upstream's current health state.
func (m *proxyMetrics) setUpstreamHealthy(upstream string, healthy bool) {
if m == nil {
return
}
v := 0.0
if healthy {
v = 1.0
}
m.upstreamHealthy.WithLabelValues(upstream).Set(v)
}
124 changes: 124 additions & 0 deletions modules/l4proxy/metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Copyright 2020 Matthew Holt
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package l4proxy

import (
"net"
"testing"
"time"

"github.com/caddyserver/caddy/v2"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"go.uber.org/zap"
)

func TestProxyMetricsConnections(t *testing.T) {
m := newProxyMetrics(prometheus.NewRegistry())

m.connectionOpened("up1")
m.connectionOpened("up1")
if got := testutil.ToFloat64(m.connectionsTotal.WithLabelValues("up1")); got != 2 {
t.Errorf("connections_total = %v, want 2", got)
}
if got := testutil.ToFloat64(m.activeConns.WithLabelValues("up1")); got != 2 {
t.Errorf("active_connections = %v, want 2", got)
}

m.connectionClosed("up1")
if got := testutil.ToFloat64(m.activeConns.WithLabelValues("up1")); got != 1 {
t.Errorf("active_connections after one close = %v, want 1", got)
}
}

func TestProxyMetricsHealth(t *testing.T) {
m := newProxyMetrics(prometheus.NewRegistry())

m.setUpstreamHealthy("p1", true)
if got := testutil.ToFloat64(m.upstreamHealthy.WithLabelValues("p1")); got != 1 {
t.Errorf("upstream_healthy = %v, want 1", got)
}
m.setUpstreamHealthy("p1", false)
if got := testutil.ToFloat64(m.upstreamHealthy.WithLabelValues("p1")); got != 0 {
t.Errorf("upstream_healthy = %v, want 0", got)
}
}

func TestProxyMetricsNilSafe(t *testing.T) {
var m *proxyMetrics // a handler that was never provisioned
m.connectionOpened("x")
m.connectionClosed("x")
m.setUpstreamHealthy("x", true)
}

func TestActiveHealthCheckUpdatesHealthMetricDown(t *testing.T) {
addr, err := caddy.ParseNetworkAddress("127.0.0.1:1") // nothing listening
if err != nil {
t.Fatalf("parsing address: %v", err)
}
p := &peer{address: &addr, dialAddr: "127.0.0.1:1"}
h := &Handler{
metrics: newProxyMetrics(prometheus.NewRegistry()),
HealthChecks: &HealthChecks{Active: &ActiveHealthChecks{
Timeout: caddy.Duration(200 * time.Millisecond),
logger: zap.NewNop(),
}},
}
h.metrics.setUpstreamHealthy(p.dialAddr, true) // pretend it was healthy

if err := h.doActiveHealthCheck(&Upstream{peers: []*peer{p}}, p); err != nil {
t.Fatalf("health check: %v", err)
}
if got := testutil.ToFloat64(h.metrics.upstreamHealthy.WithLabelValues(p.dialAddr)); got != 0 {
t.Errorf("upstream_healthy after failed check = %v, want 0", got)
}
}

func TestActiveHealthCheckUpdatesHealthMetricUp(t *testing.T) {
ln, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("listen: %v", err)
}
defer ln.Close()
go func() {
for {
c, err := ln.Accept()
if err != nil {
return
}
_ = c.Close()
}
}()

addr, err := caddy.ParseNetworkAddress(ln.Addr().String())
if err != nil {
t.Fatalf("parsing address: %v", err)
}
p := &peer{address: &addr, dialAddr: ln.Addr().String()}
h := &Handler{
metrics: newProxyMetrics(prometheus.NewRegistry()),
HealthChecks: &HealthChecks{Active: &ActiveHealthChecks{
Timeout: caddy.Duration(time.Second),
logger: zap.NewNop(),
}},
}

if err := h.doActiveHealthCheck(&Upstream{peers: []*peer{p}}, p); err != nil {
t.Fatalf("health check: %v", err)
}
if got := testutil.ToFloat64(h.metrics.upstreamHealthy.WithLabelValues(p.dialAddr)); got != 1 {
t.Errorf("upstream_healthy after successful check = %v, want 1", got)
}
}
12 changes: 10 additions & 2 deletions modules/l4proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ type Handler struct {

proxyProtocolVersion uint8

metrics *proxyMetrics

ctx caddy.Context
logger *zap.Logger
}
Expand All @@ -78,6 +80,7 @@ func (*Handler) CaddyModule() caddy.ModuleInfo {
func (h *Handler) Provision(ctx caddy.Context) error {
h.ctx = ctx
h.logger = ctx.Logger(h)
h.metrics = newProxyMetrics(ctx.GetMetricsRegistry())

// start by loading modules
if h.LoadBalancing != nil && h.LoadBalancing.SelectionPolicyRaw != nil {
Expand Down Expand Up @@ -159,10 +162,11 @@ func (h *Handler) Handle(down *layer4.Connection, _ layer4.Handler) error {

var upConns []net.Conn
var proxyErr error
var upstream *Upstream

for {
// choose an available upstream
upstream := h.LoadBalancing.SelectionPolicy.Select(h.Upstreams, down)
upstream = h.LoadBalancing.SelectionPolicy.Select(h.Upstreams, down)
if upstream == nil {
if proxyErr == nil {
proxyErr = fmt.Errorf("no upstreams available")
Expand All @@ -186,11 +190,15 @@ func (h *Handler) Handle(down *layer4.Connection, _ layer4.Handler) error {
break
}

// make sure upstream connections all get closed
upstreamLabel := upstream.String()
h.metrics.connectionOpened(upstreamLabel)

// make sure upstream connections all get closed, and record the close
defer func() {
for _, conn := range upConns {
_ = conn.Close()
}
h.metrics.connectionClosed(upstreamLabel)
}()

// finally, proxy the connection
Expand Down