Module pipelines.rj_cor.meteorologia.satelite.satellite_utils
Funções úteis no tratamento de dados de satélite
Functions
def choose_file_to_download(storage_files_path: list, base_path: str, redis_files: str, ref_filename=None)
-
Expand source code
def choose_file_to_download( storage_files_path: list, base_path: str, redis_files: str, ref_filename=None ): """ We can treat only one file each run, so we will first eliminate files that were already trated and saved on redis and keep only the first one from this partition """ # keep only ref_filename if it exists if ref_filename is not None: # extract this part of the name s_20222911230206_e20222911239514 ref_date = ref_filename[ref_filename.find("_s") + 1 : ref_filename.find("_e")] log(f"\n\n[DEBUG]: ref_date: {ref_date}") match_text = re.compile(f".*{ref_date}") storage_files_path = list(filter(match_text.match, storage_files_path)) log(f"\n\n[DEBUG]: storage_files_path: {storage_files_path}") # keep the first file if it is not on redis storage_files_path.sort() destination_file_path, download_file = None, None for path_file in storage_files_path: filename = path_file.split("/")[-1] log(f"\n\nChecking if {filename} is in redis") if filename not in redis_files: log(f"\n\n {filename} not in redis") redis_files.append(filename) destination_file_path = os.path.join(base_path, filename) download_file = path_file # log(f"[DEBUG]: filename to be append on redis_files: {redis_files}") break log(f"\n{filename} is already in redis") return redis_files, destination_file_path, download_file
We can treat only one file each run, so we will first eliminate files that were already trated and saved on redis and keep only the first one from this partition
def convert_julian_to_conventional_day(year: int, julian_day: int)
-
Expand source code
def convert_julian_to_conventional_day(year: int, julian_day: int): """Convert julian day to conventional Parameters year (int): 2022 (20222901900203) julian_day (int): 290 (20222901900203) Returns date_save (str): 19 (20222901900203)""" # Subtracting 1 because the year starts at day "0" julian_day = julian_day - 1 dayconventional = datetime.datetime(year, 1, 1) + datetime.timedelta(julian_day) # Format the date according to the strftime directives date_save = dayconventional.strftime("%Y%m%d") return date_save
Convert julian day to conventional
Parameters year (int): 2022 (20222901900203) julian_day (int): 290 (20222901900203)
Returns date_save (str): 19 (20222901900203)
def converte_timezone(datetime_save: str) ‑> str
-
Expand source code
def converte_timezone(datetime_save: str) -> str: """ Get UTC date-hour on 'YYYYMMDD HHmm' format and returns im the same format but on São Paulo timezone. """ log(f">>>>>>> datetime_save {datetime_save}") datahora = pendulum.from_format(datetime_save, "YYYYMMDD HHmmss") log(f">>>>>>> datahora {datahora}") datahora = datahora.in_tz("America/Sao_Paulo") return datahora.format("YYYYMMDD HHmmss")
Get UTC date-hour on 'YYYYMMDD HHmm' format and returns im the same format but on São Paulo timezone.
def create_and_save_image(data: xarray.core.dataarray.DataArray, info: dict, variable) ‑> pathlib.Path
-
Expand source code
def create_and_save_image(data: xr.DataArray, info: dict, variable) -> Path: """ Create image from xarray ans save it as png file. """ plt.figure(figsize=(10, 10)) # Use the Geostationary projection in cartopy axis = plt.axes(projection=ccrs.PlateCarree()) extent = info["extent"] img_extent = [extent[0], extent[2], extent[1], extent[3]] # Define the color scale based on the channel colormap = "jet" # White to black for IR channels # colormap = "gray_r" # White to black for IR channels # Plot the image img = axis.imshow(data, origin="upper", extent=img_extent, cmap=colormap, alpha=0.8) # # Find shapefile file "Limite_Bairros_RJ.shp" across the entire file system # for root, dirs, files in os.walk(os.sep): # if "Limite_Bairros_RJ.shp" in files: # log(f"[DEBUG] ROOT {root}") # shapefile_dir = root # break # else: # print("File not found.") # Add coastlines, borders and gridlines shapefile_dir = Path( "/opt/venv/lib/python3.9/site-packages/pipelines/utils/shapefiles" ) shapefile_path_neighborhood = shapefile_dir / "Limite_Bairros_RJ.shp" shapefile_path_state = shapefile_dir / "Limite_Estados_BR_IBGE.shp" log("\nImporting shapefiles") fiona.os.environ["SHAPE_RESTORE_SHX"] = "YES" reader_neighborhood = shpreader.Reader(shapefile_path_neighborhood) reader_state = shpreader.Reader(shapefile_path_state) state = [record.geometry for record in reader_state.records()] neighborhood = [record.geometry for record in reader_neighborhood.records()] log("\nShapefiles imported") axis.add_geometries( state, ccrs.PlateCarree(), facecolor="none", edgecolor="black", linewidth=0.7 ) axis.add_geometries( neighborhood, ccrs.PlateCarree(), facecolor="none", edgecolor="black", linewidth=0.2, ) # axis.coastlines(resolution='10m', color='black', linewidth=1.0) # axis.add_feature(cartopy.feature.BORDERS, edgecolor='black', linewidth=1.0) grdln = axis.gridlines( crs=ccrs.PlateCarree(), color="gray", alpha=0.7, linestyle="--", linewidth=0.7, xlocs=np.arange(-180, 180, 1), ylocs=np.arange(-90, 90, 1), draw_labels=True, ) grdln.top_labels = False grdln.right_labels = False plt.colorbar( img, label=variable.upper(), extend="both", orientation="horizontal", pad=0.05, fraction=0.05, ) log("\n Start saving image") output_image_path = Path(os.getcwd()) / "output" / "images" save_image_path = output_image_path / (f"{variable}_{info['datetime_save']}.png") if not output_image_path.exists(): output_image_path.mkdir(parents=True, exist_ok=True) plt.savefig(save_image_path, bbox_inches="tight", pad_inches=0, dpi=300) log("\n Ended saving image") return save_image_path
Create image from xarray ans save it as png file.
def download_blob(bucket_name: str,
source_blob_name: str,
destination_file_name: str | pathlib.Path,
mode: str = 'prod')-
Expand source code
def download_blob( bucket_name: str, source_blob_name: str, destination_file_name: Union[str, Path], mode: str = "prod", ): """ Downloads a blob from the bucket. Mode needs to be "prod" or "staging" # The ID of your GCS bucket # bucket_name = "your-bucket-name" # The ID of your GCS object # source_blob_name = "storage-object-name" # The path to which the file should be downloaded # destination_file_name = "local/path/to/file" """ credentials = get_credentials_from_env(mode=mode) storage_client = storage.Client(credentials=credentials) bucket = storage_client.bucket(bucket_name) blob = bucket.blob(source_blob_name) blob.download_to_filename(destination_file_name) log( f"Downloaded storage object {source_blob_name} from bucket\ {bucket_name} to local file {destination_file_name}." )
Downloads a blob from the bucket. Mode needs to be "prod" or "staging"
The ID of your GCS bucket
bucket_name = "your-bucket-name"
The ID of your GCS object
source_blob_name = "storage-object-name"
The path to which the file should be downloaded
destination_file_name = "local/path/to/file"
def extract_julian_day_and_hour_from_filename(filename: str)
-
Expand source code
def extract_julian_day_and_hour_from_filename(filename: str): """ Extract julian day and hour from satelite filename Parameters filename (str): 'OR_ABI-L2-TPWF-M6_G16_s20222901900203_e20222901909511_c20222901911473.nc' Returns year (int): 2022 (20222901900203) julian_day (int): 290 (20222901900203) hour_utc (str): 1900 (20222901900203) """ # Search for the Scan start in the file name start = filename[filename.find("_s") + 2 : filename.find("_e")] # Get year year = int(start[0:4]) # Get julian day julian_day = int(start[4:7]) # Time (UTC) as string hour_utc = start[7:13] # Time of the start of the Scan # time = start[7:9] + ":" + start[9:11] + ":" + start[11:13] + " UTC" return year, julian_day, hour_utc
Extract julian day and hour from satelite filename
Parameters filename (str): 'OR_ABI-L2-TPWF-M6_G16_s20222901900203_e20222901909511_c20222901911473.nc'
Returns year (int): 2022 (20222901900203) julian_day (int): 290 (20222901900203) hour_utc (str): 1900 (20222901900203)
def get_blob_with_prefix(bucket_name: str, prefix: str, mode: str = 'prod') ‑> str
-
Expand source code
def get_blob_with_prefix(bucket_name: str, prefix: str, mode: str = "prod") -> str: """ Lists all the blobs in the bucket that begin with the prefix. This can be used to list all blobs in a "folder", e.g. "public/". Mode needs to be "prod" or "staging" """ files = [b.name for b in list_blobs_with_prefix(bucket_name, prefix, mode)] files.sort() return files[0]
Lists all the blobs in the bucket that begin with the prefix. This can be used to list all blobs in a "folder", e.g. "public/". Mode needs to be "prod" or "staging"
def get_files_from_aws(partition_path)
-
Expand source code
def get_files_from_aws(partition_path): """ Get all available files from aws that is inside the partition path """ log("Acessing AWS to get files") # Use the anonymous credentials to access public data s3_fs = s3fs.S3FileSystem(anon=True) # Get all files of GOES-16 data (multiband format) at this hour storage_files_path = np.sort( np.array( s3_fs.find(f"noaa-goes16/{partition_path}") # s3_fs.find(f"noaa-goes16/ABI-L2-CMIPF/2022/270/10/OR_ABI-L2-CMIPF-M6C13_G16_s20222701010208_e20222701019528_c20222701020005.nc") ) ) storage_origin = "aws" return storage_files_path, storage_origin, s3_fs
Get all available files from aws that is inside the partition path
def get_files_from_gcp(partition_path)
-
Expand source code
def get_files_from_gcp(partition_path): """ Get all available files from gcp that is inside the partition path """ log("Acessing GCP to get files") bucket_name = "gcp-public-data-goes-16" storage_files_path = get_blob_with_prefix( bucket_name=bucket_name, prefix=partition_path, mode="prod" ) storage_origin = "gcp" return storage_files_path, storage_origin, bucket_name
Get all available files from gcp that is inside the partition path
def get_info(path: str) ‑> dict
-
Expand source code
def get_info(path: str) -> dict: """ # Getting Information From the File Name (Time, Date, # Product Type, Variable and Defining the CMAP) """ year, julian_day, hour_utc = extract_julian_day_and_hour_from_filename(path) date_save = convert_julian_to_conventional_day(year, julian_day) log(f">>>>>>>>>date_save {date_save}") log(f">>>>>>>>> hour_utc {hour_utc}") log(f">>>>>>>>>julian_day {julian_day}") datetime_save = str(date_save) + " " + str(hour_utc) # Converte data/hora de UTC para horário de São Paulo datetime_save = converte_timezone(datetime_save=datetime_save) # ===================================================================== # Detect the product type # ===================================================================== procura_m = path.find("-M6") # Se não encontra o termo "M6" tenta encontrar "M3" e depois "M4" if procura_m == -1: procura_m = path.find("-M3") if procura_m == -1: procura_m = path.find("-M4") product = path[path.find("L2-") + 3 : procura_m] # Nem todos os produtos foram adicionados no dicionário de características # dos produtos. Olhar arquivo original caso o produto não estaja aqui # https://www.dropbox.com/s/yfopijdrplq5sjr/GNC-A%20Blog%20-%20GOES-R-Level-2-Products%20-%20Superimpose.py?dl=0 # GNC-A Blog - GOES-R-Level-2-Products - Superimpose # https://geonetcast.wordpress.com/2018/06/28/goes-r-level-2-products-a-python-script/ # https://www.dropbox.com/s/2zylbwfjfkx9a7i/GNC-A%20Blog%20-%20GOES-R-Level-2-Products.py?dl=0 product_caracteristics = {} # ACHAF - Cloud Top Height: 'HT' product_caracteristics["ACHAF"] = { "variable": ["HT"], "vmin": 0, "vmax": 15000, "cmap": "rainbow", } # CMIPF - Cloud and Moisture Imagery: 'CMI' product_caracteristics["CMIPF"] = { "variable": ["CMI"], "vmin": -50, "vmax": 50, "cmap": "jet", } # ACHAF - Cloud Top Height: 'HT' product_caracteristics["ACHAF"] = { "variable": ["HT"], "vmin": 0, "vmax": 15000, "cmap": "rainbow", } # ACHTF - Cloud Top Temperature: 'TEMP' product_caracteristics["ACHATF"] = { "variable": ["TEMP"], "vmin": 180, "vmax": 300, "cmap": "jet", } # ACMF - Clear Sky Masks: 'BCM' product_caracteristics["ACMF"] = { "variable": ["BCM"], "vmin": 0, "vmax": 1, "cmap": "gray", } # ACTPF - Cloud Top Phase: 'Phase' product_caracteristics["ACTPF"] = { "variable": ["Phase"], "vmin": 0, "vmax": 5, "cmap": "jet", } # ADPF - Aerosol Detection: 'Smoke' product_caracteristics["ADPF"] = { "variable": ["Smoke"], "vmin": 0, "vmax": 255, "cmap": "jet", } # AODF - Aerosol Optical Depth: 'AOD' product_caracteristics["AODF"] = { "variable": ["AOD"], "vmin": 0, "vmax": 2, "cmap": "rainbow", } # CODF - Cloud Optical Depth: 'COD' product_caracteristics["CODF"] = { "variable": ["CODF"], "vmin": 0, "vmax": 100, "cmap": "jet", } # CPSF - Cloud Particle Size: 'PSD' product_caracteristics["CPSF"] = { "variable": ["PSD"], "vmin": 0, "vmax": 80, "cmap": "rainbow", } # CTPF - Cloud Top Pressure: 'PRES' product_caracteristics["CTPF"] = { "variable": ["PRES"], "vmin": 0, "vmax": 1100, "cmap": "rainbow", } # DSIF - Derived Stability Indices: 'CAPE', 'KI', 'LI', 'SI', 'TT' product_caracteristics["DSIF"] = { "variable": ["LI", "CAPE", "TT", "SI", "KI"], "vmin": 0, "vmax": 1000, "cmap": "jet", } # FDCF - Fire-Hot Spot Characterization: 'Area', 'Mask', 'Power', 'Temp' product_caracteristics["FDCF"] = { "variable": ["Area", "Mask", "Power", "Temp"], "vmin": 0, "vmax": 255, "cmap": "jet", } # LSTF - Land Surface (Skin) Temperature: 'LST' product_caracteristics["LSTF"] = { "variable": ["LST"], "vmin": 213, "vmax": 330, "cmap": "jet", } # RRQPEF - Rainfall Rate - Quantitative Prediction Estimate: 'RRQPE' product_caracteristics["RRQPEF"] = { "variable": ["RRQPE"], "vmin": 0, "vmax": 50, "cmap": "jet", } # SSTF - Sea Surface (Skin) Temperature: 'SST' product_caracteristics["SSTF"] = { "variable": ["SST"], "vmin": 268, "vmax": 308, "cmap": "jet", } # TPWF - Total Precipitable Water: 'TPW' product_caracteristics["TPWF"] = { "variable": ["TPW"], "vmin": 0, "vmax": 60, "cmap": "jet", } # MCMIPF - Cloud and Moisture Imagery: 'MCMIP' # https://developers.google.com/earth-engine/datasets/catalog/NOAA_GOES_16_MCMIPM#bands product_caracteristics["MCMIPF"] = { "variable": [ "CMI_C01", "CMI_C02", "CMI_C03", "CMI_C04", "CMI_C05", "CMI_C06", "CMI_C07", "CMI_C08", "CMI_C09", "CMI_C10", "CMI_C11", "CMI_C12", "CMI_C13", "CMI_C14", "CMI_C15", "CMI_C16", ], } # variable = product_caracteristics[product]['variable'] # vmin = product_caracteristics[product]['vmin'] # vmax = product_caracteristics[product]['vmax'] # cmap = product_caracteristics[product]['cmap'] product_caracteristics = product_caracteristics[product] product_caracteristics["product"] = product product_caracteristics["filename"] = path product_caracteristics["datetime_save"] = datetime_save if product_caracteristics["variable"] == ["CMI"]: # Search for the GOES-16 channel in the file name pattern = r"M(\d+)C(\d+)_G16" match = re.search(pattern, path) print(match) print(match.group(1)) product_caracteristics["band"] = match.group(2) else: product_caracteristics["band"] = np.nan log(f"Product Caracteristics: {product_caracteristics}") return product_caracteristics
Getting Information From the File Name (Time, Date,
Product Type, Variable and Defining the CMAP)
def get_variable_values(dfr: pandas.core.frame.DataFrame, variable: str) ‑> xarray.core.dataarray.DataArray
-
Expand source code
def get_variable_values(dfr: pd.DataFrame, variable: str) -> xr.DataArray: """ Convert pandas dataframe to a matrix with latitude on rows, longitudes on columns and the correspondent values on a xarray DataArray """ log("Fill matrix") dfr = dfr.sort_values(by=["latitude", "longitude"], ascending=[False, True]) matrix_temp = dfr.pivot(index="latitude", columns="longitude", values=variable) matrix_temp = matrix_temp.sort_index(ascending=False) log( f"[DEBUG]: matriz de conversão deve estar com a latitude em ordem descendente\ e a longitude em ascendente: {matrix_temp.head(10)}" ) # Create a NumPy matriz NumPy matrix = matrix_temp.values longitudes = list(matrix_temp.columns) latitudes = list(matrix_temp.index) log("Convert to xr dataarray") data_array = xr.DataArray( matrix, dims=("lat", "lon"), coords={"lon": longitudes, "lat": latitudes} ) log("end") return data_array
Convert pandas dataframe to a matrix with latitude on rows, longitudes on columns and the correspondent values on a xarray DataArray
def read_netcdf(file_path: str) ‑> pandas.core.frame.DataFrame
-
Expand source code
def read_netcdf(file_path: str) -> pd.DataFrame: """ Function to extract data from NetCDF file and convert it to pandas DataFrame with the name of columns the same as the variable saved on the filename """ dxr = xr.open_dataset(file_path) pattern = r"variable-(.*?)\.nc" match = re.search(pattern, file_path) if match: # Extract the content between "variable-" and ".nc" variable = match.group(1) dfr = ( dxr.to_dataframe() .reset_index()[["lat", "lon", "Band1"]] .rename( { "lat": "latitude", "lon": "longitude", "Band1": f"{variable}", }, axis=1, ) ) return dfr
Function to extract data from NetCDF file and convert it to pandas DataFrame with the name of columns the same as the variable saved on the filename
def remap_g16(path: str, extent: list, product: str, variable: list)
-
Expand source code
def remap_g16( path: str, extent: list, product: str, variable: list, ): """ the GOES-16 image is reprojected to the rectangular projection in the extent region. If netcdf file has more than one variable remap function will save each variable in a different netcdf file inside temp/ folder. """ n_variables = len(variable) print(variable, n_variables) remap_path = f"{os.getcwd()}/temp/treated/{product}/" # This removing old files step is important to do backfill if os.path.exists(remap_path): print("Removing old files") shutil.rmtree(remap_path) os.makedirs(remap_path) for i in range(n_variables): log( f"Starting remap for path: {path}, remap_path: {remap_path}, variable: {variable[i]}" ) remap(path, remap_path, variable[i], extent)
the GOES-16 image is reprojected to the rectangular projection in the extent region. If netcdf file has more than one variable remap function will save each variable in a different netcdf file inside temp/ folder.
def save_data_in_file(product: str, variable: list, datetime_save: str, mode_redis: str = 'prod')
-
Expand source code
def save_data_in_file( product: str, variable: list, datetime_save: str, mode_redis: str = "prod" ): """ Read all nc or tif files and save them in a unique file inside a partition """ date_save = datetime_save[:8] time_save = str(int(datetime_save[9:11])) year = date_save[:4] month = str(int(date_save[4:6])) day = str(int(date_save[6:8])) date = year + "-" + month.zfill(2) + "-" + day.zfill(2) partitions = os.path.join( f"ano_particao={year}", f"mes_particao={month}", f"data_particao={date}", f"hora_particao={time_save}", ) folder_path = f"{os.getcwd()}/temp/treated/{product}/" # cria pasta de partições se elas não existem output_path = os.path.join(os.getcwd(), "temp", "output", mode_redis, product) partitions_path = os.path.join(output_path, partitions) if not os.path.exists(partitions_path): os.makedirs(partitions_path) # Loop through all NetCDF files in the folder data = pd.DataFrame() files = [i for i in os.listdir(folder_path) if i.endswith(".nc")] for i, file_name in enumerate(files): saved_file_path = os.path.join(folder_path, file_name) data_temp = read_netcdf(saved_file_path) if i == 0: data = data_temp.copy() else: data = data.merge(data_temp, on=["latitude", "longitude"], how="outer") # Guarda horário do arquivo na coluna data["horario"] = pendulum.from_format( datetime_save, "YYYYMMDD HHmmss" ).to_time_string() print(f"Final df: {data.head()}") # Fixa ordem das colunas data = data[["longitude", "latitude", "horario"] + [i.lower() for i in variable]] print("cols", data.columns) file_name = files[0].split("_variable-")[0] print(f"\n\n[DEGUB]: Saving {file_name} on {output_path}\n") print(f"Data_save: {date_save}, time_save: {time_save}") # log(f"\n\n[DEGUB]: Saving {file_name} on {parquet_path}\n\n") # log(f"Data_save: {date_save}, time_save: {time_save}") file_path = os.path.join(partitions_path, f"{file_name}.csv") data.to_csv(file_path, index=False) return output_path, file_path
Read all nc or tif files and save them in a unique file inside a partition
def upload_image_to_api(info: dict, save_image_path: pathlib.Path)
-
Expand source code
def upload_image_to_api(info: dict, save_image_path: Path): """ Upload image to api """ username = "your-username" password = "your-password" image = base64.b64encode(open(save_image_path, "rb").read()).decode() response = requests.post( "https://api.example.com/upload-image", data={"image": image, "timestamp": info["datetime_save"]}, auth=(username, password), ) if response.status_code == 200: print("Image sent to API") else: print("Problem senting imagem to API")
Upload image to api