From dc600c636d8d65ec55abfdba961106551ff6d275 Mon Sep 17 00:00:00 2001 From: klprakhar Date: Sun, 26 Apr 2026 18:01:42 +0530 Subject: [PATCH 1/4] refactor: replace unstructured informers with typed informers in workload manager Signed-off-by: klprakhar --- pkg/workloadmanager/informers.go | 18 +++++----- pkg/workloadmanager/k8s_client.go | 40 ++++++++++++++------- pkg/workloadmanager/workload_builder.go | 48 ++++++++----------------- 3 files changed, 51 insertions(+), 55 deletions(-) diff --git a/pkg/workloadmanager/informers.go b/pkg/workloadmanager/informers.go index fd39561f..a96c2c4d 100644 --- a/pkg/workloadmanager/informers.go +++ b/pkg/workloadmanager/informers.go @@ -24,6 +24,8 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/informers" "k8s.io/client-go/tools/cache" + + agentruntimev1alpha1 "github.com/volcano-sh/agentcube/client-go/informers/externalversions/runtime/v1alpha1" ) var ( @@ -50,16 +52,16 @@ var ( ) type Informers struct { - AgentRuntimeInformer cache.SharedIndexInformer - CodeInterpreterInformer cache.SharedIndexInformer + AgentRuntimeInformer agentruntimev1alpha1.AgentRuntimeInformer + CodeInterpreterInformer agentruntimev1alpha1.CodeInterpreterInformer PodInformer cache.SharedIndexInformer informerFactory informers.SharedInformerFactory } func NewInformers(k8sClient *K8sClient) *Informers { return &Informers{ - AgentRuntimeInformer: k8sClient.dynamicInformer.ForResource(AgentRuntimeGVR).Informer(), - CodeInterpreterInformer: k8sClient.dynamicInformer.ForResource(CodeInterpreterGVR).Informer(), + AgentRuntimeInformer: k8sClient.agentcubeInformer.Runtime().V1alpha1().AgentRuntimes(), + CodeInterpreterInformer: k8sClient.agentcubeInformer.Runtime().V1alpha1().CodeInterpreters(), PodInformer: k8sClient.podInformer, informerFactory: k8sClient.informerFactory, } @@ -77,18 +79,18 @@ func (ifm *Informers) RunAndWaitForCacheSync(ctx context.Context) error { func (ifm *Informers) run(stopCh <-chan struct{}) { ifm.informerFactory.Start(stopCh) - go ifm.AgentRuntimeInformer.Run(stopCh) - go ifm.CodeInterpreterInformer.Run(stopCh) + go ifm.AgentRuntimeInformer.Informer().Run(stopCh) + go ifm.CodeInterpreterInformer.Informer().Run(stopCh) } func (ifm *Informers) waitForCacheSync(ctx context.Context) error { - if !cache.WaitForCacheSync(ctx.Done(), ifm.AgentRuntimeInformer.HasSynced) { + if !cache.WaitForCacheSync(ctx.Done(), ifm.AgentRuntimeInformer.Informer().HasSynced) { if err := ctx.Err(); err != nil { return fmt.Errorf("timed out waiting for %v caches to sync: %w", AgentRuntimeGVR, err) } return fmt.Errorf("timed out waiting for %v caches to sync", AgentRuntimeGVR) } - if !cache.WaitForCacheSync(ctx.Done(), ifm.CodeInterpreterInformer.HasSynced) { + if !cache.WaitForCacheSync(ctx.Done(), ifm.CodeInterpreterInformer.Informer().HasSynced) { if err := ctx.Err(); err != nil { return fmt.Errorf("timed out waiting for %v caches to sync: %w", CodeInterpreterGVR, err) } diff --git a/pkg/workloadmanager/k8s_client.go b/pkg/workloadmanager/k8s_client.go index 15cc47bd..646d4016 100644 --- a/pkg/workloadmanager/k8s_client.go +++ b/pkg/workloadmanager/k8s_client.go @@ -40,6 +40,9 @@ import ( "k8s.io/client-go/tools/clientcmd" sandboxv1alpha1 "sigs.k8s.io/agent-sandbox/api/v1alpha1" extensionsv1alpha1 "sigs.k8s.io/agent-sandbox/extensions/api/v1alpha1" + + agentcubeclientset "github.com/volcano-sh/agentcube/client-go/clientset/versioned" + agentcubeinformers "github.com/volcano-sh/agentcube/client-go/informers/externalversions" ) const ( @@ -67,10 +70,11 @@ type K8sClient struct { scheme *runtime.Scheme baseConfig *rest.Config // Store base config for creating user clients clientCache *ClientCache // LRU cache for user clients - dynamicInformer dynamicinformer.DynamicSharedInformerFactory - informerFactory informers.SharedInformerFactory - podInformer cache.SharedIndexInformer - podLister listersv1.PodLister + dynamicInformer dynamicinformer.DynamicSharedInformerFactory + informerFactory informers.SharedInformerFactory + agentcubeInformer agentcubeinformers.SharedInformerFactory + podInformer cache.SharedIndexInformer + podLister listersv1.PodLister } type sandboxEntry struct { @@ -128,16 +132,26 @@ func NewK8sClient() (*K8sClient, error) { podInformer := informerFactory.Core().V1().Pods().Informer() podLister := informerFactory.Core().V1().Pods().Lister() + // Create AgentCube clientset + agentcubeClient, err := agentcubeclientset.NewForConfig(config) + if err != nil { + return nil, fmt.Errorf("failed to create agentcube clientset: %w", err) + } + + // Create informer factory for AgentCube resources + agentcubeInformer := agentcubeinformers.NewSharedInformerFactory(agentcubeClient, 0) + return &K8sClient{ - clientset: clientset, - dynamicClient: dynamicClient, - scheme: scheme, - baseConfig: config, - clientCache: NewClientCache(100), // Cache up to 100 clients - dynamicInformer: dynamicinformer.NewDynamicSharedInformerFactory(dynamicClient, 0), - informerFactory: informerFactory, - podInformer: podInformer, - podLister: podLister, + clientset: clientset, + dynamicClient: dynamicClient, + scheme: scheme, + baseConfig: config, + clientCache: NewClientCache(100), // Cache up to 100 clients + dynamicInformer: dynamicinformer.NewDynamicSharedInformerFactory(dynamicClient, 0), + informerFactory: informerFactory, + agentcubeInformer: agentcubeInformer, + podInformer: podInformer, + podLister: podLister, }, nil } diff --git a/pkg/workloadmanager/workload_builder.go b/pkg/workloadmanager/workload_builder.go index 86956e04..0ec1bac9 100644 --- a/pkg/workloadmanager/workload_builder.go +++ b/pkg/workloadmanager/workload_builder.go @@ -29,8 +29,8 @@ import ( runtimev1alpha1 "github.com/volcano-sh/agentcube/pkg/apis/runtime/v1alpha1" "github.com/volcano-sh/agentcube/pkg/common/types" corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" @@ -242,22 +242,13 @@ func buildSandboxClaimObject(params *buildSandboxClaimParams) *extensionsv1alpha } func buildSandboxByAgentRuntime(namespace string, name string, ifm *Informers) (*sandboxv1alpha1.Sandbox, *sandboxEntry, error) { - agentRuntimeKey := namespace + "/" + name - // TODO(hzxuzhonghu): make use of typed informer, so we don't need to do type conversion below - runtimeObj, exists, _ := ifm.AgentRuntimeInformer.GetStore().GetByKey(agentRuntimeKey) - if !exists { - return nil, nil, api.ErrAgentRuntimeNotFound - } - - unstructuredObj, ok := runtimeObj.(*unstructured.Unstructured) - if !ok { - klog.Errorf("agent runtime %s type asserting unstructured.Unstructured failed", agentRuntimeKey) - return nil, nil, fmt.Errorf("agent runtime type asserting failed") - } - - var agentRuntimeObj runtimev1alpha1.AgentRuntime - if err := runtime.DefaultUnstructuredConverter.FromUnstructured(unstructuredObj.Object, &agentRuntimeObj); err != nil { - return nil, nil, fmt.Errorf("failed to convert unstructured to AgentRuntime: %w", err) + // Use typed informer lister to get AgentRuntime + agentRuntimeObj, err := ifm.AgentRuntimeInformer.Lister().AgentRuntimes(namespace).Get(name) + if err != nil { + if apierrors.IsNotFound(err) { + return nil, nil, api.ErrAgentRuntimeNotFound + } + return nil, nil, fmt.Errorf("failed to get agent runtime %s/%s from informer cache: %w", namespace, name, err) } sessionID := uuid.New().String() @@ -316,24 +307,13 @@ func buildCodeInterpreterEnvVars(templateEnv []corev1.EnvVar, authMode runtimev1 } func buildSandboxByCodeInterpreter(namespace string, codeInterpreterName string, informer *Informers) (*sandboxv1alpha1.Sandbox, *extensionsv1alpha1.SandboxClaim, *sandboxEntry, error) { - codeInterpreterKey := namespace + "/" + codeInterpreterName - // TODO(hzxuzhonghu): make use of typed informer, so we don't need to do type conversion below - runtimeObj, exists, err := informer.CodeInterpreterInformer.GetStore().GetByKey(codeInterpreterKey) + // Use typed informer lister to get CodeInterpreter + codeInterpreterObj, err := informer.CodeInterpreterInformer.Lister().CodeInterpreters(namespace).Get(codeInterpreterName) if err != nil { - return nil, nil, nil, fmt.Errorf("failed to get code interpreter %s from informer cache: %w", codeInterpreterKey, err) - } - if !exists { - return nil, nil, nil, api.ErrCodeInterpreterNotFound - } - unstructuredObj, ok := runtimeObj.(*unstructured.Unstructured) - if !ok { - klog.Errorf("code interpreter %s type asserting unstructured.Unstructured failed", codeInterpreterKey) - return nil, nil, nil, fmt.Errorf("code interpreter type asserting failed") - } - - var codeInterpreterObj runtimev1alpha1.CodeInterpreter - if err := runtime.DefaultUnstructuredConverter.FromUnstructured(unstructuredObj.Object, &codeInterpreterObj); err != nil { - return nil, nil, nil, fmt.Errorf("failed to convert unstructured to CodeInterpreter: %w", err) + if apierrors.IsNotFound(err) { + return nil, nil, nil, api.ErrCodeInterpreterNotFound + } + return nil, nil, nil, fmt.Errorf("failed to get code interpreter %s/%s from informer cache: %w", namespace, codeInterpreterName, err) } // Check public key available if authMode is picod From 925c535c58244a4ebbe6a88f693aaa03be5ce7e5 Mon Sep 17 00:00:00 2001 From: klprakhar Date: Sun, 26 Apr 2026 18:16:51 +0530 Subject: [PATCH 2/4] fix the copilot review changes Signed-off-by: klprakhar --- docker/Dockerfile | 1 + pkg/workloadmanager/informers.go | 6 ++- pkg/workloadmanager/informers_test.go | 51 +++++++++++++++++-------- pkg/workloadmanager/workload_builder.go | 1 - 4 files changed, 41 insertions(+), 18 deletions(-) diff --git a/docker/Dockerfile b/docker/Dockerfile index 6656a0cf..da25c8eb 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -17,6 +17,7 @@ RUN --mount=type=cache,target=/go/pkg/mod \ # Copy source code COPY cmd/ cmd/ COPY pkg/ pkg/ +COPY client-go/ client-go/ # Build with dynamic architecture support and caching # Supports amd64, arm64, arm/v7, etc. diff --git a/pkg/workloadmanager/informers.go b/pkg/workloadmanager/informers.go index a96c2c4d..e9b51079 100644 --- a/pkg/workloadmanager/informers.go +++ b/pkg/workloadmanager/informers.go @@ -25,6 +25,7 @@ import ( "k8s.io/client-go/informers" "k8s.io/client-go/tools/cache" + agentcubeinformers "github.com/volcano-sh/agentcube/client-go/informers/externalversions" agentruntimev1alpha1 "github.com/volcano-sh/agentcube/client-go/informers/externalversions/runtime/v1alpha1" ) @@ -56,6 +57,7 @@ type Informers struct { CodeInterpreterInformer agentruntimev1alpha1.CodeInterpreterInformer PodInformer cache.SharedIndexInformer informerFactory informers.SharedInformerFactory + agentcubeInformer agentcubeinformers.SharedInformerFactory } func NewInformers(k8sClient *K8sClient) *Informers { @@ -64,6 +66,7 @@ func NewInformers(k8sClient *K8sClient) *Informers { CodeInterpreterInformer: k8sClient.agentcubeInformer.Runtime().V1alpha1().CodeInterpreters(), PodInformer: k8sClient.podInformer, informerFactory: k8sClient.informerFactory, + agentcubeInformer: k8sClient.agentcubeInformer, } } @@ -79,8 +82,7 @@ func (ifm *Informers) RunAndWaitForCacheSync(ctx context.Context) error { func (ifm *Informers) run(stopCh <-chan struct{}) { ifm.informerFactory.Start(stopCh) - go ifm.AgentRuntimeInformer.Informer().Run(stopCh) - go ifm.CodeInterpreterInformer.Informer().Run(stopCh) + ifm.agentcubeInformer.Start(stopCh) } func (ifm *Informers) waitForCacheSync(ctx context.Context) error { diff --git a/pkg/workloadmanager/informers_test.go b/pkg/workloadmanager/informers_test.go index 993e914f..3c36f12f 100644 --- a/pkg/workloadmanager/informers_test.go +++ b/pkg/workloadmanager/informers_test.go @@ -23,25 +23,41 @@ import ( "time" "k8s.io/client-go/informers" - "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/tools/cache" + + agentcubeinformers "github.com/volcano-sh/agentcube/client-go/informers/externalversions" + agentruntimev1alpha1 "github.com/volcano-sh/agentcube/client-go/informers/externalversions/runtime/v1alpha1" + runtimelister "github.com/volcano-sh/agentcube/client-go/listers/runtime/v1alpha1" ) -// neverSyncedInformer is a cache.SharedIndexInformer whose HasSynced always returns false. type neverSyncedInformer struct { cache.SharedIndexInformer } -func (n *neverSyncedInformer) HasSynced() bool { return false } -func (n *neverSyncedInformer) Run(stopCh <-chan struct{}) { <-stopCh } +func (n *neverSyncedInformer) HasSynced() bool { return false } +func (n *neverSyncedInformer) Run(stopCh <-chan struct{}) { <-stopCh } +func (n *neverSyncedInformer) Informer() cache.SharedIndexInformer { return n } +func (n *neverSyncedInformer) Lister() runtimelister.AgentRuntimeLister { return nil } + +// fakeCodeInterpreterInformer implements CodeInterpreterInformer +type fakeCodeInterpreterInformer struct { + cache.SharedIndexInformer +} + +func (f *fakeCodeInterpreterInformer) HasSynced() bool { return f.SharedIndexInformer.HasSynced() } +func (f *fakeCodeInterpreterInformer) Run(stopCh <-chan struct{}) { f.SharedIndexInformer.Run(stopCh) } +func (f *fakeCodeInterpreterInformer) Informer() cache.SharedIndexInformer { return f.SharedIndexInformer } +func (f *fakeCodeInterpreterInformer) Lister() runtimelister.CodeInterpreterLister { return nil } // alwaysSyncedInformer is a cache.SharedIndexInformer whose HasSynced always returns true. type alwaysSyncedInformer struct { cache.SharedIndexInformer } -func (a *alwaysSyncedInformer) HasSynced() bool { return true } -func (a *alwaysSyncedInformer) Run(stopCh <-chan struct{}) { <-stopCh } +func (a *alwaysSyncedInformer) HasSynced() bool { return true } +func (a *alwaysSyncedInformer) Run(stopCh <-chan struct{}) { <-stopCh } +func (a *alwaysSyncedInformer) Informer() cache.SharedIndexInformer { return a } +func (a *alwaysSyncedInformer) Lister() runtimelister.AgentRuntimeLister { return nil } // runCanceled starts RunAndWaitForCacheSync in a goroutine, cancels the context // immediately, and returns the error. Fails the test if it takes more than 2s. @@ -60,8 +76,9 @@ func runCanceled(t *testing.T, ifm *Informers) error { } } -func newFactory() informers.SharedInformerFactory { - return informers.NewSharedInformerFactory(fake.NewSimpleClientset(), 0) +func newFactory() (informers.SharedInformerFactory, agentcubeinformers.SharedInformerFactory) { + return informers.NewSharedInformerFactory(fake.NewSimpleClientset(), 0), + agentcubeinformers.NewSharedInformerFactory(nil, 0) } func TestRunAndWaitForCacheSync_ContextCancellation(t *testing.T) { @@ -69,10 +86,10 @@ func TestRunAndWaitForCacheSync_ContextCancellation(t *testing.T) { always := func() cache.SharedIndexInformer { return &alwaysSyncedInformer{} } tests := []struct { - name string - agentRuntime cache.SharedIndexInformer - codeInterpreter cache.SharedIndexInformer - pod cache.SharedIndexInformer + name string + agentRuntime agentruntimev1alpha1.AgentRuntimeInformer + codeInterpreter agentruntimev1alpha1.CodeInterpreterInformer + pod cache.SharedIndexInformer }{ { name: "AgentRuntimeInformer never syncs", @@ -96,11 +113,13 @@ func TestRunAndWaitForCacheSync_ContextCancellation(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { + factory, agentcubeFactory := newFactory() ifm := &Informers{ AgentRuntimeInformer: tc.agentRuntime, CodeInterpreterInformer: tc.codeInterpreter, PodInformer: tc.pod, - informerFactory: newFactory(), + informerFactory: factory, + agentcubeInformer: agentcubeFactory, } err := runCanceled(t, ifm) if !errors.Is(err, context.Canceled) { @@ -111,11 +130,13 @@ func TestRunAndWaitForCacheSync_ContextCancellation(t *testing.T) { } func TestRunAndWaitForCacheSync_AllSynced(t *testing.T) { + factory, agentcubeFactory := newFactory() ifm := &Informers{ AgentRuntimeInformer: &alwaysSyncedInformer{}, - CodeInterpreterInformer: &alwaysSyncedInformer{}, + CodeInterpreterInformer: &fakeCodeInterpreterInformer{SharedIndexInformer: &alwaysSyncedInformer{}}, PodInformer: &alwaysSyncedInformer{}, - informerFactory: newFactory(), + informerFactory: factory, + agentcubeInformer: agentcubeFactory, } ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() diff --git a/pkg/workloadmanager/workload_builder.go b/pkg/workloadmanager/workload_builder.go index 0ec1bac9..c2d31a1a 100644 --- a/pkg/workloadmanager/workload_builder.go +++ b/pkg/workloadmanager/workload_builder.go @@ -31,7 +31,6 @@ import ( corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" "k8s.io/utils/ptr" From 121e3fb1ff6eeaf3253d2ec0e832caef9ab19179 Mon Sep 17 00:00:00 2001 From: klprakhar Date: Sun, 26 Apr 2026 18:23:36 +0530 Subject: [PATCH 3/4] fix informers_test.go for golang CI Signed-off-by: klprakhar --- pkg/workloadmanager/informers_test.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/pkg/workloadmanager/informers_test.go b/pkg/workloadmanager/informers_test.go index 3c36f12f..7cb76813 100644 --- a/pkg/workloadmanager/informers_test.go +++ b/pkg/workloadmanager/informers_test.go @@ -23,6 +23,7 @@ import ( "time" "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/tools/cache" agentcubeinformers "github.com/volcano-sh/agentcube/client-go/informers/externalversions" @@ -82,8 +83,8 @@ func newFactory() (informers.SharedInformerFactory, agentcubeinformers.SharedInf } func TestRunAndWaitForCacheSync_ContextCancellation(t *testing.T) { - never := func() cache.SharedIndexInformer { return &neverSyncedInformer{} } - always := func() cache.SharedIndexInformer { return &alwaysSyncedInformer{} } + never := func() *neverSyncedInformer { return &neverSyncedInformer{} } + always := func() *alwaysSyncedInformer { return &alwaysSyncedInformer{} } tests := []struct { name string @@ -94,19 +95,19 @@ func TestRunAndWaitForCacheSync_ContextCancellation(t *testing.T) { { name: "AgentRuntimeInformer never syncs", agentRuntime: never(), - codeInterpreter: never(), + codeInterpreter: &fakeCodeInterpreterInformer{SharedIndexInformer: never()}, pod: never(), }, { name: "CodeInterpreterInformer never syncs", agentRuntime: always(), - codeInterpreter: never(), + codeInterpreter: &fakeCodeInterpreterInformer{SharedIndexInformer: never()}, pod: never(), }, { name: "PodInformer never syncs", agentRuntime: always(), - codeInterpreter: always(), + codeInterpreter: &fakeCodeInterpreterInformer{SharedIndexInformer: always()}, pod: never(), }, } From 34c366d5c5215551a6555407f898773f5a44cf80 Mon Sep 17 00:00:00 2001 From: klprakhar Date: Sun, 26 Apr 2026 18:39:56 +0530 Subject: [PATCH 4/4] fix the changes of the second review of copilot Signed-off-by: klprakhar --- pkg/workloadmanager/informers.go | 3 +++ pkg/workloadmanager/workload_builder.go | 4 ++-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/pkg/workloadmanager/informers.go b/pkg/workloadmanager/informers.go index e9b51079..1a7617c5 100644 --- a/pkg/workloadmanager/informers.go +++ b/pkg/workloadmanager/informers.go @@ -82,6 +82,9 @@ func (ifm *Informers) RunAndWaitForCacheSync(ctx context.Context) error { func (ifm *Informers) run(stopCh <-chan struct{}) { ifm.informerFactory.Start(stopCh) + // Instantiate the informers to ensure they are registered with the factory + ifm.AgentRuntimeInformer.Informer() + ifm.CodeInterpreterInformer.Informer() ifm.agentcubeInformer.Start(stopCh) } diff --git a/pkg/workloadmanager/workload_builder.go b/pkg/workloadmanager/workload_builder.go index c2d31a1a..7f7342aa 100644 --- a/pkg/workloadmanager/workload_builder.go +++ b/pkg/workloadmanager/workload_builder.go @@ -354,8 +354,8 @@ func buildSandboxByCodeInterpreter(namespace string, codeInterpreterName string, sessionID: sessionID, idleTimeout: idleTimeout, ownerReference: &metav1.OwnerReference{ - APIVersion: codeInterpreterObj.APIVersion, - Kind: codeInterpreterObj.Kind, + APIVersion: runtimev1alpha1.SchemeGroupVersion.String(), + Kind: runtimev1alpha1.CodeInterpreterKind, Name: codeInterpreterObj.Name, UID: codeInterpreterObj.UID, },