Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 24 additions & 6 deletions cmd/api/api/instances_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestCreateInstance_AutoPullImage(t *testing.T) {

t.Log("Creating instance without pre-pulling image (testing auto-pull)...")
networkEnabled := false
resp, err := svc.CreateInstance(ctx(), oapi.CreateInstanceRequestObject{
createReq := oapi.CreateInstanceRequestObject{
Body: &oapi.CreateInstanceRequest{
Name: "test-auto-pull",
Image: "docker.io/library/alpine:latest",
Expand All @@ -74,19 +74,37 @@ func TestCreateInstance_AutoPullImage(t *testing.T) {
Enabled: &networkEnabled,
},
},
})
require.NoError(t, err)
}

deadline := time.Now().Add(integrationTestTimeout(15 * time.Second))
var created oapi.CreateInstance201JSONResponse
for {
resp, err := svc.CreateInstance(ctx(), createReq)
require.NoError(t, err)

if okResp, ok := resp.(oapi.CreateInstance201JSONResponse); ok {
created = okResp
break
}

notReadyResp, ok := resp.(oapi.CreateInstance400JSONResponse)
require.True(t, ok, "expected create to either succeed or report image_not_ready while auto-pull finishes")
require.Equal(t, "image_not_ready", notReadyResp.Code)

if time.Now().After(deadline) {
t.Fatalf("auto-pull did not finish before deadline: %s", notReadyResp.Message)
}
time.Sleep(500 * time.Millisecond)
}

created, ok := resp.(oapi.CreateInstance201JSONResponse)
require.True(t, ok, "expected 201 response — auto-pull should have fetched the image")
t.Logf("Instance created via auto-pull: %s", created.Id)

// Cleanup: delete the instance
instanceID := created.Id
t.Log("Deleting instance...")
deleteResp, err := svc.DeleteInstance(ctxWithInstance(svc, instanceID), oapi.DeleteInstanceRequestObject{Id: instanceID})
require.NoError(t, err)
_, ok = deleteResp.(oapi.DeleteInstance204Response)
_, ok := deleteResp.(oapi.DeleteInstance204Response)
require.True(t, ok, "expected 204 response for delete")
t.Log("Instance deleted successfully")
}
Expand Down
8 changes: 6 additions & 2 deletions lib/hypervisor/cloudhypervisor/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ func (s *Starter) StartVM(ctx context.Context, p *paths.Paths, version string, s
}

// 1. Start the Cloud Hypervisor process
pid, err := vmm.StartProcess(ctx, p, chVersion, socketPath)
processCtx, processSpan := hypervisor.StartProcessSpan(ctx, hypervisor.TypeCloudHypervisor)
pid, err := vmm.StartProcess(processCtx, p, chVersion, socketPath)
hypervisor.FinishTraceSpan(processSpan, err)
if err != nil {
return 0, nil, fmt.Errorf("start process: %w", err)
}
Expand Down Expand Up @@ -117,7 +119,9 @@ func (s *Starter) RestoreVM(ctx context.Context, p *paths.Paths, version string,

// 1. Start the Cloud Hypervisor process
processStartTime := time.Now()
pid, err := vmm.StartProcess(ctx, p, chVersion, socketPath)
processCtx, processSpan := hypervisor.StartProcessSpan(ctx, hypervisor.TypeCloudHypervisor)
pid, err := vmm.StartProcess(processCtx, p, chVersion, socketPath)
hypervisor.FinishTraceSpan(processSpan, err)
if err != nil {
return 0, nil, fmt.Errorf("start process: %w", err)
}
Expand Down
25 changes: 25 additions & 0 deletions lib/hypervisor/firecracker/firecracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ import (
"time"

"github.com/kernel/hypeman/lib/hypervisor"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
)

type apiError struct {
Expand Down Expand Up @@ -273,17 +277,30 @@ func guestTargetBytesToMiB(bytes int64) int64 {
}

func (f *Firecracker) do(ctx context.Context, method, path string, reqBody any, expectedStatus ...int) ([]byte, error) {
attrs := hypervisor.TraceAttributesFromContext(ctx)
attrs = append(attrs,
attribute.String("operation", method+" "+path),
attribute.String("http.method", method),
attribute.String("http.route", path),
)
ctx, span := otel.Tracer("hypeman/hypervisor/firecracker").Start(ctx, "hypervisor.http "+method+" "+path, trace.WithAttributes(attrs...))
defer span.End()

var bodyReader io.Reader
if reqBody != nil {
data, err := json.Marshal(reqBody)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, fmt.Errorf("marshal request body: %w", err)
}
bodyReader = bytes.NewReader(data)
}

req, err := http.NewRequestWithContext(ctx, method, "http://localhost"+path, bodyReader)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, fmt.Errorf("create request: %w", err)
}
req.Header.Set("Accept", "application/json")
Expand All @@ -293,27 +310,35 @@ func (f *Firecracker) do(ctx context.Context, method, path string, reqBody any,

resp, err := f.client.Do(req)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, fmt.Errorf("request %s %s: %w", method, path, err)
}
defer resp.Body.Close()
span.SetAttributes(attribute.Int("http.status_code", resp.StatusCode))

data, err := io.ReadAll(resp.Body)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, fmt.Errorf("read response body: %w", err)
}

for _, status := range expectedStatus {
if resp.StatusCode == status {
span.SetStatus(codes.Ok, "")
return data, nil
}
}

if len(data) > 0 {
var apiErr apiError
if err := json.Unmarshal(data, &apiErr); err == nil && apiErr.FaultMessage != "" {
span.SetStatus(codes.Error, apiErr.FaultMessage)
return nil, fmt.Errorf("status %d: %s", resp.StatusCode, apiErr.FaultMessage)
}
}
span.SetStatus(codes.Error, resp.Status)
return nil, fmt.Errorf("status %d: %s", resp.StatusCode, string(data))
}

Expand Down
8 changes: 6 additions & 2 deletions lib/hypervisor/firecracker/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ func (s *Starter) GetVersion(p *paths.Paths) (string, error) {
}

func (s *Starter) StartVM(ctx context.Context, p *paths.Paths, version string, socketPath string, config hypervisor.VMConfig) (int, hypervisor.Hypervisor, error) {
pid, err := s.startProcess(ctx, p, version, socketPath)
processCtx, processSpan := hypervisor.StartProcessSpan(ctx, hypervisor.TypeFirecracker)
pid, err := s.startProcess(processCtx, p, version, socketPath)
hypervisor.FinishTraceSpan(processSpan, err)
if err != nil {
return 0, nil, fmt.Errorf("start firecracker process: %w", err)
}
Expand Down Expand Up @@ -92,7 +94,9 @@ func (s *Starter) StartVM(ctx context.Context, p *paths.Paths, version string, s
}

func (s *Starter) RestoreVM(ctx context.Context, p *paths.Paths, version string, socketPath string, snapshotPath string) (int, hypervisor.Hypervisor, error) {
pid, err := s.startProcess(ctx, p, version, socketPath)
processCtx, processSpan := hypervisor.StartProcessSpan(ctx, hypervisor.TypeFirecracker)
pid, err := s.startProcess(processCtx, p, version, socketPath)
hypervisor.FinishTraceSpan(processSpan, err)
if err != nil {
return 0, nil, fmt.Errorf("start firecracker process: %w", err)
}
Expand Down
6 changes: 5 additions & 1 deletion lib/hypervisor/hypervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,5 +288,9 @@ func NewClient(hvType Type, socketPath string) (Hypervisor, error) {
if !ok {
return nil, fmt.Errorf("no client factory registered for hypervisor type: %s", hvType)
}
return factory(socketPath)
client, err := factory(socketPath)
if err != nil {
return nil, err
}
return WrapHypervisor(hvType, client), nil
}
29 changes: 27 additions & 2 deletions lib/hypervisor/qemu/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ import (
"github.com/kernel/hypeman/lib/hypervisor"
"github.com/kernel/hypeman/lib/logger"
"github.com/kernel/hypeman/lib/paths"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"gvisor.dev/gvisor/pkg/cleanup"
)

Expand Down Expand Up @@ -128,15 +132,25 @@ func buildQMPArgs(socketPath string) []string {
// The cleanup function must be called on error; call cleanup.Release() on success.
func (s *Starter) startQEMUProcess(ctx context.Context, p *paths.Paths, version string, socketPath string, args []string) (int, *QEMU, *cleanup.Cleanup, error) {
log := logger.FromContext(ctx)
processAttrs := hypervisor.TraceAttributesFromContext(ctx)
processAttrs = append(processAttrs,
attribute.String("operation", "start_process"),
attribute.String("hypervisor", string(hypervisor.TypeQEMU)),
)
processCtx, processSpan := otel.Tracer("hypeman/hypervisor/qemu").Start(ctx, "hypervisor.start_process", trace.WithAttributes(processAttrs...))
defer processSpan.End()

// Get binary path
binaryPath, err := s.GetBinaryPath(p, version)
if err != nil {
processSpan.RecordError(err)
processSpan.SetStatus(codes.Error, err.Error())
return 0, nil, nil, fmt.Errorf("get binary: %w", err)
}

// Check if socket is already in use
if isSocketInUse(socketPath) {
processSpan.SetStatus(codes.Error, "socket already in use")
return 0, nil, nil, fmt.Errorf("socket already in use, QEMU may be running at %s", socketPath)
}

Expand All @@ -155,6 +169,8 @@ func (s *Starter) startQEMUProcess(ctx context.Context, p *paths.Paths, version
instanceDir := filepath.Dir(socketPath)
logsDir := filepath.Join(instanceDir, "logs")
if err := os.MkdirAll(logsDir, 0755); err != nil {
processSpan.RecordError(err)
processSpan.SetStatus(codes.Error, err.Error())
return 0, nil, nil, fmt.Errorf("create logs directory: %w", err)
}

Expand All @@ -164,6 +180,8 @@ func (s *Starter) startQEMUProcess(ctx context.Context, p *paths.Paths, version
0644,
)
if err != nil {
processSpan.RecordError(err)
processSpan.SetStatus(codes.Error, err.Error())
return 0, nil, nil, fmt.Errorf("create vmm log: %w", err)
}
defer vmmLogFile.Close()
Expand All @@ -173,11 +191,13 @@ func (s *Starter) startQEMUProcess(ctx context.Context, p *paths.Paths, version

processStartTime := time.Now()
if err := cmd.Start(); err != nil {
processSpan.RecordError(err)
processSpan.SetStatus(codes.Error, err.Error())
return 0, nil, nil, fmt.Errorf("start qemu: %w", err)
}

pid := cmd.Process.Pid
log.DebugContext(ctx, "QEMU process started", "pid", pid, "duration_ms", time.Since(processStartTime).Milliseconds())
log.DebugContext(processCtx, "QEMU process started", "pid", pid, "duration_ms", time.Since(processStartTime).Milliseconds())

// Setup cleanup to kill the process if subsequent steps fail
cu := cleanup.Make(func() {
Expand All @@ -187,10 +207,12 @@ func (s *Starter) startQEMUProcess(ctx context.Context, p *paths.Paths, version
// Wait for socket to be ready
socketWaitStart := time.Now()
if err := waitForSocket(socketPath, socketWaitTimeout); err != nil {
processSpan.RecordError(err)
processSpan.SetStatus(codes.Error, err.Error())
cu.Clean()
return 0, nil, nil, appendVMMLog(err, logsDir)
}
log.DebugContext(ctx, "QMP socket ready", "duration_ms", time.Since(socketWaitStart).Milliseconds())
log.DebugContext(processCtx, "QMP socket ready", "duration_ms", time.Since(socketWaitStart).Milliseconds())

// Create QMP client. The socket file may exist before QEMU can actually
// accept monitor connections, so retry briefly on transient dial failures.
Expand All @@ -202,12 +224,15 @@ func (s *Starter) startQEMUProcess(ctx context.Context, p *paths.Paths, version
break
}
if time.Now().After(clientDeadline) {
processSpan.RecordError(err)
processSpan.SetStatus(codes.Error, err.Error())
cu.Clean()
return 0, nil, nil, appendVMMLog(fmt.Errorf("create client: %w", err), logsDir)
}
time.Sleep(socketPollInterval)
}

processSpan.SetStatus(codes.Ok, "")
return pid, hv, &cu, nil
}

Expand Down
Loading
Loading