Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 52 additions & 35 deletions heft.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,68 +38,85 @@ def calc_heft(dag: TaskDAG, network: NetworkGraph) -> dict:

def calc_hepft(dag: TaskDAG, network: NetworkGraph, dynamic_net: DynamicNetwork) -> tuple[dict, dict, dict]:

# task prioritization
ranks = dag.compute_ranks(network)
# task prioritization
ranks = dag.compute_ranks(network, dynamic_network=dynamic_net)
sorted_tasks = sorted(ranks.keys(), key=lambda t: ranks[t], reverse=True)

# processor selection
proc_available_time = {proc_id: 0 for proc_id in network.processors}
schedule = {} # task_id: processor, start_time, finish_time
# processor selection
schedule = {}
proc_available = {proc_id: 0.0 for proc_id in network.processors}

for task_id in sorted_tasks:
task = dag.nodes[task_id]
best_proc, best_est, best_eft = None, 0.0, float('inf')
best_proc, best_est, best_eft = None, None, float('inf')

# Always consider ALL processors
for proc_id in network.processors:
base_est = proc_available_time.get(proc_id, 0.0)
est = base_est

# ── compute EST ───────────────────────────────────────────────
ready_time = 0.0
for parent_id in task.parents:
parent_proc, _, parent_finish = schedule[parent_id]

transfer_time = max(parent_finish, est)
future_network = dynamic_net.pred_net_func(transfer_time)

parent_proc, _, parent_eft = schedule[parent_id]
data_size = dag.edges[(parent_id, task_id)]
comm = future_network.comm_cost(
parent_proc,
proc_id,
data_size,
pred_net = dynamic_net.pred_net_func(parent_eft)
comm = pred_net.comm_cost(
parent_proc, proc_id, data_size,
fallback_bandwidth=network.bandwidth
)
ready_time = max(ready_time, parent_eft + comm)

est = max(est, parent_finish + comm)

# Check processor availability at actual start time
net_at_start = dynamic_net.pred_net_func(est)
if not net_at_start.has_processor(proc_id):
continue

est = max(ready_time, proc_available[proc_id])
eft = est + task.comp_costs[proc_id]

# ── full window check [est, eft] ───────────────────────────────
# with perfect knowledge we know every future snapshot, so scan
# every snapshot that falls inside this task's execution window.
# if the processor is absent from ANY of them, it will fail
# mid-execution — skip it for this task.
proc_goes_down = False

# always check the boundaries first
for boundary in [est, eft]:
snapshot = dynamic_net.pred_net_func(boundary)
if not snapshot.has_processor(proc_id):
proc_goes_down = True
break

# then check every snapshot whose timestamp falls inside (est, eft)
if not proc_goes_down:
for t, snapshot in dynamic_net.snapshots:
if t <= est:
continue # before execution window
if t >= eft:
break # past execution window — snapshots are sorted
if not snapshot.has_processor(proc_id):
proc_goes_down = True
break # one failure inside the window is enough

if proc_goes_down:
continue # skip for this task only — reconsidered for next task

# ── update best ───────────────────────────────────────────────
if eft < best_eft:
best_eft = eft
best_est = est
best_proc = proc_id
best_eft, best_est, best_proc = eft, est, proc_id

# Fallback (unchanged, but now rarely triggered)
# ── fallback: all processors fail during this window ──────────────
if best_proc is None:
for proc_id in network.processors:
est = proc_available_time.get(proc_id, 0.0)
ready_time = 0.0
for parent_id in task.parents:
parent_proc, _, parent_finish = schedule[parent_id]
parent_proc, _, parent_eft = schedule[parent_id]
data_size = dag.edges[(parent_id, task_id)]
comm = network.comm_cost(parent_proc, proc_id, data_size)
est = max(est, parent_finish + comm)
comm = network.comm_cost(parent_proc, proc_id, data_size)
ready_time = max(ready_time, parent_eft + comm)

est = max(ready_time, proc_available[proc_id])
eft = est + task.comp_costs[proc_id]

if eft < best_eft:
best_eft, best_est, best_proc = eft, est, proc_id

schedule[task_id] = (best_proc, best_est, best_eft)
proc_available_time[best_proc] = best_eft
schedule[task_id] = (best_proc, best_est, best_eft)
proc_available[best_proc] = best_eft

return schedule

Expand Down