Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions lib/atq.bpf.c
Original file line number Diff line number Diff line change
Expand Up @@ -184,3 +184,44 @@ int scx_atq_nr_queued(scx_atq_t *atq)
{
return atq->size;
}

/*
* Cancel ATQ membership for the task. Find any ATQs it is
* in and pop it out.
*/
__weak
int scx_atq_cancel(scx_task_common __arg_arena *taskc)
{
scx_atq_t *atq;
int ret;

/*
* Copy the ATQ pointer over to the stack and use it to avoid
* a racing scx_atq_pop() from overwriting it. Check the
* pointer is valid, as expected by the caller.
*/
atq = taskc->atq;
if (!atq)
return 0;

if ((ret = scx_atq_lock(atq))) {
bpf_printk("Failed to lock ATQ for task");
return ret;
}

/* We lost the race, assume whoever popped the task will handle it. */
if (taskc->atq != atq) {
scx_atq_unlock(atq);
return 0;
}

/* Protected from races by the lock. */
if ((ret = scx_atq_remove_unlocked(taskc->atq, taskc))) {
/* There is an unavoidable race with scx_atq_pop. */
bpf_printk("Failed to remove node from task");
}

scx_atq_unlock(atq);
return ret;
}

33 changes: 1 addition & 32 deletions lib/cgroup_bw.bpf.c
Original file line number Diff line number Diff line change
Expand Up @@ -1626,38 +1626,7 @@ bool cbw_transit_replenish_stat(int from, int to)
__hidden
int scx_cgroup_bw_cancel(u64 ctx)
{
scx_task_common *taskc = (scx_task_common *)ctx;
scx_atq_t *atq;
int ret;

/*
* Copy the ATQ pointer over to the stack and use it to avoid
* a racing scx_atq_pop() from overwriting it. Check the
* pointer is valid, as expected by the caller.
*/
atq = taskc->atq;
if (!atq)
return 0;

if ((ret = scx_atq_lock(atq))) {
cbw_err("Failed to lock ATQ for task.");
return ret;
}

/* We lost the race, assume whoever popped the task will handle it. */
if (taskc->atq != atq) {
scx_atq_unlock(atq);
return 0;
}

/* Protected from races by the lock. */
if ((ret = scx_atq_remove_unlocked(taskc->atq, taskc))) {
/* There is an unavoidable race with scx_atq_pop. */
cbw_dbg("Failed to remove node from task");
}

scx_atq_unlock(atq);
return ret;
return scx_atq_cancel((scx_task_common *)ctx);
}

