diff --git a/internal/pb/cache.pb.go b/internal/pb/cache.pb.go index c4948d31..6e7bc6f1 100644 --- a/internal/pb/cache.pb.go +++ b/internal/pb/cache.pb.go @@ -109,6 +109,134 @@ func (x *PopulateDiskCacheResponse) GetVersion() int64 { return 0 } +type AssignPodRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + ApplicationId int64 `protobuf:"varint,1,opt,name=applicationId,proto3" json:"applicationId,omitempty"` + EnvironmentId int64 `protobuf:"varint,2,opt,name=environmentId,proto3" json:"environmentId,omitempty"` + EnvironmentVersion int64 `protobuf:"varint,3,opt,name=environmentVersion,proto3" json:"environmentVersion,omitempty"` + PodUuid string `protobuf:"bytes,4,opt,name=podUuid,proto3" json:"podUuid,omitempty"` + VolumeName *string `protobuf:"bytes,5,opt,name=volumeName,proto3,oneof" json:"volumeName,omitempty"` + PodVolumePath *string `protobuf:"bytes,6,opt,name=podVolumePath,proto3,oneof" json:"podVolumePath,omitempty"` // If provided, we will use this as the volume path for the pod + CachePath string `protobuf:"bytes,7,opt,name=cachePath,proto3" json:"cachePath,omitempty"` // If provided, we will use this as the cache path for the pod + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *AssignPodRequest) Reset() { + *x = AssignPodRequest{} + mi := &file_internal_pb_cache_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *AssignPodRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*AssignPodRequest) ProtoMessage() {} + +func (x *AssignPodRequest) ProtoReflect() protoreflect.Message { + mi := &file_internal_pb_cache_proto_msgTypes[2] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use AssignPodRequest.ProtoReflect.Descriptor instead. +func (*AssignPodRequest) Descriptor() ([]byte, []int) { + return file_internal_pb_cache_proto_rawDescGZIP(), []int{2} +} + +func (x *AssignPodRequest) GetApplicationId() int64 { + if x != nil { + return x.ApplicationId + } + return 0 +} + +func (x *AssignPodRequest) GetEnvironmentId() int64 { + if x != nil { + return x.EnvironmentId + } + return 0 +} + +func (x *AssignPodRequest) GetEnvironmentVersion() int64 { + if x != nil { + return x.EnvironmentVersion + } + return 0 +} + +func (x *AssignPodRequest) GetPodUuid() string { + if x != nil { + return x.PodUuid + } + return "" +} + +func (x *AssignPodRequest) GetVolumeName() string { + if x != nil && x.VolumeName != nil { + return *x.VolumeName + } + return "" +} + +func (x *AssignPodRequest) GetPodVolumePath() string { + if x != nil && x.PodVolumePath != nil { + return *x.PodVolumePath + } + return "" +} + +func (x *AssignPodRequest) GetCachePath() string { + if x != nil { + return x.CachePath + } + return "" +} + +type AssignPodResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *AssignPodResponse) Reset() { + *x = AssignPodResponse{} + mi := &file_internal_pb_cache_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *AssignPodResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*AssignPodResponse) ProtoMessage() {} + +func (x *AssignPodResponse) ProtoReflect() protoreflect.Message { + mi := &file_internal_pb_cache_proto_msgTypes[3] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use AssignPodResponse.ProtoReflect.Descriptor instead. +func (*AssignPodResponse) Descriptor() ([]byte, []int) { + return file_internal_pb_cache_proto_rawDescGZIP(), []int{3} +} + var File_internal_pb_cache_proto protoreflect.FileDescriptor var file_internal_pb_cache_proto_rawDesc = string([]byte{ @@ -120,16 +248,40 @@ var file_internal_pb_cache_proto_rawDesc = string([]byte{ 0x19, 0x50, 0x6f, 0x70, 0x75, 0x6c, 0x61, 0x74, 0x65, 0x44, 0x69, 0x73, 0x6b, 0x43, 0x61, 0x63, 0x68, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x07, 0x76, 0x65, 0x72, - 0x73, 0x69, 0x6f, 0x6e, 0x32, 0x5a, 0x0a, 0x06, 0x43, 0x61, 0x63, 0x68, 0x65, 0x64, 0x12, 0x50, + 0x73, 0x69, 0x6f, 0x6e, 0x22, 0xb7, 0x02, 0x0a, 0x10, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x50, + 0x6f, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x24, 0x0a, 0x0d, 0x61, 0x70, 0x70, + 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, + 0x52, 0x0d, 0x61, 0x70, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x12, + 0x24, 0x0a, 0x0d, 0x65, 0x6e, 0x76, 0x69, 0x72, 0x6f, 0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x49, 0x64, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0d, 0x65, 0x6e, 0x76, 0x69, 0x72, 0x6f, 0x6e, 0x6d, + 0x65, 0x6e, 0x74, 0x49, 0x64, 0x12, 0x2e, 0x0a, 0x12, 0x65, 0x6e, 0x76, 0x69, 0x72, 0x6f, 0x6e, + 0x6d, 0x65, 0x6e, 0x74, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, + 0x03, 0x52, 0x12, 0x65, 0x6e, 0x76, 0x69, 0x72, 0x6f, 0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x56, 0x65, + 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x6f, 0x64, 0x55, 0x75, 0x69, 0x64, + 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x70, 0x6f, 0x64, 0x55, 0x75, 0x69, 0x64, 0x12, + 0x23, 0x0a, 0x0a, 0x76, 0x6f, 0x6c, 0x75, 0x6d, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x18, 0x05, 0x20, + 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, 0x0a, 0x76, 0x6f, 0x6c, 0x75, 0x6d, 0x65, 0x4e, 0x61, 0x6d, + 0x65, 0x88, 0x01, 0x01, 0x12, 0x29, 0x0a, 0x0d, 0x70, 0x6f, 0x64, 0x56, 0x6f, 0x6c, 0x75, 0x6d, + 0x65, 0x50, 0x61, 0x74, 0x68, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x48, 0x01, 0x52, 0x0d, 0x70, + 0x6f, 0x64, 0x56, 0x6f, 0x6c, 0x75, 0x6d, 0x65, 0x50, 0x61, 0x74, 0x68, 0x88, 0x01, 0x01, 0x12, + 0x1c, 0x0a, 0x09, 0x63, 0x61, 0x63, 0x68, 0x65, 0x50, 0x61, 0x74, 0x68, 0x18, 0x07, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x09, 0x63, 0x61, 0x63, 0x68, 0x65, 0x50, 0x61, 0x74, 0x68, 0x42, 0x0d, 0x0a, + 0x0b, 0x5f, 0x76, 0x6f, 0x6c, 0x75, 0x6d, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x42, 0x10, 0x0a, 0x0e, + 0x5f, 0x70, 0x6f, 0x64, 0x56, 0x6f, 0x6c, 0x75, 0x6d, 0x65, 0x50, 0x61, 0x74, 0x68, 0x22, 0x13, + 0x0a, 0x11, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x50, 0x6f, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x32, 0x94, 0x01, 0x0a, 0x06, 0x43, 0x61, 0x63, 0x68, 0x65, 0x64, 0x12, 0x50, 0x0a, 0x11, 0x50, 0x6f, 0x70, 0x75, 0x6c, 0x61, 0x74, 0x65, 0x44, 0x69, 0x73, 0x6b, 0x43, 0x61, 0x63, 0x68, 0x65, 0x12, 0x1c, 0x2e, 0x70, 0x62, 0x2e, 0x50, 0x6f, 0x70, 0x75, 0x6c, 0x61, 0x74, 0x65, 0x44, 0x69, 0x73, 0x6b, 0x43, 0x61, 0x63, 0x68, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d, 0x2e, 0x70, 0x62, 0x2e, 0x50, 0x6f, 0x70, 0x75, 0x6c, 0x61, 0x74, 0x65, 0x44, 0x69, 0x73, 0x6b, 0x43, 0x61, 0x63, 0x68, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, - 0x42, 0x29, 0x5a, 0x27, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, - 0x61, 0x64, 0x67, 0x65, 0x74, 0x2d, 0x69, 0x6e, 0x63, 0x2f, 0x64, 0x61, 0x74, 0x65, 0x69, 0x6c, - 0x61, 0x67, 0x65, 0x72, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x33, + 0x12, 0x38, 0x0a, 0x09, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x50, 0x6f, 0x64, 0x12, 0x14, 0x2e, + 0x70, 0x62, 0x2e, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x50, 0x6f, 0x64, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x1a, 0x15, 0x2e, 0x70, 0x62, 0x2e, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x50, + 0x6f, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x29, 0x5a, 0x27, 0x67, 0x69, + 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x61, 0x64, 0x67, 0x65, 0x74, 0x2d, + 0x69, 0x6e, 0x63, 0x2f, 0x64, 0x61, 0x74, 0x65, 0x69, 0x6c, 0x61, 0x67, 0x65, 0x72, 0x2f, 0x70, + 0x6b, 0x67, 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, }) var ( @@ -144,16 +296,20 @@ func file_internal_pb_cache_proto_rawDescGZIP() []byte { return file_internal_pb_cache_proto_rawDescData } -var file_internal_pb_cache_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_internal_pb_cache_proto_msgTypes = make([]protoimpl.MessageInfo, 4) var file_internal_pb_cache_proto_goTypes = []any{ (*PopulateDiskCacheRequest)(nil), // 0: pb.PopulateDiskCacheRequest (*PopulateDiskCacheResponse)(nil), // 1: pb.PopulateDiskCacheResponse + (*AssignPodRequest)(nil), // 2: pb.AssignPodRequest + (*AssignPodResponse)(nil), // 3: pb.AssignPodResponse } var file_internal_pb_cache_proto_depIdxs = []int32{ 0, // 0: pb.Cached.PopulateDiskCache:input_type -> pb.PopulateDiskCacheRequest - 1, // 1: pb.Cached.PopulateDiskCache:output_type -> pb.PopulateDiskCacheResponse - 1, // [1:2] is the sub-list for method output_type - 0, // [0:1] is the sub-list for method input_type + 2, // 1: pb.Cached.AssignPod:input_type -> pb.AssignPodRequest + 1, // 2: pb.Cached.PopulateDiskCache:output_type -> pb.PopulateDiskCacheResponse + 3, // 3: pb.Cached.AssignPod:output_type -> pb.AssignPodResponse + 2, // [2:4] is the sub-list for method output_type + 0, // [0:2] is the sub-list for method input_type 0, // [0:0] is the sub-list for extension type_name 0, // [0:0] is the sub-list for extension extendee 0, // [0:0] is the sub-list for field type_name @@ -164,13 +320,14 @@ func file_internal_pb_cache_proto_init() { if File_internal_pb_cache_proto != nil { return } + file_internal_pb_cache_proto_msgTypes[2].OneofWrappers = []any{} type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_internal_pb_cache_proto_rawDesc), len(file_internal_pb_cache_proto_rawDesc)), NumEnums: 0, - NumMessages: 2, + NumMessages: 4, NumExtensions: 0, NumServices: 1, }, diff --git a/internal/pb/cache.proto b/internal/pb/cache.proto index c4468590..1806dd48 100644 --- a/internal/pb/cache.proto +++ b/internal/pb/cache.proto @@ -6,8 +6,21 @@ option go_package = "github.com/gadget-inc/dateilager/pkg/pb"; service Cached { rpc PopulateDiskCache(PopulateDiskCacheRequest) returns (PopulateDiskCacheResponse); + rpc AssignPod(AssignPodRequest) returns (AssignPodResponse); } message PopulateDiskCacheRequest { string path = 1; } message PopulateDiskCacheResponse { int64 version = 1; }; + +message AssignPodRequest { + int64 applicationId = 1; + int64 environmentId = 2; + int64 environmentVersion = 3; + string podUuid = 4; + optional string volumeName = 5; + optional string podVolumePath = 6; // If provided, we will use this as the volume path for the pod + string cachePath = 7; // If provided, we will use this as the cache path for the pod +} + +message AssignPodResponse { }; diff --git a/internal/pb/cache_grpc.pb.go b/internal/pb/cache_grpc.pb.go index 44a44e70..f5e20e35 100644 --- a/internal/pb/cache_grpc.pb.go +++ b/internal/pb/cache_grpc.pb.go @@ -20,6 +20,7 @@ const _ = grpc.SupportPackageIsVersion9 const ( Cached_PopulateDiskCache_FullMethodName = "/pb.Cached/PopulateDiskCache" + Cached_AssignPod_FullMethodName = "/pb.Cached/AssignPod" ) // CachedClient is the client API for Cached service. @@ -27,6 +28,7 @@ const ( // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. type CachedClient interface { PopulateDiskCache(ctx context.Context, in *PopulateDiskCacheRequest, opts ...grpc.CallOption) (*PopulateDiskCacheResponse, error) + AssignPod(ctx context.Context, in *AssignPodRequest, opts ...grpc.CallOption) (*AssignPodResponse, error) } type cachedClient struct { @@ -47,11 +49,22 @@ func (c *cachedClient) PopulateDiskCache(ctx context.Context, in *PopulateDiskCa return out, nil } +func (c *cachedClient) AssignPod(ctx context.Context, in *AssignPodRequest, opts ...grpc.CallOption) (*AssignPodResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(AssignPodResponse) + err := c.cc.Invoke(ctx, Cached_AssignPod_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + // CachedServer is the server API for Cached service. // All implementations must embed UnimplementedCachedServer // for forward compatibility. type CachedServer interface { PopulateDiskCache(context.Context, *PopulateDiskCacheRequest) (*PopulateDiskCacheResponse, error) + AssignPod(context.Context, *AssignPodRequest) (*AssignPodResponse, error) mustEmbedUnimplementedCachedServer() } @@ -65,6 +78,9 @@ type UnimplementedCachedServer struct{} func (UnimplementedCachedServer) PopulateDiskCache(context.Context, *PopulateDiskCacheRequest) (*PopulateDiskCacheResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method PopulateDiskCache not implemented") } +func (UnimplementedCachedServer) AssignPod(context.Context, *AssignPodRequest) (*AssignPodResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method AssignPod not implemented") +} func (UnimplementedCachedServer) mustEmbedUnimplementedCachedServer() {} func (UnimplementedCachedServer) testEmbeddedByValue() {} @@ -104,6 +120,24 @@ func _Cached_PopulateDiskCache_Handler(srv interface{}, ctx context.Context, dec return interceptor(ctx, in, info, handler) } +func _Cached_AssignPod_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(AssignPodRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(CachedServer).AssignPod(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: Cached_AssignPod_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(CachedServer).AssignPod(ctx, req.(*AssignPodRequest)) + } + return interceptor(ctx, in, info, handler) +} + // Cached_ServiceDesc is the grpc.ServiceDesc for Cached service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -115,6 +149,10 @@ var Cached_ServiceDesc = grpc.ServiceDesc{ MethodName: "PopulateDiskCache", Handler: _Cached_PopulateDiskCache_Handler, }, + { + MethodName: "AssignPod", + Handler: _Cached_AssignPod_Handler, + }, }, Streams: []grpc.StreamDesc{}, Metadata: "internal/pb/cache.proto", diff --git a/pkg/api/cached.go b/pkg/api/cached.go index 4462c744..b7055def 100644 --- a/pkg/api/cached.go +++ b/pkg/api/cached.go @@ -28,11 +28,13 @@ import ( ) const ( - DriverName = "com.gadget.dateilager.cached" - CACHE_PATH_SUFFIX = "dl_cache" - UPPER_DIR = "upper" - WORK_DIR = "work" - NO_CHANGE_USER = -1 + DriverName = "com.gadget.dateilager.cached" + CACHE_PATH_SUFFIX = "dl_cache" + UPPER_DIR = "upper" + WORK_DIR = "work" + NO_CHANGE_USER = -1 + POD_LOCAL_DIR = "/var/lib/kubelet/pods" + DEFAULT_VOLUME_NAME = "a" ) type Cached struct { @@ -64,6 +66,56 @@ func (c *Cached) PopulateDiskCache(ctx context.Context, req *pb.PopulateDiskCach return &pb.PopulateDiskCacheResponse{Version: version}, nil } +func (c *Cached) AssignPod(ctx context.Context, req *pb.AssignPodRequest) (*pb.AssignPodResponse, error) { + podId := req.PodUuid + var volumeName string + var podVolumePath string + + if req.VolumeName != nil { + volumeName = *req.VolumeName + } else { + volumeName = DEFAULT_VOLUME_NAME + } + + if req.PodVolumePath != nil { + podVolumePath = *req.PodVolumePath + } else { + podVolumePath = path.Join(POD_LOCAL_DIR, podId, "volumes", "kubernetes.io~csi", volumeName) + } + _, err := os.Stat(podVolumePath) + if err != nil { + return nil, fmt.Errorf("failed to stat pod storage path %s: %v", podVolumePath, err) + } + + appDir := path.Join(podVolumePath, "app") + nodeModulesDir := path.Join(appDir, "node_modules") + + err = os.MkdirAll(nodeModulesDir, 0o777) + if err != nil { + return nil, fmt.Errorf("failed to create node_modules directory %s: %v", nodeModulesDir, err) + } + + workDir := path.Join(podVolumePath, "work") + err = os.MkdirAll(workDir, 0o777) + if err != nil { + return nil, fmt.Errorf("failed to create work directory %s: %v", workDir, err) + } + + upperDir := path.Join(podVolumePath, "upper") + err = os.MkdirAll(upperDir, 0o777) + if err != nil { + return nil, fmt.Errorf("failed to create upper directory %s: %v", upperDir, err) + } + + cachePath := req.CachePath + err = c.createOverlayDirs(upperDir, workDir, cachePath, podVolumePath) + if err != nil { + return nil, fmt.Errorf("failed to create overlay directories: %v", err) + } + + return &pb.AssignPodResponse{}, nil +} + func (c *Cached) GetCachePath() string { return filepath.Join(c.StagingPath, CACHE_PATH_SUFFIX) } @@ -216,19 +268,9 @@ func (c *Cached) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolu return nil, fmt.Errorf("failed to create target path directory %s: %v", targetPath, err) } - mountArgs := []string{ - "-t", - "overlay", - "overlay", - "-n", - "--options", - fmt.Sprintf("redirect_dir=on,volatile,lowerdir=%s,upperdir=%s,workdir=%s", c.StagingPath, upperdir, workdir), - targetPath, - } - - err = execCommand("mount", mountArgs...) + err = c.createOverlayDirs(upperdir, workdir, c.StagingPath, targetPath) if err != nil { - return nil, fmt.Errorf("failed to mount overlay: %s", err) + return nil, fmt.Errorf("failed to create overlay directories: %v", err) } cachePath := path.Join(targetPath, CACHE_PATH_SUFFIX) @@ -274,6 +316,24 @@ func (c *Cached) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolu return &csi.NodePublishVolumeResponse{}, nil } +func (c *Cached) createOverlayDirs(upperdir, workdir, lowerdir, targetPath string) error { + mountArgs := []string{ + "-t", + "overlay", + "overlay", + "-n", + "--options", + fmt.Sprintf("redirect_dir=on,volatile,lowerdir=%s,upperdir=%s,workdir=%s", lowerdir, upperdir, workdir), + targetPath, + } + + err := execCommand("mount", mountArgs...) + if err != nil { + return fmt.Errorf("failed to mount overlay: %s", err) + } + return nil +} + func (s *Cached) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) { if req.VolumeId == "" { return nil, status.Error(codes.InvalidArgument, "NodeUnpublishVolume Volume ID must be provided") diff --git a/pkg/client/client.go b/pkg/client/client.go index 127fa323..14906796 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -24,6 +24,7 @@ import ( "github.com/gadget-inc/dateilager/internal/telemetry" fsdiff_pb "github.com/gadget-inc/fsdiff/pkg/pb" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" "golang.org/x/oauth2" "golang.org/x/sync/errgroup" @@ -1069,6 +1070,31 @@ func (c *CachedClient) PopulateDiskCache(ctx context.Context, destination string return response.Version, nil } +func (c *CachedClient) AssignPod(ctx context.Context, applicationId int64, environmentId int64, environmentVersion int64, podUuid string, volumeName *string) error { + ctx, span := telemetry.Start(ctx, "client.assign-pod", trace.WithAttributes( + attribute.Int64("application_id", applicationId), + attribute.Int64("environment_id", environmentId), + attribute.Int64("environment_version", environmentVersion), + attribute.String("pod_uuid", podUuid), + )) + defer span.End() + + request := &pb.AssignPodRequest{ + ApplicationId: applicationId, + EnvironmentId: environmentId, + EnvironmentVersion: environmentVersion, + PodUuid: podUuid, + VolumeName: volumeName, + } + + _, err := c.cached.AssignPod(ctx, request) + if err != nil { + return fmt.Errorf("failed to assign pod %s for app %d and env %d: %w", podUuid, applicationId, environmentId, err) + } + + return nil +} + func (c *CachedClient) Probe(ctx context.Context) (bool, error) { request := &csi.ProbeRequest{} diff --git a/test/cached_csi_test.go b/test/cached_csi_test.go index 49935732..4eb5b73f 100644 --- a/test/cached_csi_test.go +++ b/test/cached_csi_test.go @@ -13,6 +13,7 @@ import ( "github.com/container-storage-interface/spec/lib/go/csi" "github.com/gadget-inc/dateilager/internal/auth" "github.com/gadget-inc/dateilager/internal/db" + "github.com/gadget-inc/dateilager/internal/pb" util "github.com/gadget-inc/dateilager/internal/testutil" "github.com/gadget-inc/dateilager/pkg/api" "github.com/gadget-inc/dateilager/pkg/cached" @@ -340,6 +341,67 @@ func TestCachedCSIDriverAppUserSet(t *testing.T) { require.Equal(t, 1000, int(sysstat.Gid)) } +func TestCachedCSIDriverAssignPod(t *testing.T) { + tc := util.NewTestCtx(t, auth.Project, 1) + defer tc.Close() + + writeProject(tc, 1, 4) + writeObject(tc, 1, 1, nil, "a", "a v1") + writeObject(tc, 1, 2, i(3), "b", "b v2") + writeObject(tc, 1, 3, i(4), "b/c", "b/c v3") + writeObject(tc, 1, 3, nil, "b/d", "b/d v3") + writeObject(tc, 1, 4, nil, "b/e", "b/e v4") + + tmpDir := emptyTmpDir(t) + defer os.RemoveAll(tmpDir) + + cached, _, close := createTestCachedServer(tc, tmpDir) + defer close() + + err := cached.Prepare(tc.Context(), -1) + require.NoError(t, err, "cached.Prepare must succeed") + + targetDir := path.Join(tmpDir, "vol-target") + podPrebuildDir := path.Join(tmpDir, "pod-prebuild") + + stagingDir := path.Join(tmpDir, "vol-staging-target") + _, err = cached.NodePublishVolume(tc.Context(), &csi.NodePublishVolumeRequest{ + VolumeId: "foobar", + StagingTargetPath: stagingDir, + TargetPath: targetDir, + VolumeCapability: &csi.VolumeCapability{}, + VolumeContext: map[string]string{ + "appUser": "1000", + "appGroup": "1000", + }, + }) + require.NoError(t, err) + + c, _, close := createTestClient(tc) + defer close() + + dlCacheDir := path.Join(targetDir, "dl_cache") + + // Do the rebuild into the pod prebuild dir to simulate a ready made cache + rebuild(tc, c, 1, nil, podPrebuildDir, &dlCacheDir, expectedResponse{ + version: 4, + count: 1, + }, nil) + + assignPodRequest := &pb.AssignPodRequest{ + PodUuid: "123", + ApplicationId: 1, + EnvironmentId: 1, + EnvironmentVersion: 1, + PodVolumePath: &targetDir, + CachePath: podPrebuildDir, + } + + _, err = cached.AssignPod(tc.Context(), assignPodRequest) + require.NoError(t, err) + +} + func formatFileMode(mode os.FileMode) string { return fmt.Sprintf("%#o", mode) }