Build a minimal console application that produces, merges, and processes work using Hugo channels, wait groups, and Result<T>. By the end of the tutorial you will have an end-to-end pipeline that fans in multiple producers, validates messages, and shuts down gracefully on cancellation.
- Two producers feeding bounded channels.
- A fan-in relay that merges producers into a single channel.
- A consumer loop that transforms messages with result pipelines.
- Deterministic shutdown using
WaitGroup,CancellationToken, andDefer.
Estimated time: 10–15 minutes.
- .NET 10 SDK installed (
dotnet --versionshould report9.*or10.*). - A terminal and editor.
- A new folder for the sample (
mkdir HugoQuickstartif you prefer to create it manually).
Tip: Run
dotnet nuget locals all --clearif a previous restore fails; the tutorial relies on the officialHugoNuGet package.
dotnet new console -n HugoQuickstart
cd HugoQuickstart
dotnet add package HugoRestore completion confirms the package is available; fix restore issues before continuing.
Replace Program.cs with the following template:
using Hugo;
using static Hugo.Go;
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
var metrics = MakeChannel<int>(capacity: 8);
var jobs = MakeChannel<string>(capacity: 8);
var merged = MakeChannel<object>(capacity: 16);
var workers = new WaitGroup();Highlights:
CancellationTokenSourceenforces a five second deadline for the demo.- Two bounded channels (
metrics,jobs) feed a shared destination (merged). WaitGrouptracks background work so you can block on completion deterministically.
Append two WaitGroup.Go calls that populate the source channels and guarantee completion by deferring TryComplete:
workers.Go(async () =>
{
using var complete = Defer(() => metrics.Writer.TryComplete());
for (var i = 0; i < 3; i++)
{
await metrics.Writer.WriteAsync(i, cts.Token).ConfigureAwait(false);
}
});
workers.Go(async () =>
{
using var complete = Defer(() => jobs.Writer.TryComplete());
foreach (var name in new[] { "build", "deploy", "notify" })
{
await jobs.Writer.WriteAsync(name, cts.Token).ConfigureAwait(false);
}
});Defer mirrors Go's defer keyword and ensures the channel completes even if an exception occurs mid-loop.
Merge the source readers into merged.Writer:
var relay = FanInAsync(
sources: new[] { metrics.Reader, jobs.Reader },
destination: merged.Writer,
completeDestination: true,
cancellationToken: cts.Token);- Passing
completeDestination: trueguarantees the merged channel completes once every source finishes. FanInAsyncreturns a task that resolves toResult<Unit>—you can inspect it later for failures.
Add a result-aware consumer loop after the relay:
var messages = new List<string>();
await merged.Reader
.ReadAllAsync(cts.Token)
.Select(payload => payload switch
{
int sample => Ok(sample)
.Ensure(static value => value >= 0, value =>
Error.From($"negative sample {value}", "error.validation"))
.Map(static value => $"metric={value}"),
string job => Ok(job)
.Ensure(static value => !string.IsNullOrWhiteSpace(value))
.Map(static value => $"job={value}"),
_ => Err<string>("unsupported payload", "error.validation")
})
.ForEachAsync(async (result, ct) =>
{
var handled = await result
.TapAsync(static (value, token) =>
{
Console.WriteLine($"processed {value}");
return ValueTask.CompletedTask;
}, ct);
if (handled.IsSuccess)
{
messages.Add(handled.Value);
}
else
{
Console.WriteLine($"skipped: {handled.Error}");
}
return Result.Ok(Go.Unit.Value);
},
cts.Token);Ensurevalidates data without throwing.Maptransforms successful values, keeping the pipeline linear.TapAsyncinstruments successes without altering the result; switch toTapValueTaskAsyncwhen the instrumentation already returnsValueTask.
Finish the program with coordinated shutdown:
var fanInResult = await relay;
if (fanInResult.IsFailure)
{
Console.WriteLine($"fan-in failed: {fanInResult.Error}");
}
await workers.WaitAsync(cts.Token);
Console.WriteLine(string.Join(", ", messages));WaitAsync observes the same cancellation token as the consumer loop, so the app exits cleanly when the deadline expires.
dotnet runExpected output:
processed metric=0
processed metric=1
processed metric=2
processed job=build
processed job=deploy
processed job=notify
metric=0, metric=1, metric=2, job=build, job=deploy, job=notify
Try shortening the timeout or hitting Ctrl+C to see how Error.Canceled flows through the result pipeline without throwing.
using Hugo.Diagnostics.OpenTelemetry;
using Microsoft.Extensions.Hosting;
builder.Services
.AddOpenTelemetry()
.AddHugoDiagnostics(options =>
{
options.ServiceName = "hugo-quickstart";
options.AddPrometheusExporter = true;
});Run the sample with a Prometheus collector (see Publish metrics and traces to OpenTelemetry) to inspect counters such as waitgroup.additions and channel.select.latency.
- Every async call accepted a
CancellationToken. - Producers always completed their writers, even when cancellation fired.
- The consumer treated validation failures as data, not exceptions.
- Shutdown awaited both the fan-in relay and the worker group.
If any of these checks surprised you, revisit the corresponding step before moving on.
- Coordinate fan-in workflows for more advanced fan-in scenarios.
- Publish metrics and traces to OpenTelemetry to light up observability.
- Explore the
Hugo.WorkerSampleproject for a full worker that combines queues, retries, and diagnostics.