Created
December 5, 2019 06:49
-
-
Save vikas-git/22aa3ca5164c37737f020389950a85bf to your computer and use it in GitHub Desktop.
find distance between two points using google map api and plot it on folium.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# check attachment files | |
# find_distance.py | |
import os | |
from datetime import datetime | |
import googlemaps | |
from django.conf import settings | |
from packages.common_func import get_or_none, convert_float | |
from network.models import Distance, LatLong | |
class FindDistance(object): | |
def __init__(self): | |
pass | |
def convert_distance_in_km(self, distance): | |
new_distance = 0 | |
try: | |
if distance.find(' km'): | |
new_distance = distance.replace(' km', '').replace(',','') | |
elif distance.find(' m'): | |
distance = float(distance.replace(' m', '')).replace(',','') | |
new_distance = distance/1000 | |
else: | |
new_distance = distance | |
except Exception as e: | |
print(e) | |
return convert_float(new_distance) | |
def get_distance(self, origin, destination): | |
''' | |
This function return distance between origin and destination. | |
It search from db records if it does exist then it will hit google api and store result in db. | |
''' | |
distance_in_km = 0 | |
try: | |
search_data = get_or_none(Distance, origin= origin, destination= destination) | |
if search_data: | |
distance_in_km = convert_float(search_data.get('distance', 0.0)) | |
else: | |
# get origin and destination lat/long | |
origin_lat_long = get_or_none(LatLong, node=origin, status=1) | |
if origin_lat_long.get('latitude'): | |
origin_lat_long = origin_lat_long.get('latitude'), origin_lat_long.get('longitude') | |
destination_lat_long = get_or_none(LatLong, node=destination, status=1) | |
if destination_lat_long.get('latitude'): | |
destination_lat_long = (destination_lat_long.get('latitude'), destination_lat_long.get('longitude')) | |
try: | |
geo_dist = GeoDist(settings.GOOGLE_MAPS_API_KEY) | |
response = geo_dist.find_dist_and_time(origin_lat_long, destination_lat_long) | |
if response.get('distance'): | |
distance = self.convert_distance_in_km(response.get('distance')) | |
if distance > 0: | |
a_dict = { | |
'id': self.get_next_document_id(), | |
'origin': origin, | |
'destination': destination, | |
'distance': distance, | |
'duration': response.get('duration') | |
} | |
# save a_dict to database | |
Distance(**a_dict).save() | |
distance_in_km = distance | |
except Exception as e: | |
print(e) | |
except Exception as e: | |
print(e) | |
return distance_in_km | |
def get_next_document_id(self): | |
dis_obj = Distance.objects.latest('id') | |
next_id = 1 | |
if dis_obj: | |
next_id = int(dis_obj.id)+1 | |
return next_id | |
class GeoDist: | |
''' | |
Get Distance and duration from google map | |
''' | |
def __init__(self, key='XXX'): | |
self.gmaps = googlemaps.Client(key=key) | |
self.now = datetime.now() | |
def find_dist_and_time(self, source, destination, mode='driving', transit_mode='bus'): | |
now = datetime.now() | |
directions_result = self.gmaps.directions(source, destination, mode=mode, departure_time=now) | |
for map1 in directions_result: | |
overall_stats = map1['legs'] | |
for dimensions in overall_stats: | |
duration = dimensions['duration'] | |
distance = dimensions['distance'] | |
return {'distance': distance['text'], 'duration': duration['text']} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# geovis.py | |
# -*- coding: utf-8 -*- | |
""" | |
Created on Tuesday 23-July-2019 | |
@author: Vikas shrivastava | |
For more info go to official documentation of folium https://python-visualization.github.io/folium/modules.html | |
""" | |
import random | |
import logging | |
import folium | |
import pandas as pd | |
import numpy as np | |
from collections import namedtuple | |
logger = logging.getLogger('TaskLog') | |
class GeoVis: | |
def __init__(self, center_lat=23, center_lon=77, zoom=5, width='100%', height='100%'): | |
''' | |
This is constructor function for set center position of map | |
Parameters: | |
center_lat: default(23), | |
center_lon: default(77), | |
zoom: int default(5), | |
width: int default(100%), | |
height: int default(100%), | |
''' | |
self.m = folium.Map( | |
location=[center_lat, center_lon], | |
zoom_start=zoom, | |
width=width, | |
height=height | |
) | |
def mark_points(self, df, lat_col='latitude', lon_col='longitude', color='green', **other_constraint_params ): | |
try: | |
label_fields = other_constraint_params.get('label_fields', []) | |
node_type_col = other_constraint_params.get('node_type_col') | |
color_col = other_constraint_params.get('color_node') | |
size_col = other_constraint_params.get('size_node') | |
if not label_fields: | |
label_fields = list(df.columns) | |
response = self.apply_constraints(df, other_constraint_params) | |
df.apply( | |
lambda row: self.mark_point( | |
row[lat_col], row[lon_col], | |
self.make_html_for_tooltip(row, label_fields), | |
radius= 4.5, | |
color=color, | |
node_type_params=response.get('node_type_params', {}), | |
node_type_col_val= row[node_type_col] if node_type_col else None, | |
color_params=response.get('color_params', {}), | |
color_col_val=row[color_col] if color_col else None, | |
size_params=response.get('size_params', {}), | |
size_col_val=row[size_col] if size_col else None, | |
), | |
axis=1) | |
except Exception as e: | |
logger.error("mark_points() in packages/planning_script/geovis.py file : "+str(e)) | |
return self | |
def mark_point(self, lat, lon, label='DC', radius=1.5, color='blue', weight=0, opacity=0.7, point_type='circle_marker', **other_params): | |
if pd.isna(lat) or pd.isna(lon): | |
return | |
node_type_params = other_params.get('node_type_params', {}) | |
node_type_col_val = other_params.get('node_type_col_val') | |
if node_type_params and node_type_col_val: | |
try: | |
node_type_params['col_val'] = float(node_type_col_val) | |
node_type_params['default'] = point_type | |
point_type = self.manage_various_values(**node_type_params) | |
except Exception as e: | |
logger.error("mark_point() in packages/planning_script/geovis.py file : "+str(e)) | |
color_params = other_params.get('color_params', {}) | |
color_col_val = other_params.get('color_col_val') | |
if color_params and color_col_val: | |
try: | |
color_params['col_val'] = float(color_col_val) | |
color_params['default'] = color | |
color = self.manage_various_values(**color_params) | |
except Exception as e: | |
logger.error("mark_point() in packages/planning_script/geovis.py file : "+str(e)) | |
size_params = other_params.get('size_params', {}) | |
size_col_val = other_params.get('size_col_val') | |
if size_params and size_col_val: | |
try: | |
size_params['col_val'] = float(size_col_val) | |
size_params['default'] = float(radius) | |
radius = self.manage_various_values(**size_params) | |
except Exception as e: | |
logger.error("mark_point() in packages/planning_script/geovis.py file : "+str(e)) | |
params = { | |
'location': [lat, lon], | |
# 'popup': label, | |
'tooltip': label, | |
'color': color, | |
'line_color': False, | |
'fill': True, | |
'fill_color': color, | |
'fill_opacity': 0.3, | |
'weight': weight, | |
'radius': radius | |
} | |
try: | |
if point_type == 'circle': | |
folium.Circle(**params).add_to(self.m) | |
elif point_type == 'circle_marker': | |
folium.CircleMarker(**params).add_to(self.m) | |
elif point_type == 'marker': | |
params['icon'] = folium.Icon(color=color) | |
folium.Marker(**params).add_to(self.m) | |
except Exception as e: | |
logger.error("mark_point() in packages/planning_script/geovis.py file : "+str(e)) | |
return self | |
def connection_establish(self, points, color="#FF0000", weight=5, opacity=0.5): | |
''' | |
This function connect one lat, long to other lat, long | |
Parameters : | |
points : [(lat, long), (lat2, long2)] | |
color : hexadecimal code(example: #CD5C5C), default: #FF0000 | |
''' | |
folium.PolyLine( | |
points, | |
color=color, | |
weight=weight, | |
opacity=opacity | |
).add_to(self.m) | |
return self | |
def draw_connection(self, lat1, lon1, lat2, lon2, color='blue', weight=3, opacity=0.5, **other_params): | |
if pd.isna(lat1) or pd.isna(lon1) or pd.isna(lat2) or pd.isna(lon2): | |
return | |
p1 = [lat1, lon1] | |
p2 = [lat2, lon2] | |
cate_color_var = other_params.get('cate_color_var') | |
if cate_color_var: | |
pass | |
else: | |
color_params = other_params.get('color_params', {}) | |
color_col_val = other_params.get('color_col_val') | |
if color_params and color_col_val: | |
try: | |
color_params['col_val'] = float(color_col_val) | |
color_params['default'] = color | |
color = self.manage_various_values(**color_params) | |
except Exception as e: | |
logger.error("draw_connection() in packages/planning_script/geovis.py file : "+str(e)) | |
thickness_params = other_params.get('thickness_params', {}) | |
thickness_col_val = other_params.get('thickness_col_val') | |
if thickness_params and thickness_col_val: | |
try: | |
thickness_params['col_val'] = float(thickness_col_val) | |
thickness_params['default'] = weight | |
weight = self.manage_various_values(**thickness_params) | |
except Exception as e: | |
logger.error("draw_connection() in packages/planning_script/geovis.py file : "+str(e)) | |
folium.PolyLine( | |
[p1, p2], | |
color= color, | |
weight= weight, | |
opacity= opacity, | |
# tooltip= other_params.get('tooltip', '') | |
popup= other_params.get('tooltip', '') | |
).add_to(self.m) | |
return self | |
def connections(self, df, lat_col1='from_latitude', lon_col1='from_longitude', lat_col2='to_latitude', | |
lon_col2='to_longitude', color='blue', weight=2, opacity=0.5, **other_constraint_params): | |
label_fields = other_constraint_params.get('label_fields', []) | |
if not label_fields: | |
label_fields = list(df.columns) | |
color_col = other_constraint_params.get('color_node') | |
thickness_col = other_constraint_params.get('thickness_node') | |
response = self.apply_constraints(df, other_constraint_params) | |
cate_color_var = other_constraint_params.get('cate_color_var') | |
df.apply( | |
lambda row: self.draw_connection( | |
row[lat_col1], row[lon_col1], | |
row[lat_col2], row[lon_col2], | |
color = row['cate_color_code'] if cate_color_var else color, | |
weight=weight, | |
opacity=opacity, | |
tooltip=self.make_html_for_tooltip(row, label_fields), | |
color_params=response.get('color_params', {}), | |
color_col_val=row[color_col] if color_col else None, | |
thickness_params=response.get('thickness_params', {}), | |
thickness_col_val=row[thickness_col] if thickness_col else None, | |
cate_color_var=cate_color_var | |
), | |
axis = 1) | |
return self | |
def get_bearing(self, p1, p2): | |
''' | |
Returns compass bearing from p1 to p2 | |
Parameters | |
p1 : namedtuple with lat lon | |
p2 : namedtuple with lat lon | |
Return | |
compass bearing of type float | |
''' | |
long_diff = np.radians(p2.lon - p1.lon) | |
lat1 = np.radians(p1.lat) | |
lat2 = np.radians(p2.lat) | |
x = np.sin(long_diff) * np.cos(lat2) | |
y = (np.cos(lat1) * np.sin(lat2) - (np.sin(lat1) * np.cos(lat2) * np.cos(long_diff))) | |
bearing = np.degrees(np.arctan2(x, y)) | |
# adjusting for compass bearing | |
if bearing < 0: | |
return bearing + 360 | |
return bearing | |
def get_arrows(self, locations, color='blue', size=3, n_arrows=3): | |
''' | |
Get a list of correctly placed and rotated | |
arrows/markers to be plotted | |
Parameters | |
locations : list of lists of lat lons that represent the | |
start and end of the line. | |
eg [[41.1132, -96.1993],[41.3810, -95.8021]] | |
arrow_color : default is 'blue' | |
size : default is 6 | |
n_arrows : number of arrows to create. default is 3 | |
Return | |
list of arrows/markers | |
''' | |
Point = namedtuple('Point', field_names=['lat', 'lon']) | |
# creating point from our Point named tuple | |
p1 = Point(locations[0][0], locations[0][1]) | |
p2 = Point(locations[1][0], locations[1][1]) | |
# getting the rotation needed for our marker. | |
# Subtracting 90 to account for the marker's orientation | |
# of due East(get_bearing returns North) | |
# rotation = self.get_bearing(p1, p2) - 90 | |
# get an evenly space list of lats and lons for our arrows | |
# note that I'm discarding the first and last for aesthetics | |
# as I'm using markers to denote the start and end | |
arrow_lats = np.linspace(p1.lat, p2.lat, n_arrows + 2)[1:n_arrows+1] | |
arrow_lons = np.linspace(p1.lon, p2.lon, n_arrows + 2)[1:n_arrows+1] | |
arrows = [] | |
#creating each "arrow" and appending them to our arrows list | |
for points in zip(arrow_lats, arrow_lons): | |
arrows.append(folium.RegularPolygonMarker(location=points, | |
fill_color=color, number_of_sides=1, | |
radius=size, rotation=0, weight=1)) | |
return arrows | |
def manage_arrows(self, lat1, lon1, lat2, lon2): | |
if pd.isna(lat1) or pd.isna(lon1) or pd.isna(lat2) or pd.isna(lon2): | |
return | |
p1 = [lat1, lon1] | |
p2 = [lat2, lon2] | |
arrows = self.get_arrows([p1,p2]) | |
for arrow in arrows: | |
arrow.add_to(self.m) | |
return self | |
def connection_flow_arrow(self, df, lat_col1 = 'from_latitude', lon_col1 = 'from_longitude', lat_col2 = 'to_latitude', | |
lon_col2 = 'to_longitude'): | |
df.apply( | |
lambda row: self.manage_arrows( | |
row[lat_col1], row[lon_col1], | |
row[lat_col2], row[lon_col2] | |
), | |
axis = 1) | |
return self | |
def manage_arrows_without_df(self, points): | |
for i in range(len(points)-1): | |
arrows = self.get_arrows([points[i], points[i+1]]) | |
for arrow in arrows: | |
arrow.add_to(self.m) | |
return self | |
# local function | |
def make_html_for_tooltip(self, row, fields): | |
html = '' | |
for field in fields: | |
html += '<div><span><b>'+field +'</b>: <label>'+str(row[field])+'</label></br></span></div>' | |
return html | |
def apply_constraints(self, df, other_constraint_params={}): | |
response = {} | |
# for node type constraint | |
node_type_col = other_constraint_params.get('node_type_col') | |
if node_type_col: | |
node_type_filters = other_constraint_params.get('node_type_filters') | |
des_col = dict(df[node_type_col].describe()) | |
response['node_type_params'] = self.prepare_constraint_data(node_type_col, node_type_filters, des_col) | |
# for color of node | |
color_col = other_constraint_params.get('color_node') | |
if color_col: | |
color_filters = other_constraint_params.get('color_filters') | |
des_col = dict(df[color_col].describe()) | |
response['color_params'] = self.prepare_constraint_data(color_col, color_filters, des_col) | |
# for size of node | |
size_col = other_constraint_params.get('size_node') | |
if size_col: | |
size_filters = other_constraint_params.get('size_filters') | |
des_col = dict(df[size_col].describe()) | |
response['size_params'] = self.prepare_constraint_data(size_col, size_filters, des_col) | |
# for thickness of node | |
thickness_col = other_constraint_params.get('thickness_node') | |
if thickness_col: | |
thickness_filters = other_constraint_params.get('thickness_filters') | |
des_col = dict(df[thickness_col].describe()) | |
response['thickness_params'] = self.prepare_constraint_data(thickness_col, thickness_filters, des_col) | |
return response | |
def prepare_constraint_data(self, col_name, _filters, des_col): | |
_params = {} | |
if col_name: | |
_params = { | |
'col_max_val': des_col.get('max', '100'), | |
'col_min_val': des_col.get('min', '1'), | |
'_filters': _filters | |
} | |
return _params | |
def manage_various_values(self, col_max_val, col_min_val, _filters, col_val, default): | |
return_val = default | |
try: | |
x = ((col_val - col_min_val)/(col_max_val-col_min_val)) * 100 | |
for _filter in _filters: | |
if float(_filter.get('min_val', 0.0)) <= x <= float(_filter.get('max_val', 100.0)): | |
return_val = _filter.get('attr_value') | |
break | |
except Exception as e: | |
print(e) | |
return return_val | |
# def manage_color_for_connection(self, col_max_val, col_min_val, _filters, col_val): | |
# color_code = '#a94442' | |
# try: | |
# x = ((col_val - col_min_val)/(col_max_val-col_min_val)) * 100 | |
# for _filter in _filters: | |
# if float(_filter.get('min_val', 0.0)) <= x <= float(_filter.get('max_val', 100.0)): | |
# color_code = _filter.get('color_code') | |
# break | |
# except Exception as e: | |
# print(e) | |
# return color_code | |
# def manage_marker_type(self, col_max_val, col_min_val, _filters, col_val): | |
# point_type = 'circle_marker' | |
# try: | |
# x = ((col_val - col_min_val)/(col_max_val-col_min_val)) * 100 | |
# for _filter in _filters: | |
# if float(_filter.get('min_val', 0.0)) <= x <= float(_filter.get('max_val', 100.0)): | |
# point_type = _filter.get('icon_type') | |
# break | |
# except Exception as e: | |
# print(e) | |
# return point_type |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment