diff --git a/Datadog/Datadog.xcodeproj/project.pbxproj b/Datadog/Datadog.xcodeproj/project.pbxproj index 1f89bafa4f..fa5f608ee3 100644 --- a/Datadog/Datadog.xcodeproj/project.pbxproj +++ b/Datadog/Datadog.xcodeproj/project.pbxproj @@ -662,6 +662,9 @@ 61DA8CB828647A500074A606 /* InternalLoggerTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 61DA8CB728647A500074A606 /* InternalLoggerTests.swift */; }; 61DB33B225DEDFC200F7EA71 /* CustomObjcViewController.m in Sources */ = {isa = PBXBuildFile; fileRef = 61DB33B125DEDFC200F7EA71 /* CustomObjcViewController.m */; }; 61DCC8472C05CD0000CB59E5 /* SessionEndedMetricControllerTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 61DCC8462C05CD0000CB59E5 /* SessionEndedMetricControllerTests.swift */; }; + 61DCC8482C05CD0000CB59E5 /* SessionEndedMetricControllerTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 61DCC8462C05CD0000CB59E5 /* SessionEndedMetricControllerTests.swift */; }; + B51668F3A97DEAC2917B0F44 /* TimeseriesSessionCollectorTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 7F8C00B87F71287FB535342C /* TimeseriesSessionCollectorTests.swift */; }; + 21A5D61036FA5C28E71ED824 /* TimeseriesSessionCollectorTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 7F8C00B87F71287FB535342C /* TimeseriesSessionCollectorTests.swift */; }; BB1A2B3C4D5E6F7800000004 /* DeltaEncoder.swift in Sources */ = {isa = PBXBuildFile; fileRef = BB1A2B3C4D5E6F7800000003 /* DeltaEncoder.swift */; }; BB1A2B3C4D5E6F7800000005 /* DeltaEncoder.swift in Sources */ = {isa = PBXBuildFile; fileRef = BB1A2B3C4D5E6F7800000003 /* DeltaEncoder.swift */; }; 7F8C00B87F71287FB535342E /* DeltaEncoderTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 7F8C00B87F71287FB535342D /* DeltaEncoderTests.swift */; }; @@ -1107,6 +1110,7 @@ D29A9F5929DD85BB005C54A4 /* RUMCommand.swift in Sources */ = {isa = PBXBuildFile; fileRef = 61C3E63A24BF1A4B008053F2 /* RUMCommand.swift */; }; D29A9F5A29DD85BB005C54A4 /* RUMScopeDependencies.swift in Sources */ = {isa = PBXBuildFile; fileRef = 6122514727FDFF82004F5AE4 /* RUMScopeDependencies.swift */; }; D29A9F5B29DD85BB005C54A4 /* VitalMemoryReader.swift in Sources */ = {isa = PBXBuildFile; fileRef = B3BBBCB0265E71C600943419 /* VitalMemoryReader.swift */; }; + D29A9FFF29DD85BB005C54A4 /* TimeseriesSessionCollector.swift in Sources */ = {isa = PBXBuildFile; fileRef = BB1A2B3C4D5E6F7800000001 /* TimeseriesSessionCollector.swift */; }; D29A9F5C29DD85BB005C54A4 /* RUMSessionScope.swift in Sources */ = {isa = PBXBuildFile; fileRef = 61C2C20624C098FC00C0321C /* RUMSessionScope.swift */; }; D29A9F5D29DD85BB005C54A4 /* RUMCommandSubscriber.swift in Sources */ = {isa = PBXBuildFile; fileRef = 616CCE12250A1868009FED46 /* RUMCommandSubscriber.swift */; }; D29A9F5E29DD85BB005C54A4 /* UIKitRUMViewsPredicate.swift in Sources */ = {isa = PBXBuildFile; fileRef = 61F3CDA62512144600C816E5 /* UIKitRUMViewsPredicate.swift */; }; @@ -2731,7 +2735,9 @@ AA0001012A000006000A0001 /* UIScrollViewDelegateProxyTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = UIScrollViewDelegateProxyTests.swift; sourceTree = ""; }; AA0001012A000007000A0001 /* UIScrollViewSwizzlerTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = UIScrollViewSwizzlerTests.swift; sourceTree = ""; }; B3BBBCB0265E71C600943419 /* VitalMemoryReader.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = VitalMemoryReader.swift; sourceTree = ""; }; + BB1A2B3C4D5E6F7800000001 /* TimeseriesSessionCollector.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = TimeseriesSessionCollector.swift; sourceTree = ""; }; BB1A2B3C4D5E6F7800000003 /* DeltaEncoder.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = DeltaEncoder.swift; sourceTree = ""; }; + 7F8C00B87F71287FB535342C /* TimeseriesSessionCollectorTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = TimeseriesSessionCollectorTests.swift; sourceTree = ""; }; 7F8C00B87F71287FB535342D /* DeltaEncoderTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = DeltaEncoderTests.swift; sourceTree = ""; }; B3BBBCBB265E71D100943419 /* VitalMemoryReaderTests.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = VitalMemoryReaderTests.swift; sourceTree = ""; }; B3E46CAA2D91B3A400BABF66 /* NetworkContextProvider.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = NetworkContextProvider.swift; sourceTree = ""; }; diff --git a/DatadogRUM/Sources/Timeseries/TimeseriesSessionCollector.swift b/DatadogRUM/Sources/Timeseries/TimeseriesSessionCollector.swift new file mode 100644 index 0000000000..4a93758381 --- /dev/null +++ b/DatadogRUM/Sources/Timeseries/TimeseriesSessionCollector.swift @@ -0,0 +1,256 @@ +/* + * Unless explicitly stated otherwise all files in this repository are licensed under the Apache License Version 2.0. + * This product includes software developed at Datadog (https://www.datadoghq.com/). + * Copyright 2019-Present Datadog, Inc. + */ + +import Foundation +import DatadogInternal + +/// Defines the interface for collecting timeseries data during a RUM session. +internal protocol TimeseriesCollecting: AnyObject { + func start(sessionID: String, applicationID: String, sessionType: RUMSessionType) + func stop() +} + +/// Collects memory and CPU samples at 1s intervals during a RUM session and flushes them +/// as `RUMTimeseriesMemoryEvent` / `RUMTimeseriesCpuEvent` batches via the RUM feature scope. +/// +/// At session start a coin is flipped: 50% of sessions send full-array `object` schema events, +/// 50% send delta-compressed events (`delta-object` for memory, `delta-scalar` for CPU). +internal class TimeseriesSessionCollector: TimeseriesCollecting { + private let memoryReader: SamplingBasedVitalReader + private let cpuUsageProvider: () -> Double? + private let compressionSampler: () -> Bool + private let batchSize: Int + private let samplingInterval: TimeInterval + private let featureScope: FeatureScope + private let totalRAM: Double + + private var memoryBuffer: [RUMTimeseriesMemoryEvent.Timeseries.Data] = [] + private var cpuBuffer: [RUMTimeseriesCpuEvent.Timeseries.Data] = [] + private var sessionID: String = "" + private var applicationID: String = "" + private var sessionType: RUMSessionType = .user + private var useDeltaCompression: Bool = false + private var timer: DispatchSourceTimer? + + /// All buffer mutations and timer events run on this queue. + private let queue = DispatchQueue(label: "com.datadoghq.timeseries-collector", qos: .utility) + + init( + memoryReader: SamplingBasedVitalReader, + featureScope: FeatureScope, + batchSize: Int = 30, + samplingInterval: TimeInterval = 1, + cpuUsageProvider: (() -> Double?)? = nil, + compressionSampler: @escaping () -> Bool = { Bool.random() } + ) { + self.memoryReader = memoryReader + self.batchSize = batchSize + self.samplingInterval = samplingInterval + self.featureScope = featureScope + self.totalRAM = Double(ProcessInfo.processInfo.physicalMemory) + self.cpuUsageProvider = cpuUsageProvider ?? { TimeseriesSessionCollector.processCPU() } + self.compressionSampler = compressionSampler + } + + /// Per-process CPU as a percentage (0–100+), summed across all app threads. + /// Separated into a static so it can be called from the init closure without capturing self. + private static func processCPU() -> Double? { + #if os(watchOS) + return nil + #else + var threadsList: thread_act_array_t? + var threadsCount = mach_msg_type_number_t() + let kr = withUnsafeMutablePointer(to: &threadsList) { + $0.withMemoryRebound(to: thread_act_array_t?.self, capacity: 1) { + task_threads(mach_task_self_, $0, &threadsCount) + } + } + guard kr == KERN_SUCCESS, let threadsList = threadsList else { + return nil + } + defer { + vm_deallocate( + mach_task_self_, + vm_address_t(bitPattern: threadsList), + vm_size_t(Int(threadsCount) * MemoryLayout.stride) + ) + } + var total = 0.0 + for i in 0.. 0 ? bytes / totalRAM * 100 : 0 + let dataPoint = RUMTimeseriesMemoryEvent.Timeseries.Data( + dataPoint: .init(memoryMax: bytes, memoryPercent: memoryPercent), + timestamp: now + ) + memoryBuffer.append(dataPoint) + if memoryBuffer.count >= batchSize { + flushMemory() + } + } + + if let cpuUsage = cpuUsageProvider() { + let dataPoint = RUMTimeseriesCpuEvent.Timeseries.Data( + dataPoint: .init(cpuUsage: cpuUsage), + timestamp: now + ) + cpuBuffer.append(dataPoint) + if cpuBuffer.count >= batchSize { + flushCPU() + } + } + } + + private func flushMemory() { + guard !memoryBuffer.isEmpty else { + return + } + let batch = memoryBuffer + memoryBuffer = [] + let sessionID = self.sessionID + let applicationID = self.applicationID + let sessionType = self.sessionType + let start = batch[0].timestamp + let end = batch[batch.count - 1].timestamp + let eventID = UUID().uuidString.lowercased() + let useDelta = self.useDeltaCompression + + featureScope.eventWriteContext { context, writer in + let objectEvent = RUMTimeseriesMemoryEvent( + dd: .init(), + application: .init(id: applicationID), + date: start / 1_000_000, + service: context.service, + session: .init(id: sessionID, type: sessionType), + source: .ios, + timeseries: .init( + data: batch, + end: end, + id: eventID, + schema: .object, + start: start + ), + version: context.version + ) + + if useDelta, + let deltaData = DeltaEncoder.encodeMemory(batch), + let eventData = try? JSONEncoder().encode(objectEvent), + var dict = try? JSONSerialization.jsonObject(with: eventData) as? [String: Any], + var ts = dict["timeseries"] as? [String: Any] { + ts["schema"] = "delta-object" + ts["data"] = deltaData + dict["timeseries"] = ts + writer.write(value: AnyEncodable(dict)) + } else { + writer.write(value: objectEvent) + } + } + } + + private func flushCPU() { + guard !cpuBuffer.isEmpty else { + return + } + let batch = cpuBuffer + cpuBuffer = [] + let sessionID = self.sessionID + let applicationID = self.applicationID + let sessionType = self.sessionType + let start = batch[0].timestamp + let end = batch[batch.count - 1].timestamp + let eventID = UUID().uuidString.lowercased() + let useDelta = self.useDeltaCompression + + featureScope.eventWriteContext { context, writer in + let objectEvent = RUMTimeseriesCpuEvent( + dd: .init(), + application: .init(id: applicationID), + date: start / 1_000_000, + service: context.service, + session: .init(id: sessionID, type: sessionType), + source: .ios, + timeseries: .init( + data: batch, + end: end, + id: eventID, + schema: .object, + start: start + ), + version: context.version + ) + + if useDelta, + let deltaData = DeltaEncoder.encodeCPU(batch), + let eventData = try? JSONEncoder().encode(objectEvent), + var dict = try? JSONSerialization.jsonObject(with: eventData) as? [String: Any], + var ts = dict["timeseries"] as? [String: Any] { + ts["schema"] = "delta-scalar" + ts["data"] = deltaData + dict["timeseries"] = ts + writer.write(value: AnyEncodable(dict)) + } else { + writer.write(value: objectEvent) + } + } + } +} diff --git a/DatadogRUM/Tests/Timeseries/TimeseriesSessionCollectorTests.swift b/DatadogRUM/Tests/Timeseries/TimeseriesSessionCollectorTests.swift new file mode 100644 index 0000000000..0e45abf2d4 --- /dev/null +++ b/DatadogRUM/Tests/Timeseries/TimeseriesSessionCollectorTests.swift @@ -0,0 +1,411 @@ +/* + * Unless explicitly stated otherwise all files in this repository are licensed under the Apache License Version 2.0. + * This product includes software developed at Datadog (https://www.datadoghq.com/). + * Copyright 2019-Present Datadog, Inc. + */ + +import XCTest +import TestUtilities +import DatadogInternal +@testable import DatadogRUM + +class TimeseriesSessionCollectorTests: XCTestCase { + private let featureScope = FeatureScopeMock() + private let memoryReader = SamplingBasedVitalReaderMock() + + // MARK: - Memory events + + func testWhenBatchSizeIsReached_itWritesMemoryEvent() { + // Given + memoryReader.vitalData = 1_000_000 + let collector = TimeseriesSessionCollector( + memoryReader: memoryReader, + featureScope: featureScope, + batchSize: 2, + samplingInterval: 0.05, + cpuUsageProvider: { nil }, + compressionSampler: { false } + ) + + // When + let expectation = self.expectation(description: "memory batch written") + expectation.assertForOverFulfill = false + DispatchQueue.main.asyncAfter(deadline: .now() + 0.5) { expectation.fulfill() } + + collector.start(sessionID: "session-abc", applicationID: "app-123", sessionType: .user) + waitForExpectations(timeout: 2) + collector.stop() + + // Then + let events = featureScope.eventsWritten(ofType: RUMTimeseriesMemoryEvent.self) + XCTAssertFalse(events.isEmpty, "Expected at least one memory batch to be written") + + let event = events[0] + XCTAssertEqual(event.session.id, "session-abc") + XCTAssertEqual(event.application.id, "app-123") + XCTAssertEqual(event.session.type, .user) + XCTAssertEqual(event.source, .ios) + XCTAssertEqual(event.timeseries.name, "memory") + XCTAssertEqual(event.timeseries.data.count, 2) + XCTAssertEqual(event.timeseries.data[0].dataPoint.memoryMax, 1_000_000) + XCTAssertGreaterThan(event.timeseries.data[0].dataPoint.memoryPercent, 0) + } + + func testWhenBatchSizeIsReached_itWritesCpuEvent() { + // Given + memoryReader.vitalData = nil + let collector = TimeseriesSessionCollector( + memoryReader: memoryReader, + featureScope: featureScope, + batchSize: 2, + samplingInterval: 0.05, + cpuUsageProvider: { 42.5 }, + compressionSampler: { false } + ) + + // When + let expectation = self.expectation(description: "cpu batch written") + expectation.assertForOverFulfill = false + DispatchQueue.main.asyncAfter(deadline: .now() + 0.5) { expectation.fulfill() } + + collector.start(sessionID: "session-abc", applicationID: "app-123", sessionType: .user) + waitForExpectations(timeout: 2) + collector.stop() + + // Then + let events = featureScope.eventsWritten(ofType: RUMTimeseriesCpuEvent.self) + XCTAssertFalse(events.isEmpty, "Expected at least one CPU batch to be written") + + let event = events[0] + XCTAssertEqual(event.session.id, "session-abc") + XCTAssertEqual(event.application.id, "app-123") + XCTAssertEqual(event.session.type, .user) + XCTAssertEqual(event.source, .ios) + XCTAssertEqual(event.timeseries.name, "cpu") + XCTAssertEqual(event.timeseries.data.count, 2) + XCTAssertEqual(event.timeseries.data[0].dataPoint.cpuUsage, 42.5) + } + + // MARK: - Flush on stop + + func testWhenStopIsCalled_itFlushesPartialMemoryBuffer() { + // Given + memoryReader.vitalData = 2_000_000 + let collector = TimeseriesSessionCollector( + memoryReader: memoryReader, + featureScope: featureScope, + batchSize: 100, // large batch — won't auto-flush + samplingInterval: 0.05, + cpuUsageProvider: { nil }, + compressionSampler: { false } + ) + + // When — let a few samples accumulate then stop + let expectation = self.expectation(description: "samples collected") + expectation.assertForOverFulfill = false + DispatchQueue.main.asyncAfter(deadline: .now() + 0.3) { expectation.fulfill() } + + collector.start(sessionID: "session-xyz", applicationID: "app-456", sessionType: .synthetics) + waitForExpectations(timeout: 2) + + let syncExpectation = self.expectation(description: "stop completed") + collector.stop() + DispatchQueue.global(qos: .utility).asyncAfter(deadline: .now() + 0.2) { syncExpectation.fulfill() } + waitForExpectations(timeout: 2) + + // Then + let events = featureScope.eventsWritten(ofType: RUMTimeseriesMemoryEvent.self) + XCTAssertFalse(events.isEmpty, "Expected partial buffer to be flushed on stop") + XCTAssertEqual(events[0].session.id, "session-xyz") + XCTAssertEqual(events[0].application.id, "app-456") + XCTAssertEqual(events[0].session.type, .synthetics) + } + + func testWhenStopIsCalled_itFlushesPartialCpuBuffer() { + // Given + memoryReader.vitalData = nil + let collector = TimeseriesSessionCollector( + memoryReader: memoryReader, + featureScope: featureScope, + batchSize: 100, + samplingInterval: 0.05, + cpuUsageProvider: { 10.0 }, + compressionSampler: { false } + ) + + let expectation = self.expectation(description: "samples collected") + expectation.assertForOverFulfill = false + DispatchQueue.main.asyncAfter(deadline: .now() + 0.3) { expectation.fulfill() } + + collector.start(sessionID: "session-xyz", applicationID: "app-456", sessionType: .ciTest) + waitForExpectations(timeout: 2) + + let syncExpectation = self.expectation(description: "stop completed") + collector.stop() + DispatchQueue.global(qos: .utility).asyncAfter(deadline: .now() + 0.2) { syncExpectation.fulfill() } + waitForExpectations(timeout: 2) + + // Then + let events = featureScope.eventsWritten(ofType: RUMTimeseriesCpuEvent.self) + XCTAssertFalse(events.isEmpty, "Expected partial CPU buffer to be flushed on stop") + XCTAssertEqual(events[0].session.type, .ciTest) + } + + // MARK: - No-data readers + + func testWhenReadersReturnNil_itWritesNoEvents() { + // Given + memoryReader.vitalData = nil + let collector = TimeseriesSessionCollector( + memoryReader: memoryReader, + featureScope: featureScope, + batchSize: 2, + samplingInterval: 0.05, + cpuUsageProvider: { nil } + ) + + let expectation = self.expectation(description: "sampling time elapsed") + expectation.assertForOverFulfill = false + DispatchQueue.main.asyncAfter(deadline: .now() + 0.3) { expectation.fulfill() } + + collector.start(sessionID: "session-abc", applicationID: "app-123", sessionType: .user) + waitForExpectations(timeout: 2) + collector.stop() + + // Then + XCTAssertTrue(featureScope.eventsWritten(ofType: RUMTimeseriesMemoryEvent.self).isEmpty) + XCTAssertTrue(featureScope.eventsWritten(ofType: RUMTimeseriesCpuEvent.self).isEmpty) + } + + // MARK: - Session restart + + func testWhenStartIsCalledAgain_itUsesNewSessionMetadata() { + // Given + memoryReader.vitalData = 512_000 + let collector = TimeseriesSessionCollector( + memoryReader: memoryReader, + featureScope: featureScope, + batchSize: 100, + samplingInterval: 0.05, + cpuUsageProvider: { nil }, + compressionSampler: { false } + ) + + // First session + let firstExpectation = self.expectation(description: "first session samples") + firstExpectation.assertForOverFulfill = false + DispatchQueue.main.asyncAfter(deadline: .now() + 0.3) { firstExpectation.fulfill() } + collector.start(sessionID: "session-1", applicationID: "app-1", sessionType: .user) + waitForExpectations(timeout: 2) + + // Second session — start() resets buffers and updates metadata + let secondExpectation = self.expectation(description: "second session samples") + secondExpectation.assertForOverFulfill = false + DispatchQueue.main.asyncAfter(deadline: .now() + 0.3) { secondExpectation.fulfill() } + collector.start(sessionID: "session-2", applicationID: "app-1", sessionType: .user) + waitForExpectations(timeout: 2) + + // Flush second session + let stopExpectation = self.expectation(description: "stop completed") + collector.stop() + DispatchQueue.global(qos: .utility).asyncAfter(deadline: .now() + 0.2) { stopExpectation.fulfill() } + waitForExpectations(timeout: 2) + + // Then — the flushed event should carry session-2 metadata + let events = featureScope.eventsWritten(ofType: RUMTimeseriesMemoryEvent.self) + let lastEvent = try! XCTUnwrap(events.last) + XCTAssertEqual(lastEvent.session.id, "session-2") + } + + // MARK: - Schema coin flip + + func testWhenDeltaCompressionSampled_itWritesDeltaEventForMemory() { + // Given + memoryReader.vitalData = 1_000_000 + let collector = TimeseriesSessionCollector( + memoryReader: memoryReader, + featureScope: featureScope, + batchSize: 3, + samplingInterval: 0.05, + cpuUsageProvider: { nil }, + compressionSampler: { true } + ) + + let expectation = self.expectation(description: "memory batch written") + expectation.assertForOverFulfill = false + DispatchQueue.main.asyncAfter(deadline: .now() + 0.5) { expectation.fulfill() } + + collector.start(sessionID: "session-delta", applicationID: "app-delta", sessionType: .user) + waitForExpectations(timeout: 2) + collector.stop() + + // Then — AnyEncodable delta-object event written, no typed object event + let typedEvents = featureScope.eventsWritten(ofType: RUMTimeseriesMemoryEvent.self) + XCTAssertTrue(typedEvents.isEmpty, "Object-schema typed event must not be written when delta is sampled") + + let anyEncodableEvents = featureScope.eventsWritten.compactMap { $0 as? AnyEncodable } + XCTAssertFalse(anyEncodableEvents.isEmpty, "Expected delta-schema AnyEncodable event") + + let jsonData = try! JSONEncoder().encode(anyEncodableEvents[0]) + let dict = try! XCTUnwrap(try? JSONSerialization.jsonObject(with: jsonData) as? [String: Any]) + let tsDict = try! XCTUnwrap(dict["timeseries"] as? [String: Any]) + XCTAssertEqual(tsDict["schema"] as? String, "delta-object") + let dataDict = try! XCTUnwrap(tsDict["data"] as? [String: Any]) + XCTAssertEqual(dataDict["resolution"] as? String, "ns") + XCTAssertNotNil(dataDict["ts"]) + XCTAssertNotNil(dataDict["memory_max"]) + XCTAssertNotNil(dataDict["memory_percent"]) + } + + func testWhenObjectSchemaSampled_itWritesObjectEventForMemory() { + // Given + memoryReader.vitalData = 1_000_000 + let collector = TimeseriesSessionCollector( + memoryReader: memoryReader, + featureScope: featureScope, + batchSize: 3, + samplingInterval: 0.05, + cpuUsageProvider: { nil }, + compressionSampler: { false } + ) + + let expectation = self.expectation(description: "memory batch written") + expectation.assertForOverFulfill = false + DispatchQueue.main.asyncAfter(deadline: .now() + 0.5) { expectation.fulfill() } + + collector.start(sessionID: "session-object", applicationID: "app-object", sessionType: .user) + waitForExpectations(timeout: 2) + collector.stop() + + // Then — typed object event written, no AnyEncodable delta event + let typedEvents = featureScope.eventsWritten(ofType: RUMTimeseriesMemoryEvent.self) + XCTAssertFalse(typedEvents.isEmpty, "Expected object-schema typed memory event") + XCTAssertEqual(typedEvents[0].timeseries.schema, .object) + XCTAssertTrue(featureScope.eventsWritten.compactMap { $0 as? AnyEncodable }.isEmpty) + } + + func testWhenDeltaCompressionSampled_itWritesDeltaEventForCPU() { + // Given + memoryReader.vitalData = nil + let collector = TimeseriesSessionCollector( + memoryReader: memoryReader, + featureScope: featureScope, + batchSize: 3, + samplingInterval: 0.05, + cpuUsageProvider: { 50.0 }, + compressionSampler: { true } + ) + + let expectation = self.expectation(description: "cpu batch written") + expectation.assertForOverFulfill = false + DispatchQueue.main.asyncAfter(deadline: .now() + 0.5) { expectation.fulfill() } + + collector.start(sessionID: "session-delta-cpu", applicationID: "app-delta", sessionType: .user) + waitForExpectations(timeout: 2) + collector.stop() + + // Then — AnyEncodable delta-scalar event written, no typed object event + let typedEvents = featureScope.eventsWritten(ofType: RUMTimeseriesCpuEvent.self) + XCTAssertTrue(typedEvents.isEmpty, "Object-schema typed event must not be written when delta is sampled") + + let anyEncodableEvents = featureScope.eventsWritten.compactMap { $0 as? AnyEncodable } + XCTAssertFalse(anyEncodableEvents.isEmpty, "Expected delta-schema AnyEncodable event") + + let jsonData = try! JSONEncoder().encode(anyEncodableEvents[0]) + let dict = try! XCTUnwrap(try? JSONSerialization.jsonObject(with: jsonData) as? [String: Any]) + let tsDict = try! XCTUnwrap(dict["timeseries"] as? [String: Any]) + XCTAssertEqual(tsDict["schema"] as? String, "delta-scalar") + let dataDict = try! XCTUnwrap(tsDict["data"] as? [String: Any]) + XCTAssertEqual(dataDict["resolution"] as? String, "ns") + XCTAssertNotNil(dataDict["ts"]) + XCTAssertNotNil(dataDict["value"]) + } + + func testWhenObjectSchemaSampled_itWritesObjectEventForCPU() { + // Given + memoryReader.vitalData = nil + let collector = TimeseriesSessionCollector( + memoryReader: memoryReader, + featureScope: featureScope, + batchSize: 3, + samplingInterval: 0.05, + cpuUsageProvider: { 50.0 }, + compressionSampler: { false } + ) + + let expectation = self.expectation(description: "cpu batch written") + expectation.assertForOverFulfill = false + DispatchQueue.main.asyncAfter(deadline: .now() + 0.5) { expectation.fulfill() } + + collector.start(sessionID: "session-object-cpu", applicationID: "app-object", sessionType: .user) + waitForExpectations(timeout: 2) + collector.stop() + + // Then — typed object event written, no AnyEncodable delta event + let typedEvents = featureScope.eventsWritten(ofType: RUMTimeseriesCpuEvent.self) + XCTAssertFalse(typedEvents.isEmpty, "Expected object-schema typed CPU event") + XCTAssertEqual(typedEvents[0].timeseries.schema, .object) + XCTAssertTrue(featureScope.eventsWritten.compactMap { $0 as? AnyEncodable }.isEmpty) + } + + func testWhenDeltaCompressionSampledWithSingleSample_itFallsBackToObjectEvent() { + // Given — delta sampled but only 1 sample: DeltaEncoder returns nil, falls back to object + memoryReader.vitalData = 1_000_000 + let collector = TimeseriesSessionCollector( + memoryReader: memoryReader, + featureScope: featureScope, + batchSize: 100, + samplingInterval: 0.05, + cpuUsageProvider: { nil }, + compressionSampler: { true } + ) + + let expectation = self.expectation(description: "one sample collected") + DispatchQueue.main.asyncAfter(deadline: .now() + 0.08) { expectation.fulfill() } + collector.start(sessionID: "session-fallback", applicationID: "app-fallback", sessionType: .user) + waitForExpectations(timeout: 2) + + let stopExpectation = self.expectation(description: "stop completed") + collector.stop() + DispatchQueue.global(qos: .utility).asyncAfter(deadline: .now() + 0.2) { stopExpectation.fulfill() } + waitForExpectations(timeout: 2) + + // Then — falls back to object event, no AnyEncodable + XCTAssertFalse(featureScope.eventsWritten(ofType: RUMTimeseriesMemoryEvent.self).isEmpty) + XCTAssertTrue(featureScope.eventsWritten.compactMap { $0 as? AnyEncodable }.isEmpty) + } + + // MARK: - Timeseries range + + func testTimestampsAreMonotonicallyIncreasing() { + // Given + memoryReader.vitalData = 1_000_000 + let collector = TimeseriesSessionCollector( + memoryReader: memoryReader, + featureScope: featureScope, + batchSize: 3, + samplingInterval: 0.05, + cpuUsageProvider: { nil }, + compressionSampler: { false } + ) + + let expectation = self.expectation(description: "first batch written") + expectation.assertForOverFulfill = false + DispatchQueue.main.asyncAfter(deadline: .now() + 0.5) { expectation.fulfill() } + + collector.start(sessionID: "session-abc", applicationID: "app-123", sessionType: .user) + waitForExpectations(timeout: 2) + collector.stop() + + // Then + let events = featureScope.eventsWritten(ofType: RUMTimeseriesMemoryEvent.self) + guard let event = events.first else { + XCTFail("Expected at least one memory event") + return + } + let timestamps = event.timeseries.data.map { $0.timestamp } + XCTAssertEqual(timestamps, timestamps.sorted(), "Timestamps should be monotonically increasing") + XCTAssertEqual(event.timeseries.start, timestamps.first) + XCTAssertEqual(event.timeseries.end, timestamps.last) + } +}