Scaling background jobs

Most systems often have recurring background jobs that execute at regular intervals to perform database maintenance tasks, importing data from other systems, caching data, newsletters, etc. When you have a multi tenant platform, you usually have these recurring jobs per tenant, which sometimes means hundreds or thousands of concurrent jobs. Some jobs may even be long-running jobs, while others could be very cpu intensive jobs.

Lots of open source schedulers allow you to configure (dynamically or not) when your jobs run, but often they have to run in-proc, as in, in the scheduler’s process by implementing an IJob interface to execute the job’s code.

public interface IJob
    Task Execute(JobRequest job);

This is required so that the scheduler can measure total execution time and ensure the interval between executions is respected, but also to know whether the job has completed or failed in order to retry it. Scaling schedulers often requires distributed locks to synchronize executions, splitting jobs amongst multiple scheduler instances, while at the same time, executing the actual jobs. When you have thousands of concurrent jobs this is a serious problem, as you often end up with hundreds of threads in your scheduler processes and/or massive cpu/memory usage causing a lot of strain in the scheduler with some jobs affecting the stability of others. This happens because you’re mixing two different things: scheduling vs processing of the jobs.

In order to achieve higher scalability you need to offload jobs and run them outside the scheduler. The most common solution is to use message queues, where you tipically only need to worry about scaling your message handlers by simply adding more machines/processes. If your queue becomes a bottle neck you can check whether your queuing technology supports partitioning, and if it doesn’t you can use multiple queues (perhaps one per tenant or per group of tenants).

The scheduler still needs to be notified about job completions in order to be able to ensure proper intervals and avoid concurrent executions of the same job. If your scheduler has an API where you can report job completion status from the message handlers, great. Otherwise, don’t despair.. it’s relatively easy to implement one using messaging and using a jobId as correlation, and have your scheduler’s IJob await until the actual job completes.

Continue reading

NDomain, a new framework to simplify DDD, CQRS and Event Sourcing development

On my last post I talked about how I ended up creating a framework to simplify DDD, CQRS and Event Sourcing development, as well as helping me better understand these concepts on the low level side of things.


NDomain’s source code repository can be found on github. Here’s what you can expect from NDomain:

  • Robust EventStore implementation, where events can be stored and published using different technologies
  • Base aggregate class, whose state that can be rebuilt from stored events or from a snapshot
  • Repository to load/save aggregates, so you get true persistence ignorance in your domain layer
  • Brokerless message bus with transports for multiple technologies including Redis and Azure Queues
  • CommandBus and EventBus built on top of the message bus, as well as Command and Event handlers
  • Integration with your logging and IoC container
  • Fully async to its core, leveraging non blocking IO operations and keeping resource usage to a minimum
  • Naming convention based, meaning you don’t need to implement interfaces for each command/event handler, it just works and it’s fast!
  • No reflection to invoke command/event handlers nor rebuilding aggregates, all is wired up using compiled lambda expression trees are created on startup
  • In-proc implementations for all components, so you can decide to move to a distributed architecture later without having to refactor your whole solution.
  • A straightforward Fluent configuration API, to let you choose the implementation of each component
  • A suite of base unit test classes, so that all different implementations for a given component are tested in the same way

Great, how does it work?

Here’s some basics to get you started, and you can also check the samples.

Configuring the DomainContext

The DomainContext is NDomain’s container, where all components are accessible and message processors can be started and stopped.

var context = DomainContext.Configure()
                           .EventSourcing(c => c.WithAzureTableStorage(azureAccount, "events"))
                           .Logging(c => c.WithNLog())
                           .IoC(c => c.WithAutofac(container))
                           .Bus(c => c.WithAzureQueues(azureAccount)
                                      .WithProcessor(p => p.Endpoint("background-worker")

DomainContext exposes an ICommandBus, IEventBus, IEventStore and IAggregateRepository that you can use by either passing the DomainContext around or if you use an IoC container you can just configure it and depend on them.

Creating aggregates

A sample Aggregate, enforcing domain rules by checking its state properties and firing state change events

public class Sale : Aggregate<SaleState>
    public Sale(string id, SaleState state) : base(id, state)  { }

    public bool CanPlaceOrder(Order order)
        return State.AvailableStock >= order.Quantity;

    public void PlaceOrder(Order order)
        if (State.PendingOrders.ContainsKey(order.Id))
            // idempotency

        if (!CanPlaceOrder(order))
            // return error code or throw exception
            throw new InvalidOperationException("not enough quantity");

        this.On(new OrderPlaced { SaleId = this.Id, Order = order});

    public void CancelOrder(string orderId)
        if (!State.PendingOrders.ContainsKey(orderId))
            // idempotency

        this.On(new OrderCancelled { SaleId = this.Id, OrderId = orderId });

    // check OpenStore samples for complete example

Aggregate’s State is changed when events are fired from aggregates, and can be rebuilt by applying all past events, loaded from the IEventStore.

public class SaleState : State
    public string SellerId { get; set; }
    public Item Item { get; set; }
    public decimal Price { get; set; }
    public int Stock { get; set; }
    public int AvailableStock { get; set; }

    public Dictionary<string, Order> PendingOrders { get; set; }

    public SaleState()
        this.PendingOrders = new Dictionary<string, Order>();

    private void On(SaleCreated ev)
        this.SellerId = ev.SellerId;
        this.Item = ev.Item;
        this.Price = ev.Price;
        this.Stock = this.AvailableStock = ev.Stock;

    private void On(OrderPlaced ev)
        AvailableStock -= ev.Order.Quantity;
        PendingOrders[ev.Order.Id] = ev.Order;

    private void On(OrderCancelled ev)
        AvailableStock += PendingOrders[ev.OrderId].Quantity;

    private void On(OrderCompleted ev)
        var order = PendingOrders[ev.OrderId];
        Stock -= order.Quantity;
    // check OpenStore samples for complete example

Aggregates are loaded and saved by an IAggregateRepository, that persists its state change events using the IEventStore. As events are persisted, they are also published on the IEventBus.

CQRS handlers and processors

A command handler processes commands sent by the ICommandBus, updates aggregates and persists state changes

public class SaleCommandHandler
    readonly IAggregateRepository<Sale> repository;
    public SaleCommandHandler(IAggregateRepository<Sale> repository)
        this.repository = repository;

    public async Task Handle(ICommand<CreateSale> command)
        var cmd = command.Payload;

        await repository.CreateOrUpdate(cmd.SaleId,
                                        s => s.Create(cmd.SellerId, cmd.Item, cmd.Price, cmd.Stock));

    public async Task Handle(ICommand<PlaceOrder> command)
        var cmd = command.Payload;

        await repository.Update(cmd.SaleId, s => s.PlaceOrder(cmd.Order));

    // other commands

An event handler reacts to published events, updates read models used in your queries

public class SaleEventHandler
    public async Task On(IEvent<OrderCompleted> @event)
        var ev = @event.Payload;

        // do something with it

    // .. other events

As you can see, NDomain tries to be as less intrusive in your code as much as possible, so you don’t need to implement message handler interfaces, as long as you keep the naming conventions.

Message processing is transactional, so if a message handler fails or times out, the message gets back to the queue to be retried. It is important to design your aggregates, command and event handlers to be idempotent to avoid side effects.

A processor has an endpoint address (internally a queue) where you can register message handlers, usually for commands and events, but really any POCO can be used as a message. When you register handlers, message subscriptions are created based on the message’s Type name, and whenever a message is sent each subscription will get a copy of it, in this case, a processor/handler.

Your commands/event handlers can scale horizontally, as multiple processors using the same endpoint address will process messages from its input queue in a competing consumers fashion.


If you would like to have support for other technologies, please take a look at the existing implementations and feel free to implement your own and submit a pull request. NDomain’s source code is very clean and simple, let’s keep it that way!

My journey on DDD, CQRS and Event Sourcing

This post is not about what DDD, CQRS and Event Sourcing are, but rather how I’ve been using it.

Over the last year I’ve been developing a collaborative social app (web and mobile) on my spare time where you can have user groups with activities, polls, discussions, feeds, and more.

As I’m targeting mostly mobile audience, I wanted to support disconnected clients and let offline users work on cached data. Once they’re online, I can synchronize their changes. This is easier to accomplish with task based UIs (where user actions map to commands in CQRS) and it’s clear that one user’s action doesn’t really need to be immediately visible to all members of the group, since it’s very likely that they’re offline and will only see the changes later. However, I wanted to be able to track and list other changes that have been done by users and not just show the final, last version of the data, giving a better feeling of collaboration even though clients can be disconnected most of the time.

Possibly this app could scale to millions of users and I wanted to keep it free of ads, so I needed the backend to be fast, scalable, cloud hosted and be as cheap as possible. I’m currently using Azure, but the original plan was to use AWS. On Azure I can implement my messaging infrastructure on top of Azure Queues and use Table Storage for my EventStore. On AWS I could SQS for messaging and DynamoDB for my EventStore. The key point is that using the right set of abstractions, my architecture doesn’t get tied to any particular service, database or cloud service.

Below is an overview of my current architecture. Non-blue boxes are components that can be hosted in separate processes / machines, but there’s really no obligation for that. My current setup is one worker role for the API and another worker role for command and event handlers.

Backend architecture overview

Continue reading

Reliable Redis Request-Reply using service contracts in .NET

In my previous post I wrote about ServiceProxy and how easy it should be to integrate with messaging frameworks and do request/reply with service contracts even if your messaging framework has no support for it.

If you’re not familiar with Redis, you should take some time and get acquainted with it. There is a lot of material on Redis website to get you started, and you’ll be amazed with how Redis helps you solve complex problems in a simple, easy and performant way. There are many use cases for Redis, and one is certainly messaging. Redis supports queues and pub/sub, which are the messaging capabilities that probably most common systems will ever need.

Request-Reply can be done with Redis in many ways: queues with or without blocking primitives, pub/sub or a combination of pub/sub and queues.


ServiceProxy.Redis is an implementation of ServiceProxy using Redis and the Booksleeve Redis client.

Update (07 April 2014): ServiceProxy.Redis now uses the StackExchange.Redis library, Booksleeve’s successor. The source code changes can be found here.

ServiceProxy.Redis uses an asynchronous and reliable request-reply pattern using Queues. There are two ways of dequeuing items in Redis: non-blocking and blocking. The non-blocking operation returns immediately whether there is an item to be dequeued or not, returning null in the latter. The blocking operation is used together with a timeout parameter and Redis will block the client’s connection when the queue is empty, until an item can be dequeued or the timeout time is reached. When multiple connections are blocked on the same queue, Redis will do fair queueing among them, so load balancing is trivially simple to do. Since the connection to Redis can be blocked when dequeuing, it is recommended to use a different connection for enqueuing and/or issueing other non-blocking commands. The same principle applies when doing pubsub (one connection to subscribe channels, another to publish and/or issue other commands).

Continue reading

Asynchronous request-reply messaging using service contracts and ServiceProxy

Typically messaging frameworks support the request/reply pattern by sending messages between client and server, using a correlation id to map response callbacks and the client’s address or inbound channel to know whom the response should be sent to. There is usually a Send mehod that takes a message in and a Reply method that takes a response message and the address/channel of the client. Other frameworks simply rely on queues and have no concept of responses at all, so you have to define queues for the client and tell the server to drop the response message there. This works well for message oriented architectures with very well defined messages.

When you have a lot of different requests/responses you’ll have to create a lot of classes to represent your messages and a lot of message handlers to handle them and give responses back. This model can lead to a very bloated architecture if you start creating messages like GetCustomerByName, GetCustomerById, ListCustomers, UpdateCustomer and such, or create a generic CustomerQueryMessage with lots of properties and one single message handler. In these cases it is much cleaner to interact between your application components using service interfaces and group related operations.

Enter ServiceProxy

ServiceProxy is a lightweight asynchronous proxy for .NET that allows you to use service contracts in a request/reply manner with any messaging framework. It is open source and is the result of my past work with service oriented architectures using high performance messaging frameworks that had no support for services (eg: ZeroMQ, Redis, etc).

Continue reading

It’s been a long time

It’s been a long time since my last post and a lot has happened since then.

First I’d like to apologize to everyone who wanted me to continue writing about the Silverlight LOB application series. When I realized that Silverlight was going to die I moved on to other technologies. Also, at that time I started my master thesis, got a new job in a start-up doing realtime online marketing, my free time was close to zero and I had to put my blog aside.

Since 2011 I’ve been working in high performant scalable systems and I developed even more interest on backend technologies, messaging patterns and scalable architectures. I’ve learned lots of cool technologies such as ZeroMQ, Redis, NodeJS, MongoDB, Golang, and others, and I plan to start writing about what I’ve learned and hope that it’s as useful for others as it has been for me. I still work mainly in .NET, so most of my future posts will be based in .NET.

See you all soon!

Architecting Silverlight LOB applications (Part 6) – Building an MVVM Framework

Hello again! In this post I’m going to talk about building an MVVM framework.

As I said in the previous post, this post should be about the OrderView. This view should allow users to pick items from a list of products, add them to the actual order, choose quantities, view the current total price and submit the order. Also the user should be able to filter, sort and do pagination on the product list. I’m sure you’ve seen enough blog posts from other people talking about this subject, that is getting the selected product, add it to another collection or create an order detail based on it, then update some other data on the UI and finally submit the changes back to the server. The thing is, everyone has its own way for programming and eventually when you end up in a team, you may find that two people coded the same thing in a different way, and one has a bug in situation A and the other has a bug in situation B. Having a good MVVM framework with a well defined methodology is a must to prevent these situations. In this post I want to talk about essential components you must have in an MVVM framework. Later, I’ll describe an MVVM Framework I’ve been working on which was based on WCF RIA Services but doesn’t really depend on it.

Since we’re following best practices, we know that using a good MVVM architecture we can come up with a solution whose fetching, filtering, sorting, paging logic is entirely separated from the view, allowing us to also have different views for the same view model. For example, we can start by using a DataGrid and a DataPager to display our items but later provide a new view that uses comboboxes to select the sort options, an album-like listbox to show the items and custom buttons for paging. Also, we should be able to separate all this logic from its actual data access logic to be able to use mock objects for our model and do unit tests for our viewmodels. That’s not an easy task but that’s what I want to achieve from now on.

Well, to start, .NET / Silverlight already offers us some classes and interfaces that are very handy for MVVM scenarios.

  • INotifyPropertyChanged – Used to raise an event when a property changes. WPF / Silverlight Binding framework use this interface to update the view when a property changes.
  • INotifyCollectionChanged – Used to raise an event when an insert, remove, clear or replace operation has been done against a collection. WPF / Silverlight controls that have an ItemsSource property usually use this interface to create or delete visual items in a container. For example, ListBoxes display new ListBoxItems, DataGrids display new DataGridRows.
  • ICollectionView – Used to provide filter, sort descriptions, group descriptions, and item selection for an IEnumerable collection and have the view display only the filtered items, sorted according to the sort descriptions and highlight the selected item. (Has more features but these are the most relevant for the sake of this post).
  • IPagedCollectionView – Used to provide paging options to an IEnumerable collection. This is used by DataPagers mostly, that make calls to the MoveToPage(int pageIndex) method and allows us to register in the PageChanging event and fetch a new page of entities to be displayed.
  • There are other important interfaces like IEditableObject, IEditableCollectionView but I’m not going to cover those in this posts. They are used to update property values of an object in an atomic fashion.

Continue reading