Skip to content

Commit b4e596a

Browse files
Adds lazy loading + streaming queue population for minitest-queue
1 parent 3de93c6 commit b4e596a

38 files changed

+2302
-92
lines changed

redis/_entry_helpers.lua

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
local function test_id_from_entry(value, delimiter)
2+
if delimiter then
3+
local pos = string.find(value, delimiter, 1, true)
4+
if pos then
5+
return string.sub(value, 1, pos - 1)
6+
end
7+
end
8+
return value
9+
end

redis/acknowledge.lua

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,16 @@ local processed_key = KEYS[2]
33
local owners_key = KEYS[3]
44
local error_reports_key = KEYS[4]
55

6-
local test = ARGV[1]
7-
local error = ARGV[2]
8-
local ttl = ARGV[3]
9-
redis.call('zrem', zset_key, test)
10-
redis.call('hdel', owners_key, test) -- Doesn't matter if it was reclaimed by another workers
11-
local acknowledged = redis.call('sadd', processed_key, test) == 1
6+
local entry = ARGV[1]
7+
local test_id = ARGV[2]
8+
local error = ARGV[3]
9+
local ttl = ARGV[4]
10+
redis.call('zrem', zset_key, entry)
11+
redis.call('hdel', owners_key, entry) -- Doesn't matter if it was reclaimed by another workers
12+
local acknowledged = redis.call('sadd', processed_key, test_id) == 1
1213

1314
if acknowledged and error ~= "" then
14-
redis.call('hset', error_reports_key, test, error)
15+
redis.call('hset', error_reports_key, test_id, error)
1516
redis.call('expire', error_reports_key, ttl)
1617
end
1718

redis/heartbeat.lua

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,22 @@
1+
-- @include _entry_helpers
2+
13
local zset_key = KEYS[1]
24
local processed_key = KEYS[2]
35
local owners_key = KEYS[3]
46
local worker_queue_key = KEYS[4]
57

68
local current_time = ARGV[1]
7-
local test = ARGV[2]
9+
local entry = ARGV[2]
10+
local entry_delimiter = ARGV[3]
11+
12+
local test_id = test_id_from_entry(entry, entry_delimiter)
813

914
-- already processed, we do not need to bump the timestamp
10-
if redis.call('sismember', processed_key, test) == 1 then
15+
if redis.call('sismember', processed_key, test_id) == 1 then
1116
return false
1217
end
1318

1419
-- we're still the owner of the test, we can bump the timestamp
15-
if redis.call('hget', owners_key, test) == worker_queue_key then
16-
return redis.call('zadd', zset_key, current_time, test)
20+
if redis.call('hget', owners_key, entry) == worker_queue_key then
21+
return redis.call('zadd', zset_key, current_time, entry)
1722
end

redis/requeue.lua

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,15 @@ local error_reports_key = KEYS[7]
88

99
local max_requeues = tonumber(ARGV[1])
1010
local global_max_requeues = tonumber(ARGV[2])
11-
local test = ARGV[3]
12-
local offset = ARGV[4]
11+
local entry = ARGV[3]
12+
local test_id = ARGV[4]
13+
local offset = ARGV[5]
1314

14-
if redis.call('hget', owners_key, test) == worker_queue_key then
15-
redis.call('hdel', owners_key, test)
15+
if redis.call('hget', owners_key, entry) == worker_queue_key then
16+
redis.call('hdel', owners_key, entry)
1617
end
1718

18-
if redis.call('sismember', processed_key, test) == 1 then
19+
if redis.call('sismember', processed_key, test_id) == 1 then
1920
return false
2021
end
2122

@@ -24,23 +25,23 @@ if global_requeues and global_requeues >= tonumber(global_max_requeues) then
2425
return false
2526
end
2627

27-
local requeues = tonumber(redis.call('hget', requeues_count_key, test))
28+
local requeues = tonumber(redis.call('hget', requeues_count_key, test_id))
2829
if requeues and requeues >= max_requeues then
2930
return false
3031
end
3132

3233
redis.call('hincrby', requeues_count_key, '___total___', 1)
33-
redis.call('hincrby', requeues_count_key, test, 1)
34+
redis.call('hincrby', requeues_count_key, test_id, 1)
3435

35-
redis.call('hdel', error_reports_key, test)
36+
redis.call('hdel', error_reports_key, test_id)
3637

3738
local pivot = redis.call('lrange', queue_key, -1 - offset, 0 - offset)[1]
3839
if pivot then
39-
redis.call('linsert', queue_key, 'BEFORE', pivot, test)
40+
redis.call('linsert', queue_key, 'BEFORE', pivot, entry)
4041
else
41-
redis.call('lpush', queue_key, test)
42+
redis.call('lpush', queue_key, entry)
4243
end
4344

44-
redis.call('zrem', zset_key, test)
45+
redis.call('zrem', zset_key, entry)
4546

4647
return true

redis/reserve_lost.lua

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,18 @@
1+
-- @include _entry_helpers
2+
13
local zset_key = KEYS[1]
24
local processed_key = KEYS[2]
35
local worker_queue_key = KEYS[3]
46
local owners_key = KEYS[4]
57

68
local current_time = ARGV[1]
79
local timeout = ARGV[2]
10+
local entry_delimiter = ARGV[3]
811

912
local lost_tests = redis.call('zrangebyscore', zset_key, 0, current_time - timeout)
1013
for _, test in ipairs(lost_tests) do
11-
if redis.call('sismember', processed_key, test) == 0 then
14+
local test_id = test_id_from_entry(test, entry_delimiter)
15+
if redis.call('sismember', processed_key, test_id) == 0 then
1216
redis.call('zadd', zset_key, current_time, test)
1317
redis.call('lpush', worker_queue_key, test)
1418
redis.call('hset', owners_key, test, worker_queue_key) -- Take ownership

ruby/README.md

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,33 @@ minitest-queue --queue redis://example.com run -Itest test/**/*_test.rb
3838

3939
Additionally you can configure the requeue settings (see main README) with `--max-requeues` and `--requeue-tolerance`.
4040

41+
#### Lazy loading (opt-in)
42+
43+
Lazy loading and streaming are currently supported only by `minitest-queue` (not `rspec-queue`).
44+
45+
To reduce worker memory usage, you can enable lazy loading so test files are loaded on-demand:
46+
47+
```bash
48+
minitest-queue --queue redis://example.com --lazy-load run -Itest test/**/*_test.rb
49+
```
50+
51+
You can tune streaming with `--lazy-load-stream-batch-size` (default: 10000) and `--lazy-load-stream-timeout`.
52+
53+
Environment variables:
54+
55+
- `CI_QUEUE_LAZY_LOAD=1`
56+
- `CI_QUEUE_LAZY_LOAD_STREAM_BATCH_SIZE=10000`
57+
- `CI_QUEUE_LAZY_LOAD_STREAM_TIMEOUT=300`
58+
- `CI_QUEUE_LAZY_LOAD_TEST_HELPERS=test/test_helper.rb`
59+
60+
Backward-compatible aliases still work:
61+
62+
- `CI_QUEUE_STREAM_BATCH_SIZE`
63+
- `CI_QUEUE_STREAM_TIMEOUT`
64+
- `CI_QUEUE_TEST_HELPERS`
65+
66+
When enabled, file loading stats are printed at the end of the run if debug is enabled.
67+
4168

4269
If you'd like to centralize the error reporting you can do so with:
4370

ruby/lib/ci/queue.rb

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@
1414
require 'ci/queue/file'
1515
require 'ci/queue/grind'
1616
require 'ci/queue/bisect'
17+
require 'ci/queue/queue_entry'
18+
require 'ci/queue/class_resolver'
19+
require 'ci/queue/file_loader'
1720

1821
module CI
1922
module Queue
@@ -22,6 +25,18 @@ module Queue
2225
attr_accessor :shuffler, :requeueable
2326

2427
Error = Class.new(StandardError)
28+
ClassNotFoundError = Class.new(Error)
29+
30+
class FileLoadError < Error
31+
attr_reader :file_path, :original_error
32+
33+
def initialize(file_path, original_error)
34+
@file_path = file_path
35+
@original_error = original_error
36+
super("Failed to load #{file_path}: #{original_error.class}: #{original_error.message}")
37+
set_backtrace(original_error.backtrace)
38+
end
39+
end
2540

2641
module Warnings
2742
RESERVED_LOST_TEST = :RESERVED_LOST_TEST
@@ -48,6 +63,11 @@ def shuffle(tests, random)
4863
end
4964
end
5065

66+
def debug?
67+
value = ENV['CI_QUEUE_DEBUG']
68+
value && !value.strip.empty? && !%w[0 false].include?(value.strip.downcase)
69+
end
70+
5171
def from_uri(url, config)
5272
uri = URI(url)
5373
implementation = case uri.scheme
@@ -65,3 +85,4 @@ def from_uri(url, config)
6585
end
6686
end
6787
end
88+
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
# frozen_string_literal: true
2+
3+
module CI
4+
module Queue
5+
module ClassResolver
6+
def self.resolve(class_name, file_path: nil, loader: nil)
7+
klass = try_direct_lookup(class_name)
8+
return klass if klass
9+
10+
if file_path && loader
11+
loader.load_file(file_path)
12+
klass = try_direct_lookup(class_name)
13+
return klass if klass
14+
end
15+
16+
raise ClassNotFoundError, "Unable to resolve class #{class_name}"
17+
end
18+
19+
def self.try_direct_lookup(class_name)
20+
parts = class_name.sub(/\A::/, '').split('::')
21+
current = Object
22+
23+
parts.each do |name|
24+
return nil unless current.const_defined?(name, false)
25+
26+
current = current.const_get(name, false)
27+
end
28+
29+
return nil unless current.is_a?(Class)
30+
31+
current
32+
rescue NameError
33+
nil
34+
end
35+
private_class_method :try_direct_lookup
36+
end
37+
end
38+
end

ruby/lib/ci/queue/configuration.rb

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,16 @@ class Configuration
66
attr_accessor :requeue_tolerance, :namespace, :failing_test, :statsd_endpoint
77
attr_accessor :max_test_duration, :max_test_duration_percentile, :track_test_duration
88
attr_accessor :max_test_failed, :redis_ttl, :warnings_file, :debug_log, :max_missed_heartbeat_seconds
9+
attr_accessor :lazy_load, :lazy_load_stream_batch_size
10+
attr_accessor :lazy_load_streaming_timeout, :lazy_load_test_helpers
911
attr_reader :circuit_breakers
1012
attr_writer :seed, :build_id
1113
attr_writer :queue_init_timeout, :report_timeout, :inactive_workers_timeout
1214

