Skip to content

Commit 2c5a5e8

Browse files
authored
Merge pull request #16 from pvelx/add_context_to_net_operation
heap optimisation added context to net operation refactoring: merge packages fix race conditions
2 parents d66044a + d59afb7 commit 2c5a5e8

41 files changed

Lines changed: 726 additions & 334 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,4 @@
1313

1414
# Dependency directories (remove the comment below to include it)
1515
# vendor/
16-
.idea/
16+
.idea/

.pre-commit-config.yaml

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
repos:
2+
- repo: https://github.com/pre-commit/pre-commit-hooks
3+
rev: v2.3.0
4+
hooks:
5+
- id: check-yaml
6+
- id: end-of-file-fixer
7+
- id: trailing-whitespace
8+
- repo: git://github.com/dnephin/pre-commit-golang
9+
rev: master
10+
hooks:
11+
- id: golangci-lint
12+
# - id: go-fmt
13+
# - id: go-vet
14+
# - id: go-lint
15+
# - id: go-imports
16+
# - id: go-cyclo
17+
# args: [-over=15]
18+
# - id: validate-toml
19+
# - id: no-go-testing
20+
# - id: gometalinter
21+
# - id: go-critic
22+
# - id: go-unit-tests
23+
# - id: go-build
24+
# - id: go-mod-tidy

.travis.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,4 @@ script:
1212
- make test
1313

1414
notifications:
15-
email: false
15+
email: false

Makefile

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,7 @@ sending_and_confirmation:
77
test:
88
GOMAXPROCS=4 go test ./ -v
99
GOMAXPROCS=4 go test ./repository -v
10-
go test ./sender_service ./task_manager ./error_service \
11-
./prioritized_task_list ./preloader_service ./monitoring_service ./waiting_service -v
10+
go test ./sender_service ./task_manager ./error_service ./preloader_service ./monitoring_service ./waiting_service -v
11+
12+
pre-commit:
13+
pre-commit run --all-files

README.md

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
## Trigger Hook - delayed launch of tasks
22

33
[![Build Status](https://travis-ci.com/pvelx/triggerhook.svg?branch=master)](https://travis-ci.com/pvelx/triggerhook)
4-
[![GitHub release](https://img.shields.io/github/release/pvelx/triggerhook.svg?include_prereleases)](https://github.com/pvelx/triggerhook/releases/latest)
4+
[![GitHub release](https://img.shields.io/github/release/pvelx/triggerhook.svg?include_prereleases)](https://github.com/pvelx/triggerhook/releases/latest)
55

6-
Often in projects, there is a need to perform deferred tasks,
7-
such as sending email, push, and other tasks specific to the domain area of your application.
8-
Difficulties begin when the usual crontab is no longer enough,
6+
Often in projects, there is a need to perform deferred tasks,
7+
such as sending email, push, and other tasks specific to the domain area of your application.
8+
Difficulties begin when the usual crontab is no longer enough,
99
when batch processing is not suitable, when each task unit has its own execution time or it is assigned dynamically.
1010
To solve this problem, a Trigger Hook was created. You can build a task scheduler based on this library.
1111

@@ -25,11 +25,11 @@ Task | Description
2525

2626
Life cycle tasks:
2727
- When creating a task, it gets into the database (square block) (red and yellow).
28-
- Tasks are loaded into memory (triangular block) if their start time is coming soon (red->yellow).
28+
- Tasks are loaded into memory (triangular block) if their start time is coming soon (red->yellow).
2929
This structure is implemented in the form of a prioritized queue (heap).
30-
- When the task execution time comes, it is sent for execution (yellow->green).
30+
- When the task execution time comes, it is sent for execution (yellow->green).
3131
An intermediate buffer is used before processing to compensate for peak loads.
32-
- If the task is successfully submitted, it is deleted from the database (green->blue).
32+
- If the task is successfully submitted, it is deleted from the database (green->blue).
3333
An intermediate buffer is used before deletion, also to compensate for peak loads.
3434

3535

@@ -39,7 +39,7 @@ Metric|Description
3939
---|---
4040
All|Total number of tasks
4141
Creating rate | Number of created tasks (via the Create method) per unit of time.
42-
Deleting rate | Number of deleted tasks (via the Delete method) per unit of time.
42+
Deleting rate | Number of deleted tasks (via the Delete method) per unit of time.
4343
Sending rate | The number of processed tasks (via the Consume method) per unit of time.
4444
Preloaded | The number of tasks preloaded into memory.
4545
Preloading rate | The number of tasks loaded per unit of time.
@@ -48,14 +48,14 @@ Waiting for confirmation | The number of tasks waiting for confirmation after se
4848
Confirmation rate | The number of confirmed tasks after sending per unit of time.
4949

5050
### Demo
51-
[Use the demo](https://github.com/pvelx/k8s-message-demo)
51+
[Use the demo](https://github.com/pvelx/k8s-message-demo)
5252
[Read the article](https://vlad-pavlenko.medium.com/deferred-tasks-in-a-microservice-architecture-8e7273089ee7)
5353

5454
### Features
5555
- Simple API.
5656
- Performing tasks with second precision.
5757
- High performance of sending tasks for execution. This is achieved through a simple task storage scheme, indexing, and multithreaded database access.
58-
- High peak performance. Tasks that will be completed soon are loaded into memory in advance. This is especially important, for example, if several hundred thousand tasks are assigned at one time.
58+
- High peak performance. Tasks that will be completed soon are loaded into memory in advance. This is especially important, for example, if several hundred thousand tasks are assigned at one time.
5959
- The system is durable to failures. Only after the task is completed, the task is deleted from the database. This ensures that the task is sent for execution. The sudden stop of the application will not lead to inconsistent data in the database.
6060
- It is designed for a micro-service, event-driven architecture. It is easy to implement a fully asynchronous API.
6161
- The modular structure of the library. You can easily replace any part with your own implementation.
@@ -125,13 +125,13 @@ func main() {
125125
Id: uuid.NewV4().String(),
126126
ExecTime: time.Now().Add(time.Minute).Unix(),
127127
}
128-
if err := tasksDeferredService.Create(&task); err != nil {
128+
if err := tasksDeferredService.CreateCtx(context.Background(), &task); err != nil {
129129
log.Fatalf("error creating task: %v", err)
130130
}
131131

132132
// Delete each tenth task
133133
if i%10 == 0 {
134-
if err := tasksDeferredService.Delete(task.Id); err != nil {
134+
if err := tasksDeferredService.DeleteCtx(context.Background(), task.Id); err != nil {
135135
log.Fatalf("error deleting task: %v", err)
136136
}
137137
}

cmd/benchmark/creating_and_deleting.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package main
22

33
import (
4+
"context"
45
"fmt"
56
"log"
67
"math/rand"
@@ -66,7 +67,7 @@ func deleteTasks(tasks <-chan *domain.Task, triggerHookService contracts.Trigger
6667
defer wg.Done()
6768
for task := range tasks {
6869
preparingBar.Add(1)
69-
if err := triggerHookService.Delete(task.Id); err != nil {
70+
if err := triggerHookService.DeleteCtx(context.Background(), task.Id); err != nil {
7071
log.Fatal(err)
7172
}
7273
}
@@ -100,7 +101,7 @@ func createTasks(
100101
task := &domain.Task{
101102
ExecTime: time.Now().Add(time.Hour + time.Duration(rand.Intn(dispersion))*time.Second).Unix(),
102103
}
103-
if err := triggerHookService.Create(task); err != nil {
104+
if err := triggerHookService.CreateCtx(context.Background(), task); err != nil {
104105
fmt.Println(err)
105106
}
106107

cmd/benchmark/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ func clear() {
2525
Host: mysqlHost,
2626
DbName: mysqlDbName,
2727
})
28-
repository.New(conn, "", nil, nil).Up()
28+
_ = repository.New(conn, "", nil, nil).Up()
2929

3030
if _, err := conn.Exec("DELETE FROM task"); err != nil {
3131
log.Fatal(err)

cmd/benchmark/sending_and_confirmation.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package main
22

33
import (
4+
"context"
45
"fmt"
56
"log"
67
"math/rand"
@@ -49,7 +50,7 @@ func upInitialState(taskCount int) {
4950
Id: util.NewId(),
5051
ExecTime: time.Unix(now, 0).Add(-time.Duration(rand.Intn(dispersion)) * time.Second).Unix(),
5152
}
52-
if err := repositoryService.Create(task, false); err != nil {
53+
if err := repositoryService.Create(context.Background(), task, false); err != nil {
5354
log.Println(err)
5455
}
5556
}

contracts/contracts.go

Lines changed: 53 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,13 @@
11
package contracts
22

33
import (
4+
"context"
45
"errors"
56
"time"
67

78
"github.com/pvelx/triggerhook/domain"
89
)
910

10-
/* --------------------------------------------------
11-
Prioritized task list
12-
*/
13-
14-
type PrioritizedTaskListInterface interface {
15-
/*
16-
Count of tasks in the list
17-
*/
18-
Len() int
19-
20-
/*
21-
Add a task to the list based on priority
22-
*/
23-
Add(task domain.Task)
24-
25-
/*
26-
Take the most prioritized task and delete from list
27-
*/
28-
Take() *domain.Task
29-
30-
/*
31-
Searches for a task and deletes it return true when the task was deleted, false - when was not found
32-
*/
33-
DeleteIfExist(taskId string) bool
34-
}
35-
3611
/* --------------------------------------------------
3712
Sender service
3813
*/
@@ -52,10 +27,10 @@ type TaskToSendInterface interface {
5227
Task manager
5328
*/
5429
type TaskManagerInterface interface {
55-
Create(task *domain.Task, isTaken bool) error
56-
Delete(taskId string) error
57-
GetTasksToComplete(preloadingTimeRange time.Duration) (CollectionsInterface, error)
58-
ConfirmExecution(task []domain.Task) error
30+
Create(ctx context.Context, task *domain.Task, isTaken bool) error
31+
Delete(ctx context.Context, taskId string) error
32+
GetTasksToComplete(ctx context.Context, preloadingTimeRange time.Duration) (CollectionsInterface, error)
33+
ConfirmExecution(ctx context.Context, task []domain.Task) error
5934
}
6035

6136
var (
@@ -73,15 +48,15 @@ var (
7348
Repository
7449
*/
7550
type RepositoryInterface interface {
76-
Create(task domain.Task, isTaken bool) error
77-
Delete(tasks []domain.Task) (int64, error)
78-
FindBySecToExecTime(preloadingTimeRange time.Duration) (CollectionsInterface, error)
51+
Create(ctx context.Context, task domain.Task, isTaken bool) error
52+
Delete(ctx context.Context, tasks []domain.Task) (int64, error)
53+
FindBySecToExecTime(ctx context.Context, preloadingTimeRange time.Duration) (CollectionsInterface, error)
7954
Up() error
8055
Count() (int, error)
8156
}
8257

8358
type CollectionsInterface interface {
84-
Next() (tasks []domain.Task, err error)
59+
Next(ctx context.Context) (tasks []domain.Task, err error)
8560
}
8661

8762
var (
@@ -102,7 +77,7 @@ var (
10277
Preloading task service
10378
*/
10479
type PreloadingServiceInterface interface {
105-
AddNewTask(task *domain.Task) error
80+
AddNewTask(ctx context.Context, task *domain.Task) error
10681
GetPreloadedChan() <-chan domain.Task
10782
Run()
10883
}
@@ -111,7 +86,7 @@ type PreloadingServiceInterface interface {
11186
Waiting task service
11287
*/
11388
type WaitingServiceInterface interface {
114-
CancelIfExist(taskId string) error
89+
CancelIfExist(ctx context.Context, taskId string) error
11590
GetReadyToSendChan() chan domain.Task
11691
Run()
11792
}
@@ -224,22 +199,56 @@ type MonitoringInterface interface {
224199
*/
225200

226201
var (
202+
/*
203+
Number of tasks waiting for confirmation after sending
204+
*/
227205
WaitingForConfirmation Topic = "waiting_for_confirmation"
228-
ConfirmationRate Topic = "confirmation_rate"
229-
Preloaded Topic = "preloaded"
230-
PreloadingRate Topic = "preloading_rate"
231-
WaitingForSending Topic = "waiting_for_sending"
232-
CreatingRate Topic = "creating_rate"
233-
DeletingRate Topic = "deleting_rate"
234-
SendingRate Topic = "sending_rate"
235-
All Topic = "all"
206+
207+
/*
208+
The rate of confirmation of the sending task
209+
*/
210+
ConfirmationRate Topic = "confirmation_rate"
211+
212+
/*
213+
Number of preloaded tasks
214+
*/
215+
Preloaded Topic = "preloaded"
216+
217+
/*
218+
Speed of preloading
219+
*/
220+
PreloadingRate Topic = "preloading_rate"
221+
222+
/*
223+
Deprecated
224+
Number of tasks waiting for sending
225+
*/
226+
WaitingForSending Topic = "waiting_for_sending"
227+
228+
CreatingRate Topic = "creating_rate"
229+
230+
DeletingRate Topic = "deleting_rate"
231+
232+
SendingRate Topic = "sending_rate"
233+
234+
/*
235+
Number of all tasks
236+
*/
237+
All Topic = "all"
236238
)
237239

238240
type TriggerHookInterface interface {
241+
242+
// Deprecated
239243
Create(task *domain.Task) error
240244

245+
// Deprecated
241246
Delete(taskId string) error
242247

248+
CreateCtx(ctx context.Context, task *domain.Task) error
249+
250+
DeleteCtx(ctx context.Context, taskId string) error
251+
243252
Consume() TaskToSendInterface
244253

245254
/*

0 commit comments

Comments
 (0)