-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathSearchSyncFunction.cs
More file actions
68 lines (60 loc) · 2.55 KB
/
SearchSyncFunction.cs
File metadata and controls
68 lines (60 loc) · 2.55 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
using Azure.Messaging.EventHubs;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Logging;
using SearchSync.Common.EventContext.Adapters;
using SearchSync.Common.Services;
using System.Text;
namespace SearchSync.AzureFunctionsEventHub
{
public class EventHubListenerFunction
{
private readonly ISearchIngestionService _searchIngestionService;
private readonly IEventParser _eventParser;
private readonly ILogger<EventHubListenerFunction> _logger;
public EventHubListenerFunction(ISearchIngestionService searchIngestionService, IEventParser eventParser, ILogger<EventHubListenerFunction> logger)
{
_searchIngestionService = searchIngestionService;
_eventParser = eventParser;
_logger = logger;
}
[Function(nameof(EventHubListenerFunction))]
public async Task Run([EventHubTrigger("YOUR_EVENTHUB_NAME_HERE", Connection = "EventHubConnectionString")] EventData[] events)
{
_logger.LogDebug($"Processing {events.Length} event(s)");
var hasErrors = false;
var tasks = events.Select(async @event =>
{
var eventBody = Encoding.UTF8.GetString(@event.Body.ToArray());
var eventProperties = @event.Properties;
_logger.LogDebug($"Received event with body: {eventBody} and event properties: {eventProperties}");
try
{
var context = new EventHubEventContext(@event);
var (eventType, payload) = _eventParser.Parse(context);
await _searchIngestionService.ProcessEvent(eventType, payload);
}
catch (Exception ex)
{
hasErrors = true;
// swallow errors so the rest of events in the batch can still be processed
if (ex is ILoggableException loggable)
{
using (_logger.BeginScope(loggable.GetCustomDimensions()))
{
_logger.LogError(ex, loggable.GetLogMessage());
}
}
else
{
_logger.LogError(ex, ex.Message);
}
}
});
await Task.WhenAll(tasks);
if (hasErrors)
{
throw new Exception("One of the messages in this batch failed, please see logs for more details");
}
}
}
}