Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions Datadog/Datadog.xcodeproj/project.pbxproj
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,9 @@
61DA8CB828647A500074A606 /* InternalLoggerTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 61DA8CB728647A500074A606 /* InternalLoggerTests.swift */; };
61DB33B225DEDFC200F7EA71 /* CustomObjcViewController.m in Sources */ = {isa = PBXBuildFile; fileRef = 61DB33B125DEDFC200F7EA71 /* CustomObjcViewController.m */; };
61DCC8472C05CD0000CB59E5 /* SessionEndedMetricControllerTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 61DCC8462C05CD0000CB59E5 /* SessionEndedMetricControllerTests.swift */; };
61DCC8482C05CD0000CB59E5 /* SessionEndedMetricControllerTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 61DCC8462C05CD0000CB59E5 /* SessionEndedMetricControllerTests.swift */; };
B51668F3A97DEAC2917B0F44 /* TimeseriesSessionCollectorTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 7F8C00B87F71287FB535342C /* TimeseriesSessionCollectorTests.swift */; };
21A5D61036FA5C28E71ED824 /* TimeseriesSessionCollectorTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 7F8C00B87F71287FB535342C /* TimeseriesSessionCollectorTests.swift */; };
BB1A2B3C4D5E6F7800000004 /* DeltaEncoder.swift in Sources */ = {isa = PBXBuildFile; fileRef = BB1A2B3C4D5E6F7800000003 /* DeltaEncoder.swift */; };
BB1A2B3C4D5E6F7800000005 /* DeltaEncoder.swift in Sources */ = {isa = PBXBuildFile; fileRef = BB1A2B3C4D5E6F7800000003 /* DeltaEncoder.swift */; };
7F8C00B87F71287FB535342E /* DeltaEncoderTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 7F8C00B87F71287FB535342D /* DeltaEncoderTests.swift */; };
Expand Down Expand Up @@ -1107,6 +1110,7 @@
D29A9F5929DD85BB005C54A4 /* RUMCommand.swift in Sources */ = {isa = PBXBuildFile; fileRef = 61C3E63A24BF1A4B008053F2 /* RUMCommand.swift */; };
D29A9F5A29DD85BB005C54A4 /* RUMScopeDependencies.swift in Sources */ = {isa = PBXBuildFile; fileRef = 6122514727FDFF82004F5AE4 /* RUMScopeDependencies.swift */; };
D29A9F5B29DD85BB005C54A4 /* VitalMemoryReader.swift in Sources */ = {isa = PBXBuildFile; fileRef = B3BBBCB0265E71C600943419 /* VitalMemoryReader.swift */; };
D29A9FFF29DD85BB005C54A4 /* TimeseriesSessionCollector.swift in Sources */ = {isa = PBXBuildFile; fileRef = BB1A2B3C4D5E6F7800000001 /* TimeseriesSessionCollector.swift */; };
D29A9F5C29DD85BB005C54A4 /* RUMSessionScope.swift in Sources */ = {isa = PBXBuildFile; fileRef = 61C2C20624C098FC00C0321C /* RUMSessionScope.swift */; };
D29A9F5D29DD85BB005C54A4 /* RUMCommandSubscriber.swift in Sources */ = {isa = PBXBuildFile; fileRef = 616CCE12250A1868009FED46 /* RUMCommandSubscriber.swift */; };
D29A9F5E29DD85BB005C54A4 /* UIKitRUMViewsPredicate.swift in Sources */ = {isa = PBXBuildFile; fileRef = 61F3CDA62512144600C816E5 /* UIKitRUMViewsPredicate.swift */; };
Expand Down Expand Up @@ -2731,7 +2735,9 @@
AA0001012A000006000A0001 /* UIScrollViewDelegateProxyTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = UIScrollViewDelegateProxyTests.swift; sourceTree = "<group>"; };
AA0001012A000007000A0001 /* UIScrollViewSwizzlerTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = UIScrollViewSwizzlerTests.swift; sourceTree = "<group>"; };
B3BBBCB0265E71C600943419 /* VitalMemoryReader.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = VitalMemoryReader.swift; sourceTree = "<group>"; };
BB1A2B3C4D5E6F7800000001 /* TimeseriesSessionCollector.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = TimeseriesSessionCollector.swift; sourceTree = "<group>"; };
BB1A2B3C4D5E6F7800000003 /* DeltaEncoder.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = DeltaEncoder.swift; sourceTree = "<group>"; };
7F8C00B87F71287FB535342C /* TimeseriesSessionCollectorTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = TimeseriesSessionCollectorTests.swift; sourceTree = "<group>"; };
7F8C00B87F71287FB535342D /* DeltaEncoderTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = DeltaEncoderTests.swift; sourceTree = "<group>"; };
B3BBBCBB265E71D100943419 /* VitalMemoryReaderTests.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = VitalMemoryReaderTests.swift; sourceTree = "<group>"; };
B3E46CAA2D91B3A400BABF66 /* NetworkContextProvider.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = NetworkContextProvider.swift; sourceTree = "<group>"; };
Expand Down
256 changes: 256 additions & 0 deletions DatadogRUM/Sources/Timeseries/TimeseriesSessionCollector.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
/*
* Unless explicitly stated otherwise all files in this repository are licensed under the Apache License Version 2.0.
* This product includes software developed at Datadog (https://www.datadoghq.com/).
* Copyright 2019-Present Datadog, Inc.
*/

import Foundation
import DatadogInternal

/// Defines the interface for collecting timeseries data during a RUM session.
internal protocol TimeseriesCollecting: AnyObject {
func start(sessionID: String, applicationID: String, sessionType: RUMSessionType)
func stop()
}

/// Collects memory and CPU samples at 1s intervals during a RUM session and flushes them
/// as `RUMTimeseriesMemoryEvent` / `RUMTimeseriesCpuEvent` batches via the RUM feature scope.
///
/// At session start a coin is flipped: 50% of sessions send full-array `object` schema events,
/// 50% send delta-compressed events (`delta-object` for memory, `delta-scalar` for CPU).
internal class TimeseriesSessionCollector: TimeseriesCollecting {
private let memoryReader: SamplingBasedVitalReader
private let cpuUsageProvider: () -> Double?
private let compressionSampler: () -> Bool
private let batchSize: Int
private let samplingInterval: TimeInterval
private let featureScope: FeatureScope
private let totalRAM: Double

private var memoryBuffer: [RUMTimeseriesMemoryEvent.Timeseries.Data] = []
private var cpuBuffer: [RUMTimeseriesCpuEvent.Timeseries.Data] = []
private var sessionID: String = ""
private var applicationID: String = ""
private var sessionType: RUMSessionType = .user
private var useDeltaCompression: Bool = false
private var timer: DispatchSourceTimer?

/// All buffer mutations and timer events run on this queue.
private let queue = DispatchQueue(label: "com.datadoghq.timeseries-collector", qos: .utility)

init(
memoryReader: SamplingBasedVitalReader,
featureScope: FeatureScope,
batchSize: Int = 30,
samplingInterval: TimeInterval = 1,
cpuUsageProvider: (() -> Double?)? = nil,
compressionSampler: @escaping () -> Bool = { Bool.random() }
) {
self.memoryReader = memoryReader
self.batchSize = batchSize
self.samplingInterval = samplingInterval
self.featureScope = featureScope
self.totalRAM = Double(ProcessInfo.processInfo.physicalMemory)
self.cpuUsageProvider = cpuUsageProvider ?? { TimeseriesSessionCollector.processCPU() }
self.compressionSampler = compressionSampler
}

/// Per-process CPU as a percentage (0–100+), summed across all app threads.
/// Separated into a static so it can be called from the init closure without capturing self.
private static func processCPU() -> Double? {
#if os(watchOS)
return nil
#else
var threadsList: thread_act_array_t?
var threadsCount = mach_msg_type_number_t()
let kr = withUnsafeMutablePointer(to: &threadsList) {
$0.withMemoryRebound(to: thread_act_array_t?.self, capacity: 1) {
task_threads(mach_task_self_, $0, &threadsCount)
}
}
guard kr == KERN_SUCCESS, let threadsList = threadsList else {
return nil
}
defer {
vm_deallocate(
mach_task_self_,
vm_address_t(bitPattern: threadsList),
vm_size_t(Int(threadsCount) * MemoryLayout<thread_t>.stride)
)
}
var total = 0.0
for i in 0..<threadsCount {
var info = thread_basic_info()
var infoCount = mach_msg_type_number_t(THREAD_INFO_MAX)
let kr = withUnsafeMutablePointer(to: &info) {
$0.withMemoryRebound(to: integer_t.self, capacity: 1) {
thread_info(threadsList[Int(i)], thread_flavor_t(THREAD_BASIC_INFO), $0, &infoCount)
}
}
guard kr == KERN_SUCCESS, info.flags != TH_FLAGS_IDLE else {
continue
}
total += Double(info.cpu_usage) / Double(TH_USAGE_SCALE) * 100.0
}
return total
#endif
}

/// Resets state, flips the compression coin, and starts a 1s sampling timer for the new session.
func start(sessionID: String, applicationID: String, sessionType: RUMSessionType) {
queue.async { [weak self] in
guard let self = self else {
return
}
self.sessionID = sessionID
self.applicationID = applicationID
self.sessionType = sessionType
self.useDeltaCompression = self.compressionSampler()
self.memoryBuffer = []
self.cpuBuffer = []

self.timer?.cancel()
let timer = DispatchSource.makeTimerSource(queue: self.queue)
timer.schedule(deadline: .now() + samplingInterval, repeating: samplingInterval)
timer.setEventHandler { [weak self] in self?.sample() }
timer.resume()
self.timer = timer
}
}

/// Stops sampling and flushes any remaining buffered data points.
func stop() {
queue.async { [weak self] in
guard let self = self else {
return
}
self.timer?.cancel()
self.timer = nil
self.flushMemory()
self.flushCPU()
}
}

// MARK: - Private

private func sample() {
let now = Int64(Date().timeIntervalSince1970 * 1_000_000_000)

if let bytes = memoryReader.readVitalData() {
let memoryPercent = totalRAM > 0 ? bytes / totalRAM * 100 : 0
let dataPoint = RUMTimeseriesMemoryEvent.Timeseries.Data(
dataPoint: .init(memoryMax: bytes, memoryPercent: memoryPercent),
timestamp: now
)
memoryBuffer.append(dataPoint)
if memoryBuffer.count >= batchSize {
flushMemory()
}
}

if let cpuUsage = cpuUsageProvider() {
let dataPoint = RUMTimeseriesCpuEvent.Timeseries.Data(
dataPoint: .init(cpuUsage: cpuUsage),
timestamp: now
)
cpuBuffer.append(dataPoint)
if cpuBuffer.count >= batchSize {
flushCPU()
}
}
}

private func flushMemory() {
guard !memoryBuffer.isEmpty else {
return
}
let batch = memoryBuffer
memoryBuffer = []
let sessionID = self.sessionID
let applicationID = self.applicationID
let sessionType = self.sessionType
let start = batch[0].timestamp
let end = batch[batch.count - 1].timestamp
let eventID = UUID().uuidString.lowercased()
let useDelta = self.useDeltaCompression

featureScope.eventWriteContext { context, writer in
let objectEvent = RUMTimeseriesMemoryEvent(
dd: .init(),
application: .init(id: applicationID),
date: start / 1_000_000,
service: context.service,
session: .init(id: sessionID, type: sessionType),
source: .ios,
timeseries: .init(
data: batch,
end: end,
id: eventID,
schema: .object,
start: start
),
version: context.version
)

if useDelta,
let deltaData = DeltaEncoder.encodeMemory(batch),
let eventData = try? JSONEncoder().encode(objectEvent),
var dict = try? JSONSerialization.jsonObject(with: eventData) as? [String: Any],
var ts = dict["timeseries"] as? [String: Any] {
ts["schema"] = "delta-object"
ts["data"] = deltaData
dict["timeseries"] = ts
writer.write(value: AnyEncodable(dict))
} else {
writer.write(value: objectEvent)
}
}
}

private func flushCPU() {
guard !cpuBuffer.isEmpty else {
return
}
let batch = cpuBuffer
cpuBuffer = []
let sessionID = self.sessionID
let applicationID = self.applicationID
let sessionType = self.sessionType
let start = batch[0].timestamp
let end = batch[batch.count - 1].timestamp
let eventID = UUID().uuidString.lowercased()
let useDelta = self.useDeltaCompression

featureScope.eventWriteContext { context, writer in
let objectEvent = RUMTimeseriesCpuEvent(
dd: .init(),
application: .init(id: applicationID),
date: start / 1_000_000,
service: context.service,
session: .init(id: sessionID, type: sessionType),
source: .ios,
timeseries: .init(
data: batch,
end: end,
id: eventID,
schema: .object,
start: start
),
version: context.version
)

if useDelta,
let deltaData = DeltaEncoder.encodeCPU(batch),
let eventData = try? JSONEncoder().encode(objectEvent),
var dict = try? JSONSerialization.jsonObject(with: eventData) as? [String: Any],
var ts = dict["timeseries"] as? [String: Any] {
ts["schema"] = "delta-scalar"
ts["data"] = deltaData
dict["timeseries"] = ts
writer.write(value: AnyEncodable(dict))
} else {
writer.write(value: objectEvent)
}
}
}
}
Loading