NDomain, a new framework to simplify DDD, CQRS and Event Sourcing development

On my last post I talked about how I ended up creating a framework to simplify DDD, CQRS and Event Sourcing development, as well as helping me better understand these concepts on the low level side of things.

NDomain

NDomain’s source code repository can be found on github. Here’s what you can expect from NDomain:

  • Robust EventStore implementation, where events can be stored and published using different technologies
  • Base aggregate class, whose state that can be rebuilt from stored events or from a snapshot
  • Repository to load/save aggregates, so you get true persistence ignorance in your domain layer
  • Brokerless message bus with transports for multiple technologies including Redis and Azure Queues
  • CommandBus and EventBus built on top of the message bus, as well as Command and Event handlers
  • Integration with your logging and IoC container
  • Fully async to its core, leveraging non blocking IO operations and keeping resource usage to a minimum
  • Naming convention based, meaning you don’t need to implement interfaces for each command/event handler, it just works and it’s fast!
  • No reflection to invoke command/event handlers nor rebuilding aggregates, all is wired up using compiled lambda expression trees are created on startup
  • In-proc implementations for all components, so you can decide to move to a distributed architecture later without having to refactor your whole solution.
  • A straightforward Fluent configuration API, to let you choose the implementation of each component
  • A suite of base unit test classes, so that all different implementations for a given component are tested in the same way

Great, how does it work?

Here’s some basics to get you started, and you can also check the samples.

Configuring the DomainContext

The DomainContext is NDomain’s container, where all components are accessible and message processors can be started and stopped.


var context = DomainContext.Configure()
                           .EventSourcing(c => c.WithAzureTableStorage(azureAccount, "events"))
                           .Logging(c => c.WithNLog())
                           .IoC(c => c.WithAutofac(container))
                           .Bus(c => c.WithAzureQueues(azureAccount)
                                      .WithRedisSubscriptionStore(redisConnection)
                                      .WithRedisSubscriptionBroker(redisConnection)
                                      .WithProcessor(p => p.Endpoint("background-worker")
                                                           .RegisterHandler<CommandHandlerThatUpdatesSomeAggregate>()
                                                           .RegisterHandler<EventHandlerThatUpdatesAReadModel>()
                                                           .RegisterHandler<EventHandlerThatUpdatesAnotherReadModel>()))
                           .Start();

DomainContext exposes an ICommandBus, IEventBus, IEventStore and IAggregateRepository that you can use by either passing the DomainContext around or if you use an IoC container you can just configure it and depend on them.

Creating aggregates

A sample Aggregate, enforcing domain rules by checking its state properties and firing state change events


public class Sale : Aggregate<SaleState>
{
    public Sale(string id, SaleState state) : base(id, state)  { }

    public bool CanPlaceOrder(Order order)
    {
        return State.AvailableStock >= order.Quantity;
    }

    public void PlaceOrder(Order order)
    {
        if (State.PendingOrders.ContainsKey(order.Id))
        {
            // idempotency
            return;
        }

        if (!CanPlaceOrder(order))
        {
            // return error code or throw exception
            throw new InvalidOperationException("not enough quantity");
        }

        this.On(new OrderPlaced { SaleId = this.Id, Order = order});
    }

    public void CancelOrder(string orderId)
    {
        if (!State.PendingOrders.ContainsKey(orderId))
        {
            // idempotency
            return;
        }

        this.On(new OrderCancelled { SaleId = this.Id, OrderId = orderId });
    }

    // check OpenStore samples for complete example
}

Aggregate’s State is changed when events are fired from aggregates, and can be rebuilt by applying all past events, loaded from the IEventStore.


public class SaleState : State
{
    public string SellerId { get; set; }
    public Item Item { get; set; }
    public decimal Price { get; set; }
    public int Stock { get; set; }
    public int AvailableStock { get; set; }

    public Dictionary<string, Order> PendingOrders { get; set; }

    public SaleState()
    {
        this.PendingOrders = new Dictionary<string, Order>();
    }

    private void On(SaleCreated ev)
    {
        this.SellerId = ev.SellerId;
        this.Item = ev.Item;
        this.Price = ev.Price;
        this.Stock = this.AvailableStock = ev.Stock;
    }

    private void On(OrderPlaced ev)
    {
        AvailableStock -= ev.Order.Quantity;
        PendingOrders[ev.Order.Id] = ev.Order;
    }

    private void On(OrderCancelled ev)
    {
        AvailableStock += PendingOrders[ev.OrderId].Quantity;
        PendingOrders.Remove(ev.OrderId);
    }

    private void On(OrderCompleted ev)
    {
        var order = PendingOrders[ev.OrderId];
        
        Stock -= order.Quantity;
        PendingOrders.Remove(ev.OrderId);
    }
    
    // check OpenStore samples for complete example
}

Aggregates are loaded and saved by an IAggregateRepository, that persists its state change events using the IEventStore. As events are persisted, they are also published on the IEventBus.

CQRS handlers and processors

A command handler processes commands sent by the ICommandBus, updates aggregates and persists state changes


public class SaleCommandHandler
{
    readonly IAggregateRepository<Sale> repository;
    
    public SaleCommandHandler(IAggregateRepository<Sale> repository)
    {
        this.repository = repository;
    }

    public async Task Handle(ICommand<CreateSale> command)
    {
        var cmd = command.Payload;

        await repository.CreateOrUpdate(cmd.SaleId,
                                        s => s.Create(cmd.SellerId, cmd.Item, cmd.Price, cmd.Stock));
    }

    public async Task Handle(ICommand<PlaceOrder> command)
    {
        var cmd = command.Payload;

        await repository.Update(cmd.SaleId, s => s.PlaceOrder(cmd.Order));
    }

    // other commands
}

An event handler reacts to published events, updates read models used in your queries


public class SaleEventHandler
{
    
    public async Task On(IEvent<OrderCompleted> @event)
    {
        var ev = @event.Payload;

        // do something with it
    }

    // .. other events
}

As you can see, NDomain tries to be as less intrusive in your code as much as possible, so you don’t need to implement message handler interfaces, as long as you keep the naming conventions.

Message processing is transactional, so if a message handler fails or times out, the message gets back to the queue to be retried. It is important to design your aggregates, command and event handlers to be idempotent to avoid side effects.

A processor has an endpoint address (internally a queue) where you can register message handlers, usually for commands and events, but really any POCO can be used as a message. When you register handlers, message subscriptions are created based on the message’s Type name, and whenever a message is sent each subscription will get a copy of it, in this case, a processor/handler.

Your commands/event handlers can scale horizontally, as multiple processors using the same endpoint address will process messages from its input queue in a competing consumers fashion.

Contributing

If you would like to have support for other technologies, please take a look at the existing implementations and feel free to implement your own and submit a pull request. NDomain’s source code is very clean and simple, let’s keep it that way!

My journey on DDD, CQRS and Event Sourcing

This post is not about what DDD, CQRS and Event Sourcing are, but rather how I’ve been using it.

Over the last year I’ve been developing a collaborative social app (web and mobile) on my spare time where you can have user groups with activities, polls, discussions, feeds, and more.

As I’m targeting mostly mobile audience, I wanted to support disconnected clients and let offline users work on cached data. Once they’re online, I can synchronize their changes. This is easier to accomplish with task based UIs (where user actions map to commands in CQRS) and it’s clear that one user’s action doesn’t really need to be immediately visible to all members of the group, since it’s very likely that they’re offline and will only see the changes later. However, I wanted to be able to track and list other changes that have been done by users and not just show the final, last version of the data, giving a better feeling of collaboration even though clients can be disconnected most of the time.

Possibly this app could scale to millions of users and I wanted to keep it free of ads, so I needed the backend to be fast, scalable, cloud hosted and be as cheap as possible. I’m currently using Azure, but the original plan was to use AWS. On Azure I can implement my messaging infrastructure on top of Azure Queues and use Table Storage for my EventStore. On AWS I could SQS for messaging and DynamoDB for my EventStore. The key point is that using the right set of abstractions, my architecture doesn’t get tied to any particular service, database or cloud service.

Below is an overview of my current architecture. Non-blue boxes are components that can be hosted in separate processes / machines, but there’s really no obligation for that. My current setup is one worker role for the API and another worker role for command and event handlers.

Backend architecture overview

Continue reading