Created
June 8, 2024 09:25
-
-
Save leobeeson/c7997f4cf3670951d15857ceef0d6b75 to your computer and use it in GitHub Desktop.
Two methods for using an ontology .ttl file to guide data validation and transformations in an en ETL pipeline.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import rdflib | |
from rdflib.namespace import RDF, RDFS, OWL | |
# Create a Graph | |
g = rdflib.Graph() | |
# Parse the TTL file | |
ttl_file_path = 'path/to/your/ontology.ttl' | |
g.parse(ttl_file_path, format='ttl') | |
# Print the number of triples in the graph | |
print(f"Graph has {len(g)} statements.") | |
# Function to extract classes, properties, and required properties | |
def extract_classes_and_properties(graph: rdflib.Graph) -> dict: | |
data_models = {} | |
# Query to extract classes | |
class_query = """ | |
SELECT ?class | |
WHERE { | |
?class a owl:Class . | |
} | |
""" | |
for row in graph.query(class_query): | |
class_uri = str(row[0]) # Should be accessing `row.class`, but because `class` is a reserved keyword, we use `row[0] | |
data_models[class_uri] = {"properties": [], "required": []} | |
# Query to extract properties and their domains | |
property_query = """ | |
SELECT ?property ?domain | |
WHERE { | |
?property a rdf:Property ; | |
rdfs:domain ?domain . | |
} | |
""" | |
for row in graph.query(property_query): | |
property_uri = str(row.property) | |
domain_uri = str(row.domain) | |
if domain_uri in data_models: | |
data_models[domain_uri]["properties"].append(property_uri) | |
# Query to extract required properties (e.g., with minCardinality > 0) | |
required_property_query = """ | |
SELECT ?property ?domain | |
WHERE { | |
?restriction a owl:Restriction ; | |
owl:onProperty ?property ; | |
owl:minCardinality ?minCardinality ; | |
owl:onClass ?domain . | |
FILTER (?minCardinality > 0) | |
} | |
""" | |
for row in graph.query(required_property_query): | |
property_uri = str(row.property) | |
domain_uri = str(row.domain) | |
if domain_uri in data_models: | |
data_models[domain_uri]["required"].append(property_uri) | |
return data_models | |
data_models: dict = extract_classes_and_properties(g) | |
print("Extracted data models:", data_models) | |
# Define Validation and Transformation Functions | |
def validate_data(data: dict, model: dict) -> bool: | |
for key, value in data.items(): | |
if key not in model['properties']: | |
print(f"Invalid property: {key}") | |
return False | |
if key in model['required'] and not value: | |
print(f"Missing required property: {key}") | |
return False | |
return True | |
def transform_data(data: dict, model: dict) -> dict: | |
transformed_data = {} | |
for key, value in data.items(): | |
if key in model['properties']: | |
transformed_data[key] = value | |
return transformed_data | |
# Example data to validate and transform. You'll want to replace this with your own data, or feed it from your data ingestion pipeline, or some stage in the transformation pipeline. | |
person_data = { | |
"http://example.org/ontology#name": "John Doe", | |
"http://example.org/ontology#email": "john.doe@example.com" | |
} | |
# Get the data model for a specific class (example: http://example.org/ontology#Person) | |
person_model: dict = data_models.get("http://example.org/ontology#Person", {}) | |
# Validate data | |
is_valid: bool = validate_data(person_data, person_model) | |
print(f"Is data valid? {is_valid}") | |
# Transform data | |
transformed_person_data: dict = transform_data(person_data, person_model) | |
print("Transformed data:", transformed_person_data) | |
############## From ttl file to Pydantic models ############## | |
from rdflib.namespace import RDF, RDFS, OWL, XSD | |
# Function to extract classes, properties, and data types | |
def extract_classes_and_properties(graph: rdflib.Graph) -> dict: | |
data_models = {} | |
# Query to extract classes | |
class_query = """ | |
SELECT ?class | |
WHERE { | |
?class a owl:Class . | |
} | |
""" | |
for row in graph.query(class_query): | |
class_uri = str(row[0]) | |
class_name = class_uri.split('#')[-1] | |
data_models[class_name] = {"properties": {}, "required": []} | |
# Query to extract properties and their domains and ranges | |
property_query = """ | |
SELECT ?property ?domain ?range | |
WHERE { | |
?property a rdf:Property ; | |
rdfs:domain ?domain ; | |
rdfs:range ?range . | |
} | |
""" | |
for row in graph.query(property_query): | |
property_uri = str(row[0]) | |
domain_uri = str(row[1]) | |
range_uri = str(row[2]) | |
domain_name = domain_uri.split('#')[-1] | |
property_name = property_uri.split('#')[-1] | |
if domain_name in data_models: | |
data_models[domain_name]["properties"][property_name] = range_uri | |
# Query to extract required properties (e.g., with minCardinality > 0) | |
required_property_query = """ | |
SELECT ?property ?domain | |
WHERE { | |
?restriction a owl:Restriction ; | |
owl:onProperty ?property ; | |
owl:minCardinality ?minCardinality ; | |
owl:onClass ?domain . | |
FILTER (?minCardinality > 0) | |
} | |
""" | |
for row in graph.query(required_property_query): | |
property_uri = str(row[0]) | |
domain_uri = str(row[1]) | |
domain_name = domain_uri.split('#')[-1] | |
property_name = property_uri.split('#')[-1] | |
if domain_name in data_models: | |
data_models[domain_name]["required"].append(property_name) | |
return data_models | |
data_models = extract_classes_and_properties(g) | |
print("Extracted data models:", data_models) | |
# Mapping RDF data types to Python types | |
datatype_map = { | |
str(XSD.string): 'str', | |
str(XSD.integer): 'int', | |
str(XSD.float): 'float', | |
str(XSD.double): 'float', | |
str(XSD.boolean): 'bool', | |
str(XSD.dateTime): 'datetime', | |
str(XSD.date): 'date' | |
} | |
from pydantic import BaseModel, Field | |
from typing import List, Optional | |
from datetime import date, datetime | |
# Function to generate Pydantic models | |
def generate_pydantic_models(data_models: dict, datatype_map: dict) -> str: | |
models_code = "from pydantic import BaseModel, Field\nfrom typing import List, Optional\nfrom datetime import date, datetime\n\n" | |
for class_name, attributes in data_models.items(): | |
class_code = f"class {class_name}(BaseModel):\n" | |
if not attributes["properties"]: | |
class_code += " pass\n" | |
else: | |
for prop, prop_type in attributes["properties"].items(): | |
python_type = datatype_map.get(prop_type, 'str') | |
if prop in attributes["required"]: | |
class_code += f" {prop}: {python_type}\n" | |
else: | |
class_code += f" {prop}: Optional[{python_type}] = None\n" | |
models_code += class_code + "\n" | |
return models_code | |
models_code = generate_pydantic_models(data_models, datatype_map) | |
print(models_code) | |
with open('generated_models.py', 'w') as f: | |
f.write(models_code) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment