Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions token/registry.go → bucket/registry.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package token
package bucket

import (
"sync"
Expand All @@ -8,13 +8,13 @@ type (
Identifier string
Registry struct {
mu sync.Mutex
limiters map[Identifier]*Limiter
limiters map[Identifier]*TokenLimiter
capacity, rate uint32
}
)

func NewRegistry(capacity, rate uint32, users ...Identifier) (*Registry, error) {
limiters := make(map[Identifier]*Limiter)
limiters := make(map[Identifier]*TokenLimiter)

for _, user := range users {
limiter := NewLimiter(capacity, rate)
Expand Down
30 changes: 15 additions & 15 deletions token/registry_test.go → bucket/registry_test.go
Original file line number Diff line number Diff line change
@@ -1,29 +1,29 @@
package token_test
package bucket_test

import (
"sync"
"sync/atomic"
"testing"

"github.com/serroba/rate/token"
"github.com/serroba/rate/bucket"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestNewRegistry(t *testing.T) {
reg, err := token.NewRegistry(10, 2)
reg, err := bucket.NewRegistry(10, 2)
require.NoError(t, err)
require.NotNil(t, reg)
}

func TestNewRegistry_WithUsers(t *testing.T) {
reg, err := token.NewRegistry(10, 2, "alice", "bob")
reg, err := bucket.NewRegistry(10, 2, "alice", "bob")
require.NoError(t, err)
require.NotNil(t, reg)
}

func TestRegistry_Allow_ExistingUser(t *testing.T) {
reg, err := token.NewRegistry(2, 0, "alice")
reg, err := bucket.NewRegistry(2, 0, "alice")
require.NoError(t, err)

require.True(t, reg.Allow("alice"))
Expand All @@ -32,7 +32,7 @@ func TestRegistry_Allow_ExistingUser(t *testing.T) {
}

func TestRegistry_Allow_NewUser(t *testing.T) {
reg, err := token.NewRegistry(2, 0)
reg, err := bucket.NewRegistry(2, 0)
require.NoError(t, err)

// First call for a new user should create limiter and allow
Expand All @@ -42,7 +42,7 @@ func TestRegistry_Allow_NewUser(t *testing.T) {
}

func TestRegistry_Allow_IndependentUsers(t *testing.T) {
reg, err := token.NewRegistry(1, 0)
reg, err := bucket.NewRegistry(1, 0)
require.NoError(t, err)

// Each user has their own bucket
Expand All @@ -55,7 +55,7 @@ func TestRegistry_Allow_IndependentUsers(t *testing.T) {
}

func TestRegistry_Allow_Concurrent(t *testing.T) {
reg, err := token.NewRegistry(100, 0)
reg, err := bucket.NewRegistry(100, 0)
require.NoError(t, err)

var (
Expand All @@ -64,12 +64,12 @@ func TestRegistry_Allow_Concurrent(t *testing.T) {
)

// 50 goroutines per user, 4 users = 200 goroutines
users := []token.Identifier{"alice", "bob", "charlie", "diana"}
users := []bucket.Identifier{"alice", "bob", "charlie", "diana"}
for _, user := range users {
for range 50 {
wg.Add(1)

go func(u token.Identifier) {
go func(u bucket.Identifier) {
defer wg.Done()

if reg.Allow(u) {
Expand All @@ -86,7 +86,7 @@ func TestRegistry_Allow_Concurrent(t *testing.T) {
}

func TestRegistry_Deny_Concurrent(t *testing.T) {
reg, err := token.NewRegistry(100, 0)
reg, err := bucket.NewRegistry(100, 0)
require.NoError(t, err)

var (
Expand All @@ -96,12 +96,12 @@ func TestRegistry_Deny_Concurrent(t *testing.T) {
)

// 50 goroutines per user, 4 users = 200 goroutines
users := []token.Identifier{"alice", "bob", "charlie", "diana"}
users := []bucket.Identifier{"alice", "bob", "charlie", "diana"}
for _, user := range users {
for range 110 {
wg.Add(1)

go func(u token.Identifier) {
go func(u bucket.Identifier) {
defer wg.Done()

if reg.Allow(u) {
Expand All @@ -121,7 +121,7 @@ func TestRegistry_Deny_Concurrent(t *testing.T) {
}

func TestRegistry_Allow_ConcurrentNewUsers(t *testing.T) {
reg, err := token.NewRegistry(5, 0)
reg, err := bucket.NewRegistry(5, 0)
require.NoError(t, err)

var wg sync.WaitGroup
Expand All @@ -133,7 +133,7 @@ func TestRegistry_Allow_ConcurrentNewUsers(t *testing.T) {
go func(id int) {
defer wg.Done()

user := token.Identifier(rune('a' + id%26))
user := bucket.Identifier(rune('a' + id%26))
reg.Allow(user)
}(i)
}
Expand Down
18 changes: 9 additions & 9 deletions token/rate.go → bucket/token.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package token
package bucket

import (
"sync"
Expand All @@ -15,9 +15,9 @@ func (c realClock) Now() time.Time {
return time.Now()
}

// Limiter implements a token bucket rate limiter. It allows a burst of
// TokenLimiter implements a bucket rate limiter. It allows a burst of
// requests up to capacity, then refills tokens at the specified rate per second.
type Limiter struct {
type TokenLimiter struct {
mu sync.Mutex
capacity, tokens, rate float64
lastRefillAt time.Time
Expand All @@ -26,14 +26,14 @@ type Limiter struct {

// NewLimiter creates a new rate limiter with the given capacity and refill rate.
// Capacity is the maximum burst size. Rate is tokens added per second.
func NewLimiter(capacity, rate uint32) *Limiter {
func NewLimiter(capacity, rate uint32) *TokenLimiter {
return NewLimiterWithClock(capacity, rate, realClock{})
}

// NewLimiterWithClock creates a new rate limiter with a custom clock.
// Use this constructor for testing with a mock clock.
func NewLimiterWithClock(capacity, rate uint32, clock clock) *Limiter {
return &Limiter{
func NewLimiterWithClock(capacity, rate uint32, clock clock) *TokenLimiter {
return &TokenLimiter{
capacity: float64(capacity),
tokens: float64(capacity),
rate: float64(rate),
Expand All @@ -42,10 +42,10 @@ func NewLimiterWithClock(capacity, rate uint32, clock clock) *Limiter {
}
}

// Allow reports whether a request is allowed. It consumes one token if
// Allow reports whether a request is allowed. It consumes one bucket if
// available and returns true. If no tokens are available, it returns false
// without blocking.
func (lim *Limiter) Allow() bool {
func (lim *TokenLimiter) Allow() bool {
lim.mu.Lock()
defer lim.mu.Unlock()

Expand All @@ -60,7 +60,7 @@ func (lim *Limiter) Allow() bool {
return false
}

func (lim *Limiter) refill() {
func (lim *TokenLimiter) refill() {
t := lim.clock.Now()
if t.Before(lim.lastRefillAt) {
return
Expand Down
14 changes: 7 additions & 7 deletions token/rate_test.go → bucket/token_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package token_test
package bucket_test

import (
"sync"
"sync/atomic"
"testing"
"time"

"github.com/serroba/rate/token"
"github.com/serroba/rate/bucket"
"github.com/stretchr/testify/require"
)

Expand All @@ -24,9 +24,9 @@ func (c *testClock) advance(by time.Duration) {

func TestLimiter_Allow_ClockGoesBackwards(t *testing.T) {
clock := &testClock{now: time.Now()}
lim := token.NewLimiterWithClock(1, 1, clock)
lim := bucket.NewLimiterWithClock(1, 1, clock)

// Drain the token
// Drain the bucket
require.True(t, lim.Allow())

// Move clock backwards - should not refill
Expand Down Expand Up @@ -74,7 +74,7 @@ func TestLimiter_Allow(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
lim := token.NewLimiterWithClock(tt.fields.capacity, tt.fields.rate, clock)
lim := bucket.NewLimiterWithClock(tt.fields.capacity, tt.fields.rate, clock)

for range tt.previousAttempts {
lim.Allow()
Expand All @@ -90,7 +90,7 @@ func TestLimiter_Allow(t *testing.T) {
}

func TestLimiter_Allow_Concurrent(t *testing.T) {
lim := token.NewLimiter(100, 0)
lim := bucket.NewLimiter(100, 0)

var (
allowed atomic.Int64
Expand Down Expand Up @@ -119,7 +119,7 @@ func TestLimiter_Allow_Concurrent(t *testing.T) {

func TestLimiter_Allow_ConcurrentWithRefill(t *testing.T) {
clock := &testClock{now: time.Now()}
lim := token.NewLimiterWithClock(10, 1000, clock)
lim := bucket.NewLimiterWithClock(10, 1000, clock)

var (
allowed atomic.Int64
Expand Down