Skip to content

Commit a054ff0

Browse files
authored
Merge pull request #69 from tinh-tinh/feat/ren/68-list-failed-job-in-redis
feat: add methods to list and manage failed jobs in Redis
2 parents f713235 + 435cfa9 commit a054ff0

2 files changed

Lines changed: 299 additions & 1 deletion

File tree

failed_jobs_test.go

Lines changed: 207 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,207 @@
1+
package queue_test
2+
3+
import (
4+
"errors"
5+
"testing"
6+
"time"
7+
8+
"github.com/redis/go-redis/v9"
9+
"github.com/stretchr/testify/require"
10+
"github.com/tinh-tinh/queue/v2"
11+
)
12+
13+
func Test_GetFailedJobs(t *testing.T) {
14+
failedQueue := queue.New("failed_jobs_test", &queue.Options{
15+
Connect: &redis.Options{
16+
Addr: "localhost:6379",
17+
Password: "",
18+
DB: 0,
19+
},
20+
Workers: 3,
21+
RetryFailures: 0, // No retries, so jobs fail immediately
22+
})
23+
24+
// Clear any existing failed jobs first
25+
err := failedQueue.ClearFailedJobs()
26+
require.Nil(t, err)
27+
28+
failedQueue.Process(func(job *queue.Job) {
29+
job.Process(func() error {
30+
// All jobs will fail
31+
return errors.New("intentional failure for test")
32+
})
33+
})
34+
35+
// Add multiple jobs that will fail
36+
failedQueue.AddJob(queue.AddJobOptions{
37+
Id: "fail1",
38+
Data: "value 1",
39+
})
40+
failedQueue.AddJob(queue.AddJobOptions{
41+
Id: "fail2",
42+
Data: "value 2",
43+
})
44+
failedQueue.AddJob(queue.AddJobOptions{
45+
Id: "fail3",
46+
Data: "value 3",
47+
})
48+
49+
// Wait a bit for jobs to be processed
50+
time.Sleep(500 * time.Millisecond)
51+
52+
// Retrieve failed jobs
53+
failedJobs, err := failedQueue.GetFailedJobs()
54+
require.Nil(t, err)
55+
require.Equal(t, 3, len(failedJobs))
56+
57+
// Verify job IDs are present
58+
jobIds := make(map[string]bool)
59+
for _, job := range failedJobs {
60+
jobIds[job.Id] = true
61+
require.NotEmpty(t, job.FailedReason)
62+
require.Contains(t, job.FailedReason, "intentional failure for test")
63+
require.Equal(t, queue.FailedStatus, job.Status)
64+
}
65+
require.True(t, jobIds["fail1"])
66+
require.True(t, jobIds["fail2"])
67+
require.True(t, jobIds["fail3"])
68+
69+
// Clean up
70+
err = failedQueue.ClearFailedJobs()
71+
require.Nil(t, err)
72+
}
73+
74+
func Test_GetFailedJob(t *testing.T) {
75+
singleFailQueue := queue.New("single_fail_test", &queue.Options{
76+
Connect: &redis.Options{
77+
Addr: "localhost:6379",
78+
Password: "",
79+
DB: 0,
80+
},
81+
Workers: 3,
82+
RetryFailures: 0,
83+
})
84+
85+
// Clear any existing failed jobs first
86+
err := singleFailQueue.ClearFailedJobs()
87+
require.Nil(t, err)
88+
89+
singleFailQueue.Process(func(job *queue.Job) {
90+
job.Process(func() error {
91+
return errors.New("specific error for job " + job.Id)
92+
})
93+
})
94+
95+
// Add a job that will fail
96+
singleFailQueue.AddJob(queue.AddJobOptions{
97+
Id: "specific_fail",
98+
Data: "test data",
99+
})
100+
101+
// Wait for job to be processed
102+
time.Sleep(500 * time.Millisecond)
103+
104+
// Retrieve the specific failed job
105+
reason, err := singleFailQueue.GetFailedJob("specific_fail")
106+
require.Nil(t, err)
107+
require.Contains(t, reason, "specific error for job specific_fail")
108+
109+
// Try to get a non-existent failed job
110+
_, err = singleFailQueue.GetFailedJob("non_existent")
111+
require.NotNil(t, err)
112+
require.Contains(t, err.Error(), "not found")
113+
114+
// Clean up
115+
err = singleFailQueue.ClearFailedJobs()
116+
require.Nil(t, err)
117+
}
118+
119+
func Test_ClearFailedJobs(t *testing.T) {
120+
clearQueue := queue.New("clear_test", &queue.Options{
121+
Connect: &redis.Options{
122+
Addr: "localhost:6379",
123+
Password: "",
124+
DB: 0,
125+
},
126+
Workers: 3,
127+
RetryFailures: 0,
128+
})
129+
130+
// Clear any existing failed jobs first
131+
err := clearQueue.ClearFailedJobs()
132+
require.Nil(t, err)
133+
134+
clearQueue.Process(func(job *queue.Job) {
135+
job.Process(func() error {
136+
return errors.New("error for clearing test")
137+
})
138+
})
139+
140+
// Add multiple jobs that will fail
141+
clearQueue.BulkAddJob([]queue.AddJobOptions{
142+
{Id: "clear1", Data: "value 1"},
143+
{Id: "clear2", Data: "value 2"},
144+
{Id: "clear3", Data: "value 3"},
145+
{Id: "clear4", Data: "value 4"},
146+
{Id: "clear5", Data: "value 5"},
147+
})
148+
149+
// Wait for jobs to be processed
150+
time.Sleep(500 * time.Millisecond)
151+
152+
// Verify failed jobs exist
153+
failedJobs, err := clearQueue.GetFailedJobs()
154+
require.Nil(t, err)
155+
require.Equal(t, 5, len(failedJobs))
156+
157+
// Clear all failed jobs
158+
err = clearQueue.ClearFailedJobs()
159+
require.Nil(t, err)
160+
161+
// Verify all failed jobs are cleared
162+
failedJobs, err = clearQueue.GetFailedJobs()
163+
require.Nil(t, err)
164+
require.Equal(t, 0, len(failedJobs))
165+
166+
// Clearing again should not cause an error
167+
err = clearQueue.ClearFailedJobs()
168+
require.Nil(t, err)
169+
}
170+
171+
func Test_GetFailedJobs_RedisError(t *testing.T) {
172+
// Create a queue with invalid Redis connection
173+
invalidQueue := queue.New("redis_error_test", &queue.Options{
174+
Connect: &redis.Options{
175+
Addr: "localhost:9999", // Invalid port
176+
Password: "",
177+
DB: 0,
178+
},
179+
Workers: 3,
180+
RetryFailures: 0,
181+
})
182+
183+
// Attempt to get failed jobs should return an error
184+
// This tests that SCAN errors are propagated
185+
_, err := invalidQueue.GetFailedJobs()
186+
require.NotNil(t, err)
187+
require.Contains(t, err.Error(), "failed to scan Redis keys")
188+
}
189+
190+
func Test_ClearFailedJobs_RedisError(t *testing.T) {
191+
// Create a queue with invalid Redis connection
192+
invalidQueue := queue.New("redis_clear_error_test", &queue.Options{
193+
Connect: &redis.Options{
194+
Addr: "localhost:9999", // Invalid port
195+
Password: "",
196+
DB: 0,
197+
},
198+
Workers: 3,
199+
RetryFailures: 0,
200+
})
201+
202+
// Attempt to clear failed jobs should return an error
203+
// This tests that SCAN errors are propagated
204+
err := invalidQueue.ClearFailedJobs()
205+
require.NotNil(t, err)
206+
require.Contains(t, err.Error(), "failed to scan Redis keys")
207+
}

queue.go

Lines changed: 92 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ type RateLimiter struct {
3939
Max int
4040
Duration time.Duration
4141
}
42-
4342
type Options struct {
4443
Connect *redis.Options
4544
Workers int
@@ -476,3 +475,95 @@ func (q *Queue) log(logType LoggerType, format string, v ...any) {
476475
func (q *Queue) getKey() string {
477476
return q.cachedKey
478477
}
478+
479+
// scanFailedJobKeys scans Redis for all keys matching the failed job pattern.
480+
// Returns a slice of all matching keys or an error if the scan fails.
481+
func (q *Queue) scanFailedJobKeys() ([]string, error) {
482+
pattern := q.cachedKey + ":*"
483+
var allKeys []string
484+
485+
var cursor uint64
486+
for {
487+
keys, nextCursor, err := q.client.Scan(q.ctx, cursor, pattern, 100).Result()
488+
if err != nil {
489+
return nil, fmt.Errorf("failed to scan Redis keys: %w", err)
490+
}
491+
492+
allKeys = append(allKeys, keys...)
493+
494+
cursor = nextCursor
495+
if cursor == 0 {
496+
break
497+
}
498+
}
499+
500+
return allKeys, nil
501+
}
502+
503+
// GetFailedJobs retrieves all failed jobs stored in Redis for this queue.
504+
// It returns a slice of Job with Id, FailedReason, and Status populated.
505+
// Other fields (Data, Priority, etc.) are not available as only the failure
506+
// reason is stored in Redis. Returns an error if the Redis operation fails.
507+
func (q *Queue) GetFailedJobs() ([]Job, error) {
508+
keys, err := q.scanFailedJobKeys()
509+
if err != nil {
510+
return nil, err
511+
}
512+
513+
var failedJobs []Job
514+
for _, key := range keys {
515+
reason, err := q.client.Get(q.ctx, key).Result()
516+
if err != nil {
517+
// Only skip missing keys; propagate all other errors (network, timeout, etc.)
518+
if err == redis.Nil {
519+
continue
520+
}
521+
return nil, fmt.Errorf("failed to retrieve job data: %w", err)
522+
}
523+
524+
// Extract job ID from key (format: {prefix}{queueName}:{jobId})
525+
jobId := strings.TrimPrefix(key, q.cachedKey+":")
526+
failedJobs = append(failedJobs, Job{
527+
Id: jobId,
528+
FailedReason: reason,
529+
Status: FailedStatus,
530+
queue: q,
531+
})
532+
}
533+
534+
return failedJobs, nil
535+
}
536+
537+
// GetFailedJob retrieves the failure reason for a specific job by its ID.
538+
// Returns the failure reason string or an error if the job is not found
539+
// or if the Redis operation fails.
540+
func (q *Queue) GetFailedJob(jobId string) (string, error) {
541+
key := q.cachedKey + ":" + jobId
542+
reason, err := q.client.Get(q.ctx, key).Result()
543+
if err != nil {
544+
if err == redis.Nil {
545+
return "", fmt.Errorf("failed job with ID '%s' not found", jobId)
546+
}
547+
return "", fmt.Errorf("failed to retrieve job: %w", err)
548+
}
549+
return reason, nil
550+
}
551+
552+
// ClearFailedJobs removes all failed job records from Redis for this queue.
553+
// Returns an error if the Redis operation fails.
554+
func (q *Queue) ClearFailedJobs() error {
555+
keysToDelete, err := q.scanFailedJobKeys()
556+
if err != nil {
557+
return err
558+
}
559+
560+
if len(keysToDelete) > 0 {
561+
_, err := q.client.Del(q.ctx, keysToDelete...).Result()
562+
if err != nil {
563+
return fmt.Errorf("failed to delete keys: %w", err)
564+
}
565+
q.formatLog(LoggerInfo, "Cleared %d failed job(s)", len(keysToDelete))
566+
}
567+
568+
return nil
569+
}

0 commit comments

Comments
 (0)