Created
August 13, 2014 05:46
-
-
Save chuyskywalker/52cc85e943bfb68c3575 to your computer and use it in GitHub Desktop.
Parallel Tasks Spawn Other Tasks As Available
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
require './defs.php'; | |
$gearmanClient = new GearmanClient(); | |
$gearmanClient->addServer(); | |
$modelsToRun = [ | |
ModelA::CLASS, | |
ModelB::CLASS, | |
ModelC::CLASS, | |
ModelD::CLASS, | |
ModelE::CLASS, | |
]; | |
$modelSet = $factSetsRequired = []; | |
foreach ($modelsToRun as $modelName) { | |
/** @var Model $newModel */ | |
$newModel = new $modelName(); | |
$modelSet[$modelName] = $newModel; | |
$factSetsRequired = array_merge($factSetsRequired, $newModel->requiredFactSets()); | |
} | |
$factSetsRequired = array_values(array_unique($factSetsRequired)); | |
echo "Models & FactSets\nAttempting to run these models which, in aggregate, require these facts:"; | |
print_r($modelsToRun); | |
print_r($factSetsRequired); | |
$modelResults = $availableFactSets = []; | |
function taskReturn(GearmanTask $task) { | |
global $gearmanClient, $modelResults, $modelSet, $availableFactSets; | |
if ($task->functionName() == 'factSet') { | |
/** @var FactSet $factSet */ | |
$factSet = unserialize($task->data()); | |
$availableFactSets[get_class($factSet)] = $factSet; | |
echo "<- Facts results from: " . get_class($factSet) . "\n"; | |
// see if any models are ready to be queued off | |
foreach ($modelSet as $modelName => $model) { /** @var Model $model */ | |
if (array_diff($model->requiredFactSets(), array_keys($availableFactSets)) === []) { | |
echo "-> Model " . get_class($model) . " now has all required facts, queueing it\n"; | |
// only include the required factSets to the model job | |
$factSets = []; | |
foreach ($model->requiredFactSets() as $factSetName) { | |
$factSets[$factSetName] = $availableFactSets[$factSetName]; | |
} | |
$gearmanClient->addTask('model', serialize([ | |
'factSets' => $factSets, | |
'model' => get_class($model) | |
])); | |
unset($modelSet[$modelName]); // done with this one, remove from the list | |
} | |
else { | |
echo "-- Model not ready: " . get_class($model) . "\n"; | |
} | |
} | |
} | |
else { | |
// a model result, save it up | |
$resp = unserialize($task->data()); | |
echo "<- Model results for: " . $resp['model'] . "\n"; | |
$modelResults[$resp['model']] = $resp['result']; | |
} | |
} | |
$gearmanClient->setCompleteCallback("taskReturn"); | |
// start by queueing up tasks for all the fact gathering | |
// which, when we have enough for models, queue off model jobs | |
foreach ($factSetsRequired as $factSet) { | |
$gearmanClient->addTask('factSet', $factSet); | |
} | |
echo "Starting Fact Gathering:\n"; | |
$gearmanClient->runTasks(); | |
echo "All facts and models completed, predictions are:\n"; | |
print_r($modelResults); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
interface FactSet { | |
public function gatherFacts(); | |
public function getFact($factIdId); | |
} | |
abstract class FactSetTemplate implements FactSet { | |
protected $facts = []; | |
public function gatherFacts() { | |
usleep(rand(1000000,5000000)); // 1000000 == 1 sec | |
} | |
final public function getFact($factId) { | |
return isset($this->facts[$factId]) ? $this->facts[$factId] : null; | |
} | |
} | |
class UserInfoFactSet extends FactSetTemplate { | |
const AGE = 1; | |
const HEIGHT = 2; | |
const WEIGHT = 3; | |
const GENDER = 4; | |
public function gatherFacts() { | |
parent::gatherFacts(); | |
$this->facts = [ | |
self::AGE => rand(18,99), | |
self::HEIGHT => rand(45,84), | |
self::WEIGHT => rand(120,250), | |
self::GENDER => rand(0,1), | |
]; | |
return true; | |
} | |
} | |
class SchoolInfoFactSet extends FactSetTemplate { | |
const GRADE = 1; | |
const GPA = 2; | |
const REGION = 3; | |
public function gatherFacts() { | |
parent::gatherFacts(); | |
$this->facts = [ | |
self::GRADE => rand(4,12), | |
self::GPA => rand(0,100), | |
self::REGION=> rand(0,6), | |
]; | |
return true; | |
} | |
} | |
class ClickInfoFactSet extends FactSetTemplate { | |
const SPORTS = 1; | |
const GLOBALNEWS = 2; | |
const TECHNOLOGY = 3; | |
public function gatherFacts() { | |
parent::gatherFacts(); | |
$this->facts = [ | |
self::SPORTS => rand(0,100), | |
self::GLOBALNEWS => rand(0,100), | |
self::TECHNOLOGY => rand(0,100), | |
]; | |
return true; | |
} | |
} | |
class SocialFactSet extends FactSetTemplate { | |
const FBLIKES = 1; | |
const TWEETS = 2; | |
const GPLUSES = 3; | |
public function gatherFacts() { | |
parent::gatherFacts(); | |
$this->facts = [ | |
self::FBLIKES => rand(0,100), | |
self::TWEETS => rand(0,100), | |
self::GPLUSES => rand(0,100), | |
]; | |
return true; | |
} | |
} | |
interface Model { | |
/** | |
* @return array | |
*/ | |
public function requiredFactSets(); | |
/** | |
* @param FactSet[] $factSets | |
* @return float | |
*/ | |
public function run($factSets); | |
} | |
class ModelA implements Model { | |
/** @inheritdoc */ | |
public function requiredFactSets() { | |
return [ | |
UserInfoFactSet::CLASS, | |
]; | |
} | |
/** @inheritdoc */ | |
public function run($factSets) { | |
usleep(rand(1000000,5000000)); // 1000000 == 1 sec | |
return ( | |
($factSets[UserInfoFactSet::CLASS]->getFact(UserInfoFactSet::AGE) * 5) | |
+ ($factSets[UserInfoFactSet::CLASS]->getFact(UserInfoFactSet::HEIGHT) * .5) | |
- ($factSets[UserInfoFactSet::CLASS]->getFact(UserInfoFactSet::WEIGHT) * 2) | |
); | |
} | |
} | |
class ModelB implements Model { | |
/** @inheritdoc */ | |
public function requiredFactSets() { | |
return [ | |
UserInfoFactSet::CLASS, | |
SchoolInfoFactSet::CLASS, | |
]; | |
} | |
/** @inheritdoc */ | |
public function run($factSets) { | |
usleep(rand(1000000,5000000)); // 1000000 == 1 sec | |
return ( | |
($factSets[SchoolInfoFactSet::CLASS]->getFact(SchoolInfoFactSet::GPA) * 5) | |
+ ($factSets[UserInfoFactSet::CLASS]->getFact(UserInfoFactSet::AGE) * 1.5) | |
- ($factSets[SchoolInfoFactSet::CLASS]->getFact(SchoolInfoFactSet::REGION) * 2) | |
); | |
} | |
} | |
class ModelC implements Model { | |
/** @inheritdoc */ | |
public function requiredFactSets() { | |
return [ | |
SchoolInfoFactSet::CLASS, | |
ClickInfoFactSet::CLASS, | |
]; | |
} | |
/** @inheritdoc */ | |
public function run($factSets) { | |
usleep(rand(1000000,5000000)); // 1000000 == 1 sec | |
return ( | |
($factSets[SchoolInfoFactSet::CLASS]->getFact(SchoolInfoFactSet::GPA) * 5) | |
+ ($factSets[ClickInfoFactSet::CLASS]->getFact(ClickInfoFactSet::GLOBALNEWS) * 7.2) | |
- ($factSets[SchoolInfoFactSet::CLASS]->getFact(SchoolInfoFactSet::REGION) * 2) | |
); | |
} | |
} | |
class ModelD implements Model { | |
/** @inheritdoc */ | |
public function requiredFactSets() { | |
return [ | |
SchoolInfoFactSet::CLASS, | |
ClickInfoFactSet::CLASS, | |
SocialFactSet::CLASS, | |
]; | |
} | |
/** @inheritdoc */ | |
public function run($factSets) { | |
usleep(rand(1000000,5000000)); // 1000000 == 1 sec | |
return ( | |
($factSets[SchoolInfoFactSet::CLASS]->getFact(SchoolInfoFactSet::GPA) * 5) | |
+ ($factSets[ClickInfoFactSet::CLASS]->getFact(ClickInfoFactSet::GLOBALNEWS) * 7.2) | |
- ($factSets[SchoolInfoFactSet::CLASS]->getFact(SchoolInfoFactSet::REGION) * 2) | |
- ($factSets[SocialFactSet::CLASS]->getFact(SocialFactSet::FBLIKES) * 2) | |
); | |
} | |
} | |
class ModelE implements Model { | |
/** @inheritdoc */ | |
public function requiredFactSets() { | |
return [ | |
SocialFactSet::CLASS, | |
]; | |
} | |
/** @inheritdoc */ | |
public function run($factSets) { | |
usleep(rand(1000000,5000000)); // 1000000 == 1 sec | |
return ( | |
($factSets[SocialFactSet::CLASS]->getFact(SocialFactSet::FBLIKES) * 2) | |
); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
require './defs.php'; | |
$worker = new GearmanWorker(); | |
$worker->addServer(); | |
$worker->addFunction('factSet', function(GearmanJob $job) { | |
echo "Got job\n"; | |
$serializedFactSet = $job->workload(); | |
/** @var FactSet $factSetObj */ | |
$factSetObj = new $serializedFactSet(); | |
$factSetObj->gatherFacts(); | |
return serialize($factSetObj); | |
}); | |
while ($worker->work()); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
require './defs.php'; | |
$worker = new GearmanWorker(); | |
$worker->addServer(); | |
$worker->addFunction('model', function(GearmanJob $job) { | |
echo "Got job\n"; | |
$todo = unserialize($job->workload()); | |
$factSets = $todo['factSets']; | |
$modelName = $todo['model']; | |
/** @var Model $model */ | |
$model = new $modelName(); | |
$prediction = $model->run($factSets); | |
return serialize([ | |
'model' => $modelName, | |
'result' => $prediction | |
]); | |
}); | |
while ($worker->work()); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment