Scaling background jobs


Most systems often have recurring background jobs that execute at regular intervals to perform database maintenance tasks, importing data from other systems, caching data, newsletters, etc. When you have a multi tenant platform, you usually have these recurring jobs per tenant, which sometimes means hundreds or thousands of concurrent jobs. Some jobs may even be long-running jobs, while others could be very cpu intensive jobs.

Lots of open source schedulers allow you to configure (dynamically or not) when your jobs run, but often they have to run in-proc, as in, in the scheduler’s process by implementing an IJob interface to execute the job’s code.


public interface IJob
{
    Task Execute(JobRequest job);
}

This is required so that the scheduler can measure total execution time and ensure the interval between executions is respected, but also to know whether the job has completed or failed in order to retry it. Scaling schedulers often requires distributed locks to synchronize executions, splitting jobs amongst multiple scheduler instances, while at the same time, executing the actual jobs. When you have thousands of concurrent jobs this is a serious problem, as you often end up with hundreds of threads in your scheduler processes and/or massive cpu/memory usage causing a lot of strain in the scheduler with some jobs affecting the stability of others. This happens because you’re mixing two different things: scheduling vs processing of the jobs.

In order to achieve higher scalability you need to offload jobs and run them outside the scheduler. The most common solution is to use message queues, where you tipically only need to worry about scaling your message handlers by simply adding more machines/processes. If your queue becomes a bottle neck you can check whether your queuing technology supports partitioning, and if it doesn’t you can use multiple queues (perhaps one per tenant or per group of tenants).

The scheduler still needs to be notified about job completions in order to be able to ensure proper intervals and avoid concurrent executions of the same job. If your scheduler has an API where you can report job completion status from the message handlers, great. Otherwise, don’t despair.. it’s relatively easy to implement one using messaging and using a jobId as correlation, and have your scheduler’s IJob await until the actual job completes.

The code samples below are just to illustrate the purpose and should be treated as pseudo code.


public class MyLongRunningJob : IJob
{
    IJobQueue jobQueue;
    IJobNotificationService jobNotificationService;

    public async Task Execute(JobRequest job)
    {
        await jobQueue.Enqueue(job);

        await jobNotificationService.WhenCompleted(job, timeout);
    }
}

public class MyLongRunningJobHandler : IHandler<JobRequest>
{
    IJobNotificationService jobNotificationService;

    public async Task Handle(JobRequest job)
    {
        // do long running stuff
        // ...

        await jobNotificationService.MarkAsCompleted(job);
    }
}

public class JobNotificationService : IJobNotificationService
{
    readonly ConcurrentDictionary<string, TaskCompletionSource<bool>> pendingJobs;

    public Task WhenCompleted(string jobId, TimeSpan? timeout = null)
    {
        var completion = pendingJobs[jobId] = new TaskCompletionSource<bool>();

        if (timeout == null)
        {
            return completion.Task;
        }
        else
        {
            return Task.WhenAny(
                        completion.Task,
                        Task.Delay(timeout.Value)
                            .ContinueWith(t => { throw new TimeoutException(); }));
        }
    }

    // called when a message regarding job completion is received
    private void OnJobCompleted(string jobId, bool status)
    {
        TaskCompletionSource<bool> completion;
        if (pendingJobs.TryRemove(jobId, out completion))
        {
            // eventually set exception on failure status
            completion.SetResult(true);
        }
    }

    public Task MarkAsCompleted(string jobId, bool status)
    {
        // sends a message to scheduler
        // which triggers the OnJobCompleted handler
    }
}

The JobNotificationService simply holds a dictionary of tasks that complete when the job notification is received.
When you have thousands of concurrent jobs this is much more efficient than executing them inside the scheduler’s process. It’s fairly cheap to have 1000000 tasks waiting for completion, but it’s very expensive to have a few hundreds of threads, even if they’re blocked waiting for I/O.

There are several messaging technologies you can use to implement a generic job notification service. I’m a big fan of Redis, so if you have it on your toolbox, you can leverage either Redis publish/subscribe or Redis queues, in order to send messages from the job handlers to the job schedulers.

I hope this helps. Would be nice to hear your stories around schedulers scalability!

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s