Source code for wrfup.download

# download.py in wrfup
import os
import requests
import zipfile
import io
import logging
import rasterio
from tqdm.auto import tqdm
from rasterio.merge import merge

[docs]def merge_tiles(tile_paths, output_path): """ Merge multiple tiles into a single mosaic and save as a compressed GeoTIFF. Args: tile_paths (list): List of file paths for the tiles to be merged. output_path (str): Path to save the compressed merged file. """ # Open the raster files src_files_to_mosaic = [rasterio.open(tile) for tile in tile_paths] # Merge the files into one mosaic mosaic, out_trans = merge(src_files_to_mosaic) # Define metadata for the output file, based on one of the inputs out_meta = src_files_to_mosaic[0].meta.copy() # Update the metadata to reflect the mosaic's properties and add compression out_meta.update({ "driver": "GTiff", "height": mosaic.shape[1], "width": mosaic.shape[2], "transform": out_trans, "compress": "lzw" # Use LZW compression }) # Close all the source files after merging for src in src_files_to_mosaic: src.close() # Write the compressed merged file to the output path with rasterio.open(output_path, "w", **out_meta) as dest: dest.write(mosaic) logging.info(f"Compressed merged file saved at {output_path}")
[docs]def download_tiles(tile_names, save_dir, download_url): """ Download the urban fraction or URB_PARAM tiles for the given tile names. Args: tile_names (list): List of tile names to download. save_dir (str): Directory to save the downloaded tiles. download_url (str): Base URL for downloading the tiles. """ if not os.path.exists(save_dir): os.makedirs(save_dir) tile_paths = [] # Calculate the total size of all the tiles to be downloaded total_size_in_bytes, unknown_size = get_total_download_size(tile_names, download_url) total_size_in_mb = total_size_in_bytes / (1024 * 1024) # Ask the user for confirmation before downloading if unknown_size: confirm = input(f"Total download size is at least {total_size_in_mb:.2f} MB, but some sizes are unknown. Do you want to proceed? (y/n): ").lower() else: confirm = input(f"Total download size is {total_size_in_mb:.2f} MB. Do you want to proceed? (y/n): ").lower() if confirm != 'y': logging.info("Download canceled by the user.") return # Use tqdm to display the progress of the download for file_num, tile_name in enumerate(tile_names): file_name = f"{tile_name}.zip" zip_file_url = f"{download_url}/{file_name}" download_and_extract_zip(zip_file_url, save_dir, file_num + 1, len(tile_names)) tile_paths.append(os.path.join(save_dir, f"{tile_name}.tif")) # If multiple tiles were downloaded, merge them if len(tile_paths) > 1: output_path = os.path.join(save_dir, "merged_tiles.tif") merge_tiles(tile_paths, output_path) else: logging.info("Only one tile downloaded. No merging required.") output_path = tile_paths[0] return output_path
[docs]def lat_lon_to_tile_indices(lat, lon, grid_rows=16, grid_cols=16): """ Convert latitude and longitude to grid tile index based on zoom level. Args: lat (float): Latitude in degrees. lon (float): Longitude in degrees. grid_rows (int): Number of rows in the grid. grid_cols (int): Number of columns in the grid. Returns: (int, int): Row and column index of the tile. """ lat_min = -60 lat_max = 84 lat_relative = (lat - lat_max) / (lat_min - lat_max) lon_relative = (lon + 180) / 360 row_index = int(lat_relative * grid_rows) col_index = int(lon_relative * grid_cols) return min(max(row_index, 0), grid_rows - 1), min(max(col_index, 0), grid_cols - 1)
[docs]def get_tile_names_in_aoi(lat_min, lat_max, lon_min, lon_max, field): """ Get the list of tile names for the area of interest (AOI) based on latitude and longitude, with different naming conventions for FRC_URB2D and URB_PARAM fields. Args: lat_min (float): Minimum latitude of AOI. lat_max (float): Maximum latitude of AOI. lon_min (float): Minimum longitude of AOI. lon_max (float): Maximum longitude of AOI. field (str): The field type ('FRC_URB2D' or 'URB_PARAM'). Returns: list: List of tile names. """ tile_names = set() for lat in [lat_min, lat_max]: for lon in [lon_min, lon_max]: row_idx, col_idx = lat_lon_to_tile_indices(lat, lon) if field == 'FRC_URB2D': tile_name = f"{row_idx:02d}_{col_idx:02d}_zoom4_urban_fraction_100m_int8" elif field == 'URB_PARAM': tile_name = f"{row_idx:02d}_{col_idx:02d}_zoom4_URB_PARAM_100m" else: raise ValueError(f"Unknown field type: {field}") tile_names.add(tile_name) return list(tile_names)
[docs]def download_and_extract_zip(zip_url, extraction_path, file_num, total_files): """Download and extract a zip file from a URL with real-time progress tracking.""" response = requests.get(zip_url, stream=True) if response.status_code == 200: # Get the total file size from the headers total_size_in_bytes = int(response.headers.get('content-length', 0)) block_size = 1024 # 1 Kilobyte chunks # Create a progress bar using tqdm progress_bar = tqdm(total=total_size_in_bytes, unit='iB', unit_scale=True, desc=f"Downloading file {file_num} of {total_files}") # Create a buffer to store the downloaded file buffer = io.BytesIO() for data in response.iter_content(block_size): progress_bar.update(len(data)) buffer.write(data) progress_bar.close() # Once the file is fully downloaded, we can extract it buffer.seek(0) # Reset buffer position to the start with zipfile.ZipFile(buffer) as thezip: thezip.extractall(extraction_path) logging.info(f"File downloaded and extracted: {zip_url}") else: logging.error(f"Failed to download the file: {zip_url}")
[docs]def get_total_download_size(tile_names, download_url): """ Calculate the total size of all tiles to be downloaded. Args: tile_names (list): List of tile names. download_url (str): Base URL to check file size. Returns: int: Total size in bytes. """ total_size = 0 unknown_size = False for tile_name in tile_names: file_name = f"{tile_name}.zip" zip_file_url = f"{download_url}/{file_name}" # Try a GET request to fetch the content-length header try: response = requests.get(zip_file_url, stream=True) if response.status_code == 200: file_size = int(response.headers.get('content-length', 0)) if file_size > 0: total_size += file_size else: logging.warning(f"Could not retrieve size for {zip_file_url}. Marking as unknown size.") unknown_size = True else: logging.warning(f"Could not retrieve size for {zip_file_url}. Skipping...") except requests.RequestException as e: logging.error(f"Error fetching file size for {zip_file_url}: {e}") unknown_size = True return total_size, unknown_size
# def download_and_extract_zip(zip_url, extraction_path): # """Download and extract a zip file from a URL with real-time progress tracking.""" # response = requests.get(zip_url, stream=True) # if response.status_code == 200: # total_size_in_bytes = int(response.headers.get('content-length', 0)) # block_size = 1024 # 1 Kilobyte chunks # # Create a progress bar using tqdm # progress_bar = tqdm(total=total_size_in_bytes, unit='iB', unit_scale=True, desc=f"Downloading {zip_url}") # buffer = io.BytesIO() # for data in response.iter_content(block_size): # progress_bar.update(len(data)) # buffer.write(data) # progress_bar.close() # # Extract the downloaded zip file # buffer.seek(0) # Reset buffer position to the start # with zipfile.ZipFile(buffer) as thezip: # thezip.extractall(extraction_path) # logging.info(f"File downloaded and extracted: {zip_url}") # else: # logging.error(f"Failed to download the file: {zip_url}")