Created
May 2, 2014 02:31
-
-
Save buzzedword/625393303a3d6506a1dd to your computer and use it in GitHub Desktop.
Our initial proof of concept hack for parallelized jobs without rabbitmq.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
namespace Application\CronBundle\Services; | |
use Symfony\Component\Process\Process; | |
class CommandAsyncHelper { | |
private $processes; | |
public function __construct() { | |
$this->processes = array(); | |
} | |
public function getProcesses() { | |
return $this->processes; | |
} | |
public function addCommand($command) { | |
$this->processes[] = new Process($command); | |
return $this; | |
} | |
public function async($processes, $queueId) { | |
$safety = count($processes); | |
$pidRegister = array(); | |
echo "Entering async loop: ". $queueId ."\n\r"; | |
while (count($processes) > 0) { | |
foreach ($processes as $i => $process) { | |
$pid = $queueId . ':' . $i; | |
if (!$process->isStarted() && in_array($pid, $pidRegister) == false) { | |
echo "Process id: ". $pid." starts\n"; | |
$pidRegister[] = $pid; | |
$process->start(); | |
continue; | |
} | |
echo $process->getIncrementalOutput(); | |
echo $process->getIncrementalErrorOutput(); | |
if (!$process->isRunning()) { | |
echo "Process id: ".$pid." stopped\n"; | |
$safety--; | |
unset($processes[$i]); | |
echo count($processes) . " processes remain.\n"; | |
} | |
} | |
if ($safety == 0) { | |
break; | |
} | |
sleep(1); | |
} | |
return true; | |
} | |
public function executeQueueWithSize($size) { | |
$queues = array_chunk($this->processes, $size); | |
echo "Total size: ". count($this->processes) . "\n"; | |
echo "Broken down into " . count($queues) . " queues of " . $size . "\n"; | |
foreach ($queues as $i => $queue) { | |
if ($this->async($queue, $i, $total)) { | |
unset($queues[$i]); | |
echo "Exiting async loop: " . $i ."\n"; | |
} | |
echo count($queues) . " queues remain. \n"; | |
} | |
return true; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
// We follow an orchestrator pattern for our imports, where one command compiles a list of all items to act on, | |
// and another command carries out the action. This is so we have a "batch" command, and a "singular" one with minimal effort. | |
//... | |
foreach (item as $i) { | |
$command = array( | |
'php', | |
'app/console', | |
'framework:action:import', | |
'--no-debug', | |
'--', | |
$i["id"], | |
); | |
if(!is_null($i["other_id"])) { | |
$command[] = $i["location_id"]; | |
} | |
$command = implode(' ', $command); // We've already found there's easier ways to do this. | |
$async->addCommand($command); | |
} | |
// Tell the queueing service how large to chunk your processes | |
if ($async->executeQueueWithSize(7)) { | |
echo "Done!"; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment