Skip to content

Instantly share code, notes, and snippets.

@piyueh
Created April 8, 2022 18:07
Show Gist options
  • Save piyueh/403e40995c0b000e0dd0272a847c789e to your computer and use it in GitHub Desktop.
Save piyueh/403e40995c0b000e0dd0272a847c789e to your computer and use it in GitHub Desktop.
A simple OpenFOAM data reader with double-precision floats
#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# vim:fenc=utf-8
#
# Copyright © 2022 Pi-Yueh Chuang <pychuang@pm.me>
#
# Distributed under terms of the BSD 3-Clause license.
"""Test.
"""
import re
import struct
import pathlib
import numpy
import pyvista
import vtk
def parse_lists(content, datatype, factor=1):
"""Find and parse lists in a binary string.
"""
dtlen = struct.calcsize(datatype)
# reading all numbers of arrays
results = re.finditer(b"^(\d+)\n\(", content, re.DOTALL | re.MULTILINE)
assert results is not None, "Failed to read arrays."
# `meta` contains tuples. Each tuple represents the following info of a list:
# 1st element: number of elements in a list
# 2nd element: beginning index of the list; offset by 2 due to leading "\n("
meta = list(map(lambda inp: (int(inp.group(1)), inp.end(1)+2), results))
# read all lists
lists = map(
lambda inp: struct.unpack(
f"{inp[0]*factor}{datatype}",
content[inp[1]:inp[1]+inp[0]*factor*dtlen]
),
meta
)
return list(lists)
def parse_lists_in_file(filename, datatype, factor=1):
"""Find and parse lists in a binary file.
"""
filename = pathlib.Path(filename).expanduser().resolve()
with open(filename, "rb") as fobj:
content = fobj.read()
return parse_lists(content, datatype, factor)
def read_points(casedir):
"""Read points' coordinates.
"""
casedir = pathlib.Path(casedir).expanduser().resolve()
data = parse_lists_in_file(casedir.joinpath("constant", "polyMesh", "points"), "d", 3)
assert len(data) == 1, f"Number of lists in file `points` shoule be 1. Found {len(data)}."
data = numpy.array(data[0]).reshape(-1, 3)
return data
def read_faces(casedir):
"""Read faces' definitions.
"""
casedir = pathlib.Path(casedir).expanduser().resolve()
data = parse_lists_in_file(casedir.joinpath("constant", "polyMesh", "faces"), "i")
assert len(data) == 2, f"Number of lists in file `faces` shoule be 2. Found {len(data)}."
data = [data[1][bg:ed] for bg, ed in zip(data[0][:-1], data[0][1:])] # reshape to 2D
return data
def read_owner(casedir):
"""Read faces' owners.
"""
casedir = pathlib.Path(casedir).expanduser().resolve()
data = parse_lists_in_file(casedir.joinpath("constant", "polyMesh", "owner"), "i")
assert len(data) == 1, f"Number of lists in file `owner` shoule be 1. Found {len(data)}."
return data[0]
def read_neighbour(casedir):
"""Read faces' neighbours.
Note the spelling of "neighbour".
"""
casedir = pathlib.Path(casedir).expanduser().resolve()
data = parse_lists_in_file(casedir.joinpath("constant", "polyMesh", "neighbour"), "i")
assert len(data) == 1, f"Number of lists in file `neighbour` shoule be 1. Found {len(data)}."
return data[0]
def read_boundary(casedir):
"""Read faces' neighbours.
"""
casedir = pathlib.Path(casedir).expanduser().resolve()
with open(casedir.joinpath("constant", "polyMesh", "boundary"), "rb") as fobj:
content = fobj.read()
# reading the number of boundaries
result = re.search(b"^(\d+)\n\(", content, re.DOTALL | re.MULTILINE) # should be only 1 match
assert result is not None, "Failed to read the number of boundaries."
nbc = int(result.group(1))
content = content[result.end(1)+2:]
# read each boundary
boundaries = {}
results = re.finditer(b"(\w+)\s+\{(.*?)}\s", content, re.DOTALL | re.MULTILINE)
boundaries = {result.group(1).decode(): result.group(2).decode() for result in results}
for name, boundary in boundaries.items(): # note that boundary is pure str; not binary str!
results = re.findall("(\w+)\s+(.*?);$", boundary, re.DOTALL | re.MULTILINE)
boundaries[name] = {key: val for key, val in results}
# convert integers
boundaries[name]["nFaces"] = int(boundaries[name]["nFaces"])
boundaries[name]["startFace"] = int(boundaries[name]["startFace"])
return boundaries
def read_cellzones(casedir):
"""Read cellzones' definitions.
"""
casedir = pathlib.Path(casedir).expanduser().resolve()
with open(casedir.joinpath("constant", "polyMesh", "cellZones"), "rb") as fobj:
content = fobj.read()
# get the number of zones (re.search guarantees the result is the first match)
result = re.search(b"^(\d+)\s+\(", content, re.DOTALL | re.MULTILINE)
assert result is not None, "Failed to get the number of zones."
# number of zones and the content after the number of zones
nzones = int(result.group(1))
content = content[result.end(1)+2:] # offset 2 characters due to "\n("
# get the names of each zone
names = re.findall(b"^(\w+)\s+\{", content, re.DOTALL | re.MULTILINE)
# get the lists from the content
arrys = parse_lists(content, "i")
result = {name.decode(): arry for name, arry in zip(names, arrys)}
return result
def read_times(casedir):
"""Read all time values of this simulation case.
"""
casedir = pathlib.Path(casedir).expanduser().resolve()
times = []
for pth in casedir.iterdir():
result = re.fullmatch(r"^(\d+(?:$|\.\d+$))", pth.name)
if result is None: continue
if result.group(1) == "0":
times.append("0")
continue
tfile = list(pth.glob("**/time"))
assert len(tfile) == 1
tfile = tfile[0]
with open(tfile, "r") as fobj: # time file seems to always be ASCII format
content = fobj.read()
result = re.search(r"\sname\s+\"(\d+(?:\.\d+)?)\";", content)
assert str(pth.relative_to(casedir)) == result.group(1)
times.append(str(pth.relative_to(casedir)))
return times
def read_snapshot(casedir, time, field):
"""Read in simulation at a specific simulation time.
"""
# in case we forget to use a str sometimes
assert isinstance(time, str), "Please make the time value a str."
casedir = pathlib.Path(casedir).expanduser().resolve()
solnfile = casedir.joinpath(time, field)
assert solnfile.exists() and solnfile.is_file(), f"{solnfile} did not exist or is not a file."
with open(solnfile, "rb") as fobj:
content = fobj.read()
# regex patterns (use plain str first)
floatptn = r"(?:\d+(?:\.\d+)?)"
vectorptn = rf"\({floatptn}\s+{floatptn}\s+{floatptn}\)"
listptn = r"List<(\S+)>\s+.*?\)\s?"
ptn = rf"\sinternalField\s+(\S+)\s+({floatptn}|{vectorptn}|{listptn});\s"
# get the type of value for internalField
result = re.search(ptn.encode(), content, re.DOTALL)
assert result is not None, f"Error parsing solution file {solnfile}"
# type
field_type = result.group(1).decode() # convert binary str to a plain str
if "nonuniform" in field_type:
if b"scalar" in result.group(2):
soln = numpy.array(
parse_lists(content[result.start(2):], "d")[0],
dtype=float
)
elif b"vector" in result.group(2):
soln = numpy.array(
parse_lists(content[result.start(2):], "d", 3)[0],
dtype=float
).reshape(-1, 3)
else:
raise NotImplementedError(f"{result.group(2)} has not been implemented.")
elif "uniform" in field_type:
try:
soln = float(result.group(2))
except ValueError as err:
if not str(err).startswith("could not convert"):
raise
soln = numpy.array([float(val) for val in result.group(2).strip(b"() \t").split()])
else:
raise RuntimeError("I don't know there're other field types.")
return soln
def get_cell_faces(ncells, owners, neighbour):
"""Calculate the face IDs of cells.
"""
# get info from the file `owner`
face_owners = numpy.array(owners)
face_idxs = face_owners.argsort().tolist()
face_owners = face_owners[face_idxs].tolist()
cell_w_faces, cell_face_counts = numpy.unique(face_owners, return_counts=True)
cum_cell_face_counts = numpy.zeros(ncells+1, dtype=int)
cum_cell_face_counts[cell_w_faces+1] = cell_face_counts
cum_cell_face_counts = numpy.cumsum(cum_cell_face_counts)
# merge the data from `neighbour` and `owner`
cell_faces = list(map(
lambda inp: face_idxs[inp[0]:inp[1]],
zip(cum_cell_face_counts[:-1], cum_cell_face_counts[1:])
))
# slow for-loop; need imporvement
for face, cell in enumerate(neighbour):
cell_faces[cell].append(face)
return cell_faces
def get_cell_vertices(cell_faces, faces):
"""Calculate the vertex IDs of cells (unordered).
"""
cell_verts = []
# slow; but I can live w/ it for now
for vals in cell_faces:
verts = set()
for val in vals:
verts.update(faces[val])
verts = list(verts)
verts.sort()
cell_verts.append(verts)
return cell_verts
def get_cell_data_vtk(cell_faces, faces, points):
"""Calculate the vertex IDs of cells in preparation for VTK data objects.
"""
cell_verts = []
cell_types = []
# slow; but I can live w/ it for now
for vals in cell_faces:
local_faces = [list(faces[val]) for val in vals]
verts = get_hexahedron_nodes(local_faces)
verts = reorder_nodes(verts, points[verts])
nverts = len(set(verts))
assert nverts == 8, f"Currently only support hexahedrons. Got {nverts} vertices: {verts}"
cell_types.append(vtk.VTK_HEXAHEDRON)
cell_verts.append(nverts)
cell_verts.extend(verts)
return numpy.array(cell_types, dtype=int), numpy.array(cell_verts, dtype=int)
def get_vtk_unstructuredgrid(casedir, times=None, fields=("U", "p")):
"""Get a pyvista.UnstructuredGrid.
"""
casedir = pathlib.Path(casedir).expanduser().resolve()
print("Reading mesh vertices")
points = read_points(casedir)
print("Reading faces' definition")
faces = read_faces(casedir)
print("Reading faces' owners")
owners = read_owner(casedir)
print("Reading faces' neighbors")
neighbour = read_neighbour(casedir)
print("Calculating cells' definitions w.r.t. faces")
cell_faces = get_cell_faces(max(owners)+1, owners, neighbour)
print("Calculating cells' definitions w.r.t. vertices")
cell_types, cell_verts = get_cell_data_vtk(cell_faces, faces, points)
print("Generating pyvista.UnstructuredGrid")
mesh = pyvista.UnstructuredGrid(cell_verts, cell_types, points)
# get all available times if users did not provide any
if times is None:
times = read_times(casedir)
elif numpy.isscalar(times):
times = [str(times)]
elif isinstance(times, str):
times = [times]
elif not hasattr(times, "__iter__"):
raise ValueError("`times` got the wrong type of value.")
for time in times:
for field in fields:
print(f"Reading field {field} at time {time}")
data = read_snapshot(casedir, time, field)
if isinstance(data, float):
data = numpy.full(mesh.n_cells, data, dtype=float)
elif data.ndim == 1 and data.size ==3 and mesh.n_cells != 3:
data = numpy.tile(data, (mesh.n_cells, 1))
mesh.cell_data[f"{time}/{field}"] = data
if data.ndim == 1:
mesh.set_active_scalars(f"{time}/{field}", "cell")
elif data.ndim == 2:
mesh.set_active_vectors(f"{time}/{field}", "cell")
else:
raise NotImplementedError("Haven't implemented tensors or higher rank data format")
return mesh
def get_hexahedron_nodes(faces):
"""Given the faces of a hexahedron, return an arbitrary pair of face that do share an edge.
"""
assert len(faces) == 6, f"A hexahedron only has 6 faces. Got {len(faces)}."
assert all(len(face) == 4 for face in faces), f"Some faces don't have exactly 4 nodes: {faces}."
# let the first face encountered as the first face in the pair
face_1 = faces[0]
# check the disjoint status against other faces
disjoint = [set(face_1).isdisjoint(set(face)) for face in faces[1:]]
assert disjoint.count(False) == 4, f"A face doesn't have 4 neighbors: {faces}"
# the only one with True is the face 2
face_2_idx = disjoint.index(True)+1
face_2 = faces[face_2_idx]
# pop out the two faces for convenience and use sets
faces = [set(face) for face in faces]
faces[0] = set()
faces[face_2_idx] = set()
# new lists to store sorted nodes for face 1
face_1_new = [face_1.pop(0)]
# sort face_1
counter = 0
while len(face_1) != 0:
node_1 = face_1_new[-1]
node_2 = face_1.pop(0)
results = [{node_1, node_2} < face for face in faces]
if not any(results):
face_1.append(node_2)
continue
if results.count(True) != 1:
raise RuntimeError(f"Shouldn't be > 1. Got {results}.")
face_1_new.append(node_2)
counter += 1
if counter == 1000:
raise RuntimeError("Infinite loop.")
face_2_new = []
# sort face_2 following face_1's order
for node_1 in face_1_new:
counter = 0
while len(face_2) != 0:
counter += 1
if counter == 1000:
raise RuntimeError("Infinite loop.")
node_2 = face_2.pop(0)
results = [{node_1, node_2} < face for face in faces]
if not any(results) or results.count(True) == 1:
face_2.append(node_2)
continue
if results.count(True) > 2:
raise RuntimeError(f"Shouldn't be > 2. Got {results}.")
face_2_new.append(node_2)
break
return face_1_new + face_2_new
def reorder_nodes(cell, points):
"""Reorder nodes so that the node 0 to 3 corresponds to the face with inward normal vector.
Note: this function assumes the nodes in the cell are sorted by get_hexahedron_nodes.
"""
face_1 = cell[:4]
face_2 = cell[4:]
points_1 = points[:4]
points_2 = points[4:]
normal_1 = numpy.cross(points_1[1]-points_1[0], points_1[2]-points_1[0])
normal_2 = numpy.cross(points_2[1]-points_2[0], points_2[2]-points_2[0])
assert normal_1.dot(normal_2) > 0, f"\n{points_1}\n{points_2}"
vec04 = points_2[0] - points_1[0]
proj = vec04.dot(normal_1)
if proj < 0.:
face = face_2 + face_1
return face
def coplanar_nodes_argsort(nodes):
"""Ruturn sorted indices that sort coplanar nodes either clockwise or counter clockwise.
"""
assert nodes.shape == (4, 3), f"Nodes' shape should be (4, 3). Got {nodes.shape}"
center = numpy.sum(nodes, 0)
vecs = nodes - center
lengths = numpy.linalg.norm(vecs, axis=1)
x = vecs[0] / lengths[0]
z = numpy.cross(x, vecs[1]) / lengths[1]
y = numpy.cross(z, x)
assert abs(numpy.linalg.norm(y)-1.0) <= 1e-10
assert abs(numpy.linalg.norm(z)-1.0) <= 1e-10
assert numpy.allclose(numpy.dot(vecs, z), 0.0, 1e-10, 1e-10)
lx = numpy.dot(vecs, x)
ly = numpy.dot(vecs, y)
theta = numpy.arctan2(ly, lx)
return numpy.argsort(theta)
if __name__ == "__main__":
import pprint
loc = pathlib.Path.home().joinpath("pipeflow", "run", "airflow-OF-case-16-8-600-binary")
mesh = get_vtk_unstructuredgrid(loc)
print(mesh)
# print(mesh.cell_data)
# mesh.save("./test.vtk")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment