Created
February 15, 2013 12:35
-
-
Save jamescasbon/4960150 to your computer and use it in GitHub Desktop.
pyvcf proposal to use funcparserlib to clarify parsing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import os | |
import re | |
import logging | |
import collections | |
import pprint | |
from funcparserlib.lexer import make_tokenizer, Spec | |
from funcparserlib.parser import (maybe, many, eof, skip, fwd, name_parser_vars, SyntaxError) | |
from funcparserlib.contrib.common import const, n, op, op_, sometok | |
# Object specification | |
# -------------------- | |
class _HeaderEntry(object): | |
""" Base class for header entries """ | |
expected = [] | |
def __init__(self, values=None, strict=True, name=None): | |
self._name = name | |
self.values = values | |
if strict and hasattr(values, 'keys'): | |
missing = set(self.expected) - set(values.keys()) | |
extra = set(values.keys()) - set(self.expected) | |
if missing: | |
logging.warning('missing definitions of %s for a %s header entry', missing, self.name) | |
if extra: | |
logging.warning('extra definitions %s for a %s header entry', extra, self.name) | |
@property | |
def name(self): | |
if self._name is not None: | |
return self._name | |
return self.__class__.__name__ | |
def __repr__(self): | |
return "HeaderEntry(type=%s, values=%s)" % (self.name, self.values) | |
def __getattr__(self, name): | |
try: | |
return self.values[name] | |
except KeyError: | |
raise AttributeError | |
class _HeaderFormatSpec(_HeaderEntry): | |
""" Base class for header entries used to cast values later on in the file """ | |
expected = ['ID', 'Number', 'Type', 'Description'] | |
def cast(self, string): | |
if string == '.': | |
return None | |
try: | |
return self.Type(string) | |
except ValueError: | |
if type == int: | |
try: | |
value = float(string) | |
logging.warning('int type is actually a float') | |
return value | |
except: | |
pass | |
logging.warning('unable to parse value %s as type %s for %s', string, self.Type, self.name) | |
class Header(list): | |
""" A collection of header entries """ | |
class FORMAT(_HeaderFormatSpec): | |
pass | |
class INFO(_HeaderFormatSpec): | |
pass | |
class FILTER(_HeaderEntry): | |
expected = ['ID', 'Description'] | |
class contig(_HeaderEntry): | |
expected = ['assembly', 'taxonomy', 'species', 'length', 'ID', 'md5'] | |
_classes = { | |
'INFO': INFO, | |
'FORMAT': FORMAT, | |
'FILTER': FILTER, | |
'contig': contig | |
} | |
def __init__(self, *entries): | |
list.__init__(self, *entries) | |
self.version = None | |
self.infos = {} | |
self.filters = {} | |
self.formats = {} | |
self.contigs = {} | |
self._check_version() | |
self._create_lookups() | |
def _check_version(self): | |
if len(self) > 0 and self[0].name == 'fileformat': | |
self.version = self[0].values | |
else: | |
logging.warning('header is missing fileformat specification') | |
def _create_lookups(self): | |
dicts = { | |
'INFO': self.infos, | |
'FILTER': self.filters, | |
'FORMAT': self.formats, | |
'contig': self.contigs | |
} | |
for entry in self: | |
dict_ = dicts.get(entry.name) | |
if dict_ is not None: | |
dict_[entry.ID] = entry | |
@classmethod | |
def _make_entry(cls, a): | |
""" Look up the class for a header line """ | |
key, vals = a[0], a[1] | |
try: | |
return cls._classes[key](vals) | |
except KeyError: | |
return _HeaderEntry(name=key, values=vals) | |
@classmethod | |
def _make_value(cls, (k, v)): | |
""" handle typing of key, value pairs """ | |
if k == 'Type': | |
if v == 'Integer': v = int | |
if v == 'String': v = str | |
if v == 'Float': v = float | |
return (k,v) | |
if k in ['Number', 'length'] : | |
try: | |
return k, int(v) | |
except ValueError: | |
return k, v | |
return k, v | |
@classmethod | |
def _make_ordered_dict(cls, el): | |
""" flatten a (head, list) pair into a list """ | |
l = [el[0]] | |
if len(el) > 1: | |
l = l + el[1] | |
return collections.OrderedDict(l) | |
# Lexer specification | |
# ------------------- | |
specs = [ | |
Spec('description', r'"([^"]*)"'), | |
Spec('op', r'[#<>=,\n]'), | |
Spec('number', r'(\d+)'), | |
Spec('name', r'[A-Za-z_0-9\.:\/-]+'), | |
] | |
def tokenize(s): | |
return list(make_tokenizer(specs)(s)) | |
# Grammar specification | |
# --------------------- | |
# a header value can be a name, a number or a quoted string | |
value = (sometok('name') | sometok('number') | sometok('description')) | |
# key value pairs are separated with '=' | |
kv = sometok('name') + op_('=') + value >> Header._make_value | |
# a list of kvs are comma separated | |
kvlist = kv + many(op_(',') + kv) >> Header._make_ordered_dict | |
# header lines are name = (value | <kvlist>) | |
header_line = op_('#') + op_('#') + sometok('name') + op_('=') + (value | op_('<') + kvlist + op_('>')) >> Header._make_entry | |
header = many(header_line + op_('\n')) >> Header | |
exf = """##fileformat=VCFv4.1 | |
##fileDate=20090805 | |
##source=myImputationProgramV3.1 | |
##reference=file:///seq/references/1000GenomesPilot-NCBI36.fasta | |
##contig=<ID=20,length=62435964,assembly=B36,md5=f126cdf8a6e0c7f379d618ff66beb2da,species="Homo sapiens",taxonomy=x> | |
##phasing=partial | |
##INFO=<ID=NS,Number=1,Type=Integer,Description="Number of Samples With Data"> | |
##INFO=<ID=DP,Number=1,Type=Integer,Description="Total Depth"> | |
##INFO=<ID=AF,Number=A,Type=Float,Description="Allele Frequency"> | |
##INFO=<ID=AA,Number=1,Type=String,Description="Ancestral Allele"> | |
##INFO=<ID=DB,Number=0,Type=Flag,Description="dbSNP membership, build 129"> | |
##INFO=<ID=H2,Number=0,Type=Flag,Description="HapMap2 membership"> | |
##FILTER=<ID=q10,Description="Quality below 10"> | |
##FILTER=<ID=s50,Description="Less than 50% of samples have data"> | |
##FORMAT=<ID=GT,Number=1,Type=String,Description="Genotype"> | |
##FORMAT=<ID=GQ,Number=1,Type=Integer,Description="Genotype Quality"> | |
##FORMAT=<ID=DP,Number=1,Type=Integer,Description="Read Depth"> | |
##FORMAT=<ID=HQ,Number=2,Type=Integer,Description="Haplotype Quality">""" | |
if __name__ == '__main__': | |
pprint.pprint(header.parse(tokenize(exf))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment