Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/scheduler/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import (

// Define events for ResourceBinding, ResourceFilter objects and their associated resources.
const (
// EventReasonInsufficientGPU indicates that insufficient GPU resources are available.
EventReasonInsufficientGPU = "InsufficientGPU"
// EventReasonFilteringFailed indicates that filtering failed.
EventReasonFilteringFailed = "FilteringFailed"
// EventReasonFilteringSucceed indicates that filtering succeed.
Expand Down
11 changes: 11 additions & 0 deletions pkg/scheduler/pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,17 @@ func (m *podManager) ListPodsInfo() []*podInfo {
return pods
}

func (m *podManager) PodInfoToPodObj(pod *podInfo) *corev1.Pod {
return &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: pod.Namespace,
Name: pod.Name,
UID: pod.UID,
Annotations: map[string]string{util.AssignedNodeAnnotations: pod.NodeID},
},
}
}

func (m *podManager) GetScheduledPods() (map[k8stypes.UID]*podInfo, error) {
m.mutex.RLock()
defer m.mutex.RUnlock()
Expand Down
22 changes: 6 additions & 16 deletions pkg/scheduler/routes/gpu_manage.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,10 +294,8 @@ func AssignGPUToApp(s *scheduler.Scheduler) httprouter.Handle {
for _, cdev := range cdevs {
if cdev.UUID == uuid {
klog.Infof("Forcing out pod %s/%s of exclusive GPU %s in favor of %s", pod.Namespace, pod.Name, uuid, req.AppName)
err = ctrlclient.IgnoreNotFound(client.GetClient().CoreV1().Pods(pod.Namespace).Delete(r.Context(), pod.Name, metav1.DeleteOptions{}))
err = s.DeletePodFromCluster(r.Context(), s.PodInfoToPodObj(pod))
if err != nil {
err = fmt.Errorf("failed to delete existing pod occupying GPU %s/%s: %v", pod.Namespace, pod.Name, err)
klog.Errorln(err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
Expand Down Expand Up @@ -362,10 +360,8 @@ func AssignGPUToApp(s *scheduler.Scheduler) httprouter.Handle {
}

// delete existing pods for this app
err = ctrlclient.IgnoreNotFound(util.DeletePodsBelongToApp(r.Context(), req.AppName))
err = s.DeletePodsBelongToApp(r.Context(), req.AppName)
if err != nil {
err = fmt.Errorf("failed to delete existing pods of app %s: %v", req.AppName, err)
klog.Errorln(err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
Expand Down Expand Up @@ -464,10 +460,8 @@ func SwitchGPUMode(s *scheduler.Scheduler) httprouter.Handle {
for _, cdev := range cdevs {
if cdev.UUID == uuid {
klog.Infof("Deleting pod %s/%s for mode switch of GPU %s", pod.Namespace, pod.Name, uuid)
err = ctrlclient.IgnoreNotFound(client.GetClient().CoreV1().Pods(pod.Namespace).Delete(r.Context(), pod.Name, metav1.DeleteOptions{}))
err = s.DeletePodFromCluster(r.Context(), s.PodInfoToPodObj(pod))
if err != nil {
err = fmt.Errorf("failed to delete existing pod occupying GPU %s/%s: %v", pod.Namespace, pod.Name, err)
klog.Errorln(err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
Expand Down Expand Up @@ -555,7 +549,7 @@ func UnassignGPUFromApp(s *scheduler.Scheduler) httprouter.Handle {
return
}

if err := ctrlclient.IgnoreNotFound(util.DeletePodsBelongToApp(r.Context(), req.AppName)); err != nil {
if err := ctrlclient.IgnoreNotFound(s.DeletePodsBelongToApp(r.Context(), req.AppName)); err != nil {
klog.Errorln(fmt.Errorf("failed to delete pods of app %s: %v", req.AppName, err))
http.Error(w, err.Error(), http.StatusInternalServerError)
return
Expand Down Expand Up @@ -812,9 +806,7 @@ func BulkManageAssignments(s *scheduler.Scheduler) httprouter.Handle {
for _, cdev := range cdevs {
if _, needEvict := evictUUIDs[cdev.UUID]; needEvict {
klog.Infof("Evicting pod %s/%s occupying exclusive GPU %s", pod.Namespace, pod.Name, cdev.UUID)
if err := ctrlclient.IgnoreNotFound(client.GetClient().CoreV1().Pods(pod.Namespace).Delete(r.Context(), pod.Name, metav1.DeleteOptions{})); err != nil {
err = fmt.Errorf("failed to delete existing pod occupying GPU %s/%s: %v", pod.Namespace, pod.Name, err)
klog.Errorln(err)
if err := s.DeletePodFromCluster(r.Context(), s.PodInfoToPodObj(pod)); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
Expand All @@ -834,9 +826,7 @@ func BulkManageAssignments(s *scheduler.Scheduler) httprouter.Handle {
}

// 2) Restart this app's pods due to binding changes
if err := ctrlclient.IgnoreNotFound(util.DeletePodsBelongToApp(r.Context(), req.AppName)); err != nil {
err = fmt.Errorf("failed to delete existing pods of app %s: %v", req.AppName, err)
klog.Errorln(err)
if err := s.DeletePodsBelongToApp(r.Context(), req.AppName); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
Expand Down
62 changes: 45 additions & 17 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (s *Scheduler) onAddPod(obj any) {
if !ok {
return
}
if k8sutil.IsPodInTerminatedState(pod) {
if k8sutil.IsPodInTerminatedState(pod) || pod.DeletionTimestamp != nil {
s.delPod(pod)
return
}
Expand Down Expand Up @@ -139,20 +139,48 @@ func (s *Scheduler) onDelPod(obj any) {
return
}
p := pod.DeepCopy()
go func(nodeName string, p *corev1.Pod) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
node, err := s.kubeClient.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
if err != nil {
klog.Error("Skip releasing node lock: failed to get node", "node", nodeName, "pod", klog.KObj(p), "err", err)
return
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
node, err := s.kubeClient.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
if err != nil {
klog.Error("Skip releasing node lock: failed to get node", "node", nodeName, "pod", klog.KObj(p), "err", err)
return
}
for _, dev := range device.GetDevices() {
if err := dev.ReleaseNodeLock(node, p); err != nil {
klog.Error("ReleaseNodeLock returned error", "node", nodeName, "pod", klog.KObj(p), "err", err)
}
for _, dev := range device.GetDevices() {
if err := dev.ReleaseNodeLock(node, p); err != nil {
klog.Error("ReleaseNodeLock returned error", "node", nodeName, "pod", klog.KObj(p), "err", err)
}
}
}

func (s *Scheduler) DeletePodFromCluster(ctx context.Context, pod *corev1.Pod) error {
if pod == nil {
return nil
}
err := ctrlclient.IgnoreNotFound(s.kubeClient.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{}))
if err != nil {
err = fmt.Errorf("failed to delete pod %s: %v", pod.Name, err)
klog.Errorln(err)
return err
}
s.onDelPod(pod)
return nil
}

func (s *Scheduler) DeletePodsBelongToApp(ctx context.Context, appName string) error {
pods, err := s.kubeClient.CoreV1().Pods(metav1.NamespaceAll).List(ctx, metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", util.AppNameLabelKey, appName)})
if err != nil {
err = fmt.Errorf("failed to list pods belonging to app %s: %v", appName, err)
klog.Errorln(err)
return err
}
for _, pod := range pods.Items {
err := s.DeletePodFromCluster(ctx, &pod)
if err != nil {
return err
}
}(nodeName, p)
}
return nil
}

func (s *Scheduler) Start() {
Expand Down Expand Up @@ -1041,7 +1069,7 @@ func (s *Scheduler) Filter(args extenderv1.ExtenderArgs) (*extenderv1.ExtenderFi
for _, b := range matchedBindings {
if _, occupied := consumedByApp[b.Spec.UUID]; occupied {
err := fmt.Errorf("bound GPU %s of app %s is already consumed by another pod", b.Spec.UUID, appName)
s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, "", err)
s.recordScheduleFilterResultEvent(args.Pod, EventReasonInsufficientGPU, "", err)
return &extenderv1.ExtenderFilterResult{
FailedNodes: map[string]string{},
}, nil
Expand Down Expand Up @@ -1103,7 +1131,7 @@ func (s *Scheduler) Filter(args extenderv1.ExtenderArgs) (*extenderv1.ExtenderFi
}
if nvidiaSummary.requested > 0 && len(selectedUUIDs) < nvidiaSummary.requested {
err := fmt.Errorf("insufficient GPU candidates for app %s, requested=%d, available=%d", appName, nvidiaSummary.requested, len(selectedUUIDs))
s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, "", err)
s.recordScheduleFilterResultEvent(args.Pod, EventReasonInsufficientGPU, "", err)
return &extenderv1.ExtenderFilterResult{
FailedNodes: map[string]string{},
}, nil
Expand Down Expand Up @@ -1132,7 +1160,7 @@ func (s *Scheduler) Filter(args extenderv1.ExtenderArgs) (*extenderv1.ExtenderFi
if len((*nodeScores).NodeList) == 0 {
klog.V(4).InfoS("No available nodes meet the required scores",
"pod", args.Pod.Name)
s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, "", fmt.Errorf("no available node, %d nodes do not meet", len(*args.NodeNames)))
s.recordScheduleFilterResultEvent(args.Pod, EventReasonInsufficientGPU, "", fmt.Errorf("no available GPU resources on all %d nodes", len(*args.NodeNames)))
return &extenderv1.ExtenderFilterResult{
FailedNodes: failedNodes,
}, nil
Expand Down Expand Up @@ -1204,7 +1232,7 @@ func (s *Scheduler) Filter(args extenderv1.ExtenderArgs) (*extenderv1.ExtenderFi
if totalMem > 0 && bindingAllocatedMemory[uuid]+requiredMem > totalMem {
err := fmt.Errorf("insufficient mem-slicing GPU memory for binding on %s: allocated=%d, request=%d, total=%d", uuid, bindingAllocatedMemory[uuid], requiredMem, totalMem)
klog.ErrorS(err, "Failed to create GPUBinding automatically", "pod", args.Pod.Name, "uuid", uuid)
s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, "", err)
s.recordScheduleFilterResultEvent(args.Pod, EventReasonInsufficientGPU, "", err)
return nil, err
}
memQ := resource.NewQuantity(requiredMem, resource.BinarySI)
Expand Down
18 changes: 0 additions & 18 deletions pkg/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"time"

"github.com/Project-HAMi/HAMi/pkg/api/gpu/v1alpha1"
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"

"github.com/Project-HAMi/HAMi/pkg/util/client"
"github.com/Project-HAMi/HAMi/pkg/util/nodelock"
Expand Down Expand Up @@ -423,23 +422,6 @@ func PatchPodAnnotations(pod *corev1.Pod, annotations map[string]string) error {
return err
}

func DeletePodsBelongToApp(ctx context.Context, appName string) error {
if appName == "" {
return errors.New("appName is empty")
}
pods, err := client.GetClient().CoreV1().Pods(metav1.NamespaceAll).List(ctx, metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", AppNameLabelKey, appName)})
if err != nil {
return fmt.Errorf("failed to list pods belonging to app %s: %v", appName, err)
}
for _, pod := range pods.Items {
err := ctrlclient.IgnoreNotFound(client.GetClient().CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{}))
if err != nil {
return fmt.Errorf("failed to delete pod %s: %v", pod.Name, err)
}
}
return nil
}

func DeleteGPUBinding(ctx context.Context, name string) error {
binding := &v1alpha1.GPUBinding{
ObjectMeta: metav1.ObjectMeta{
Expand Down
Loading