Here is another update on TonicsCloud, this update is packed with best practices on background processes (jobs) and how to maintain law and order between them.
After my last update on TonicsCloud, I knew I needed a way to improve the job implementation asap, it should be able to handle worse scenarios, and most importantly, jobs execution should be organized, synchronized (if applicable), and lastly, should be ruggedly designed for concurrent and parallel processing.
High-Level Scenario
So...Here is a high-level scenario of what I envision before we go into the implementation and demo:
Imagine TonicsCloud as a society, where the background processes represent the diligent individuals working behind the scenes.
To maintain order and efficiency, each process or job has a default priority (like a point), high priority numbers are processed first, then followed by lower-priority numbers, if a process has the same priority number, then they would be processed based on which one was first enqueued in the JobTable (or whatever job transport layer), this is akin to the first come first serve basis.
The followings are other rules in TonicsCloud society:
- A process must run whenever it is directed to do so.
- When multiple processes run concurrently, they should be synchronized.
- If a process had been previously initiated but didn't complete, its priority should gradually decrease (continuously diminishing until it times out). This approach ensures that new jobs have a higher chance of being processed, and not being neglected due to some rogue process that is not handling things appropriately.
- In cases where processes are nested, they must behave themselves and patiently wait for their turn, however, when it comes to sibling processes (sharing the same parent), they can act independently and proceed to the next stage once all their respective processes are finished.
To implement and oversee this scenario effectively, there should be an enforcer that would act as a guardian and ensures that all background processes are treated fairly and impartially, this would be carried out by the ProcessManager or the JobManager, it ensures we can run processes efficiently which in turns ensure a balanced and optimized system.
Hey, the scenario is not perfect but it is better than just processing job one after the order.
Note: I sometimes use job and process interchangebly, however, depending on the context, there are subtle difference.
A job contains information about how the task would be performed, e.g., (sending email). It defines what needs to be done, and not where it is done.
Where it is done is determined by the process, it represents the actual space where the job's task is executed. Think of it as a designated area where the job can be carried out, separate from other tasks or jobs.
I might still use one for the other, but at least, you get the idea now.
Backgrounding a Background Process
Most of the time, the bottleneck of most processes is because whatever they are doing, takes time, think about making an API call that might take a significant amount of time to complete (ideally an API that supports background processing itself and provides a means to check the success or failure of a job).
In the API scenario, what people would typically do, is make the call, and wait until it does complete, or in some cases, the API response might return an operation address you can poll for its status, so, you call and wait for that until it's completed. Now imagine you have to do this for thousands of processes, well, it would take forever.
We should do things differently, so, here is a proposed idea involving two jobs: a parent job and a child job. The parent job would handle the API call, while the child job would monitor the status of the API job.
In this setup, the parent job initiates the API call, upon successfully completing the API call, the parent job marks itself as processed, and exits.
The child job then verifies whether the job on the API end is successful or is done processing, if so, the task is considered accomplished.
However, if the child job can't get back a valid response or the job on the API end hasn't been processed, the child job won't wait or continue running in the background.
Instead, the child job is placed back in the queue, awaiting its turn for execution at a later time (with a fine retry after time).
This concept led me to term it "Backgrounding a Background Process."
Note: There are valid use cases of why a process would take a long time to complete aside from the API call scenario, some examples are: Compressing large files, moving a large file to cloud storage, etc. In such cases, you create a separate manager that would handle long-running processes, or use another compute, this way, it won't interfere with the short-burst processes.
How Many Processes Can You Run?
Before we move on, I want to do some quick math, do note that these calculations assume ideal conditions, where all processes can run simultaneously without any interruptions or contention for resources, in an ideal world, factors such as system overhead, scheduling delays, and resource limitations might affect the actual number of processes that can be executed.
80 Processes Concurrently, A Sec To Complete & 10MB RAM Per Process
- If a process consumes 10MB of RAM, then 80 processes would consume 800MB of RAM when run concurrently
- If it takes 1sec for a process to complete, then, you can run...
- (60 sec ÷ 1 sec per process ) × 80 processes = 4,800 processes per minute
- 4,800 × 60 = 288,000 processes per hour
- 288,000 × 24 = 6,912,000 processes per day
Let's scale up
500 Processes Concurrently, A Sec To Complete & 10MB RAM Per Process
- If a process consumes 10MB of RAM, then 500 processes would consume 5000MB (~5GB) of RAM when run concurrently
- If it takes 1sec for a process to complete, then, you can run...
- (60 sec ÷ 1 sec per process ) × 500 processes = 30,000 processes per minute
- 30,000 × 60 = 1,800,000 processes per hour
- 1,800,000 × 24 = 43,200,000 processes per day
Just update the number of processes you want to run concurrently and do your calculation based on that, for example, to run 1.7 billion processes per day, you'll need at most 20,000 processes running concurrently.
Note: This is just a hypothetical calculation, in an ideal scenario. There is a limit to how many processes you can fork, however, there are workarounds which is beyond the scope of this guide.
Building The ProcessManager
I'll try as much as possible to keep everything short!
The first step in building the ProcessManager is creating a Data Transporter, the purpose of the Transporter is to move the data from one place to another, we can have a Redis Transporter, RDMS Transporter, and the like.
The RDBMS Transporter
I have only tested the features I listed in this guide in an RDBMS, so, I don't know if it would work in other Transporters, anyway, let's get started by creating the job queue table:
CREATE TABLE IF NOT EXISTS `queue_table` (
`job_queue_id` INT AUTO_INCREMENT PRIMARY KEY,
`job_queue_parent_job_id` INT,
`job_queue_name` VARCHAR(255) NOT NULL,
`job_queue_status` ENUM('queued', 'in_progress', 'failed', 'processed') DEFAULT 'queued',
`job_queue_priority` TINYINT UNSIGNED NOT NULL DEFAULT 100,
`job_queue_data` JSON DEFAULT NULL,
`created_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP(),
`updated_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP() ON UPDATE CURRENT_TIMESTAMP(),
`job_retry_after` timestamp NULL DEFAULT NULL,
`job_attempts` tinyint(3) unsigned NOT NULL DEFAULT 30,
KEY `job_queue_status_idx` (`job_queue_status`),
KEY `job_queue_parent_job_id_idx` (`job_queue_parent_job_id`),
KEY idx_priority_id_covering (job_queue_priority DESC, job_queue_id ASC),
KEY (`job_queue_name`),
CONSTRAINT `job_queue_child_to_parent_foreign` FOREIGN KEY (`job_queue_parent_job_id`) REFERENCES `queue_table` (`job_queue_id`) ON UPDATE CASCADE ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;");
Here are quick explanations (listed the major ones for brevity):
job_queue_parent_job_id
: A column that stores the ID of the parent job if there is any. It represents a relationship between jobs in a hierarchical structure.job_queue_status
: An ENUM column that represents the status of the job queue. It can have one of four values: 'queued', 'in_progress', 'failed', or 'processed'. The DEFAULT value is 'queued'.job_queue_priority
: A TINYINT UNSIGNED column that stores the priority of the job queue. It represents the importance or urgency of the job.job_queue_data
: A JSON column that stores additional data related to the job queue. It has a DEFAULT value of NULL.job_retry_after
: A nullable TIMESTAMP column that represents the time after which the job can be retried if it fails.job_attempts
: A tinyint(3) unsigned column that stores the number of attempts left for the job to retry, 0 means, it can't be retried no more. It has a DEFAULT value of 30.
It has a few indexes but the most important one is the following:
idx_priority_id_covering
: An index on thejob_queue_priority
andjob_queue_id
columns. It is a covering index that allows efficient sorting and retrieval of data based on both the priority (in descending order) and the ID of the job queue (in ascending order).
It has the following constraint:
job_queue_child_to_parent_foreign
: A foreign key constraint that ensures thejob_queue_parent_job_id
column references thejob_queue_id
column in the same table (queue_table). It enables cascading updates and deletes, meaning if the parent job is updated or deleted, the corresponding child jobs will be updated or deleted accordingly.
Retrieving The Next Job From The Transporter
To carry out the job execution, we need a way to retrieve the job from the Transporter, recall I said the job contains information on how the task should be carried out, while the process provides an abode for the job execution.
The following is an example of retrieving the job from the transporter:
SELECT j.*
FROM queue_table j
WHERE j.job_queue_status = 'queued'
AND (j.job_queue_parent_job_id IS NULL OR j.job_queue_parent_job_id NOT IN (
SELECT job_queue_id
FROM queue_table
WHERE job_queue_status != 'process'
))
AND (j.job_retry_after IS NULL OR j.job_retry_after <= NOW())
AND j.job_attempts > 0
ORDER BY j.job_queue_priority DESC, j.job_queue_id ASC
LIMIT 1;
It retrieves a job from the queue_table where the job's status is 'queued' and it either has no parent job or its parent job has a status of 'processed'.
The condition `(j.retry_after IS NULL OR j.retry_after <= NOW())
` checks if the retry_after value is either NULL or has passed the current timestamp, indicating that the job can be processed.
The condition `j.job_attempts > 0
` checks if the job can still be attempted.
The ORDER BY clause (j.job_queue_priority DESC, j.job_queue_id ASC) prioritizes the jobs based on their priority values in descending order. If multiple jobs have the same priority, it selects the one with the lowest job ID, following a FIFO (first in, first out) or first come, first served approach.
To improve the query's performance, a covering index was added based on the ORDER BY condition during the creation of the table. This covering index includes the columns involved in the ORDER BY clause (job_queue_priority and job_queue_id). By having this index, the database can retrieve the required information directly from the index itself without needing to access the underlying table data.
Finally, it limits the result to only one job, there is nothing stopping you from having more than one.
On a 5 million rows table (which is not much by the way), it took 2ms to fetch the record, anything from 1ms to 100ms is okay for me personally.
Process Synchronization
Process synchronization is very important at least to me to properly co-ordinate the running of concurrent processes, when you run multiple processes at the same time, you would most likely get a situation where multiple processes would have access to the same job data, leading to unwanted results, here is an example using a bank account:
BANK ACCOUNT ($500)
Process 1 +---------------------------+ Process 2
| |
withdraw withdraw
$100 $100
| |
balance balance
$400 $400
Here's a step-by-step explanation of what happens:
- Both Process 1 and Process 2 start with a balance of $500.
- Process 1 initiates the withdrawal of $100.
- Before Process 1 can update the balance, Process 2 also initiates a withdrawal of $100.
- Since both Processes are executing concurrently, it is possible that they both read the initial balance of $500 before any updates are made.
- Both Processes proceed to subtract $100 from the initial balance independently.
- Process 1 updates the balance to $400.
- Process 2, unaware of the change made by Process 1, also updates the balance to $400.
- Both Processes complete their operations, resulting in a final balance of $400 for both.
This scenario demonstrates a race condition, where the interleaved execution of concurrent processes leads to an unexpected outcome. Without proper process synchronization, the final balance becomes incorrect.
I have had situations where I would receive duplicate email messages, this is probably due to the lack of proper synchronization.
There are several ways we can solve this, starting with...
FOR UPDATE SKIP LOCKED (Not Recommended)
We can use the FOR UPDATE SKIP LOCKED statement to lock a row and skip any rows that are already locked by other transactions.
In theory, what this means is, once a row is locked, a new transaction would get a new row, and the locked ones would be skipped.
This pattern continues until all eligible rows have been processed or skipped.
Here is an update to the job retrieval statements using the FOR UPDATE SKIP LOCKED:
SELECT j.*
FROM queue_table j
WHERE j.job_queue_status = 'queued'
AND (j.job_queue_parent_job_id IS NULL OR j.job_queue_parent_job_id NOT IN (
SELECT job_queue_id
FROM queue_table
WHERE job_queue_status != 'process'
))
AND (j.job_retry_after IS NULL OR j.job_retry_after <= NOW())
AND j.job_attempts > 0
ORDER BY j.job_queue_priority DESC, j.job_queue_id ASC
LIMIT 1
FOR UPDATE SKIP LOCKED;
Problem solved right?
Well, No, here are some of the issues I faced with this method:
- If you are running too many concurrent processes, say around (50 to (>) 200), some processes would get access to the same job, and whatever that job is doing would be executed that number of times (not good), it appears I am not the only one with that problem, and I do not have no good solution, between, it would be worse at scale
- More work for the database
- The behavior is different across RDBMS, in short, the way it behaves is not consistent
Not that recommended for large jobs, but for simple job management, it is fine.
FILE LOCK (Flock) (Not Recommended)
Here you acquire a lock on a file and do all sorts of wizardry to get the synchronization in place, I didn't even bother with this method cause I don't have the time for weird edge cases, it can get very messy for complex scenarios, so, I am only adding it as an option.
SEMAPHORE (IPC) (Recommended)
Here is the method I enjoyed the best if managed properly, it is also very fast, and to put the icing on the cake, you can use it alongside SysvSharedMemory where you get a portion of a defined memory which you can further use to share data between processes.
If you know anything about forking processes, you know sharing data between processes isn't straightforward, SysvSharedMemory solves that.
In the book (Beginning Linux Programming) By Richard Stones and Mathew Neil, they gave a theoretical example of how semaphore works:
...
Suppose you have two processes proc1 and proc2, both of which need exclusive access to a database at some point in their execution.
You define a single binary semaphore, sv, which starts with the value 1 and can be accessed by both processes.
Both processes then need to perform the same processing to access the critical section of code; indeed, the two processes could simply be different invocations of the same program.
The two processes share the svsemaphore variable.
Once one process has executed P(sv), it has obtained the semaphore and can enter the critical section. The second process is prevented from entering the critical section because when it attempts to execute P(sv), it’s made to wait until the first process has left the critical section and executed V(sv)to release the semaphore.
...
If that seems complicated, here is a quick analogy of how it works:
Suppose you have two people, A and B, who both need to use a special room. However, only one person can be inside the room at a time to avoid confusion and conflicts.
To handle this problem, they employ a system called semaphore which acts like a sign indicating whether the room is available or not.
The semaphore starts with a "green" (value 1) sign, indicating that the room is available. Both person A and B can see the sign and know whether they can enter or not.
When person A wants to use the room, he checks the semaphore indicator. If it shows a green sign, he knows it's his turn. He flips the sign to "red," (value 0) indicating that he's inside the room and others should wait.
If person also wants to use the room while person A is inside, he checks the semaphore indicator. Since it shows a red sign, B understands that he needs to wait until the sign turns green again.
After A finishes his work in the room, he flips the sign back to green, signaling to B that it's now his turn to enter.
That's the idea, and this relates to a binary semaphore which is where we would limit the scope in this guide.
Now, if you are wondering what is stopping race conditions from occurring, then you are on track. The point is, no matter how fast the processes run concurrently, only one is guaranteed to get access to the semaphore, it uses an atomic operation, and relies on low-level mechanisms and operating system support to enforce its behavior.
So, here is an update on the bank account scenario:
- Process 1 and Process 2 both attempt to access the balance simultaneously.
- Process 1 arrives first and tries to acquire the semaphore. Since the semaphore is initially available (set to 1), Process 1 successfully acquires it and proceeds to the critical section.
- Process 2 arrives slightly later and also attempts to acquire the semaphore. However, since the semaphore is already acquired by Process 1, Process 2 is blocked and put into a waiting state.
- Process 1, now having exclusive access to the critical section, reads the initial balance of $500, subtracts $100, and updates the balance to $400.
- After Process 1 completes its operation, it releases the semaphore, making it available again.
- Process 2, which has been waiting for the semaphore, now gets the chance to acquire it.
- Process 2 reads the updated balance of $400 (as modified by Process 1), subtracts $100, and updates the balance to $300.
Even if you flip it where process 2 was given access first, the balance would also be $300, you get the idea.
Here is an implementation I created in a gist: https://gist.github.com/devsrealm/d15146011a5b28bbd709c086595d25f1
It is heavily commented, so, I won't be explaining much, I would be replacing the job retrieval the following way:
return $this->sharedMemory->ensureAtomicity(function (SharedMemory $sharedMemory){
$nextJob = null;
db(onGetDB: function (TonicsQuery $db) use (&$nextJob){
$table = $this->getTable();
$nextJob = $db->row(<<<SQL
SELECT j.*
FROM $table j
WHERE j.job_queue_status = ?
AND (j.job_queue_parent_job_id IS NULL OR j.job_queue_parent_job_id NOT IN (
SELECT job_queue_id
FROM $table
WHERE job_queue_status != ?
))
AND (j.job_retry_after IS NULL OR j.job_retry_after <= NOW())
AND j.job_attempts > 0
ORDER BY j.job_queue_priority DESC, j.job_queue_id ASC
LIMIT 1;
SQL, Job::JobStatus_Queued, Job::JobStatus_Processed);
# Since we have gotten access to semaphore, let's use this opportunity to quickly update the job status
# this completely prevents different jobs from stepping on each other toes for concurrent job
if ($nextJob){
$this->infoMessage("Running TonicsCloud job queue $nextJob->job_queue_name with an id of $nextJob->job_queue_id");
# Job In_Progress
$this->getJobObject($nextJob, function (AbstractJobInterface $jobObject) use ($nextJob, $db) {
$update = ['job_queue_status' => Job::JobStatus_InProgress, 'job_retry_after' => $this->retryAfter($jobObject)];
$db->Q()->FastUpdate($this->getTable(), $update, db()->WhereEquals('job_queue_id', $nextJob->job_queue_id));
});
}
});
# Since we are done, we should remove semaphore, if we do not do this, it would be impossible to
# kill child process which in theory might have completed but since its semaphore is not released, it isn't considered completed
# so by removing it, the child can close with ease or the respective SIGCHILD signal handler can handle the child zombie cleaning
$sharedMemory->detachSemaphore();
$sharedMemory->removeSemaphore();
return $nextJob;
});
The ensureAtomicity
method ensures the operation is atomic.
We can also share data across processes.
Adding a value is as simple as:
$this->sharedMemory->add('integer', 55);
// atomic add is as follows
$this->sharedMemory->atomAdd('integer', 55);
In a foreign process, you get the value using:
$sharedMemory->get('integer'); // holds 55
// or for some reason, you want to get it atomically, simply do:
$sharedMemory->atomGet('integer'); // holds 55
The same syntax applies to remove
.
This post is already too long, in future updates, I would be doing some benchmarking, on my slow HDD Drive, I am pushing around 10k jobs per minute, there is still lots of room for improvement, so, in the future, I'll take time and do some benchmark using an SSD, and discuss potential optimization.
Hire Me
If you like what I do, and want to work with me, I am available for hire, my email address is on my Hire Me Page: https://tonics.app/posts/f7b4f201bc4a91bb/hire-me
Thank you.