Created
June 7, 2010 14:37
-
-
Save durdn/428729 to your computer and use it in GitHub Desktop.
parser.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
try: | |
from lxml import etree | |
except: | |
from xml.etree import ElementTree as etree | |
from datetime import datetime | |
class Record(object): | |
""" Stores data about an artifact in TMS """ | |
def __init__(self, **kwds): | |
self.__dict__.update(kwds) | |
def __str__(self): | |
state = ["\t%s = %r" % (attribute, value) for (attribute, value) in self.__dict__.items()] | |
return '\n'.join(state) | |
class Author(object): | |
""" Stores data about an author in TMS """ | |
def __init__(self, **kwds): | |
self.__dict__.update(kwds) | |
def __str__(self): | |
state = ["\t%s = %r" % (attribute, value) for (attribute, value) in self.__dict__.items()] | |
return '\n'.join(state) | |
class TMSParser(object): | |
""" Can interpret messages coming back from a TMS server conforming to the | |
Dublin Core standard as customized by Fabrique. | |
Check the tms/xsd folder to get a feel for the xml format | |
The parse method reads an xml response and creates a list of Record(s) | |
""" | |
def __init__(self,disable_validation=False): | |
if disable_validation: | |
self.disable_validation = True | |
else: | |
self.disable_validation = False | |
def validate(self,xml_file, schema='apps/tms/xsd/fabrique-dc.cached.xsd'): | |
if self.disable_validation: | |
return True | |
#parse the xsd | |
xmlschema_doc = etree.parse(file(schema)) | |
xmlschema = etree.XMLSchema(xmlschema_doc) | |
#parse xml message | |
doc = etree.parse(xml_file) | |
#validate | |
return xmlschema.validate(doc) | |
def _xml_parse_and_validate(self,xml_file, schema='apps/tms/xsd/fabrique-dc.cached.xsd'): | |
#parse the xsd | |
xmlschema_doc = etree.parse(file(schema)) | |
xmlschema = etree.XMLSchema(xmlschema_doc) | |
#parse xml message | |
doc = etree.parse(xml_file) | |
#return validation result and actual parsed response | |
return xmlschema.validate(doc),doc | |
def _xml_parse(self,xml_file, schema='apps/tms/xsd/fabrique-dc.cached.xsd'): | |
doc = etree.parse(xml_file) | |
return doc | |
@staticmethod | |
def _remove_ns_attribute(attribute): | |
newattr = {} | |
for k in attribute.keys(): | |
curly_end = k.find('}') | |
if curly_end > 0: | |
ntag = k[curly_end+1:] | |
else: | |
ntag = k | |
newattr[ntag] = attribute[k] | |
return newattr | |
@staticmethod | |
def _remove_namespace(tag): | |
try: | |
curly_end = tag.find('}') | |
if curly_end > 0: | |
ntag = tag[curly_end+1:] | |
else: | |
ntag = tag | |
return ntag | |
except AttributeError: | |
return tag | |
def _handle_single_with_translation(self,tags,element,record): | |
#remove namespace | |
tag = self._remove_namespace(element.tag) | |
if tag in tags: | |
#this types have translations | |
try: | |
if record.__getattribute__(tag): | |
try: | |
record.__getattribute__(tag).update({self._remove_ns_attribute(element.attrib)['lang'] : unicode(element.text).strip()}) | |
except KeyError: | |
#print 'stuff is broken',record,tag,element | |
pass | |
except AttributeError: | |
try: | |
record.__setattr__(tag, | |
{ self._remove_ns_attribute(element.attrib)['lang'] : unicode(element.text).strip()}) | |
except KeyError: | |
#print 'stuff is broken',record,tag,element | |
pass | |
return record | |
def _handle_location(self,element,record): | |
tag = self._remove_namespace(element.tag) | |
if tag == 'location': | |
record.__setattr__(tag, unicode(element.text).strip()) | |
record.__setattr__('location_type', self._remove_ns_attribute(element.attrib)['type']) | |
return record | |
def _handle_simple(self,tags,element,record): | |
tag = self._remove_namespace(element.tag) | |
if tag in tags: | |
#simplest of types | |
record.__setattr__(tag, unicode(element.text).strip()) | |
return record | |
def _handle_link(self,element,record): | |
tag = self._remove_namespace(element.tag) | |
if tag == 'link': | |
if len(element) > 0: | |
for url in element: | |
curr_url = unicode(url.text).strip() | |
try: | |
if record.__getattribute__(element.tag): | |
record.__getattribute__(element.tag).append([curr_url]) | |
except AttributeError: | |
record.__setattr__(element.tag,[]) | |
record.__getattribute__(element.tag).append([curr_url]) | |
curr_item = record.__getattribute__(element.tag)[-1] | |
for title in url: | |
if title.tag == 'title' and title.text: | |
ttl = title.text.strip() | |
try: | |
lang = self._remove_ns_attribute(title.attrib)['lang'] | |
curr_item.append({}) | |
curr_item[-1][lang] = ttl | |
except KeyError: | |
curr_item[-1]['nl'] = ttl | |
return record | |
def _handle_media(self,element,record): | |
tag = self._remove_namespace(element.tag) | |
if tag == 'media': | |
if element[0].tag == 'primary-image': | |
try: | |
url = unicode(element[0][0].text).strip() | |
record.__setattr__(element[0].tag.replace('-','_'), url) | |
except IndexError: | |
record.__setattr__(element[0].tag.replace('-','_'), '') | |
return record | |
for rest in element[1:]: | |
if len(rest) > 0: | |
for url in rest: | |
curr_url = unicode(url.text).strip() | |
try: | |
if record.__getattribute__(rest.tag): | |
record.__getattribute__(rest.tag).append([curr_url]) | |
except AttributeError: | |
record.__setattr__(rest.tag,[]) | |
record.__getattribute__(rest.tag).append([curr_url]) | |
curr_item = record.__getattribute__(rest.tag)[-1] | |
for title in url: | |
if title.tag == 'title' and title.text: | |
ttl = title.text.strip() | |
try: | |
lang = self._remove_ns_attribute(title.attrib)['lang'] | |
curr_item.append({}) | |
curr_item[-1][lang] = ttl | |
except KeyError: | |
curr_item[-1]['nl'] = ttl | |
else: | |
#print 'no primary-image for this item: stopped parsing media element' | |
return record | |
return record | |
def _handle_date(self,tags,element,record): | |
tag = self._remove_namespace(element.tag) | |
if tag in tags: | |
#simplest of types | |
record.__setattr__(tag, datetime.strptime(element.text, "%Y-%m-%d")) | |
return record | |
def _handle_creator(self,element,record): | |
tag = self._remove_namespace(element.tag) | |
if tag == 'creator': | |
try: | |
record.__getattribute__(tag) | |
except AttributeError: | |
record.__setattr__(tag,[]) | |
creator_list = record.__getattribute__(tag) | |
creator_list.append({'name': unicode(element.text).strip(), | |
'role': element.attrib['role'], | |
'lang': self._remove_ns_attribute(element.attrib)['lang']}) | |
return record | |
def _handle_list(self,tags,element,record): | |
tag = self._remove_namespace(element.tag) | |
#creates a list | |
if tag in tags: | |
try: | |
if record.__getattribute__(tag): | |
record.__getattribute__(tag).append(unicode(element.text).strip()) | |
except AttributeError: | |
record.__setattr__(tag,[unicode(element.text).strip()]) | |
return record | |
def _handle_list_with_translation(self,tags,element,record): | |
tag = self._remove_namespace(element.tag) | |
#this types have translations | |
if tag in tags: | |
try: | |
if record.__getattribute__(tag): | |
record.__getattribute__(tag).setdefault(self._remove_ns_attribute(element.attrib)['lang'],[]).append(element.text) | |
except AttributeError: | |
record.__setattr__(tag,{}) | |
record.__getattribute__(tag).setdefault(self._remove_ns_attribute(element.attrib)['lang'],[]).append(element.text) | |
return record | |
def _parse_record(self,xml_record): | |
record = Record() | |
nested_record = xml_record[1][0] | |
for element in nested_record: | |
#refactor this to have a big if switch that dispatches to these functions? | |
record = self._handle_media(element,record) | |
record = self._handle_link(element,record) | |
record = self._handle_single_with_translation(('title','description', | |
'temporal','background','objectname'),element,record) | |
record = self._handle_simple(('type', 'spatial','resources','credits', | |
'format','source','subject','departement','century', | |
'identifier'),element,record) | |
record = self._handle_date(('date', 'dateSubmitted'),element,record) | |
record = self._handle_list(('relation','dimensions'), element, record) | |
record = self._handle_creator(element, record) | |
record = self._handle_location(element, record) | |
record = self._handle_list_with_translation(('instructionalMethod','hasPart','coverage', | |
'bibliographicCitation','alternative'), element, record) | |
return record | |
def parse(self,xml_file): | |
""" Converts a xml response from TMS in a list of Record(s) """ | |
result = [] | |
xml = self._xml_parse(xml_file) | |
resultset = xml.getroot()[2] | |
request = xml.getroot()[1] | |
meta = [request[1].text,request[2].text,request[3].text] | |
for r in resultset: | |
record = self._parse_record(r) | |
result.append(record) | |
if len(result) > 0: | |
result[0].meta = meta | |
return result | |
class TMSAuthorParser(object): | |
""" Can interpret messages coming back from a TMS server describing | |
Authors and Creators | |
Check the tms/sample folder to get a feel for the xml format | |
The parse method reads an xml response and creates a list of TMSAuthor(s) | |
""" | |
def __init__(self,disable_validation=False): | |
if disable_validation: | |
self.disable_validation = True | |
else: | |
self.disable_validation = False | |
def validate(self,xml_file, schema='apps/tms/xsd/fabrique-dc.cached.xsd'): | |
if self.disable_validation: | |
return True | |
#parse the xsd | |
xmlschema_doc = etree.parse(file(schema)) | |
xmlschema = etree.XMLSchema(xmlschema_doc) | |
#parse xml message | |
doc = etree.parse(xml_file) | |
#validate | |
return xmlschema.validate(doc) | |
def _xml_parse_and_validate(self,xml_file, schema='apps/tms/xsd/fabrique-dc.cached.xsd'): | |
#parse the xsd | |
xmlschema_doc = etree.parse(file(schema)) | |
xmlschema = etree.XMLSchema(xmlschema_doc) | |
#parse xml message | |
doc = etree.parse(xml_file) | |
#return validation result and actual parsed response | |
return xmlschema.validate(doc),doc | |
def _xml_parse(self,xml_file, schema='apps/tms/xsd/fabrique-dc.cached.xsd'): | |
#parse xml message | |
doc = etree.parse(xml_file) | |
#return parsed response | |
return doc | |
@staticmethod | |
def _remove_ns_attribute(attribute): | |
newattr = {} | |
for k in attribute.keys(): | |
curly_end = k.find('}') | |
if curly_end > 0: | |
ntag = k[curly_end+1:] | |
else: | |
ntag = k | |
newattr[ntag] = attribute[k] | |
return newattr | |
@staticmethod | |
def _remove_namespace(tag): | |
try: | |
curly_end = tag.find('}') | |
if curly_end > 0: | |
ntag = tag[curly_end+1:] | |
else: | |
ntag = tag | |
return ntag | |
except AttributeError: | |
return tag | |
def _handle_single_with_translation(self,tags,element,author): | |
#remove namespace | |
tag = self._remove_namespace(element.tag) | |
if tag in tags: | |
#this types have translations | |
try: | |
if author.__getattribute__(tag): | |
author.__getattribute__(tag).update( | |
{self._remove_ns_attribute(element.attrib)['lang'] : unicode(element.text).strip()}) | |
except AttributeError: | |
author.__setattr__(tag, | |
{ self._remove_ns_attribute(element.attrib)['lang'] : unicode(element.text).strip()}) | |
return author | |
def _handle_simple(self,tags,element,author): | |
tag = self._remove_namespace(element.tag) | |
if tag in tags: | |
#simplest of types | |
author.__setattr__(tag, unicode(element.text).strip()) | |
return author | |
def _parse_author(self,xml_author): | |
author = Author() | |
nested_author = xml_author[1][0] | |
for element in nested_author: | |
author = self._handle_single_with_translation(('biografie',),element,author) | |
author = self._handle_simple(('ConstituentID','ccidentifier','DisplayDate', | |
'DisplayName','BeginDate','EndDate', | |
'FirstName','LastName'),element,author) | |
return author | |
def parse(self,xml_file): | |
""" Converts a xml response from TMS in a list of Author(s) """ | |
result = [] | |
xml = self._xml_parse(xml_file) | |
resultset = xml.getroot()[2] | |
for r in resultset: | |
author = self._parse_author(r) | |
result.append(author) | |
return result |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment