Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions docs/handlers/proxy.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,18 @@ The handler has the following optional fields:
- `upstreams` may contain a list of `l4proxy.Upstream` structures (valid for JSON). In a Caddyfile, multiple `upstream`
options or blocks are unmarshalled into a list of such structures.

- `dynamic_upstreams` may contain an upstream-source module that discovers the upstreams at runtime instead of listing
them statically, so the backend set need not be restated in config when DNS already publishes it. In a Caddyfile it
is `dynamic <source> { ... }`. Two DNS sources are provided:
- `srv` resolves SRV records. Options: `service`, `proto`, `name` (or just `name` for the full domain), `refresh`
(default `1m`), `grace_period` (serve stale results for this long on lookup failure), `dial_network`.
- `a` resolves A/AAAA records for a `name` on a configured `port`. Options: `name`, `port`, `refresh`,
`grace_period`, `dial_network`.

When `dynamic_upstreams` is configured, the static `upstreams` list may be empty. Active health checks run on the
dynamically-discovered upstreams too (re-resolved each interval), so a discovered cluster can be health-gated — for
example to route only to the node whose HTTP health check passes.

**Active health checks** occur independently in a background goroutine. They run in the background on a timer.
To minimally enable active health checks, set `active` field equal to an empty structure inside `health_checks` in
a JSON configuration or include any active health check option into a Caddyfile.
Expand Down
46 changes: 46 additions & 0 deletions integration/caddyfile_adapt/gd_handler_proxy_dynamic_srv.caddytest
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
{
layer4 {
:5432 {
route {
proxy {
dynamic srv {
service postgres
proto tcp
name db.internal
refresh 30s
}
}
}
}
}
}
----------
{
"apps": {
"layer4": {
"servers": {
"srv0": {
"listen": [
":5432"
],
"routes": [
{
"handle": [
{
"dynamic_upstreams": {
"name": "db.internal",
"proto": "tcp",
"refresh": 30000000000,
"service": "postgres",
"source": "srv"
},
"handler": "proxy"
}
]
}
]
}
}
}
}
}
93 changes: 93 additions & 0 deletions modules/l4proxy/dynamichealth_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright 2020 Matthew Holt
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package l4proxy

import (
"errors"
"testing"
"time"

"github.com/caddyserver/caddy/v2"
"go.uber.org/zap"
)

// erroringSource is an UpstreamSource whose GetUpstreams always fails.
type erroringSource struct{}

func (erroringSource) GetUpstreams(*caddy.Replacer) (UpstreamPool, error) {
return nil, errors.New("discovery failed")
}

// TestActiveHealthCheckDynamicSourceError exercises the path where the dynamic
// upstream source returns an error during a health-check pass: it must be
// logged and swallowed (no panic).
func TestActiveHealthCheckDynamicSourceError(t *testing.T) {
h := &Handler{
dynamicUpstreams: erroringSource{},
HealthChecks: &HealthChecks{Active: &ActiveHealthChecks{
Timeout: caddy.Duration(200 * time.Millisecond),
logger: zap.NewNop(),
}},
}
// Must not panic; the error from the source is logged and ignored.
h.doActiveHealthCheckForAllHosts()
}

// TestActiveHealthCheckMarksDynamicUpstream verifies that the active health
// checker also checks dynamically-discovered upstreams: a discovered peer that
// cannot be dialed must be marked unhealthy.
func TestActiveHealthCheckMarksDynamicUpstream(t *testing.T) {
// dynamic source returns a single dead address (nothing listens on port 1)
au := aWith("db.dyn-health.test", "1", []string{"127.0.0.1"}, nil, nil)

h := &Handler{
dynamicUpstreams: au,
HealthChecks: &HealthChecks{
Active: &ActiveHealthChecks{
Timeout: caddy.Duration(200 * time.Millisecond),
logger: zap.NewNop(),
},
},
}

// resolve once so we hold the same peer the checker will mark (the cache
// returns the same pool/peer pointers)
pool, err := au.GetUpstreams(caddy.NewReplacer())
if err != nil {
t.Fatalf("GetUpstreams: %v", err)
}
if len(pool) != 1 || len(pool[0].peers) != 1 {
t.Fatalf("expected exactly one discovered peer, got %d upstreams", len(pool))
}
p := pool[0].peers[0]
if !p.healthy() {
t.Fatal("peer should start healthy")
}

h.doActiveHealthCheckForAllHosts()

// the check runs in a goroutine; wait for it to mark the peer down
var down bool
for range 100 {
if !p.healthy() {
down = true
break
}
time.Sleep(20 * time.Millisecond)
}
if !down {
t.Fatal("dynamically-discovered dead peer was not marked unhealthy")
}
}
29 changes: 26 additions & 3 deletions modules/l4proxy/healthchecks.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,33 @@ func (h *Handler) activeHealthChecker() {
}
}

// doActiveHealthCheckForAllHosts immediately performs a
// health checks for all upstream hosts configured by h.
// doActiveHealthCheckForAllHosts immediately performs health checks for all
// upstream hosts known to h: the statically-configured ones and, if a dynamic
// upstreams source is configured, the currently-discovered ones too. This lets
// a discovered cluster (e.g. via DNS SRV/A) be health-gated — for example to
// route only to the node whose /primary endpoint reports it is the leader —
// without any external coordinator.
func (h *Handler) doActiveHealthCheckForAllHosts() {
for _, upstream := range h.Upstreams {
h.activeHealthCheckUpstreams(h.Upstreams)

if h.dynamicUpstreams != nil {
// Discovery here is connection-independent, so use a bare replacer;
// dynamic upstream sources used with active health checks should not
// rely on connection-scoped placeholders.
dynamic, err := h.dynamicUpstreams.GetUpstreams(caddy.NewReplacer())
if err != nil {
h.HealthChecks.Active.logger.Error("getting dynamic upstreams for active health check",
zap.Error(err))
} else {
h.activeHealthCheckUpstreams(dynamic)
}
}
}

// activeHealthCheckUpstreams runs an active health check against every peer of
// every upstream in the pool, one goroutine per upstream.
func (h *Handler) activeHealthCheckUpstreams(upstreams UpstreamPool) {
for _, upstream := range upstreams {
go func(upstream *Upstream) {
defer func() {
if err := recover(); err != nil {
Expand Down
61 changes: 58 additions & 3 deletions modules/l4proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"fmt"
"io"
"log"
Expand Down Expand Up @@ -46,9 +47,14 @@ func init() {

// Handler is a handler that can proxy connections.
type Handler struct {
// Upstreams is the list of backends to proxy to.
// Upstreams is the static list of backends to proxy to.
Upstreams UpstreamPool `json:"upstreams,omitempty"`

// DynamicUpstreamsRaw is a module that discovers upstreams dynamically (per
// connection) instead of listing them statically — e.g. from DNS SRV
// records, so the backend set need not be restated in config.
DynamicUpstreamsRaw json.RawMessage `json:"dynamic_upstreams,omitempty" caddy:"namespace=layer4.proxy.upstreams inline_key=source"`

// Health checks update the status of backends, whether they are
// up or down. Down backends will not be proxied to.
HealthChecks *HealthChecks `json:"health_checks,omitempty"`
Expand All @@ -62,6 +68,8 @@ type Handler struct {

proxyProtocolVersion uint8

dynamicUpstreams UpstreamSource

ctx caddy.Context
logger *zap.Logger
}
Expand Down Expand Up @@ -98,8 +106,17 @@ func (h *Handler) Provision(ctx caddy.Context) error {
return fmt.Errorf("proxy_protocol: \"%s\" should be empty, or one of \"v1\" \"v2\"", proxyProtocol)
}

// load the dynamic upstreams source module, if configured
if h.DynamicUpstreamsRaw != nil {
mod, err := ctx.LoadModule(h, "DynamicUpstreamsRaw")
if err != nil {
return fmt.Errorf("loading dynamic upstreams source module: %v", err)
}
h.dynamicUpstreams = mod.(UpstreamSource)
}

// prepare upstreams
if len(h.Upstreams) == 0 {
if len(h.Upstreams) == 0 && h.dynamicUpstreams == nil {
return fmt.Errorf("no upstreams defined")
}
for i, ups := range h.Upstreams {
Expand Down Expand Up @@ -160,9 +177,20 @@ func (h *Handler) Handle(down *layer4.Connection, _ layer4.Handler) error {
var upConns []net.Conn
var proxyErr error

// determine the pool: dynamically discovered (per connection) or static
pool := h.Upstreams
if h.dynamicUpstreams != nil {
dynUpstreams, err := h.dynamicUpstreams.GetUpstreams(repl)
if err != nil {
h.logger.Error("getting dynamic upstreams", zap.Error(err))
} else {
pool = dynUpstreams
}
}

for {
// choose an available upstream
upstream := h.LoadBalancing.SelectionPolicy.Select(h.Upstreams, down)
upstream := h.LoadBalancing.SelectionPolicy.Select(pool, down)
if upstream == nil {
if proxyErr == nil {
proxyErr = fmt.Errorf("no upstreams available")
Expand Down Expand Up @@ -502,6 +530,11 @@ func (h *Handler) Cleanup() error {
//
// proxy_protocol <v1|v2>
//
// # discover upstreams dynamically instead of listing them
// dynamic <source> [<args...>] {
// ...
// }
//
// # multiple upstream options are supported
// upstream [<args...>] {
// ...
Expand Down Expand Up @@ -697,6 +730,28 @@ func (h *Handler) UnmarshalCaddyfile(d *caddyfile.Dispenser) error {
return d.Errf("duplicate %s option '%s'", wrapper, optionName)
}
_, h.ProxyProtocol, hasProxyProtocol = d.NextArg(), d.Val(), true
case "dynamic":
if h.DynamicUpstreamsRaw != nil {
return d.Errf("duplicate %s option '%s'", wrapper, optionName)
}
if !d.NextArg() {
return d.ArgErr()
}
sourceName := d.Val()
unm, err := caddyfile.UnmarshalModule(d, "layer4.proxy.upstreams."+sourceName)
if err != nil {
return err
}
source, ok := unm.(UpstreamSource)
if !ok {
return d.Errf("module '%s' is not an upstream source", sourceName)
}
sourceRaw := caddyconfig.JSON(source, nil)
sourceRaw, err = layer4.SetModuleNameInline("source", sourceName, sourceRaw)
if err != nil {
return d.Errf("re-encoding module '%s' configuration: %v", sourceName, err)
}
h.DynamicUpstreamsRaw = sourceRaw
case "upstream":
u := &Upstream{}
if err := u.UnmarshalCaddyfile(d.NewFromNextSegment()); err != nil {
Expand Down
Loading