Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ RUN --mount=type=cache,target=/go/pkg/mod \
# Copy source code
COPY cmd/ cmd/
COPY pkg/ pkg/
COPY client-go/ client-go/

# Build with dynamic architecture support and caching
# Supports amd64, arm64, arm/v7, etc.
Expand Down
23 changes: 15 additions & 8 deletions pkg/workloadmanager/informers.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"

agentcubeinformers "github.com/volcano-sh/agentcube/client-go/informers/externalversions"
agentruntimev1alpha1 "github.com/volcano-sh/agentcube/client-go/informers/externalversions/runtime/v1alpha1"
)
Comment on lines 24 to 30
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

To support including the SharedInformerFactory for AgentCube resources in the Informers struct, add the corresponding package to the imports.

Suggested change
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
agentruntimev1alpha1 "github.com/volcano-sh/agentcube/client-go/informers/externalversions/runtime/v1alpha1"
)
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
agentcubeinformers "github.com/volcano-sh/agentcube/client-go/informers/externalversions"
agentruntimev1alpha1 "github.com/volcano-sh/agentcube/client-go/informers/externalversions/runtime/v1alpha1"
)


var (
Expand All @@ -50,18 +53,20 @@ var (
)

type Informers struct {
AgentRuntimeInformer cache.SharedIndexInformer
CodeInterpreterInformer cache.SharedIndexInformer
AgentRuntimeInformer agentruntimev1alpha1.AgentRuntimeInformer
CodeInterpreterInformer agentruntimev1alpha1.CodeInterpreterInformer
PodInformer cache.SharedIndexInformer
Comment on lines 55 to 58
Copy link

Copilot AI Apr 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing Informers.AgentRuntimeInformer / CodeInterpreterInformer from cache.SharedIndexInformer to the typed informer interfaces breaks tests and any code that constructs Informers with fake SharedIndexInformers (e.g. pkg/workloadmanager/informers_test.go currently assigns cache.SharedIndexInformer and will no longer compile). Update tests to use a generated fake AgentCube clientset+SharedInformerFactory or provide small test fakes that implement the typed informer interfaces (exposing Informer() + Lister()).

Copilot uses AI. Check for mistakes.
informerFactory informers.SharedInformerFactory
agentcubeInformer agentcubeinformers.SharedInformerFactory
}

func NewInformers(k8sClient *K8sClient) *Informers {
return &Informers{
AgentRuntimeInformer: k8sClient.dynamicInformer.ForResource(AgentRuntimeGVR).Informer(),
CodeInterpreterInformer: k8sClient.dynamicInformer.ForResource(CodeInterpreterGVR).Informer(),
AgentRuntimeInformer: k8sClient.agentcubeInformer.Runtime().V1alpha1().AgentRuntimes(),
CodeInterpreterInformer: k8sClient.agentcubeInformer.Runtime().V1alpha1().CodeInterpreters(),
PodInformer: k8sClient.podInformer,
informerFactory: k8sClient.informerFactory,
agentcubeInformer: k8sClient.agentcubeInformer,
}
Comment on lines 55 to 70
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

It is recommended to include the agentcubeInformer factory in the Informers struct. This allows for more consistent and robust lifecycle management of the informers (e.g., using the factory's Start method) rather than manually running individual informers in goroutines.

type Informers struct {
	AgentRuntimeInformer    agentruntimev1alpha1.AgentRuntimeInformer
	CodeInterpreterInformer agentruntimev1alpha1.CodeInterpreterInformer
	PodInformer             cache.SharedIndexInformer
	informerFactory         informers.SharedInformerFactory
	agentcubeInformer       agentcubeinformers.SharedInformerFactory
}

func NewInformers(k8sClient *K8sClient) *Informers {
	return &Informers{
		AgentRuntimeInformer:    k8sClient.agentcubeInformer.Runtime().V1alpha1().AgentRuntimes(),
		CodeInterpreterInformer: k8sClient.agentcubeInformer.Runtime().V1alpha1().CodeInterpreters(),
		PodInformer:             k8sClient.podInformer,
		informerFactory:         k8sClient.informerFactory,
		agentcubeInformer:       k8sClient.agentcubeInformer,
	}
}

}

Expand All @@ -77,18 +82,20 @@ func (ifm *Informers) RunAndWaitForCacheSync(ctx context.Context) error {

func (ifm *Informers) run(stopCh <-chan struct{}) {
ifm.informerFactory.Start(stopCh)
Copy link

Copilot AI Apr 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agentcubeInformer.Start(stopCh) is called before any AgentCube typed informers are instantiated via .Informer(). In the generated informer code, AgentRuntimes()/CodeInterpreters() are lazy and only register the underlying SharedIndexInformer with the factory when .Informer() (or .Lister()) is invoked; Start() only starts informers already present in the factory map. As a result, the AgentRuntime/CodeInterpreter informers may never be started and waitForCacheSync can time out in production. Instantiate the informers (e.g., call ifm.AgentRuntimeInformer.Informer() and ifm.CodeInterpreterInformer.Informer()) before calling ifm.agentcubeInformer.Start(stopCh) (either in NewInformers or at the top of run).

Suggested change
ifm.informerFactory.Start(stopCh)
ifm.informerFactory.Start(stopCh)
ifm.AgentRuntimeInformer.Informer()
ifm.CodeInterpreterInformer.Informer()

Copilot uses AI. Check for mistakes.
go ifm.AgentRuntimeInformer.Run(stopCh)
go ifm.CodeInterpreterInformer.Run(stopCh)
// Instantiate the informers to ensure they are registered with the factory
ifm.AgentRuntimeInformer.Informer()
ifm.CodeInterpreterInformer.Informer()
Comment on lines +86 to +87
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can change these two types to informer

ifm.agentcubeInformer.Start(stopCh)
}
Comment on lines 83 to 89
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Use the agentcubeInformer.Start method to start the informers managed by the AgentCube factory. This is more consistent with how the core informerFactory is handled and avoids manual goroutine management for individual informers.

func (ifm *Informers) run(stopCh <-chan struct{}) {
	ifm.informerFactory.Start(stopCh)
	ifm.agentcubeInformer.Start(stopCh)
}


func (ifm *Informers) waitForCacheSync(ctx context.Context) error {
if !cache.WaitForCacheSync(ctx.Done(), ifm.AgentRuntimeInformer.HasSynced) {
if !cache.WaitForCacheSync(ctx.Done(), ifm.AgentRuntimeInformer.Informer().HasSynced) {
if err := ctx.Err(); err != nil {
return fmt.Errorf("timed out waiting for %v caches to sync: %w", AgentRuntimeGVR, err)
}
return fmt.Errorf("timed out waiting for %v caches to sync", AgentRuntimeGVR)
}
if !cache.WaitForCacheSync(ctx.Done(), ifm.CodeInterpreterInformer.HasSynced) {
if !cache.WaitForCacheSync(ctx.Done(), ifm.CodeInterpreterInformer.Informer().HasSynced) {
if err := ctx.Err(); err != nil {
return fmt.Errorf("timed out waiting for %v caches to sync: %w", CodeInterpreterGVR, err)
}
Expand Down
60 changes: 41 additions & 19 deletions pkg/workloadmanager/informers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,40 @@ import (
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/cache"

agentcubeinformers "github.com/volcano-sh/agentcube/client-go/informers/externalversions"
agentruntimev1alpha1 "github.com/volcano-sh/agentcube/client-go/informers/externalversions/runtime/v1alpha1"
runtimelister "github.com/volcano-sh/agentcube/client-go/listers/runtime/v1alpha1"
)

// neverSyncedInformer is a cache.SharedIndexInformer whose HasSynced always returns false.
type neverSyncedInformer struct {
cache.SharedIndexInformer
}

func (n *neverSyncedInformer) HasSynced() bool { return false }
func (n *neverSyncedInformer) Run(stopCh <-chan struct{}) { <-stopCh }
func (n *neverSyncedInformer) HasSynced() bool { return false }
func (n *neverSyncedInformer) Run(stopCh <-chan struct{}) { <-stopCh }
func (n *neverSyncedInformer) Informer() cache.SharedIndexInformer { return n }
func (n *neverSyncedInformer) Lister() runtimelister.AgentRuntimeLister { return nil }

// fakeCodeInterpreterInformer implements CodeInterpreterInformer
type fakeCodeInterpreterInformer struct {
cache.SharedIndexInformer
}

func (f *fakeCodeInterpreterInformer) HasSynced() bool { return f.SharedIndexInformer.HasSynced() }
func (f *fakeCodeInterpreterInformer) Run(stopCh <-chan struct{}) { f.SharedIndexInformer.Run(stopCh) }
func (f *fakeCodeInterpreterInformer) Informer() cache.SharedIndexInformer { return f.SharedIndexInformer }
func (f *fakeCodeInterpreterInformer) Lister() runtimelister.CodeInterpreterLister { return nil }

// alwaysSyncedInformer is a cache.SharedIndexInformer whose HasSynced always returns true.
type alwaysSyncedInformer struct {
cache.SharedIndexInformer
}

func (a *alwaysSyncedInformer) HasSynced() bool { return true }
func (a *alwaysSyncedInformer) Run(stopCh <-chan struct{}) { <-stopCh }
func (a *alwaysSyncedInformer) HasSynced() bool { return true }
func (a *alwaysSyncedInformer) Run(stopCh <-chan struct{}) { <-stopCh }
func (a *alwaysSyncedInformer) Informer() cache.SharedIndexInformer { return a }
func (a *alwaysSyncedInformer) Lister() runtimelister.AgentRuntimeLister { return nil }

// runCanceled starts RunAndWaitForCacheSync in a goroutine, cancels the context
// immediately, and returns the error. Fails the test if it takes more than 2s.
Expand All @@ -60,47 +77,50 @@ func runCanceled(t *testing.T, ifm *Informers) error {
}
}

func newFactory() informers.SharedInformerFactory {
return informers.NewSharedInformerFactory(fake.NewSimpleClientset(), 0)
func newFactory() (informers.SharedInformerFactory, agentcubeinformers.SharedInformerFactory) {
return informers.NewSharedInformerFactory(fake.NewSimpleClientset(), 0),
agentcubeinformers.NewSharedInformerFactory(nil, 0)
}

func TestRunAndWaitForCacheSync_ContextCancellation(t *testing.T) {
never := func() cache.SharedIndexInformer { return &neverSyncedInformer{} }
always := func() cache.SharedIndexInformer { return &alwaysSyncedInformer{} }
never := func() *neverSyncedInformer { return &neverSyncedInformer{} }
always := func() *alwaysSyncedInformer { return &alwaysSyncedInformer{} }

tests := []struct {
name string
agentRuntime cache.SharedIndexInformer
codeInterpreter cache.SharedIndexInformer
pod cache.SharedIndexInformer
name string
agentRuntime agentruntimev1alpha1.AgentRuntimeInformer
codeInterpreter agentruntimev1alpha1.CodeInterpreterInformer
pod cache.SharedIndexInformer
}{
{
name: "AgentRuntimeInformer never syncs",
agentRuntime: never(),
codeInterpreter: never(),
codeInterpreter: &fakeCodeInterpreterInformer{SharedIndexInformer: never()},
pod: never(),
},
{
name: "CodeInterpreterInformer never syncs",
agentRuntime: always(),
codeInterpreter: never(),
codeInterpreter: &fakeCodeInterpreterInformer{SharedIndexInformer: never()},
pod: never(),
},
{
name: "PodInformer never syncs",
agentRuntime: always(),
codeInterpreter: always(),
codeInterpreter: &fakeCodeInterpreterInformer{SharedIndexInformer: always()},
pod: never(),
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
factory, agentcubeFactory := newFactory()
ifm := &Informers{
AgentRuntimeInformer: tc.agentRuntime,
CodeInterpreterInformer: tc.codeInterpreter,
PodInformer: tc.pod,
informerFactory: newFactory(),
informerFactory: factory,
agentcubeInformer: agentcubeFactory,
}
err := runCanceled(t, ifm)
if !errors.Is(err, context.Canceled) {
Expand All @@ -111,11 +131,13 @@ func TestRunAndWaitForCacheSync_ContextCancellation(t *testing.T) {
}

func TestRunAndWaitForCacheSync_AllSynced(t *testing.T) {
factory, agentcubeFactory := newFactory()
ifm := &Informers{
AgentRuntimeInformer: &alwaysSyncedInformer{},
CodeInterpreterInformer: &alwaysSyncedInformer{},
CodeInterpreterInformer: &fakeCodeInterpreterInformer{SharedIndexInformer: &alwaysSyncedInformer{}},
PodInformer: &alwaysSyncedInformer{},
informerFactory: newFactory(),
informerFactory: factory,
agentcubeInformer: agentcubeFactory,
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
Expand Down
40 changes: 27 additions & 13 deletions pkg/workloadmanager/k8s_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ import (
"k8s.io/client-go/tools/clientcmd"
sandboxv1alpha1 "sigs.k8s.io/agent-sandbox/api/v1alpha1"
extensionsv1alpha1 "sigs.k8s.io/agent-sandbox/extensions/api/v1alpha1"

agentcubeclientset "github.com/volcano-sh/agentcube/client-go/clientset/versioned"
agentcubeinformers "github.com/volcano-sh/agentcube/client-go/informers/externalversions"
)

const (
Expand Down Expand Up @@ -67,10 +70,11 @@ type K8sClient struct {
scheme *runtime.Scheme
baseConfig *rest.Config // Store base config for creating user clients
clientCache *ClientCache // LRU cache for user clients
dynamicInformer dynamicinformer.DynamicSharedInformerFactory
informerFactory informers.SharedInformerFactory
podInformer cache.SharedIndexInformer
podLister listersv1.PodLister
dynamicInformer dynamicinformer.DynamicSharedInformerFactory
informerFactory informers.SharedInformerFactory
agentcubeInformer agentcubeinformers.SharedInformerFactory
podInformer cache.SharedIndexInformer
podLister listersv1.PodLister
}

type sandboxEntry struct {
Expand Down Expand Up @@ -128,16 +132,26 @@ func NewK8sClient() (*K8sClient, error) {
podInformer := informerFactory.Core().V1().Pods().Informer()
podLister := informerFactory.Core().V1().Pods().Lister()

// Create AgentCube clientset
agentcubeClient, err := agentcubeclientset.NewForConfig(config)
if err != nil {
return nil, fmt.Errorf("failed to create agentcube clientset: %w", err)
}

// Create informer factory for AgentCube resources
agentcubeInformer := agentcubeinformers.NewSharedInformerFactory(agentcubeClient, 0)

return &K8sClient{
clientset: clientset,
dynamicClient: dynamicClient,
scheme: scheme,
baseConfig: config,
clientCache: NewClientCache(100), // Cache up to 100 clients
dynamicInformer: dynamicinformer.NewDynamicSharedInformerFactory(dynamicClient, 0),
informerFactory: informerFactory,
podInformer: podInformer,
podLister: podLister,
clientset: clientset,
dynamicClient: dynamicClient,
scheme: scheme,
baseConfig: config,
clientCache: NewClientCache(100), // Cache up to 100 clients
dynamicInformer: dynamicinformer.NewDynamicSharedInformerFactory(dynamicClient, 0),
informerFactory: informerFactory,
agentcubeInformer: agentcubeInformer,
podInformer: podInformer,
podLister: podLister,
}, nil
}

Expand Down
53 changes: 16 additions & 37 deletions pkg/workloadmanager/workload_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,8 @@ import (
runtimev1alpha1 "github.com/volcano-sh/agentcube/pkg/apis/runtime/v1alpha1"
"github.com/volcano-sh/agentcube/pkg/common/types"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
Expand Down Expand Up @@ -242,22 +241,13 @@ func buildSandboxClaimObject(params *buildSandboxClaimParams) *extensionsv1alpha
}

func buildSandboxByAgentRuntime(namespace string, name string, ifm *Informers) (*sandboxv1alpha1.Sandbox, *sandboxEntry, error) {
agentRuntimeKey := namespace + "/" + name
// TODO(hzxuzhonghu): make use of typed informer, so we don't need to do type conversion below
runtimeObj, exists, _ := ifm.AgentRuntimeInformer.GetStore().GetByKey(agentRuntimeKey)
if !exists {
return nil, nil, api.ErrAgentRuntimeNotFound
}

unstructuredObj, ok := runtimeObj.(*unstructured.Unstructured)
if !ok {
klog.Errorf("agent runtime %s type asserting unstructured.Unstructured failed", agentRuntimeKey)
return nil, nil, fmt.Errorf("agent runtime type asserting failed")
}

var agentRuntimeObj runtimev1alpha1.AgentRuntime
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(unstructuredObj.Object, &agentRuntimeObj); err != nil {
return nil, nil, fmt.Errorf("failed to convert unstructured to AgentRuntime: %w", err)
// Use typed informer lister to get AgentRuntime
agentRuntimeObj, err := ifm.AgentRuntimeInformer.Lister().AgentRuntimes(namespace).Get(name)
if err != nil {
if apierrors.IsNotFound(err) {
return nil, nil, api.ErrAgentRuntimeNotFound
}
return nil, nil, fmt.Errorf("failed to get agent runtime %s/%s from informer cache: %w", namespace, name, err)
}

sessionID := uuid.New().String()
Expand Down Expand Up @@ -316,24 +306,13 @@ func buildCodeInterpreterEnvVars(templateEnv []corev1.EnvVar, authMode runtimev1
}

func buildSandboxByCodeInterpreter(namespace string, codeInterpreterName string, informer *Informers) (*sandboxv1alpha1.Sandbox, *extensionsv1alpha1.SandboxClaim, *sandboxEntry, error) {
codeInterpreterKey := namespace + "/" + codeInterpreterName
// TODO(hzxuzhonghu): make use of typed informer, so we don't need to do type conversion below
runtimeObj, exists, err := informer.CodeInterpreterInformer.GetStore().GetByKey(codeInterpreterKey)
// Use typed informer lister to get CodeInterpreter
codeInterpreterObj, err := informer.CodeInterpreterInformer.Lister().CodeInterpreters(namespace).Get(codeInterpreterName)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we set the warm-pool OwnerReference APIVersion/Kind explicitly instead of reading them from codeInterpreterObj.TypeMeta? Typed informer objects can have empty TypeMeta, and the current E2E run rejects every warm-pool SandboxClaim with empty ownerReferences.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed it. You are right the objects retrieved from typed informers often have empty TypeMeta. I have updated the code to explicitly set the APIVersion and Kind in the OwnerReference using runtimev1alpha1.SchemeGroupVersion and runtimev1alpha1.CodeInterpreterKind constants, ensuring that SandboxClaims are correctly populated and accepted by the API.

if err != nil {
return nil, nil, nil, fmt.Errorf("failed to get code interpreter %s from informer cache: %w", codeInterpreterKey, err)
}
if !exists {
return nil, nil, nil, api.ErrCodeInterpreterNotFound
}
unstructuredObj, ok := runtimeObj.(*unstructured.Unstructured)
if !ok {
klog.Errorf("code interpreter %s type asserting unstructured.Unstructured failed", codeInterpreterKey)
return nil, nil, nil, fmt.Errorf("code interpreter type asserting failed")
}

var codeInterpreterObj runtimev1alpha1.CodeInterpreter
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(unstructuredObj.Object, &codeInterpreterObj); err != nil {
return nil, nil, nil, fmt.Errorf("failed to convert unstructured to CodeInterpreter: %w", err)
if apierrors.IsNotFound(err) {
return nil, nil, nil, api.ErrCodeInterpreterNotFound
}
return nil, nil, nil, fmt.Errorf("failed to get code interpreter %s/%s from informer cache: %w", namespace, codeInterpreterName, err)
}

// Check public key available if authMode is picod
Expand Down Expand Up @@ -375,8 +354,8 @@ func buildSandboxByCodeInterpreter(namespace string, codeInterpreterName string,
sessionID: sessionID,
idleTimeout: idleTimeout,
ownerReference: &metav1.OwnerReference{
APIVersion: codeInterpreterObj.APIVersion,
Kind: codeInterpreterObj.Kind,
APIVersion: runtimev1alpha1.SchemeGroupVersion.String(),
Kind: runtimev1alpha1.CodeInterpreterKind,
Name: codeInterpreterObj.Name,
UID: codeInterpreterObj.UID,
},
Expand Down
Loading