Last active
May 4, 2018 22:24
-
-
Save vicmortelmans/9b3b7d7ae148deb2a4d86b7f5628526a to your computer and use it in GitHub Desktop.
QGIS script with as inputs a CRAB address layer and a table defining parishes by address ranges and as output the CRAB layer with parish attribute added
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
##crab=vector | |
##parishes=table | |
##crab_with_parishes=output vector | |
from qgis.core import * | |
from PyQt4.QtCore import * | |
import processing | |
import re | |
import csv | |
import datetime | |
#import pdb | |
#pyqtRemoveInputHook() | |
#pdb.set_trace() | |
#define the field names used in the parishes table | |
STREET = "Straat" | |
POSTAL_CODE = "Postcode" | |
PARISH = "Parochie" | |
CITY = "Gemeente" | |
START_ODD = "begin_oneven" | |
END_ODD = "einde_oneven" | |
START_EVEN = "begin_even" | |
END_EVEN = "einde_even" | |
ID = "organisatie_id" | |
def is_odd(number): | |
if int(number) % 2 == 0: | |
return False | |
else: | |
return True | |
def write_list_of_lists_to_csv(list_of_lists): | |
with open("errors-{:%Y%m%d-%H%M%S}.csv".format(datetime.datetime.now()), "wb") as file: | |
writer = csv.writer(file) | |
for list in list_of_lists: | |
list = [s.encode('utf-8') if isinstance(s, basestring) else str(s).encode('utf-8') for s in list] | |
writer.writerows([list]) | |
return | |
def get_matching_ranges_request(city, street, parishes_table): | |
# this function returns the request, not the actual ranges, because the caller wants to loop through the | |
# iterator more than once | |
levenshtein = 0 | |
while True: | |
exp_string = u"levenshtein(lower({}), lower('{}')) <= {} AND levenshtein(lower({}), lower('{}')) <= {}".format(CITY, city.replace("'", r"\'"), levenshtein, STREET, street.replace("'", r"\'"), levenshtein) | |
exp = QgsExpression(exp_string) | |
request = QgsFeatureRequest(exp) | |
print u"Finding ranges matching {}".format(exp_string) | |
ranges = parishes_table.getFeatures(request) | |
range_city_street = '' | |
empty = True | |
unique = True | |
for range in ranges: # /!\ fetching the last element will close the iterator, even rewind() won't work | |
empty = False | |
if not range_city_street: | |
range_city_street = range[CITY] + ' ' + range[STREET] | |
unique = range_city_street == range[CITY] + ' ' + range[STREET] | |
if not unique: | |
break | |
if not empty and unique: | |
# re-create the iterator, as it has been closed by looping through it | |
print "Success: found unique matching range" | |
return request | |
if not empty and not unique: | |
print "Problem: found more than one matching range" | |
return None | |
if levenshtein > 5: | |
# something's definitely wrong | |
print "Problem: found no matching range" | |
return None | |
if empty: | |
levenshtein += 1 | |
def get_addresses(city, street, address_layer): | |
exp = QgsExpression(u"GEMEENTE = '{}' AND STRAATNM = '{}' AND APPTNR IS NULL".format(city.replace("'", r"\'"), street.replace("'", r"\'"))) | |
# addresses with appartments have multiple entities, but it looks like there's always one with | |
# no APPTNR value | |
request = QgsFeatureRequest(exp) | |
addresses = address_layer.getFeatures(request) | |
return addresses | |
def get_number_as_float(number_string): | |
# when a number has a letter suffix, e.g. 34A, it is returned as a decimal number where the decimal is the | |
# position of the letter in the alphabet, e.g. 34.01 | |
# irregular suffixes, e.g. 'bis' are truncated to the first character | |
m = re.search("([0-9]+)([a-zA-Z])?", str(number_string)) | |
number = float(m.group(1)) | |
addendum = m.group(2) | |
if addendum: | |
print "Addendum {}".format(addendum) | |
addendum = ord(addendum.lower()) - 96 | |
else: | |
addendum = 0 | |
number = number + addendum / 100 | |
return number | |
def get_number_from_address_as_float(address): | |
number_string = address['HUISNR'] | |
return get_number_as_float(number_string) | |
def address_in_range(address, range): | |
MAX = 9999 | |
number = get_number_from_address_as_float(address) | |
odd_min = range[START_ODD] | |
odd_max = range[END_ODD] | |
even_min = range[START_EVEN] | |
even_max = range[END_EVEN] | |
if not odd_min: | |
odd_min = 0.0 | |
if not odd_max: | |
odd_max = MAX | |
else: | |
odd_min = get_number_as_float(odd_min) | |
if not even_min: | |
even_min = 0.0 | |
if not even_max: | |
even_max = MAX | |
else: | |
even_min = get_number_as_float(even_min) | |
if odd_max == 'ev': | |
odd_max = MAX | |
else: | |
odd_max = get_number_as_float(odd_max) | |
if even_max == 'ev': | |
even_max = MAX | |
else: | |
even_max = get_number_as_float(even_max) | |
if (is_odd(number) and odd_min <= number and number <= odd_max) or (not is_odd(number) and even_min <= number and number <= even_max): | |
return True | |
else: | |
return False | |
def error_range(range): | |
return u"odd {}-{} even {}-{}".format(range[START_ODD], range[END_ODD], range[START_EVEN], range[END_EVEN]) | |
def assign_parish_to_addresses(crab_obj, parishes_obj, fields_iterator, writer): | |
#errors = [['city', 'street', 'number', 'error', 'range']] | |
# get a list of unique values for combination of city and street | |
all_addresses = crab_obj.getFeatures() | |
unique_city_street_values = [] | |
for address in all_addresses: | |
city_street = address['GEMEENTE'] + "_" + address['STRAATNM'] | |
if city_street not in unique_city_street_values: | |
unique_city_street_values.append(city_street) | |
# iterate per group of city_street | |
for city_street in unique_city_street_values: | |
print u"Processing {}".format(city_street) | |
city, street = city_street.split('_', 1) | |
addresses = get_addresses(city, street, crab_obj) | |
request = get_matching_ranges_request(city, street, parishes_obj) | |
for address in addresses: | |
#number = get_number_as_float(address) | |
parish = '' | |
any_ranges = False | |
multiple_ranges_match = False | |
if request: | |
ranges = parishes_obj.getFeatures(request) | |
for range in ranges: | |
print u"Assessing range {}".format(error_range(range)) | |
any_ranges = True | |
if address_in_range(address, range): | |
print u"Address in range {}".format(get_number_from_address_as_float(address)) | |
if parish: | |
multiple_ranges_match = True | |
break | |
else: | |
parish = range[PARISH] | |
organisatie_id = range[ID] | |
if not any_ranges: | |
print "No ranges found" | |
pass | |
elif multiple_ranges_match: | |
print "Multiple ranges apply" | |
pass | |
elif not parish: | |
print "No range applies" | |
pass | |
else: | |
# success! | |
out_address = QgsFeature(fields_iterator) | |
out_address.setGeometry(address.geometry()) | |
attrs = address.attributes() | |
attrs.append(parish + "#" + organisatie_id) # this value will go into the field 'parish' | |
out_address.setAttributes(attrs) | |
writer.addFeature(out_address) | |
# turn the input filename into a vector layer | |
crab_obj = processing.getObject(crab) | |
parishes_obj = processing.getObject(parishes) | |
# read the input layer fields | |
crab_provider = crab_obj.dataProvider() | |
fields_iterator = crab_obj.fields() | |
# add field for aggregated string containing parish and organisatie_id if needed | |
# note that shapefile filed names have a 10 character size limit | |
field_names = [field.name() for field in fields_iterator] | |
if not 'parish' in field_names: | |
print "adding field 'parish'" | |
fields_iterator.append(QgsField("parish", QVariant.String)) | |
# get a features iterator | |
features_iterator = crab_obj.getFeatures() | |
# turn the output filename into a vector layer with the same settings | |
# as the input layer... well actually there's no layer object, only a writer | |
settings = QSettings() | |
systemEncoding = settings.value('/UI/encoding', 'System') | |
writer = QgsVectorFileWriter(crab_with_parishes, systemEncoding, | |
fields_iterator, | |
crab_provider.geometryType(), | |
crab_provider.crs()) | |
assign_parish_to_addresses(crab_obj, parishes_obj, fields_iterator, writer) | |
del writer |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment