C# Sharp Timed Hosted Service

Sharp Timed Hosted Service

This is just a Hosted Service that runs with a timer, this means the task is run at intervals. For example every 60 seconds poll a service for changes. Hosted Service is a generic term Microsoft uses as they state “Hosted Services are services/logic that you host within your host/application/microservice.” The Microsoft project template for this is Worker Service (using the BackgroundService base class) and other common names I’ve seen are Bots, Worker Processes, Workers and my personal favorite Daemon 😈

non-HTTP workloads like messaging, background tasks, console applications etc. These types of applications don’t have the notion of an incoming HTTP request like a traditional ASP.NET/ASP.NET Core Web Application - docs.microsoft.com

Story

Consider the following events feed for a shopping cart below. These events are CRUD actions CART_CREATE, CART_UPDATE and CART_DELETE. The following query string parameters are used from the json-server instance.

  • The watermark is set with id_gte=42 this means where events.id >= 42
  • Pagination is set with _page=3 this means only show page 3
  • Set the page size with _limit=25 this means the page size is 25, ie: the max events count will be 25
Verb URI example
GET http://localhost:3331/events?id_gte=42&_page=3&_limit=25
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
{
"events": [{
"id": 42,
"event_type": "CART_CREATE",
"user_id": "985e732f-bf00-4999-a1b6-873973f9b8ff",
"date_utc": "2021-09-08T04:56:19Z",
"items": [{
"id": "0760c3e1-1ceb-4a00-aede-328c66f729c7",
"quantity": 4,
"unit_cost": 1.99
},
{
"id": "117e986e-c08f-4483-8a8f-d5e1c0512650",
"quantity": 9,
"unit_cost": 0.29
}
]
},
{
"id": 43,
"event_type": "CART_UPDATE",
"user_id": "61af172f-ec9e-4789-848c-9a2bf3f16ba3",
"date_utc": "2021-09-08T04:59:30Z",
"items": [{
"id": "f4e0cc62-5be9-4ecc-82b4-4c90968a293c",
"quantity": 2,
"unit_cost": 3.25
}]
},
{
"id": 45,
"event_type": "CART_DELETE",
"user_id": "5d9e7c82-0341-4d1c-aba4-f62789d77e6f"
}
]
}

Task

Create a hosted service that fulfills the following acceptance criteria:

  • consumes this feed and persists the CRUD event to a database as a CREATE, UPDATE or DELETE
  • a watermark needs to be used so events are not processed again
  • use page and size parameters
  • the service should run infinitely until the process is interupted with ctrl+c
  • the solution should be extensible

Solution

This is the simplest no frills solution, from here it could be adapted to inlude message queues and better separation of concerns. The code for this solution can be found at https://github.com/carlpaton/TimedHostedServiceDemo

Timed Hosted Service Simple

TimedHostedService.Worker

  1. Create a timed background task per the Microsoft documentation using the IHostedService interface. Note that the BackgroundService is a base class for implementing a long running IHostedService. So this would work but I’ve use the interface instead as that the example for a time hosted service at docs.microsoft.com.

  2. In Program.cs use HostBuilder to configure the startup pipeline. Use ConfigureServices to register TimedHostedService with AddHostedService. Additionally add console logging with ConfigureLogging and configuration with ConfigureAppConfiguration

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class TimedHostedService : IHostedService, IDisposable
{
private readonly ILogger<TimedHostedService> _logger;
private Timer _timer;

public TimedHostedService(ILogger<TimedHostedService> logger)
{
_logger = logger;
}

public Task StartAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("STARTASYNC RUN");
_timer = new Timer(DoWork, null, TimeSpan.Zero, TimeSpan.FromSeconds(42));

return Task.CompletedTask;
}

public Task StopAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("STOPASYNC RUN");
_timer?.Change(Timeout.Infinite, 0);

return Task.CompletedTask;
}

public void Dispose()
{
_timer?.Dispose();
}

private void DoWork(object state)
{
_logger.LogInformation($"EVENT BROKER RUNS AGAIN IN 42 SECONDS.");
}
}
  1. Running the app now produces a timed event every 42 seconds and this is shown in the logs until ctrl+c is pressed.

Beauty!

1
2
3
4
info: TimedHostedService.Worker.TimedHostedService[0]
STARTASYNC RUN
info: TimedHostedService.Worker.TimedHostedService[0]
EVENT BROKER RUNS AGAIN IN 42 SECONDS.

Alternative to Time Hosted

Instead of using a Timer its also acceptable to just use a loop that exits based on a CancellationToken, here its more common to then just use the BackgroundService base class and only override ExecuteAsync

1
2
3
4
5
6
7
8
9
10
public class Worker : BackgroundService
{
protected override async Task ExecuteAsync(CancellationToken token)
{
while (!token.IsCancellationRequested)
{
// worker things!
}
}
}

TimedHostedService.Worker.Domain

Event Broker (CartEvents)

This orchestrates the flow and doesnt have to be called a Broker could be Processor or anything that makes sense to your team. This will be called by DoWork in TimedHostedService and only needs one method ProcessAsync

1
2
3
4
5
6
7
8
9
10
11
12
13
14
namespace TimedHostedService.Worker.Domain.CartEvents
{
public interface IEventBroker
{
/// <summary>
/// Orchestrates the following:
///
/// - Check watermark.
/// - Collect new events from HTTP events feed.
/// - Dispatch events.
/// </summary>
Task ProcessAsync();
}
}

Event Feed Service (Services)

This will create the DTO entities based on the JSON events feed, tools like json2csharp.com can help quickly generate these classes.

1
2
3
4
5
6
7
8
9
10
11
12
namespace TimedHostedService.Worker.Domain.Services
{
public interface IEventFeedService
{
/// <summary>
/// Get a collection of events from the feed.
/// </summary>
/// <param name="watermark"></param>
/// <returns></returns>
Task<IEnumerable<EventDto>> GetEventsAsync(long watermark);
}
}

The domain events are used to explicitly implement side effects across multiple aggregates. In C#, a domain event is simply a data-holding structure or class, like a DTO.

Cart Mapper (CartEvents/Mappers)

We will need to map the EventDto to the domain event CartEvent

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
namespace TimedHostedService.Worker.Domain.CartEvents.Mappers
{
public interface ICartMapper
{
/// <summary>
/// Maps `EventDto` to domain event `CartEvent`
/// </summary>
/// <param name="eventDtos"></param>
/// <returns></returns>
IEnumerable<CartEvent> MapCartEvent(IEnumerable<EventDto> eventDtos);

/// <summary>
/// Maps `CartEvent` to domain model `Cart`
/// </summary>
/// <param name="cartEvent"></param>
/// <returns></returns>
Cart MapCart(CartEvent cartEvent);
}
}

Our EventType‘s are based on the feed.

1
2
3
4
5
6
public enum EventType
{
CART_CREATE = 1,
CART_UPDATE = 2,
CART_DELETE = 3
}

Event Handler (CartEvents/Events/Handlers)

To separate out the concearns we have handlers for each event type.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
namespace TimedHostedService.Worker.Domain.CartEvents.Handlers
{
public interface IEventHandler
{
/// <summary>
/// Handles the given event based on the action type in the event.
/// </summary>
/// <param name="cartEvent"></param>
/// <returns></returns>
Task HandleAsync(CartEvent cartEvent);

/// <summary>
/// Matches the handler from the injected collection based on EventType
/// </summary>
/// <param name="eventType"></param>
/// <returns></returns>
bool IsMatch(EventType eventType);
}
}

TimedHostedService.Worker.Infrastructure

This has Repositories for persistant storage of data and the interfaces are defined in TimedHostedService.Worker.Domain/Interfaces

ICartItemRepository and ICartRepository are very simliar.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
namespace TimedHostedService.Worker.Domain.Interfaces
{
public interface ICartItemRepository
{
/// <summary>
/// Insert a new cart_item row
/// </summary>
/// <param name="cartItem"></param>
/// <returns></returns>
long Add(CartItem cartItem);

/// <summary>
/// Updates the existing cart_item row
/// </summary>
/// <param name="cartItem"></param>
void Update(CartItem cartItem);

/// <summary>
/// Deletes the cart_item row
/// </summary>
/// <param name="cartItemId"></param>
void Delete(int cartItemId);
}
}

IWatermarkRepository is used to keep track of the watermark in a local .ini file.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
namespace TimedHostedService.Worker.Domain.Interfaces
{
public interface IWatermarkRepository
{
/// <summary>
/// Selects the current watermark value
/// Only items greater than or equal to this value will be requested from the event feed
/// </summary>
/// <returns></returns>
long Get();

/// <summary>
/// Sets the watermark to the given value
/// </summary>
/// <param name="watermark"></param>
void Update(long watermark);
}
}

At the start of each ProcessAsync in the EventBroker the watermark is read from the ini file.

After each event is dispatched and awaited the watermark is updated with +1. This implementation of the watermark would NOT scale well if the hosted service was with muiltiple instances but this is fine for a POC.

Thats it, the hosted service would continue to poll for new events and handle them. When new events are needed they can be implemented with IEventHandler and injected as part of the collection using dependency injection.

References