Reliable Redis Request-Reply using service contracts in .NET


In my previous post I wrote about ServiceProxy and how easy it should be to integrate with messaging frameworks and do request/reply with service contracts even if your messaging framework has no support for it.

If you’re not familiar with Redis, you should take some time and get acquainted with it. There is a lot of material on Redis website to get you started, and you’ll be amazed with how Redis helps you solve complex problems in a simple, easy and performant way. There are many use cases for Redis, and one is certainly messaging. Redis supports queues and pub/sub, which are the messaging capabilities that probably most common systems will ever need.

Request-Reply can be done with Redis in many ways: queues with or without blocking primitives, pub/sub or a combination of pub/sub and queues.

ServiceProxy.Redis

ServiceProxy.Redis is an implementation of ServiceProxy using Redis and the Booksleeve Redis client.

Update (07 April 2014): ServiceProxy.Redis now uses the StackExchange.Redis library, Booksleeve’s successor. The source code changes can be found here.

ServiceProxy.Redis uses an asynchronous and reliable request-reply pattern using Queues. There are two ways of dequeuing items in Redis: non-blocking and blocking. The non-blocking operation returns immediately whether there is an item to be dequeued or not, returning null in the latter. The blocking operation is used together with a timeout parameter and Redis will block the client’s connection when the queue is empty, until an item can be dequeued or the timeout time is reached. When multiple connections are blocked on the same queue, Redis will do fair queueing among them, so load balancing is trivially simple to do. Since the connection to Redis can be blocked when dequeuing, it is recommended to use a different connection for enqueuing and/or issueing other non-blocking commands. The same principle applies when doing pubsub (one connection to subscribe channels, another to publish and/or issue other commands).

The core components in ServiceProxy.Redis is the RedisClient and the RedisServer. Each use two connections to Redis (one that is often blocked waiting for messages and another to send messages).

RedisClient

RedisClient implements ServiceProxy’s IClient interface. Messages sent from RedisClient contain a ServiceProxy’s RequestData object, a request identifier and the name of the queue where the client is expecting a response. The request identifier is used as a correlation id between the response message and the callback that will set a result on the Task returned by the Request method. ServiceProxy handles request timeouts internally, and when a timeout happens for a given request it notifies the respective IClient by cancelling the CancellationToken. In this case, RedisClient simply removes the callback from the requestCallbacks dictionary, because the response may never be received and we don’t want the requestCallacks dictionary to cause memory leaks.

Below is the Request and the Receive methods:


public Task<ResponseData> Request(RequestData request, CancellationToken token)
{
    this.EnsureIsReceiving();

    var requestId = this.NextId();

    var redisRequest = new RedisRequest(this.receiveQueues[0], requestId, request);
    var redisRequestBytes = redisRequest.ToBinary();

    this.connection.Sender.Lists.AddFirst(0, this.sendQueue, redisRequestBytes);

    var callback = new TaskCompletionSource<ResponseData>();
    this.requestCallbacks[requestId] = callback;

    if (token != CancellationToken.None)
    {
        token.Register(() =>
        {
            TaskCompletionSource<ResponseData> _;
            this.requestCallbacks.TryRemove(requestId, out _);
        });
    }

    return callback.Task;
}

private async Task Receive()
{
    while (Interlocked.Read(ref this.receiveState) == 1)
    {
        var rawResponse = await this.connection.Receiver.Lists.BlockingRemoveLast(0, this.receiveQueues, 1);
        if (rawResponse == null)
        {
            continue;
        }

        Task.Run(() =>
        {
            var redisResponseBytes = rawResponse.Item2;
            var redisResponse = RedisResponse.FromBinary(redisResponseBytes);

            TaskCompletionSource<ResponseData> callback;
            if (this.requestCallbacks.TryRemove(redisResponse.RequestId, out callback))
            {
                callback.SetResult(redisResponse.Response);
            }

        });
    }
}

RedisClient starts a long running task to receive responses and uses a different connection for that. Notice that the Request method uses this.connection.Sender and the Receive method uses this.connection.Receiver. As soon as a response is retrieved from the queue, RedisClient fires off a different task to process it, so that the Receive loop can keep receiving responses, reducing the overall latency.

RedisServer

RedisServer is a component that receives messages sent from a RedisClient and relies on ServiceProxy’s IServiceFactory to create an IService that will process the message and invoke your real services. The core principles of RedisClient were applied on RedisServer as well. The Receive loop fires tasks to process incoming requests. As soon as a response is ready, it will use a different Redis connection to send the response back to the client. The response message contains the ResponseData and the request identifier, so that clients can return the response using the right callback.

Below is the ReceiveRequests method:


private async Task ReceiveRequests()
{
    while (Interlocked.Read(ref this.receiveState) == 1)
    {
        var rawRequest = await this.connection.Receiver.Lists.BlockingRemoveLast(0, this.serviceQueues, 1);
        if (rawRequest == null)
        {
            continue;
        }

        Task.Run(() =>
        {
            var redisRequestBytes = rawRequest.Item2;
            var redisRequest = RedisRequest.FromBinary(redisRequestBytes);

            var service = this.serviceFactory.CreateService(redisRequest.Request.Service);

            service.Process(redisRequest.Request)
                   .ContinueWith(t =>
                   {
                       var response = t.Result;

                       var redisResponse = new RedisResponse(redisRequest.Id, response);
                       var redisResponseBytes = redisResponse.ToBinary();

                       this.connection.Sender.Lists.AddFirst(0, redisRequest.ReceiveQueue, redisResponseBytes);
                   });

        });
    }
}

Lets see it in action

I’ve added a basic ServiceProxy.Redis example to the source code. That should get you started pretty quickly. In this example I created a basic client/server request-reply scenario using the IFooService contract and service implementation below:


public interface IFooService
{
    Foo GetFoo(int id);
    Task<Foo> GetFooAsync(int id);

    IEnumerable<Foo> ListFoos();
    Task<IEnumerable<Foo>> ListFoosAsync();

    void UpdateFoo(Foo foo);
    Task UpdateFooAsync(Foo foo);
}

public class FooService : IFooService
{
    public Foo GetFoo(int id)
    {
        return FooDb.Get(id);
    }

    public IEnumerable<Foo> ListFoos()
    {
        return FooDb.All();
    }

    public void UpdateFoo(Foo foo)
    {
        FooDb.Update(foo);
    }

    public Task<Foo> GetFooAsync(int id)
    {
        return Task.FromResult(this.GetFoo(id));
    }

    public Task<IEnumerable<Foo>> ListFoosAsync()
    {
        return Task.FromResult(this.ListFoos());
    }

    public async Task UpdateFooAsync(Foo foo)
    {
        await Task.Delay(100); //lets introduce some latency

        this.UpdateFoo(foo);
    }
}

Below is an example of a client and a server program:


namespace Client
{
    class Program
    {
        static void Main(string[] args)
        {
            using (var client = new RedisClient(new RedisDuplexConnection("localhost"), "ThisIsTheClientQueue", "ThisIsTheServiceQueue"))
            {
                var clientFactory = new ServiceProxy.ServiceClientFactory(client);

                var fooService = clientFactory.CreateServiceClient<IFooService>();

                Console.WriteLine("Press ENTER for GetFooAndUpdate test");
                Console.ReadLine();

                GetFooAndUpdate(fooService);

                Console.WriteLine("Press ENTER for SimpleBenchmark test");
                Console.ReadLine();

                SimpleBenchmark(fooService).Wait();

                Console.WriteLine("Press ENTER to exit");
                Console.ReadLine();

            }
        }

        static void GetFooAndUpdate(IFooService fooService)
        {
            var foo = fooService.GetFoo(5);

            if (foo.Name == "Foo 5")
            {
                foo.Name = "Foo 1337";
                fooService.UpdateFoo(foo);

                var l33t = fooService.GetFoo(5);
                if (l33t.Name == "Foo 1337")
                {
                    Console.WriteLine("Successfully updated Foo 5 name to Foo 1337");
                }
            }
        }

        static async Task SimpleBenchmark(IFooService fooService)
        {
            var nCalls = 10 * 1000;
            var random = new Random();

            var tasksToWait = new ConcurrentBag<Task>();

            var sw = Stopwatch.StartNew();

            Parallel.For(0, nCalls, i =>
            {
                //gets a foo with id between 1 and 10, asynchronously
                tasksToWait.Add(
                    fooService.GetFooAsync(random.Next(1, 11)));
            });

            await Task.WhenAll(tasksToWait.ToArray());

            sw.Stop();

            Console.WriteLine("{0} calls completed in {1}", nCalls, sw.Elapsed);
            Console.WriteLine("Avg time per call: {0} ms", (double)sw.ElapsedMilliseconds / nCalls);
            Console.WriteLine("Requests per second: {0}", (double)nCalls / sw.Elapsed.TotalSeconds);
        }
    }
}

namespace Server
{
    class Program
    {
        static void Main(string[] args)
        {
            var serviceFactory = new ServiceProxy.ServiceFactory(new SimpleDependencyResolver());

            using (var server = new RedisServer(new RedisDuplexConnection("localhost"), "ThisIsTheServiceQueue", serviceFactory))
            {
                server.Listen();

                Console.WriteLine("Press ENTER to close");
                Console.ReadLine();
            }
        }
    }

    //This should be an adapter for an IoC container
    class SimpleDependencyResolver : ServiceProxy.IDependencyResolver
    {
        public object Resolve(Type type)
        {
            return new FooService();
        }
    }
}

How fast is ServiceProxy.Redis?

Running Redis, the client program and the server program on my machine, the SimpleBenchmark test can do more than 6k requests per second. With multiple clients and multiple servers this number can be higher, since Redis can handle much more load than that.

I encourage you to download the source code and run this example yourself.

What now?

A more complete example can be found in the source code under examples/MyApp. However that one uses ServiceProxy.Zmq instead of ServiceProxy.Redis. The main difference is that the Zmq one uses a central Broker, while ServiceProxy.Redis uses a Redis instance for that.

Besides the source code, you can get ServiceProxy.Redis via Nuget and give it a try.

Happy coding!

 

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s