/*
Expand Down
1 change: 1 addition & 0 deletions scheds/include/lib/atq.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ int scx_atq_remove_unlocked(scx_atq_t *atq, scx_task_common __arg_arena *taskc);
int scx_atq_nr_queued(scx_atq_t *atq);
u64 scx_atq_pop(scx_atq_t *atq);
u64 scx_atq_peek(scx_atq_t *atq);
int scx_atq_cancel(scx_task_common *taskc);

static __always_inline
int scx_atq_lock(scx_atq_t __arg_arena *atq)
Expand Down
2 changes: 1 addition & 1 deletion scheds/rust/scx_lavd/src/bpf/idle.bpf.c
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ static s32 find_cpu_in(const struct cpumask *src_mask, struct cpu_ctx *cpuc_cur)
const struct cpumask *online_mask;
struct bpf_cpumask *online_src_mask;
s32 cpu;
int i;
unsigned int i;

/*
* online_src_mask = src_mask ∩ online_mask
Expand Down
97 changes: 43 additions & 54 deletions scheds/rust/scx_p2dq/src/bpf/main.bpf.c
Original file line number Diff line number Diff line change
Expand Up @@ -1273,6 +1273,7 @@ static void async_p2dq_enqueue(struct enqueue_promise *ret,

static void complete_p2dq_enqueue(struct enqueue_promise *pro, struct task_struct *p)
{
task_ctx *taskc;
int ret;

switch (pro->kind) {
Expand All @@ -1293,39 +1294,33 @@ static void complete_p2dq_enqueue(struct enqueue_promise *pro, struct task_struc
break;
case P2DQ_ENQUEUE_PROMISE_ATQ_FIFO:
if (!pro->fifo.atq) {
scx_bpf_error("invalid ATQ");
scx_bpf_error("promise has no fifo ATQ");
break;
}
ret = scx_atq_insert(pro->fifo.atq, (u64)p->pid);
if (!ret) {
// The ATQ was full, fallback to the DSQ.
scx_bpf_dsq_insert(p,
pro->vtime.dsq_id,
pro->vtime.slice_ns,
pro->vtime.enq_flags);
stat_inc(P2DQ_STAT_ATQ_REENQ);
} else {
stat_inc(P2DQ_STAT_ATQ_ENQ);

taskc = lookup_task_ctx(p);
ret = scx_atq_insert(pro->fifo.atq, &taskc->common);
if (ret) {
scx_bpf_error("error %d on scx_atq_insert", ret);
break;
}

stat_inc(P2DQ_STAT_ATQ_ENQ);
break;
case P2DQ_ENQUEUE_PROMISE_ATQ_VTIME:

if (!pro->vtime.atq) {
scx_bpf_error("invalid ATQ");
scx_bpf_error("promise has no vtime ATQ");
break;
}

taskc = lookup_task_ctx(p);
ret = scx_atq_insert_vtime(pro->vtime.atq,
(u64)p->pid,
&taskc->common,
pro->vtime.vtime);
if (!ret) {
// The ATQ was full, fallback to the DSQ.
scx_bpf_dsq_insert_vtime(p,
pro->vtime.dsq_id,
pro->vtime.slice_ns,
pro->vtime.vtime,
pro->vtime.enq_flags);
stat_inc(P2DQ_STAT_ATQ_REENQ);
} else {
stat_inc(P2DQ_STAT_ATQ_ENQ);
if (ret) {
scx_bpf_error("error %d on scx_atq_insert", ret);
break;
}
break;
case P2DQ_ENQUEUE_PROMISE_FAILED:
Expand Down Expand Up @@ -1532,22 +1527,16 @@ static bool consume_llc(struct llc_ctx *llcx)
{
struct task_struct *p;
task_ctx *taskc;
u64 pid;

if (!llcx)
return false;

if (p2dq_config.atq_enabled &&
scx_atq_nr_queued(llcx->mig_atq) > 0) {
pid = scx_atq_pop(llcx->mig_atq);
p = bpf_task_from_pid((s32)pid);
taskc = (task_ctx *)scx_atq_pop(llcx->mig_atq);
p = bpf_task_from_pid((s32)taskc->pid);
if (!p) {
trace("ATQ failed to get pid %llu", pid);
return false;
}

if (!(taskc = lookup_task_ctx(p))) {
bpf_task_release(p);
trace("ATQ failed to get pid %llu", taskc->pid);
return false;
}

Expand Down Expand Up @@ -1677,7 +1666,7 @@ static void p2dq_dispatch_impl(s32 cpu, struct task_struct *prev)
task_ctx *taskc;
struct cpu_ctx *cpuc;
struct llc_ctx *llcx;
u64 pid, peeked_pid, dsq_id = 0;
u64 peeked_pid, dsq_id = 0;
scx_atq_t *min_atq = NULL;

cpuc = lookup_cpu_ctx(cpu);
Expand Down Expand Up @@ -1742,8 +1731,8 @@ static void p2dq_dispatch_impl(s32 cpu, struct task_struct *prev)
// Migration eligible vtime
if (topo_config.nr_llcs > 1) {
if (p2dq_config.atq_enabled) {
pid = scx_atq_peek(cpuc->mig_atq);
if ((p = bpf_task_from_pid((s32)pid))) {
taskc = (task_ctx *)scx_atq_peek(cpuc->mig_atq);
if ((p = bpf_task_from_pid((s32)taskc->pid))) {
if (likely(bpf_cpumask_test_cpu(cpu, p->cpus_ptr)) &&
(p->scx.dsq_vtime < min_vtime || min_vtime == 0)) {
min_vtime = p->scx.dsq_vtime;
Expand Down Expand Up @@ -1777,18 +1766,8 @@ static void p2dq_dispatch_impl(s32 cpu, struct task_struct *prev)
// First try the DSQ with the lowest vtime for fairness.
if (unlikely(min_atq)) {
trace("ATQ dispatching %llu with min vtime %llu", min_atq, min_vtime);
pid = scx_atq_pop(min_atq);
if (likely((p = bpf_task_from_pid((s32)pid)))) {
/*
* Need to ensure the peeked_pid is the pid popped off
* the ATQ. Otherwise there may be priority inversions.
* This probably needs to be done for the DSQs as well.
*/
if (unlikely(!(taskc = lookup_task_ctx(p)))) {
bpf_task_release(p);
scx_bpf_error("failed to get task ctx");
return;
}
taskc = (task_ctx *)scx_atq_pop(min_atq);
if (likely((p = bpf_task_from_pid((s32)taskc->pid)))) {
if (p->pid == peeked_pid) {
scx_bpf_dsq_insert(p,
SCX_DSQ_LOCAL,
Expand Down Expand Up @@ -1847,13 +1826,8 @@ static void p2dq_dispatch_impl(s32 cpu, struct task_struct *prev)
}

if (unlikely(p2dq_config.atq_enabled)) {
pid = scx_atq_pop(cpuc->mig_atq);
if (likely((p = bpf_task_from_pid((s32)pid)))) {
if (unlikely(!(taskc = lookup_task_ctx(p)))) {
bpf_task_release(p);
scx_bpf_error("failed to get task ctx");
return;
}
taskc = (task_ctx *)scx_atq_pop(cpuc->mig_atq);
if (likely((p = bpf_task_from_pid((s32)taskc->pid)))) {
scx_bpf_dsq_insert(p,
SCX_DSQ_LOCAL,
taskc->slice_ns,
Expand Down Expand Up @@ -2074,6 +2048,8 @@ static s32 p2dq_init_task_impl(struct task_struct *p, struct scx_init_task_args
else
taskc->dsq_id = cpuc->llc_dsq;

taskc->pid = p->pid;

return 0;
}

Expand Down Expand Up @@ -2568,6 +2544,18 @@ void BPF_STRUCT_OPS(p2dq_enqueue, struct task_struct *p __arg_trusted, u64 enq_f
complete_p2dq_enqueue(&pro, p);
}

void BPF_STRUCT_OPS(p2dq_dequeue, struct task_struct *p __arg_trusted, u64 deq_flags)
{
task_ctx *taskc = lookup_task_ctx(p);
int ret;

ret = scx_atq_cancel(&taskc->common);
if (ret)
scx_bpf_error("scx_atq_cancel returned %d", ret);

return;
}

void BPF_STRUCT_OPS(p2dq_dispatch, s32 cpu, struct task_struct *prev)
{
return p2dq_dispatch_impl(cpu, prev);
Expand All @@ -2588,6 +2576,7 @@ SCX_OPS_DEFINE(p2dq,
.select_cpu = (void *)p2dq_select_cpu,
.cpu_release = (void *)p2dq_cpu_release,
.enqueue = (void *)p2dq_enqueue,
.dequeue = (void *)p2dq_dequeue,
.dispatch = (void *)p2dq_dispatch,
.running = (void *)p2dq_running,
.stopping = (void *)p2dq_stopping,
Expand Down
6 changes: 6 additions & 0 deletions scheds/rust/scx_p2dq/src/bpf/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,12 @@ struct node_ctx {
#define task_ctx_test_flag(taskc, flag) ((taskc)->flags & (flag))

struct task_p2dq {
/*
* Do NOT change the position of common. It should be at the beginning
* of the task_ctx.
*/
struct scx_task_common common;
s32 pid;
u64 dsq_id;
u64 slice_ns;
int dsq_index;
Expand Down
Loading