Created
June 9, 2016 20:16
-
-
Save sourcesimian/2131c9ab8f3900984349a9489e592391 to your computer and use it in GitHub Desktop.
Parser to interpret tabulated text
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import re | |
class TextTable(object): | |
""" | |
Parser to interpret tabulated text, e.g: | |
>>> text = '''------------------------------------------------------------------- | |
... PRES HGHT TEMP DWPT RELH MIXR DRCT SKNT THTA THTE THTV | |
... hPa m C C % g/kg deg knot K K K | |
... ----------------------------------------------------------------------------- | |
... 1006.0 61 29.4 10.4 31 320 9 302.0 325.9 303.5 | |
... 77 29.0 9.7 30 7.56 315 19 301.8 324.6 303.2 | |
... 1000.0 108 28.2 28 6.86 315 19 | |
... 953.0 530 24.0 7.3 34 6.77 320 25 301.3 | |
... 925.0 791 21.6 5.6 35 6.20 305 28 301.4 320.2 302.5 | |
... 896.0 1064 36 5.67 290 34 302.0 319.3 303.0 | |
... 850.0 1516 36 4.86 280 37 302.9 317.9 303.8 | |
... 816.0 1857 13.2 -1.8 35 4.14 270 44 303.5 316.4 304.3 | |
... 807.0 12.5 -2.5 35 3.96 270 45 303.7 316.1 304.4 | |
... 744.0 2629 7.0 -8.0 313.9 305.4 | |
... 674.0 3431 4.2 -22.8 12 | |
... 4582 -4.1 -31.1 10 0.49 280 38 313.9 315.7 314.0''' | |
>>> t = TextTable(text, heading_lines=2, ignore=('^-+$',), keys=lambda h: "%s (%s)" % (h[0], h[1])) | |
>>> print(t.keys()) | |
['PRES (hPa)', 'HGHT (m)', 'TEMP (C)', 'DWPT (C)', 'RELH (%)', 'MIXR (g/kg)', 'DRCT (deg)', 'SKNT (knot)', 'THTA (K)', 'THTE (K)'] | |
>>> print(t.headings(0)) | |
['PRES', 'HGHT', 'TEMP', 'DWPT', 'RELH', 'MIXR', 'DRCT', 'SKNT', 'THTA', 'THTE'] | |
>>> print(t.headings(1)) | |
['hPa', 'm', 'C', 'C', '%', 'g/kg', 'deg', 'knot', 'K', 'K'] | |
>>> print(t.row(2)) | |
['1000.0', '108', '28.2', None, '28', '6.86', '315', '19', None, None] | |
>>> print(t.column('TEMP (C)')) | |
['29.4', '29.0', '28.2', '24.0', '21.6', None, None, '13.2', '12.5', '7.0', '4.2', '-4.1'] | |
>>> for row in t.rows(): | |
... print(row) | |
... break | |
(0, ['1006.0', None, '1000.0', '953.0', '925.0', '896.0', '850.0', '816.0', '807.0', '744.0', '674.0', None]) | |
>>> for col in t.columns(): | |
... print(col) | |
... break | |
('PRES (hPa)', ['1006.0', None, '1000.0', '953.0', '925.0', '896.0', '850.0', '816.0', '807.0', '744.0', '674.0', None]) | |
""" | |
_data = None | |
def __init__(self, text, heading_lines=None, keys=None, ignore=None): | |
lines = self._split_lines(text, | |
ignore or ()) | |
self._setup_column_ranges(lines) | |
self._parse_table(lines, | |
heading_lines or 1, | |
keys or (lambda h: ' '.join(h))) | |
def keys(self): | |
return [col['key'] for col in self._data] | |
def headings(self, i): | |
return [col['heading'][i] for col in self._data] | |
def row(self, index): | |
return [col['values'][index] for col in self._data] | |
def rows(self): | |
for i, row in enumerate(zip([col['values'] for col in self._data])): | |
yield i, row[0] | |
def column(self, key): | |
return self._keys[key]['values'] | |
def columns(self): | |
for col in self._data: | |
yield col['key'], col['values'] | |
def _parse_table(self, lines, heading_lines, keys): | |
cols = [] | |
for i in range(len(self._ranges)): | |
cols.append({'heading': [], 'values': []}) | |
for line in lines[:heading_lines]: | |
items = self._split_columns(line) | |
for col, heading in zip(cols, items): | |
col['heading'].append(heading) | |
for line in lines[heading_lines:]: | |
items = self._split_columns(line) | |
for col, heading in zip(cols, items): | |
col['values'].append(heading) | |
def key(col): | |
k = keys(col['heading']) | |
col['key'] = k | |
return k | |
self._data = cols | |
self._keys = {key(col): col for col in cols} | |
def _split_lines(self, text, ignore): | |
lines = [] | |
skip = [re.compile(i) for i in ignore] | |
for line in text.strip().splitlines(): | |
if any([s.match(line) for s in skip]): | |
continue | |
lines.append(line) | |
return lines | |
def _setup_column_ranges(self, lines): | |
mask = [] | |
for line in lines: | |
while len(mask) < len(line): | |
mask.append(False) | |
for i, ch in enumerate(line): | |
if ch != ' ': | |
mask[i] = True | |
ranges = [] | |
b = None | |
for i, m in enumerate(mask): | |
if m: | |
if b is None: | |
b = i | |
else: | |
if b: | |
ranges.append((b, i)) | |
b = None | |
self._ranges = ranges | |
def _split_columns(self, line): | |
items = [] | |
for b, e in self._ranges: | |
value = line[b:e].strip() | |
if not value: | |
value = None | |
items.append(value) | |
assert len(self._ranges) == len(items) | |
return items | |
if __name__ == "__main__": | |
import doctest | |
doctest.testmod() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment