Created
March 21, 2024 13:35
-
-
Save Object905/8e1bc379d10227405d099b083b41989a to your computer and use it in GitHub Desktop.
Map tile server downloader
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Download tiles from tile sever "smart way" - fetchin for biggest zoom and then going down only if tile has some data | |
""" | |
from __future__ import annotations | |
import asyncio | |
import io | |
from itertools import batched | |
import math | |
import zlib | |
from collections import defaultdict | |
from dataclasses import dataclass, field | |
import logging | |
from io import BytesIO | |
from pathlib import Path | |
from typing import NewType, TypeAlias | |
import aioboto3 | |
import aiometer | |
import httpx | |
from asgiref.sync import sync_to_async | |
from django.conf import settings | |
from PIL import Image | |
from coverage.models import TileServer | |
from zones.models.models import ThisCountry | |
logger = logging.getLogger("tdl") | |
Z = NewType("Z", int) | |
X = NewType("X", int) | |
Y = NewType("Y", int) | |
Lat = NewType("Lat", float) | |
Lon = NewType("Lon", float) | |
PointDeg: TypeAlias = tuple[Lat, Lon] | |
PointTile: TypeAlias = tuple[X, Y] | |
Dest: TypeAlias = str | |
DEFAULT_USER_AGENT = ( | |
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36" | |
) | |
@dataclass | |
class UploadHandler: | |
client: httpx.AsyncClient | None = None | |
_empty_files_cache: dict[tuple[int, int], bool] = field(default_factory=dict) | |
def get_destination(self, p: PointTile, zoom: Z) -> Dest: | |
raise NotImplementedError(f"{self.__class__.__qualname__}.get_destination not implemented") | |
async def destination_exists(self, dest: Dest) -> bool: | |
return False | |
async def store(self, buf: BytesIO, dest: Dest): | |
raise NotImplementedError(f"{self.__class__.__qualname__}.store not implemented") | |
async def after_download(self, zoom: Z): | |
pass | |
async def on_miss(self, dest: Dest): | |
Path(dest).unlink(missing_ok=True) | |
async def fetch(self, request: httpx.Request, retry=3) -> tuple[BytesIO | None, int]: | |
"""Should return None instead of buf if img is empty or 404""" | |
logger.debug(f"GET {request.url}") | |
try: | |
resp = await self.client.send(request, stream=True, follow_redirects=True) | |
except (httpx.TransportError, httpx.TimeoutException) as e: | |
await asyncio.sleep(2) | |
if not retry: | |
raise e | |
return await self.fetch(request, retry=retry - 1) | |
if resp.status_code == 429 and retry: | |
logger.info(f"Rate limited {request.url}") | |
await resp.aclose() | |
await asyncio.sleep(3) | |
return await self.fetch(request, retry=retry - 1) | |
elif resp.status_code == 404: | |
await resp.aclose() | |
return None, request.idx | |
elif resp.status_code != 200: | |
await resp.aclose() | |
logger.error(f"Response error code: {resp.status_code}. At {request.url}") | |
return None, request.idx | |
logger.debug(f"Download {resp.url}") | |
buf = BytesIO() | |
async for chunk in resp.aiter_bytes(chunk_size=io.DEFAULT_BUFFER_SIZE): | |
buf.write(chunk) | |
buf.seek(0) | |
if filtered_buf := self.filter_image_buf(buf): | |
return filtered_buf, request.idx | |
else: | |
logger.debug(f"{resp.url} is an empty image, skipping") | |
return None, request.idx | |
def filter_image_buf(self, buf: BytesIO) -> BytesIO | None: | |
key = len(buf.getbuffer()), zlib.crc32(buf.getbuffer()) | |
if (cached := self._empty_files_cache.get(key)) is not None: | |
return cached | |
img = Image.open(buf) | |
is_empty = self.is_image_empty(img) | |
self._empty_files_cache[key] = is_empty | |
if is_empty: | |
return None | |
buf.seek(0) | |
return buf | |
@staticmethod | |
def is_image_empty(img: Image.Image) -> bool: | |
extrema = img.getextrema() | |
return extrema[0] == extrema[1] | |
@dataclass | |
class FileUploadHandler(UploadHandler): | |
folder: Path = Path("~/tiles/t2_lte450").expanduser() | |
def get_destination(self, p: PointTile, zoom: Z) -> str: | |
result = self.folder / str(zoom) / str(p[0]) / str(p[1]) | |
result = result.with_suffix(".png") | |
return str(result) | |
async def destination_exists(self, dest: str) -> bool: | |
return Path(dest).exists() | |
async def store(self, buf: BytesIO, dest: str): | |
Path(dest).parent.mkdir(exist_ok=True, parents=True) | |
with open(dest, "wb") as f: | |
f.write(buf.getbuffer()) | |
@dataclass | |
class TileScraper: | |
upload_handler: UploadHandler | |
skip_existing: bool = True | |
fetched: dict[Z, list[PointTile, Dest]] = field(default_factory=lambda: defaultdict(list)) | |
def get_possible_tiles_by_country(self, zoom: Z) -> list[PointTile]: | |
country_convex_hull = ThisCountry.get_solo().geometry.convex_hull | |
points = [] | |
for ring in country_convex_hull: | |
for point in ring: | |
# reversed because not matches postgis | |
points.append((point[1], point[0])) | |
return points_to_tiles(points, zoom) | |
async def rotate_client(self, tile_server) -> httpx.AsyncClient: | |
if self.upload_handler.client is not None: | |
await self.upload_handler.client.__aexit__(None, None, None) | |
client = httpx.AsyncClient( | |
http2=True, | |
timeout=60, | |
headers={"User-Agent": tile_server.user_agent or DEFAULT_USER_AGENT}, | |
follow_redirects=True, | |
limits=httpx.Limits( | |
max_connections=50, | |
max_keepalive_connections=25, | |
keepalive_expiry=10, | |
), | |
) | |
await client.__aenter__() | |
self.upload_handler.client = client | |
return client | |
async def run(self, tile_server: TileServer): | |
min_zoom_tiles = await sync_to_async(self.get_possible_tiles_by_country)(tile_server.min_zoom) | |
await self.download_tiles( | |
tiles=min_zoom_tiles, | |
zoom=tile_server.min_zoom, | |
tile_server=tile_server, | |
) | |
prev_zoom = tile_server.min_zoom | |
for zoom in range(tile_server.min_zoom + 1, tile_server.max_zoom + 1): | |
possible_tiles = set() | |
for prev_zoom_tile, _ in self.fetched[prev_zoom]: | |
possible_tiles.update(zoom_out_tiles(prev_zoom_tile, prev_zoom, zoom)) | |
prev_zoom = zoom | |
possible_tiles = sorted(possible_tiles) | |
if not possible_tiles: | |
continue | |
logger.info(f"Downloading {len(possible_tiles)} on zoom {zoom}") | |
await self.download_tiles( | |
tiles=possible_tiles, | |
zoom=zoom, | |
tile_server=tile_server, | |
) | |
async def download_tiles( | |
self, | |
tiles: list[PointTile], | |
zoom: Z, | |
tile_server: TileServer, | |
skip_non_existing=False, | |
): | |
await self.rotate_client(tile_server) | |
idx_to_req = {} | |
idx_to_dest = {} | |
idx_to_tile = {} | |
existing_idx = [] | |
for idx, tile in enumerate(tiles): | |
destination = self.upload_handler.get_destination(tile, zoom) | |
idx_to_dest[idx] = destination | |
idx_to_tile[idx] = tile | |
url = tile_server.url_template.format(z=zoom, x=tile[0], y=tile[1]) | |
request = httpx.Request("GET", url) | |
request.idx = idx | |
idx_to_req[idx] = request | |
if self.skip_existing: | |
tasks = [] | |
for dest in idx_to_dest.values(): | |
tasks.append(self.upload_handler.destination_exists(dest)) | |
dest_exists = await asyncio.gather(*tasks) | |
for idx, dest_exists in enumerate(dest_exists): | |
if not dest_exists: | |
continue | |
existing_idx.append(idx) | |
idx_to_req.pop(idx) | |
if skip_non_existing: | |
idx_to_req = {} | |
if idx_to_req: | |
for batch in batched(idx_to_req.values(), 50): | |
tasks = [] | |
async with aiometer.amap( | |
self.upload_handler.fetch, | |
args=batch, | |
max_at_once=tile_server.req_max_at_once, | |
max_per_second=tile_server.req_max_per_second, | |
) as results: | |
async for buf, idx in results: | |
destination = idx_to_dest[idx] | |
tile = idx_to_tile[idx] | |
if buf is None: | |
tasks.append(asyncio.create_task(self.upload_handler.on_miss(destination))) | |
continue | |
tasks.append(asyncio.create_task(self.upload_handler.store(buf, destination))) | |
self.fetched[zoom].append((tile, destination)) | |
await asyncio.gather(*tasks) | |
await asyncio.sleep(1) | |
await self.upload_handler.after_download(zoom) | |
for idx in existing_idx: | |
destination = idx_to_dest[idx] | |
tile = idx_to_tile[idx] | |
self.fetched[zoom].append((tile, destination)) | |
self.fetched[zoom].sort(key=lambda x: x[0]) | |
# ref https://wiki.openstreetmap.org/wiki/Slippy_map_tilenames | |
def coords_to_tile(point_deg: PointDeg, zoom: Z) -> PointTile: | |
"""Transform lat/lon into x,y with given zoom""" | |
lat_deg, lon_deg = point_deg | |
lat_rad = math.radians(lat_deg) | |
n = 1 << zoom | |
xtile = int((lon_deg + 180.0) / 360.0 * n) | |
ytile = int((1.0 - math.asinh(math.tan(lat_rad)) / math.pi) / 2.0 * n) | |
return xtile, ytile | |
def tile_to_coords(point_tile: PointTile, zoom: Z) -> PointDeg: | |
"""This returns the NW-corner of the square""" | |
x, y = point_tile | |
n = 1 << zoom | |
lon_deg = x / n * 360.0 - 180.0 | |
lat_rad = math.atan(math.sinh(math.pi * (1 - 2 * y / n))) | |
lat_deg = math.degrees(lat_rad) | |
return lat_deg, lon_deg | |
def points_to_tiles(points: list[PointDeg], zoom: Z) -> list[PointTile]: | |
"""Return the biggest square of all possible x,y that will fit given points""" | |
possible_tiles = set() | |
for point in points: | |
tile = coords_to_tile(point, zoom) | |
possible_tiles.add(tile) | |
min_x = min(x for x, _ in possible_tiles) | |
min_x = max(min_x, 1) | |
max_x = max(x for x, _ in possible_tiles) | |
min_y = min(y for _, y in possible_tiles) | |
min_y = max(min_y, 1) | |
max_y = max(y for _, y in possible_tiles) | |
result = [] | |
for x in range(min_x, max_x + 1): | |
for y in range(min_y, max_y + 1): | |
result.append((x, y)) | |
return result | |
def tile_to_bb(point_tile: PointTile, zoom: Z) -> tuple[PointDeg, PointDeg, PointDeg, PointDeg]: | |
"""NW, NE, SE, SW corners of bounding box from point tile""" | |
nw = tile_to_coords(point_tile, zoom) | |
ne = tile_to_coords((point_tile[0] + 1, point_tile[1]), zoom) | |
se = tile_to_coords((point_tile[0] + 1, point_tile[1] + 1), zoom) | |
sw = tile_to_coords((point_tile[0], point_tile[1] + 1), zoom) | |
return nw, ne, se, sw | |
def zoom_out_tiles(tile: PointTile, old_zoom: Z, new_zoom: Z) -> list[PointTile]: | |
bb = tile_to_bb(tile, old_zoom) | |
return points_to_tiles(bb, new_zoom) | |
def tele2_lte(): | |
tile_server = TileServer( | |
name="Tele2 LTE", | |
key="t2_lte", | |
url_template="https://msk.tele2.ru/maps/_4g/{z}/{x}/{y}.png", | |
min_zoom=4, | |
max_zoom=12, | |
user_agent="Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36", | |
req_max_at_once=10, | |
req_max_per_second=20, | |
) | |
upload_handler = FileUploadHandler(folder=Path("~/tiles/").expanduser() / tile_server.key) | |
scraper = TileScraper(upload_handler=upload_handler) | |
asyncio.run(scraper.run(tile_server), debug=True) | |
if __name__ in ("__main__", "django.core.management.commands.shell"): | |
logging.basicConfig(level=logging.INFO) | |
logger.setLevel(logging.DEBUG) | |
tele2_lte() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment