When working with large, high volume, low latency systems, it is often the case that processing data sequentially becomes detrimental to the system's health. If we only allow 1 process to work on our data we run into several challenges:
- Our process may fall behind resulting in a situation which it is impossible for our process to catch up.
- Our singleton process could crash and leave our system in a degraded state.
- The average latency of data processing could be dramatically affected by outlying cases.
For these reasons, we wish to design a system which allows N number of processes to work on a single data set. In order to arrive at a possible solution, let me outline some assumptions of the system.
- Data store containing items to be processed.
- Data store with atomic updates or conditional puts.
- Horizontally scalable platform that contains homogenous environment variables.
Our approach involves distributing the workload over N processors. Each processor will coordinate with a centralized data store to obtain an integer based identity. Thus each process will be identified by an integer from 0 to N. The processor will use it's identity to exclusively find work. We will assign each item of work an integer value and the processor will select the item of work if the item's value modulo N is equal to the processor's id. Let us explore the details of our approach.
Each processor should have access to N --which is the maximum number of processors. N can be an environment variable defined in each processor's memory. Upon initialization, each processor will successively choose a number from 0 to N until the processor can globally lock it's identity. To lock an identity, each processor must request a lock on the identity with a central data store. Tools such as lock-smith provide a convenient way to acquire a global lock. The following code snippet is an example of identity coordination in Ruby:
def acquire_lock
ENV["N"].to_i.times do |i|
Locksmith::Dynamodb.lock("my-process-#{i}") do
yield(i) #critical section
end
end
end
Selecting data for each processor to process will be determined by the data store containing the items to be processed. Data stores supporting predicate analysis (e.g. SQL) will allow the processor to submit a query for data based on our modulo predicate. Data stores like Dynamodb will require the processor to scan data into memory and apply the predicates locally. You should take your data's size into consideration when choosing the store for your items to be processed. Scanning the table into memory may not be feasible. An example of both approaches in Ruby:
SQL
acquire_lock do |partition|
sql = "select * from items_to_be_processed where MOD(id, ?) = ?"
DB.exec(sql, Integer(ENV["N"]), partition)
end
In-Memory Scan
acquire_lock do |partition|
DB.scan.select do |item|
item.id % Integer(ENV["N"]) == partition
end
end
One caveat with the previous examples... It may not always be possible to have an integer based identity on your items-to-be-processed. In these cases we can use the CRC-32 algorithm to produce an checksum of the bytes of data and use the checksum in our modulo computation.
acquire_lock do |partition|
DB.scan.select do |item|
Zlib.crc32(item.id) % Integer(ENV["N"]) == partition
end
end
In order to address the problem of maximizing the availability of our processors, we need only keep redundant processor online. If a processor should fail, it's lock will be released allowing a redundant processor to acquire the lock in the identity coordination phase. For critical systems, keeping 2*N processor should be sufficient.
Process partitioning provides a way to process great amounts of data in parallel. It offer a simple design that can be implemented in any language on a variety of data stores. This approach is a great alternative to commonly seen queue based approaches. In fact, there are many cases in which this approach provides a greater level of concurrency which will allow a more robust data processing system. The proof of the concurrency improvements will be an exercise left to the reader.
- lock-smith A locking toolkit for Ruby.
- ddbsync DynamoDB based locking mechanism for Go.
- pg_advisory_lock
- MySQL GET_LOCK
- 2012-10-14 - Generalize article. Discuss the solution in more generality.
- 2012-04-12 - First draft. Addresses only Ruby + PostgreSQL
Ryan Smith
@ryandotsmith, ace hacker, builds distributed systems at heroku.
This article was motivated by many success and failures experienced with production systems at Heroku.
I think this is effectively N queues (instead of using a single shared queue). It has the benefit of not contending over jobs, instead you contend to process a particular queue. The downside is if any particular queue falls behind, there's no one to help it catch up.
As such, I would guess that it performs better overall than a single shared queue when N is large (>100?) since there's less contention, but it may cause larger perc95+ times.