Skip to content

Instantly share code, notes, and snippets.

@mara004
Last active September 11, 2024 00:33
Show Gist options
  • Save mara004/0ef12eaa154502f25de2aee94e82f84b to your computer and use it in GitHub Desktop.
Save mara004/0ef12eaa154502f25de2aee94e82f84b to your computer and use it in GitHub Desktop.
Page number spec parser [Draft]
# SPDX-FileCopyrightText: 2024 geisserml <geisserml@gmail.com>
# SPDX-License-Identifier: MPL-2.0
# Sophisticated parser for a page number spec mini-language
# Technically, this might be a use case for some parser generator like pyparsing or PLY, but this is a manual implementation based on common string operations.
__all__ = ["parse_pagenums"]
import re
import logging
from contextlib import contextmanager
logger = logging.getLogger(__name__)
def _compile_op_group(*operators):
return re.compile("(" + "|".join(re.escape(op) for op in operators) + ")")
# -- Tokens --
# Tokens currently must be non-alphanumeric, single characters, as we are using a simple loop and set for token partitioning.
OPEN = "("
CLOSE = ")"
SEP = ","
RELATIVE = "="
DASH = "-" # backward or range
STEP = "@"
EXCLUDE = "/"
MULTIPLEX = "#"
REVERSE = "~"
EVAL = "!"
# -- Token splitter groups --
# Let's hope that `re` internally builds a lookup table or similar.
# Note that EVAL is handled specially and thus not contained in OPERATORS_RE.
SEP_RE = _compile_op_group(SEP)
BRACKETS_RE = _compile_op_group(OPEN, CLOSE)
OPERATORS_RE = _compile_op_group(RELATIVE, DASH, STEP, EXCLUDE, MULTIPLEX, REVERSE)
# -- Variables --
BEGIN = "begin" # may be > 1 in RELATIVE context
END = "end"
ODD = "odd"
EVEN = "even"
ALL = "all"
class Token (str):
def __repr__(self):
return "t" + super().__repr__()
# partition_by_tokens() is an re.split() wrapper with the addition of marking split characters as Token and cleaning up empty parts. An alternative implementation could use re.finditer().
def partition_by_tokens(text, regex):
parts = re.split(regex, text)
for i in range(1, len(parts), 2):
parts[i] = Token(parts[i])
return [p for p in parts if p != ""]
class Group (list):
def __repr__(self):
return f"G({list.__repr__(self)[1:-1]})"
def __str__(self):
return OPEN + "".join(str(p) for p in self) + CLOSE
def shallow_split(self, regex):
partitioned = []
for item in self:
if isinstance(item, str):
partitioned.extend(partition_by_tokens(item, regex))
else: # isinstance(item, Group)
partitioned.append(item)
logger.debug(f"Tokenized for split:\n{' '*4}{partitioned}")
wrapped = []
cursor = 0
sym_indices = (i for i, v in enumerate(partitioned) if isinstance(v, Token))
for i in sym_indices:
wrapped.append(partitioned[cursor:i])
cursor = i+1
wrapped.append(partitioned[cursor:])
return wrapped
class _RawBracketError (ValueError):
def __init__(self, _, err_stack):
self.err_stack = err_stack
super().__init__() # f"Raw bracket error: {err_stack}"
class BracketError (ValueError):
def __init__(self, parts, err_stack):
highlight = ""
for group in err_stack:
highlight += " "*len(str(group)[1:-1]) + "^"
super().__init__(f"Unmatched bracket(s):\n{''.join(parts)}\n{highlight}")
class GroupMaker:
def __init__(self, parts, exc_type=BracketError):
self.parts = parts
self.exc_type = exc_type
self.group = Group()
self.stack = []
def compile(self):
for i, p in enumerate(self.parts):
if isinstance(p, Token):
self.TOKEN_TABLE[p](self, i)
else:
self.group.append(p)
# check against unmatched open brackets
if len(self.stack) > 0:
raise self.exc_type(self.parts, self.stack)
return self.group
def on_open(self, _):
# push current group to stack and open a new group
self.stack.append(self.group)
self.group = Group()
def on_close(self, i):
# check against unmatched closing brackets
if not len(self.stack) > 0:
# recurse over the remaining part to show all errors at once
# use two separate exception types to calculate highlighting only once for the caller-facing exception
err_stack = [self.group]
try:
GroupMaker(self.parts[i+1:], exc_type=_RawBracketError).compile()
except _RawBracketError as e:
err_stack.extend(e.err_stack)
raise self.exc_type(self.parts, err_stack)
# merge current group into nearest group on stack and pop
self.stack[-1].append(self.group)
self.group = self.stack.pop()
TOKEN_TABLE = {
Token(OPEN): on_open,
Token(CLOSE): on_close,
}
class OpPart (list):
def __repr__(self):
return "p" + super().__repr__()
class ResolvedPart (list):
def __repr__(self):
return "r" + super().__repr__()
def _tokenize_part(p):
return partition_by_tokens(p.replace(" ", ""), OPERATORS_RE)
def tokenize(root_group):
splitted = root_group.shallow_split(SEP_RE)
logger.debug(f"Shallow splitted:\n{' '*4}{splitted}")
meta_parts = []
for sep_portion in splitted:
part = []
for i, p in enumerate(sep_portion):
if isinstance(p, str):
if EVAL in p:
a, b = p.split(EVAL, maxsplit=1)
sep_remainder = sep_portion[i+1:]
if not b.replace(" ", "") and sep_remainder and isinstance(sep_remainder[0], Group):
part.extend( [*_tokenize_part(a), Token(EVAL)] )
else:
eval_str = b + "".join(str(x) for x in sep_remainder)
part.extend( [*_tokenize_part(a), Token(EVAL), eval_str] )
break
else:
part.extend(_tokenize_part(p))
else: # isinstance(p, Group)
part.append(p)
meta_parts.append(part[0] if len(part) == 1 else OpPart(part))
# tolerate trailing or multiple commas
return [p for p in meta_parts if p != []]
def _apply_excludes(base, excludes):
not_found = []
for v in excludes:
# note, in case of multiple occurrences, this removes the leftmost item
base.remove(v) if v in base else not_found.append(v)
if len(not_found) > 0:
raise ValueError(f"Unfound excludes: {base} / {not_found}")
def _wrap_to_list(item):
if not isinstance(item, list):
return [item]
return item
def _prios_keyfunc(pair):
_, (prio, _) = pair
return prio
NO_OPERATOR = (0, None)
class OpHandler:
def __init__(self, ref_translator, variables, recursive_pnp):
self.ref_translator = ref_translator
self.variables = variables
self.recursive_pnp = recursive_pnp
self.ctx = None # initial
def __call__(self, token_data):
out = []
for part in token_data:
parsed = self.PART_TABLE[type(part)](self, part)
(out.extend if isinstance(parsed, list) else out.append)(parsed)
return out
def _apply_ops(self, part, prios):
if len(part) < 2: # primitive
value, = part # unpack
return self.PRIMITIVES_TABLE[type(value)](self, value)
else:
i, (prio, func) = max(enumerate(prios), key=_prios_keyfunc)
# assert prio != 0, "Operator expected"
return func(self, part[:i], part[i+1:], prios[:i], prios[i+1:])
def parse_op_part(self, part):
prios = [self.OP_TABLE[x] if isinstance(x, Token) else NO_OPERATOR for x in part]
return self._apply_ops(part, prios)
def parse_str(self, value):
if value.isdigit():
return int(value)
else:
var = self.variables[self.ctx][value.lower()]
if isinstance(var, (range, tuple)):
var = list(var)
return var
def parse_group(self, group):
token_data = tokenize(group)
logger.debug(f"Tokenized Operators:\n{' '*4}{token_data}")
return self(token_data)
@contextmanager
def relative_ctx(self, tmp_ctx):
assert self.ctx is None, "Nested relativity is not meaningful"
orig_ctx, self.ctx = self.ctx, tmp_ctx
try:
yield
finally:
self.ctx = orig_ctx
def on_relative(self, left, right, lprios, rprios):
leading_ops, label = left[:-1], left[-1]
with self.relative_ctx(label):
value = _wrap_to_list(self._apply_ops(right, rprios))
output = [self.ref_translator[label][n] for n in value]
if leading_ops:
return self._apply_ops([*leading_ops, ResolvedPart(output)], [*lprios, NO_OPERATOR])
else:
return output
def on_exclude(self, left, right, lprios, rprios):
base = self._apply_ops(left, lprios)
excludes = _wrap_to_list(self._apply_ops(right, rprios))
_apply_excludes(base, excludes) # modifies base in place
return base
def on_reverse(self, left, right, lprios, rprios):
assert not left
value = self._apply_ops(right, rprios)
return value[::-1] # or list(reversed(value))
def on_multiplex(self, left, right, lprios, rprios):
value = self._apply_ops(left, lprios)
count = self._apply_ops(right, rprios)
if isinstance(value, list):
return value * count
else:
return [value] * count
def on_step(self, left, right, lprios, rprios):
value = self._apply_ops(left, lprios)
step = self._apply_ops(right, rprios)
return value[::step]
def on_dash(self, left, right, lprios, rprios):
if not left:
stop = self.variables[self.ctx][END] + 1
value = self._apply_ops(right, rprios)
if isinstance(value, list):
return [stop - n for n in value]
else:
return stop - value
else:
first = self._apply_ops(left, lprios)
last = self._apply_ops(right, rprios)
if first < last:
return list( range(first, last+1) )
else:
return list( range(first, last-1, -1) )
def on_eval(self, left, right, lprios, rprios):
assert not left
value, = right # unpack
value = str(value) # may be a Group
namespace = {**self.variables[self.ctx], "V": self.variables, "R": self.ref_translator, "pnp": self.recursive_pnp}
out = eval(value, namespace)
if isinstance(out, (tuple, range)):
out = list(out)
return out
PRIMITIVES_TABLE = {
str: parse_str,
Group: parse_group,
ResolvedPart: lambda h, v: v, # passthrough
}
PART_TABLE = PRIMITIVES_TABLE.copy()
PART_TABLE[OpPart] = parse_op_part
# Note that priority must be greater than that of NO_OPERATOR
OP_TABLE = {
# Token(CHAR): (prio, func)
Token(RELATIVE): (7, on_relative),
Token(EXCLUDE): (6, on_exclude),
Token(REVERSE): (5, on_reverse),
Token(STEP): (4, on_step),
Token(MULTIPLEX): (3, on_multiplex),
Token(DASH): (2, on_dash),
Token(EVAL): (1, on_eval),
}
def validate(page_indices, variables):
pass # TODO
def get_variables(base_length, ref_translator):
variables = {}
stop = base_length + 1
variables[None] = {BEGIN: 1, END: base_length, ODD: range(1, stop, 2), EVEN: range(2, stop, 2), ALL: range(1, stop)}
# NOTE this requires an ordered mapping
for label, mapping in ref_translator.items():
odd, even = [], []
for n in mapping.keys():
(odd if n % 2 else even).append(n)
variables[label] = {
BEGIN: min(mapping.keys()), END: max(mapping.keys()),
ODD: tuple(odd), EVEN: tuple(even), ALL: tuple(mapping.keys()),
}
return variables
def parse_pagenums(string, ref_translator=None, variables=None):
logger.debug(f"Input:\n{' '*4}{string!r}")
bracket_parts = partition_by_tokens(string, BRACKETS_RE)
logger.debug(f"Tokenized Brackets:\n{' '*4}{bracket_parts}")
root_group = GroupMaker(bracket_parts).compile()
logger.debug(f"Grouped by Brackets:\n{' '*4}{root_group!r}")
recursive_pnp = lambda s: parse_pagenums(s, ref_translator, variables)
page_indices = OpHandler(ref_translator, variables, recursive_pnp)([root_group])
validate(page_indices, variables)
return page_indices
# testing
logger.addHandler(logging.StreamHandler())
logger.setLevel(logging.DEBUG)
import time
t_doclen = 20
t_ref_translator = {"A": {2:8, 3:9, 4:10, 5:11, 6:12, 7:13, 10:16}, "1": {1:3}}
t_variables = get_variables(t_doclen, t_ref_translator)
logger.debug("Variables:\n" + '\n'.join(f"{k}: {v}" for k, v in t_variables.items()))
def testcase(string):
out = parse_pagenums(string, t_ref_translator, t_variables)
print(out, end="\n\n")
return out
def run_samples():
# testcase("1,2,(3,4)),(5,(6")
# testcase("A=B=1") # or "A=(B=1)"
starttime = time.time()
# NOTE the test samples aren't particularly meaningful or creative at this time, they're just to traverse the code and test parser functionality
testcase("1, 2, (3, (4, 5)), 6#2, 6-10/8, (~A=2-3)/9, 8-10/A=2, ~3-5")
testcase("-1, !-1, !8-5, !(3-1)-6, A=!(begin, end), 1-8/![x for x in range(2, 5)]")
testcase("end, 1, -end, -1, -odd")
testcase("end-1/(1-2), odd/1, even/2, all/(3,5-7)")
testcase("A=(begin, end, even, odd, all)")
testcase("A=(all/(2, 4)), all/A=(2,4), A=all/(2,4), 1=1")
testcase("1-7@2, 7-1, , 1-5#2@2, ")
testcase("!pnp('begin, end'), ![v for p in zip(pnp('5-10'), pnp('10-5')) for v in p]")
duration = time.time() - starttime
print(f"Duration {duration}s")
if __name__ == "__main__":
run_samples()
@mara004
Copy link
Author

mara004 commented Aug 17, 2024

Possible regex-based token partitioning (click to expand)

The two RE-based approaches are similar in performance, but slightly slower than the current simple char loop.
On the other hand, RE-based partitioning would be a lot more powerful and flexible: This might allow for alphabetic split tokens (with lookahead check against variable names), multi-char tokens, etc. Not that we need it, though.
That said, actually using these more advanced semantics would probably come at some performance cost.

diff --git a/pnp.py b/pnp.py
index 2db2990..32ac3ab 100644
--- a/pnp.py
+++ b/pnp.py
@@ -6,6 +6,7 @@
 
 __all__ = ["parse_pagenums"]
 
+import re
 import enum
 import logging
 from collections import namedtuple
@@ -34,7 +35,6 @@ VOID = _Void.VOID
 
 # -- Tokens --
 # Tokens currently must be non-alphanumeric, single characters, as we are using a simple loop and set for token partitioning.
-# If an advanced syntax were desired, it might be possible to use regular expressions and re.finditer() in partition_by_tokens().
 OPEN      = "("
 CLOSE     = ")"
 SEP       = ","
@@ -46,11 +46,16 @@ MULTIPLEX = "#"
 REVERSE   = "~"
 EVAL      = "!"
 
-# -- Token groups --
-# The performance of a char set contains check should be roughly equivalent to a direct ord() lookup table.
+# -- Token splitter groups --
 # Note that EVAL is handled specially and thus not contained in OPERATORS.
-BRACKETS  = {OPEN, CLOSE}
-OPERATORS = {RELATIVE, DASH, STEP, EXCLUDE, MULTIPLEX, REVERSE}
+# Let's hope that `re` internally builds a lookup table or something similarly efficient.
+
+def _compile_op_group(*operators):
+    return re.compile("(" + "|".join(re.escape(op) for op in operators) + ")")
+
+SEP_RE = _compile_op_group(SEP)
+BRACKETS_RE  = _compile_op_group(OPEN, CLOSE)
+OPERATORS_RE = _compile_op_group(RELATIVE, DASH, STEP, EXCLUDE, MULTIPLEX, REVERSE)
 
 # -- Variables --
 BEGIN = "begin"  # may be > 1 in RELATIVE context
@@ -64,18 +69,27 @@ class Token (str):
     def __repr__(self):
         return "t" + super().__repr__()
 
-def partition_by_tokens(text, tset):
-    parts = []
-    cursor = 0
+# partition_by_tokens() is an re.split() equivalent with the addition of marking split characters as Token and cleaning up empty parts
+
+def partition_by_tokens(text, regex):
+    parts = re.split(regex, text)
+    for i in range(1, len(parts), 2):
+        parts[i] = Token(parts[i])
+    return [p for p in parts if p != ""]
+
+# def partition_by_tokens(text, regex):
     
-    for i, c in enumerate(text):
-        if c in tset:
-            parts.append(text[cursor:i])
-            parts.append(Token(c))
-            cursor = i+1
+#     parts = []
+#     cursor = 0
     
-    parts.append(text[cursor:])
-    return [p for p in parts if p != ""]
+#     for m in re.finditer(regex, text):
+#         start = m.start()
+#         parts.append(text[cursor:start])
+#         parts.append(Token(m.group()))
+#         cursor = m.end()
+    
+#     parts.append(text[cursor:])
+#     return [p for p in parts if p != ""]
 
 
 class Group (list):
@@ -86,15 +100,15 @@ class Group (list):
     def __str__(self):
         return OPEN + "".join(str(p) for p in self) + CLOSE
     
-    def shallow_split(self, symbol):
+    def shallow_split(self, regex):
         
         partitioned = []
         for item in self:
             if isinstance(item, str):
-                partitioned.extend(partition_by_tokens(item, {symbol}))
+                partitioned.extend(partition_by_tokens(item, regex))
             else:  # isinstance(item, Group)
                 partitioned.append(item)
-        logger.debug(f"Tokenized {symbol!r}:\n{' '*4}{partitioned}")
+        logger.debug(f"Tokenized for split:\n{' '*4}{partitioned}")
         
         wrapped = []
         cursor = 0
@@ -174,10 +188,10 @@ class ResolvedPart (list):
         return "r" + super().__repr__()
 
 def _tokenize_part(p):
-    return partition_by_tokens(p.replace(" ", ""), OPERATORS)
+    return partition_by_tokens(p.replace(" ", ""), OPERATORS_RE)
 
 def tokenize(root_group):
-    splitted = root_group.shallow_split(SEP)
+    splitted = root_group.shallow_split(SEP_RE)
     logger.debug(f"Shallow splitted:\n{' '*4}{splitted}")
     meta_parts = []
     for sep_portion in splitted:
@@ -385,7 +399,7 @@ def get_variables(base_length, ref_translator):
 def parse_pagenums(string, ref_translator=None, variables=None):
     
     logger.debug(f"Input:\n{' '*4}{string!r}")
-    bracket_parts = partition_by_tokens(string, BRACKETS)
+    bracket_parts = partition_by_tokens(string, BRACKETS_RE)
     logger.debug(f"Tokenized Brackets:\n{' '*4}{bracket_parts}")
     root_group = GroupMaker(bracket_parts).compile()
     logger.debug(f"Grouped by Brackets:\n{' '*4}{root_group!r}")

Update: now applied to the above code

@mara004
Copy link
Author

mara004 commented Aug 19, 2024

Note that the PNP is non-iterative. Each processing step loads the whole result into RAM. However, given that strings need little memory, this should be quite irrelevant for realistic input sizes. It might be possible to rewrite this with peekable iterators, but there is no intent to do so.

@mara004
Copy link
Author

mara004 commented Sep 7, 2024

Fixme: There are currently some excessive if-checks in the OperatorHandler. Want to avoid these:

  1. Instead of applying operators recursively unless the operator asks for raw, client functions should always receive the raw parts, and invoke recursive parsing on their own. This avoids checking if raw: ...
  2. Instead of converting empty parts to VOID, just return the empty parts directly. This works thanks to the downstream control over recursion from 1). So we can remove the _apply_or_void() wrapper (and VOID) altogether, thereby avoiding an excessive function call.
  3. See if we can avoid the extend if list else append check by always returning a sequence type. This should also let us avoid some _wrap_to_list() calls. Requires downstream unpacking where we expect a single value.
  4. See if we can avoid the casting of tuple/range to list in variable parsing and eval. Instead, cast to list on exclude.

@mara004
Copy link
Author

mara004 commented Sep 10, 2024

I think I've addressed points 1 and 2. The procedure should be more elegant now, but it turned out a bit nasty in terms of code style (longer method signatures, and _apply_ops(part, prios) calls).

I don't feel sure about 3 and 4, though, so these are not done yet.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment