Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ lazy val root = project
.aggregate(
(sequentially.projectRefs ++
`sequentially-ce`.projectRefs ++
`sequentially-ce-metrics`.projectRefs ++
`sequentially-metrics`.projectRefs :+
projectToRef(benchmark)): _*
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import scala.concurrent.Future
/** Wrapper around SequentiallyF that adds metrics tracking.
* Since SequentiallyF is a final class, we wrap it by delegation.
*/
final class MeteredSequentiallyF[F[_]: Async, -K] private (
final class MeteredSequentiallyF[F[_] : Async, -K] private (
private val sequentially: SequentiallyF[F, K],
private val metrics: SequentiallyMetricsF[F],
) {
Expand All @@ -27,22 +27,22 @@ final class MeteredSequentiallyF[F[_]: Async, -K] private (

def applyF[T](key: K)(task: => F[T]): F[T] = {
val start = System.nanoTime()

metrics.queue(start) *> metrics.run(sequentially.applyF(key)(task))
}
}

object MeteredSequentiallyF {

def apply[F[_]: Async, K](
def apply[F[_] : Async, K](
sequentially: SequentiallyF[F, K],
name: String,
sequentiallyMetrics: SequentiallyMetricsF.Factory[F],
): MeteredSequentiallyF[F, K] = {
apply(sequentially, sequentiallyMetrics(name))
}

def apply[F[_]: Async, K](
def apply[F[_] : Async, K](
sequentially: SequentiallyF[F, K],
metrics: SequentiallyMetricsF[F],
): MeteredSequentiallyF[F, K] = {
Expand All @@ -59,7 +59,7 @@ object MeteredSequentiallyF {
def apply[K]: SequentiallyF[F, K]
}

def apply[F[_]: Async](
def apply[F[_] : Async](
provider: Provider[F],
sequentiallyMetrics: SequentiallyMetricsF.Factory[F],
): Factory[F] = new Factory[F] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ object SequentiallyMetricsF {
/** @note Must be singleton as metric names must be unique.
* @see CollectorRegistry#register
*/
def apply[F[_]: Sync](
def apply[F[_] : Sync](
prometheusRegistry: CollectorRegistry,
prefix: String = "sequentially",
): Factory[F] = {
Expand Down Expand Up @@ -54,4 +54,3 @@ object SequentiallyMetricsF {
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,12 @@ class MeteredSequentiallyFSpec extends AnyWordSpec with Matchers with ScalaFutur
val queueCount = registry.getSampleValue(
"sequentially_time_count",
Array("name", "operation"),
Array("test-metrics", "queue")
Array("test-metrics", "queue"),
)
val runCount = registry.getSampleValue(
"sequentially_time_count",
Array("name", "operation"),
Array("test-metrics", "run")
Array("test-metrics", "run"),
)

queueCount.doubleValue() should be >= 0.0
Expand All @@ -116,7 +116,7 @@ class MeteredSequentiallyFSpec extends AnyWordSpec with Matchers with ScalaFutur
val (dispatcher, dispatcherCleanup) = Dispatcher.parallel[IO].allocated.unsafeRunSync()
val metricsFactory = SequentiallyMetricsF.Factory[IO](registry)
val metered = MeteredSequentiallyF(sequentially, "test-apply", metricsFactory)

implicit val disp: Dispatcher[IO] = dispatcher

try {
Expand Down Expand Up @@ -153,12 +153,12 @@ class MeteredSequentiallyFSpec extends AnyWordSpec with Matchers with ScalaFutur
val queueCount = registry.getSampleValue(
"sequentially_time_count",
Array("name", "operation"),
Array("test-apply-metrics", "queue")
Array("test-apply-metrics", "queue"),
)
val runCount = registry.getSampleValue(
"sequentially_time_count",
Array("name", "operation"),
Array("test-apply-metrics", "run")
Array("test-apply-metrics", "run"),
)

queueCount.doubleValue() should be >= 0.0
Expand Down Expand Up @@ -225,12 +225,12 @@ class MeteredSequentiallyFSpec extends AnyWordSpec with Matchers with ScalaFutur
val queueCount = registry.getSampleValue(
"sequentially_time_count",
Array("name", "operation"),
Array("test-multiple", "queue")
Array("test-multiple", "queue"),
)
val runCount = registry.getSampleValue(
"sequentially_time_count",
Array("name", "operation"),
Array("test-multiple", "run")
Array("test-multiple", "run"),
)

queueCount.doubleValue() shouldEqual 10.0
Expand All @@ -255,12 +255,12 @@ class MeteredSequentiallyFSpec extends AnyWordSpec with Matchers with ScalaFutur
val count1 = registry.getSampleValue(
"sequentially_time_count",
Array("name", "operation"),
Array("name1", "run")
Array("name1", "run"),
)
val count2 = registry.getSampleValue(
"sequentially_time_count",
Array("name", "operation"),
Array("name2", "run")
Array("name2", "run"),
)

count1.doubleValue() should be >= 0.0
Expand Down Expand Up @@ -310,16 +310,16 @@ class MeteredSequentiallyFSpec extends AnyWordSpec with Matchers with ScalaFutur
val registry = new CollectorRegistry()
val (sequentially, cleanup) = SequentiallyF.resource[IO, Int]().allocated.unsafeRunSync()
val metricsFactory = SequentiallyMetricsF.Factory[IO](registry)

val provider = new MeteredSequentiallyF.Factory.Provider[IO] {
override def apply[K]: SequentiallyF[IO, K] = sequentially.asInstanceOf[SequentiallyF[IO, K]]
}

val factory = MeteredSequentiallyF.Factory(provider, metricsFactory)

try {
val metered = factory("test-factory")

val result = metered.applyF(1)(IO.pure("result")).unsafeRunSync()
result shouldEqual "result"

Expand All @@ -332,11 +332,11 @@ class MeteredSequentiallyFSpec extends AnyWordSpec with Matchers with ScalaFutur
val registry = new CollectorRegistry()
val (sequentially, cleanup) = SequentiallyF.resource[IO, Int]().allocated.unsafeRunSync()
val metricsFactory = SequentiallyMetricsF.Factory[IO](registry)

val provider = new MeteredSequentiallyF.Factory.Provider[IO] {
override def apply[K]: SequentiallyF[IO, K] = sequentially.asInstanceOf[SequentiallyF[IO, K]]
}

val factory = MeteredSequentiallyF.Factory(provider, metricsFactory)

try {
Expand All @@ -357,11 +357,11 @@ class MeteredSequentiallyFSpec extends AnyWordSpec with Matchers with ScalaFutur
val registry = new CollectorRegistry()
val (sequentially, cleanup) = SequentiallyF.resource[IO, Int]().allocated.unsafeRunSync()
val metricsFactory = SequentiallyMetricsF.Factory[IO](registry)

val provider = new MeteredSequentiallyF.Factory.Provider[IO] {
override def apply[K]: SequentiallyF[IO, K] = sequentially.asInstanceOf[SequentiallyF[IO, K]]
}

val factory = MeteredSequentiallyF.Factory(provider, metricsFactory)

try {
Expand All @@ -371,7 +371,7 @@ class MeteredSequentiallyFSpec extends AnyWordSpec with Matchers with ScalaFutur
val count = registry.getSampleValue(
"sequentially_time_count",
Array("name", "operation"),
Array("factory-test", "run")
Array("factory-test", "run"),
)

count.doubleValue() should be >= 0.0
Expand All @@ -382,4 +382,3 @@ class MeteredSequentiallyFSpec extends AnyWordSpec with Matchers with ScalaFutur
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class SequentiallyMetricsFSpec extends AnyWordSpec with Matchers {
val factory = SequentiallyMetricsF.Factory[IO](registry, prefix = "custom_prefix")

factory("test-name")

// Verify metric exists with custom prefix
val metricName = "custom_prefix_time"
val samples = registry.metricFamilySamples().asIterator().asScala.toSeq
Expand All @@ -53,7 +53,7 @@ class SequentiallyMetricsFSpec extends AnyWordSpec with Matchers {
val factory = SequentiallyMetricsF.Factory[IO](registry)

factory("test-name")

// Verify metric exists with default prefix
val metricName = "sequentially_time"
val samples = registry.metricFamilySamples().asIterator().asScala.toSeq
Expand All @@ -70,17 +70,17 @@ class SequentiallyMetricsFSpec extends AnyWordSpec with Matchers {
val metrics = factory("test-queue")

val startNanos = System.nanoTime()

// Record queue time
metrics.queue(startNanos).unsafeRunSync()

// Verify metric was recorded
val summary = registry.getSampleValue(
"sequentially_time_count",
Array("name", "operation"),
Array("test-queue", "queue")
Array("test-queue", "queue"),
)

Option(summary).isDefined shouldBe true
summary.doubleValue() should be >= 0.0
}
Expand All @@ -99,12 +99,12 @@ class SequentiallyMetricsFSpec extends AnyWordSpec with Matchers {
val count1 = registry.getSampleValue(
"sequentially_time_count",
Array("name", "operation"),
Array("name1", "queue")
Array("name1", "queue"),
)
val count2 = registry.getSampleValue(
"sequentially_time_count",
Array("name", "operation"),
Array("name2", "queue")
Array("name2", "queue"),
)

Option(count1).isDefined shouldBe true
Expand Down Expand Up @@ -139,7 +139,7 @@ class SequentiallyMetricsFSpec extends AnyWordSpec with Matchers {
val count = registry.getSampleValue(
"sequentially_time_count",
Array("name", "operation"),
Array("test-run", "run")
Array("test-run", "run"),
)

Option(count).isDefined shouldBe true
Expand All @@ -159,12 +159,12 @@ class SequentiallyMetricsFSpec extends AnyWordSpec with Matchers {
val count1 = registry.getSampleValue(
"sequentially_time_count",
Array("name", "operation"),
Array("name1", "run")
Array("name1", "run"),
)
val count2 = registry.getSampleValue(
"sequentially_time_count",
Array("name", "operation"),
Array("name2", "run")
Array("name2", "run"),
)

Option(count1).isDefined shouldBe true
Expand Down Expand Up @@ -202,7 +202,7 @@ class SequentiallyMetricsFSpec extends AnyWordSpec with Matchers {
val count = Option(registry.getSampleValue(
"sequentially_time_count",
Array("name", "operation"),
Array("test-error", "run")
Array("test-error", "run"),
))

// If exception occurs before flatMap completes, metric may not be recorded
Expand Down Expand Up @@ -251,7 +251,7 @@ class SequentiallyMetricsFSpec extends AnyWordSpec with Matchers {
val summary = registry.getSampleValue(
"sequentially_time_sum",
Array("name", "operation"),
Array("test-duration", "run")
Array("test-duration", "run"),
)

Option(summary).isDefined shouldBe true
Expand All @@ -275,12 +275,12 @@ class SequentiallyMetricsFSpec extends AnyWordSpec with Matchers {
val queueCount = registry.getSampleValue(
"sequentially_time_count",
Array("name", "operation"),
Array("test-integration", "queue")
Array("test-integration", "queue"),
)
val runCount = registry.getSampleValue(
"sequentially_time_count",
Array("name", "operation"),
Array("test-integration", "run")
Array("test-integration", "run"),
)

Option(queueCount).isDefined shouldBe true
Expand All @@ -305,12 +305,12 @@ class SequentiallyMetricsFSpec extends AnyWordSpec with Matchers {
val queueCount = registry.getSampleValue(
"sequentially_time_count",
Array("name", "operation"),
Array("test-multiple", "queue")
Array("test-multiple", "queue"),
)
val runCount = registry.getSampleValue(
"sequentially_time_count",
Array("name", "operation"),
Array("test-multiple", "run")
Array("test-multiple", "run"),
)

Option(queueCount).isDefined shouldBe true
Expand All @@ -320,4 +320,3 @@ class SequentiallyMetricsFSpec extends AnyWordSpec with Matchers {
}
}
}