Skip to content

Commit a560a76

Browse files
committed
Fix fork compression preemption metric
1 parent 8638cce commit a560a76

6 files changed

Lines changed: 61 additions & 6 deletions

File tree

lib/instances/fork.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,7 @@ func (m *manager) forkInstanceFromStoppedOrStandby(ctx context.Context, id strin
245245
defer cu.Clean()
246246

247247
if source.State == StateStandby {
248-
if err := m.ensureSnapshotMemoryReady(ctx, m.paths.InstanceSnapshotLatest(id), m.snapshotJobKeyForInstance(id), stored.HypervisorType); err != nil {
248+
if err := m.ensureSnapshotMemoryReady(ctx, m.paths.InstanceSnapshotLatest(id), m.snapshotJobKeyForInstance(id), stored.HypervisorType, snapshotCompressionPreemptionForkInstance); err != nil {
249249
return nil, fmt.Errorf("prepare standby snapshot for fork: %w", err)
250250
}
251251
}

lib/instances/metrics.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ type snapshotCompressionPreemptionOperation string
3939
const (
4040
snapshotCompressionPreemptionRestoreInstance snapshotCompressionPreemptionOperation = "restore_instance"
4141
snapshotCompressionPreemptionRestoreSnapshot snapshotCompressionPreemptionOperation = "restore_snapshot"
42+
snapshotCompressionPreemptionForkInstance snapshotCompressionPreemptionOperation = "fork_instance"
4243
snapshotCompressionPreemptionForkSnapshot snapshotCompressionPreemptionOperation = "fork_snapshot"
4344
snapshotCompressionPreemptionCreateSnapshot snapshotCompressionPreemptionOperation = "create_snapshot"
4445
snapshotCompressionPreemptionDeleteInstance snapshotCompressionPreemptionOperation = "delete_instance"

lib/instances/metrics_test.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package instances
22

33
import (
4+
"os"
45
"testing"
56
"time"
67

@@ -113,6 +114,59 @@ func TestSnapshotCompressionMetrics_RecordAndObserve(t *testing.T) {
113114
assert.Equal(t, "standby", metricLabel(t, active.DataPoints[0].Attributes, "source"))
114115
}
115116

117+
func TestEnsureSnapshotMemoryReadyRecordsProvidedPreemptionOperation(t *testing.T) {
118+
t.Parallel()
119+
120+
reader := otelmetric.NewManualReader()
121+
provider := otelmetric.NewMeterProvider(otelmetric.WithReader(reader))
122+
123+
rawDir := t.TempDir()
124+
rawPath := rawDir + "/memory-ranges"
125+
require.NoError(t, os.WriteFile(rawPath, []byte("raw snapshot"), 0644))
126+
127+
jobDone := make(chan struct{})
128+
m := &manager{
129+
paths: paths.New(t.TempDir()),
130+
compressionJobs: map[string]*compressionJob{
131+
"job-1": {
132+
done: jobDone,
133+
target: compressionTarget{
134+
Key: "job-1",
135+
HypervisorType: hypervisor.TypeCloudHypervisor,
136+
Source: snapshotCompressionSourceStandby,
137+
Policy: snapshotstore.SnapshotCompressionConfig{
138+
Enabled: true,
139+
Algorithm: snapshotstore.SnapshotCompressionAlgorithmLz4,
140+
},
141+
},
142+
},
143+
},
144+
}
145+
m.compressionJobs["job-1"].cancel = func() {
146+
select {
147+
case <-jobDone:
148+
default:
149+
close(jobDone)
150+
}
151+
}
152+
153+
metrics, err := newInstanceMetrics(provider.Meter("test"), nil, m)
154+
require.NoError(t, err)
155+
m.metrics = metrics
156+
157+
err = m.ensureSnapshotMemoryReady(t.Context(), rawDir, "job-1", hypervisor.TypeCloudHypervisor, snapshotCompressionPreemptionForkInstance)
158+
require.NoError(t, err)
159+
160+
var rm metricdata.ResourceMetrics
161+
require.NoError(t, reader.Collect(t.Context(), &rm))
162+
163+
preemptionsMetric := findMetric(t, rm, "hypeman_snapshot_compression_preemptions_total")
164+
preemptions, ok := preemptionsMetric.Data.(metricdata.Sum[int64])
165+
require.True(t, ok)
166+
require.Len(t, preemptions.DataPoints, 1)
167+
assert.Equal(t, "fork_instance", metricLabel(t, preemptions.DataPoints[0].Attributes, "operation"))
168+
}
169+
116170
func assertMetricNames(t *testing.T, rm metricdata.ResourceMetrics, expected []string) {
117171
t.Helper()
118172

lib/instances/restore.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ func (m *manager) restoreInstance(
7373
if m.metrics != nil && m.metrics.tracer != nil {
7474
ctx, prepareSnapshotSpan = m.metrics.tracer.Start(ctx, "PrepareSnapshotMemory")
7575
}
76-
err = m.ensureSnapshotMemoryReady(ctx, snapshotDir, m.snapshotJobKeyForInstance(id), stored.HypervisorType)
76+
err = m.ensureSnapshotMemoryReady(ctx, snapshotDir, m.snapshotJobKeyForInstance(id), stored.HypervisorType, snapshotCompressionPreemptionRestoreInstance)
7777
if prepareSnapshotSpan != nil {
7878
prepareSnapshotSpan.End()
7979
}

lib/instances/snapshot.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ func (m *manager) createSnapshot(ctx context.Context, id string, req CreateSnaps
103103
if target != nil {
104104
m.recordSnapshotCompressionPreemption(ctx, snapshotCompressionPreemptionCreateSnapshot, *target)
105105
}
106-
if err := m.ensureSnapshotMemoryReady(ctx, m.paths.InstanceSnapshotLatest(id), "", stored.HypervisorType); err != nil {
106+
if err := m.ensureSnapshotMemoryReady(ctx, m.paths.InstanceSnapshotLatest(id), "", stored.HypervisorType, snapshotCompressionPreemptionCreateSnapshot); err != nil {
107107
return nil, fmt.Errorf("prepare source snapshot memory for copy: %w", err)
108108
}
109109
default:
@@ -393,7 +393,7 @@ func (m *manager) forkSnapshot(ctx context.Context, snapshotID string, req ForkS
393393
if target != nil {
394394
m.recordSnapshotCompressionPreemption(ctx, snapshotCompressionPreemptionForkSnapshot, *target)
395395
}
396-
if err := m.ensureSnapshotMemoryReady(ctx, m.paths.SnapshotGuestDir(snapshotID), "", rec.StoredMetadata.HypervisorType); err != nil {
396+
if err := m.ensureSnapshotMemoryReady(ctx, m.paths.SnapshotGuestDir(snapshotID), "", rec.StoredMetadata.HypervisorType, snapshotCompressionPreemptionForkSnapshot); err != nil {
397397
return nil, fmt.Errorf("prepare snapshot memory for fork: %w", err)
398398
}
399399

lib/instances/snapshot_compression.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -417,7 +417,7 @@ func (m *manager) cancelAndWaitCompressionJob(ctx context.Context, key string) (
417417
}
418418
}
419419

420-
func (m *manager) ensureSnapshotMemoryReady(ctx context.Context, snapshotDir, jobKey string, hvType hypervisor.Type) error {
420+
func (m *manager) ensureSnapshotMemoryReady(ctx context.Context, snapshotDir, jobKey string, hvType hypervisor.Type, preemptionOp snapshotCompressionPreemptionOperation) error {
421421
start := time.Now()
422422

423423
if jobKey != "" {
@@ -426,7 +426,7 @@ func (m *manager) ensureSnapshotMemoryReady(ctx context.Context, snapshotDir, jo
426426
return err
427427
}
428428
if target != nil {
429-
m.recordSnapshotCompressionPreemption(ctx, snapshotCompressionPreemptionRestoreInstance, *target)
429+
m.recordSnapshotCompressionPreemption(ctx, preemptionOp, *target)
430430
}
431431
}
432432

0 commit comments

Comments
 (0)