1315
class << self
1416
def from_env(env)
17+
lazy_load_value = env['CI_QUEUE_LAZY_LOAD']
18+
lazy_load = lazy_load_value && !lazy_load_value.strip.empty? && !%w(0 false).include?(lazy_load_value.strip.downcase)
1519
new(
1620
build_id: env['CIRCLE_BUILD_URL'] || env['BUILDKITE_BUILD_ID'] || env['TRAVIS_BUILD_ID'] || env['HEROKU_TEST_RUN_ID'] || env['SEMAPHORE_PIPELINE_ID'],
1721
worker_id: env['CIRCLE_NODE_INDEX'] || env['BUILDKITE_PARALLEL_JOB'] || env['CI_NODE_INDEX'] || env['SEMAPHORE_JOB_ID'],
@@ -22,6 +26,10 @@ def from_env(env)
2226
debug_log: env['CI_QUEUE_DEBUG_LOG'],
2327
max_requeues: env['CI_QUEUE_MAX_REQUEUES']&.to_i || 0,
2428
requeue_tolerance: env['CI_QUEUE_REQUEUE_TOLERANCE']&.to_f || 0,
29+
lazy_load: lazy_load || false,
30+
lazy_load_stream_batch_size: (env['CI_QUEUE_LAZY_LOAD_STREAM_BATCH_SIZE'] || env['CI_QUEUE_STREAM_BATCH_SIZE'])&.to_i,
31+
lazy_load_streaming_timeout: (env['CI_QUEUE_LAZY_LOAD_STREAM_TIMEOUT'] || env['CI_QUEUE_STREAM_TIMEOUT'])&.to_i,
32+
lazy_load_test_helpers: env['CI_QUEUE_LAZY_LOAD_TEST_HELPERS'] || env['CI_QUEUE_TEST_HELPERS'],
2533
)
2634
end
2735

@@ -46,7 +54,8 @@ def initialize(
4654
grind_count: nil, max_duration: nil, failure_file: nil, max_test_duration: nil,
4755
max_test_duration_percentile: 0.5, track_test_duration: false, max_test_failed: nil,
4856
queue_init_timeout: nil, redis_ttl: 8 * 60 * 60, report_timeout: nil, inactive_workers_timeout: nil,
49-
export_flaky_tests_file: nil, warnings_file: nil, debug_log: nil, max_missed_heartbeat_seconds: nil)
57+
export_flaky_tests_file: nil, warnings_file: nil, debug_log: nil, max_missed_heartbeat_seconds: nil,
58+
lazy_load: false, lazy_load_stream_batch_size: nil, lazy_load_streaming_timeout: nil, lazy_load_test_helpers: nil)
5059
@build_id = build_id
5160
@circuit_breakers = [CircuitBreaker::Disabled]
5261
@failure_file = failure_file
@@ -73,6 +82,16 @@ def initialize(
7382
@warnings_file = warnings_file
7483
@debug_log = debug_log
7584
@max_missed_heartbeat_seconds = max_missed_heartbeat_seconds
85+
@lazy_load = lazy_load
86+
@lazy_load_stream_batch_size = lazy_load_stream_batch_size || 5_000
87+
@lazy_load_streaming_timeout = lazy_load_streaming_timeout
88+
@lazy_load_test_helpers = lazy_load_test_helpers
89+
end
90+
91+
def lazy_load_test_helper_paths
92+
return [] unless @lazy_load_test_helpers
93+
94+
@lazy_load_test_helpers.split(',').map(&:strip)
7695
end
7796

7897
def queue_init_timeout
@@ -83,6 +102,43 @@ def report_timeout
83102
@report_timeout || timeout
84103
end
85104

105+
def lazy_load_streaming_timeout
106+
if @lazy_load_streaming_timeout && @lazy_load_streaming_timeout > 0
107+
@lazy_load_streaming_timeout
108+
else
109+
[queue_init_timeout, 300].max
110+
end
111+
end
112+
113+
# Backward-compatible aliases for existing callers.
114+
def stream_batch_size
115+
lazy_load_stream_batch_size
116+
end
117+
118+
def stream_batch_size=(value)
119+
self.lazy_load_stream_batch_size = value
120+
end
121+
122+
def streaming_timeout
123+
lazy_load_streaming_timeout
124+
end
125+
126+
def streaming_timeout=(value)
127+
self.lazy_load_streaming_timeout = value
128+
end
129+
130+
def test_helpers
131+
lazy_load_test_helpers
132+
end
133+
134+
def test_helpers=(value)
135+
self.lazy_load_test_helpers = value
136+
end
137+
138+
def test_helper_paths
139+
lazy_load_test_helper_paths
140+
end
141+
86142
def inactive_workers_timeout
87143
@inactive_workers_timeout || timeout
88144
end

0 commit comments

Comments
 (0)