Last active
January 22, 2016 14:29
-
-
Save roryk/4258845bc3389f7548cf to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import subprocess | |
from argparse import ArgumentParser | |
def find_parallel_environment(queue): | |
"""Find an SGE/OGE parallel environment for running multicore jobs in specified queue. | |
""" | |
base_queue = os.path.splitext(queue)[0] | |
queue = base_queue + ".q" | |
available_pes = [] | |
for name in subprocess.check_output(["qconf", "-spl"]).strip().split(): | |
if name: | |
for line in subprocess.check_output(["qconf", "-sp", name]).split("\n"): | |
print name, line | |
if _has_parallel_environment(line): | |
if (_queue_can_access_pe(name, queue) or _queue_can_access_pe(name, base_queue)): | |
available_pes.append(name) | |
if len(available_pes) == 0: | |
raise ValueError("Could not find an SGE environment configured for parallel execution. " | |
"See %s for SGE setup instructions." % | |
"https://blogs.oracle.com/templedf/entry/configuring_a_new_parallel_environment") | |
else: | |
return _prioritize_pes(available_pes) | |
def _has_parallel_environment(line): | |
if line.startswith("allocation_rule"): | |
if line.find("$pe_slots") >= 0 or line.find("$fill_up") >= 0: | |
return True | |
return False | |
def _prioritize_pes(choices): | |
"""Prioritize and deprioritize paired environments based on names. | |
We're looking for multiprocessing friendly environments, so prioritize ones with SMP | |
in the name and deprioritize those with MPI. | |
""" | |
# lower scores = better | |
ranks = {"smp": -1, "mpi": 1} | |
sort_choices = [] | |
for n in choices: | |
# Identify if it fits in any special cases | |
special_case = False | |
for k, val in ranks.items(): | |
if n.lower().find(k) >= 0: | |
sort_choices.append((val, n)) | |
special_case = True | |
break | |
if not special_case: # otherwise, no priority/de-priority | |
sort_choices.append((0, n)) | |
sort_choices.sort() | |
return sort_choices[0][1] | |
def _parseSGEConf(data): | |
"""Handle SGE multiple line output nastiness. | |
From: https://github.com/clovr/vappio/blob/master/vappio-twisted/vappio_tx/load/sge_queue.py | |
""" | |
lines = data.split('\n') | |
multiline = False | |
ret = {} | |
for line in lines: | |
line = line.strip() | |
if line: | |
if not multiline: | |
key, value = line.split(' ', 1) | |
value = value.strip().rstrip('\\') | |
ret[key] = value | |
else: | |
# Making use of the fact that the key was created | |
# in the previous iteration and is stil lin scope | |
ret[key] += line | |
multiline = (line[-1] == '\\') | |
return ret | |
def _queue_can_access_pe(pe_name, queue): | |
"""Check if a queue has access to a specific parallel environment, using qconf. | |
""" | |
try: | |
queue_config = _parseSGEConf(subprocess.check_output(["qconf", "-sq", queue])) | |
except: | |
print "Error parsing SGEConfig." | |
return False | |
for test_pe_name in queue_config["pe_list"].split(): | |
print "SGEConfig parsed, checking %s for a match to %s." %(test_pe_name, pe_name) | |
test_pe_name = test_pe_name.split(",")[0].strip() | |
if test_pe_name == pe_name: | |
return True | |
return False | |
if __name__ == "__main__": | |
parser = ArgumentParser() | |
parser.add_argument("queue", help="Name of queue to use.") | |
args = parser.parse_args() | |
find_parallel_environment(args.queue) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment