Module pipelines.rj_cor.meteorologia.satelite.satellite_utils

Funções úteis no tratamento de dados de satélite

Functions

def choose_file_to_download(storage_files_path: list, base_path: str, redis_files: str, ref_filename=None)
Expand source code
def choose_file_to_download(
    storage_files_path: list, base_path: str, redis_files: str, ref_filename=None
):
    """
    We can treat only one file each run, so we will first eliminate files that were
    already trated and saved on redis and keep only the first one from this partition
    """
    # keep only ref_filename if it exists
    if ref_filename is not None:
        # extract this part of the name s_20222911230206_e20222911239514
        ref_date = ref_filename[ref_filename.find("_s") + 1 : ref_filename.find("_e")]
        log(f"\n\n[DEBUG]: ref_date: {ref_date}")
        match_text = re.compile(f".*{ref_date}")
        storage_files_path = list(filter(match_text.match, storage_files_path))

    log(f"\n\n[DEBUG]: storage_files_path: {storage_files_path}")

    # keep the first file if it is not on redis
    storage_files_path.sort()
    destination_file_path, download_file = None, None

    for path_file in storage_files_path:
        filename = path_file.split("/")[-1]

        log(f"\n\nChecking if {filename} is in redis")
        if filename not in redis_files:
            log(f"\n\n {filename} not in redis")
            redis_files.append(filename)
            destination_file_path = os.path.join(base_path, filename)
            download_file = path_file
            # log(f"[DEBUG]: filename to be append on redis_files: {redis_files}")
            break
        log(f"\n{filename} is already in redis")

    return redis_files, destination_file_path, download_file

We can treat only one file each run, so we will first eliminate files that were already trated and saved on redis and keep only the first one from this partition

def convert_julian_to_conventional_day(year: int, julian_day: int)
Expand source code
def convert_julian_to_conventional_day(year: int, julian_day: int):
    """Convert julian day to conventional

    Parameters
    year (int): 2022 (20222901900203)
    julian_day (int): 290 (20222901900203)

    Returns
    date_save (str): 19 (20222901900203)"""

    # Subtracting 1 because the year starts at day "0"
    julian_day = julian_day - 1
    dayconventional = datetime.datetime(year, 1, 1) + datetime.timedelta(julian_day)

    # Format the date according to the strftime directives
    date_save = dayconventional.strftime("%Y%m%d")

    return date_save

Convert julian day to conventional

Parameters year (int): 2022 (20222901900203) julian_day (int): 290 (20222901900203)

Returns date_save (str): 19 (20222901900203)

def converte_timezone(datetime_save: str) ‑> str
Expand source code
def converte_timezone(datetime_save: str) -> str:
    """
    Get UTC date-hour on 'YYYYMMDD HHmm' format and returns im the same format but
    on São Paulo timezone.
    """
    log(f">>>>>>> datetime_save {datetime_save}")
    datahora = pendulum.from_format(datetime_save, "YYYYMMDD HHmmss")
    log(f">>>>>>> datahora {datahora}")
    datahora = datahora.in_tz("America/Sao_Paulo")
    return datahora.format("YYYYMMDD HHmmss")

Get UTC date-hour on 'YYYYMMDD HHmm' format and returns im the same format but on São Paulo timezone.

def create_and_save_image(data: xarray.core.dataarray.DataArray, info: dict, variable) ‑> pathlib.Path
Expand source code
def create_and_save_image(data: xr.DataArray, info: dict, variable) -> Path:
    """
    Create image from xarray ans save it as png file.
    """

    plt.figure(figsize=(10, 10))

    # Use the Geostationary projection in cartopy
    axis = plt.axes(projection=ccrs.PlateCarree())

    extent = info["extent"]
    img_extent = [extent[0], extent[2], extent[1], extent[3]]

    # Define the color scale based on the channel
    colormap = "jet"  # White to black for IR channels
    # colormap = "gray_r" # White to black for IR channels

    # Plot the image
    img = axis.imshow(data, origin="upper", extent=img_extent, cmap=colormap, alpha=0.8)

    # # Find shapefile file "Limite_Bairros_RJ.shp" across the entire file system
    # for root, dirs, files in os.walk(os.sep):
    #     if "Limite_Bairros_RJ.shp" in files:
    #         log(f"[DEBUG] ROOT {root}")
    #         shapefile_dir = root
    #         break
    # else:
    #     print("File not found.")

    # Add coastlines, borders and gridlines
    shapefile_dir = Path(
        "/opt/venv/lib/python3.9/site-packages/pipelines/utils/shapefiles"
    )
    shapefile_path_neighborhood = shapefile_dir / "Limite_Bairros_RJ.shp"
    shapefile_path_state = shapefile_dir / "Limite_Estados_BR_IBGE.shp"

    log("\nImporting shapefiles")
    fiona.os.environ["SHAPE_RESTORE_SHX"] = "YES"
    reader_neighborhood = shpreader.Reader(shapefile_path_neighborhood)
    reader_state = shpreader.Reader(shapefile_path_state)
    state = [record.geometry for record in reader_state.records()]
    neighborhood = [record.geometry for record in reader_neighborhood.records()]
    log("\nShapefiles imported")
    axis.add_geometries(
        state, ccrs.PlateCarree(), facecolor="none", edgecolor="black", linewidth=0.7
    )
    axis.add_geometries(
        neighborhood,
        ccrs.PlateCarree(),
        facecolor="none",
        edgecolor="black",
        linewidth=0.2,
    )
    # axis.coastlines(resolution='10m', color='black', linewidth=1.0)
    # axis.add_feature(cartopy.feature.BORDERS, edgecolor='black', linewidth=1.0)
    grdln = axis.gridlines(
        crs=ccrs.PlateCarree(),
        color="gray",
        alpha=0.7,
        linestyle="--",
        linewidth=0.7,
        xlocs=np.arange(-180, 180, 1),
        ylocs=np.arange(-90, 90, 1),
        draw_labels=True,
    )
    grdln.top_labels = False
    grdln.right_labels = False

    plt.colorbar(
        img,
        label=variable.upper(),
        extend="both",
        orientation="horizontal",
        pad=0.05,
        fraction=0.05,
    )

    log("\n Start saving image")
    output_image_path = Path(os.getcwd()) / "output" / "images"

    save_image_path = output_image_path / (f"{variable}_{info['datetime_save']}.png")

    if not output_image_path.exists():
        output_image_path.mkdir(parents=True, exist_ok=True)

    plt.savefig(save_image_path, bbox_inches="tight", pad_inches=0, dpi=300)
    log("\n Ended saving image")
    return save_image_path

Create image from xarray ans save it as png file.

def download_blob(bucket_name: str,
source_blob_name: str,
destination_file_name: str | pathlib.Path,
mode: str = 'prod')
Expand source code
def download_blob(
    bucket_name: str,
    source_blob_name: str,
    destination_file_name: Union[str, Path],
    mode: str = "prod",
):
    """
    Downloads a blob from the bucket.
    Mode needs to be "prod" or "staging"

    # The ID of your GCS bucket
    # bucket_name = "your-bucket-name"

    # The ID of your GCS object
    # source_blob_name = "storage-object-name"

    # The path to which the file should be downloaded
    # destination_file_name = "local/path/to/file"
    """

    credentials = get_credentials_from_env(mode=mode)
    storage_client = storage.Client(credentials=credentials)

    bucket = storage_client.bucket(bucket_name)

    blob = bucket.blob(source_blob_name)
    blob.download_to_filename(destination_file_name)

    log(
        f"Downloaded storage object {source_blob_name} from bucket\
        {bucket_name} to local file {destination_file_name}."
    )

Downloads a blob from the bucket. Mode needs to be "prod" or "staging"

The ID of your GCS bucket

bucket_name = "your-bucket-name"

The ID of your GCS object

source_blob_name = "storage-object-name"

The path to which the file should be downloaded

destination_file_name = "local/path/to/file"

def extract_julian_day_and_hour_from_filename(filename: str)
Expand source code
def extract_julian_day_and_hour_from_filename(filename: str):
    """
    Extract julian day and hour from satelite filename

    Parameters
    filename (str): 'OR_ABI-L2-TPWF-M6_G16_s20222901900203_e20222901909511_c20222901911473.nc'

    Returns
    year (int): 2022 (20222901900203)
    julian_day (int): 290 (20222901900203)
    hour_utc (str): 1900 (20222901900203)
    """
    # Search for the Scan start in the file name
    start = filename[filename.find("_s") + 2 : filename.find("_e")]
    # Get year
    year = int(start[0:4])
    # Get julian day
    julian_day = int(start[4:7])

    # Time (UTC) as string
    hour_utc = start[7:13]

    # Time of the start of the Scan
    # time = start[7:9] + ":" + start[9:11] + ":" + start[11:13] + " UTC"

    return year, julian_day, hour_utc

Extract julian day and hour from satelite filename

Parameters filename (str): 'OR_ABI-L2-TPWF-M6_G16_s20222901900203_e20222901909511_c20222901911473.nc'

Returns year (int): 2022 (20222901900203) julian_day (int): 290 (20222901900203) hour_utc (str): 1900 (20222901900203)

def get_blob_with_prefix(bucket_name: str, prefix: str, mode: str = 'prod') ‑> str
Expand source code
def get_blob_with_prefix(bucket_name: str, prefix: str, mode: str = "prod") -> str:
    """
    Lists all the blobs in the bucket that begin with the prefix.
    This can be used to list all blobs in a "folder", e.g. "public/".
    Mode needs to be "prod" or "staging"
    """
    files = [b.name for b in list_blobs_with_prefix(bucket_name, prefix, mode)]
    files.sort()
    return files[0]

Lists all the blobs in the bucket that begin with the prefix. This can be used to list all blobs in a "folder", e.g. "public/". Mode needs to be "prod" or "staging"

def get_files_from_aws(partition_path)
Expand source code
def get_files_from_aws(partition_path):
    """
    Get all available files from aws that is inside the partition path
    """
    log("Acessing AWS to get files")
    # Use the anonymous credentials to access public data
    s3_fs = s3fs.S3FileSystem(anon=True)

    # Get all files of GOES-16 data (multiband format) at this hour
    storage_files_path = np.sort(
        np.array(
            s3_fs.find(f"noaa-goes16/{partition_path}")
            # s3_fs.find(f"noaa-goes16/ABI-L2-CMIPF/2022/270/10/OR_ABI-L2-CMIPF-M6C13_G16_s20222701010208_e20222701019528_c20222701020005.nc")
        )
    )
    storage_origin = "aws"

    return storage_files_path, storage_origin, s3_fs

Get all available files from aws that is inside the partition path

def get_files_from_gcp(partition_path)
Expand source code
def get_files_from_gcp(partition_path):
    """
    Get all available files from gcp that is inside the partition path
    """
    log("Acessing GCP to get files")
    bucket_name = "gcp-public-data-goes-16"
    storage_files_path = get_blob_with_prefix(
        bucket_name=bucket_name, prefix=partition_path, mode="prod"
    )
    storage_origin = "gcp"
    return storage_files_path, storage_origin, bucket_name

Get all available files from gcp that is inside the partition path

def get_info(path: str) ‑> dict
Expand source code
def get_info(path: str) -> dict:
    """
    # Getting Information From the File Name  (Time, Date,
    # Product Type, Variable and Defining the CMAP)
    """
    year, julian_day, hour_utc = extract_julian_day_and_hour_from_filename(path)

    date_save = convert_julian_to_conventional_day(year, julian_day)
    log(f">>>>>>>>>date_save {date_save}")
    log(f">>>>>>>>> hour_utc {hour_utc}")
    log(f">>>>>>>>>julian_day  {julian_day}")
    datetime_save = str(date_save) + " " + str(hour_utc)

    # Converte data/hora de UTC para horário de São Paulo
    datetime_save = converte_timezone(datetime_save=datetime_save)

    # =====================================================================
    # Detect the product type
    # =====================================================================
    procura_m = path.find("-M6")
    # Se não encontra o termo "M6" tenta encontrar "M3" e depois "M4"
    if procura_m == -1:
        procura_m = path.find("-M3")
    if procura_m == -1:
        procura_m = path.find("-M4")
    product = path[path.find("L2-") + 3 : procura_m]

    # Nem todos os produtos foram adicionados no dicionário de características
    # dos produtos. Olhar arquivo original caso o produto não estaja aqui
    # https://www.dropbox.com/s/yfopijdrplq5sjr/GNC-A%20Blog%20-%20GOES-R-Level-2-Products%20-%20Superimpose.py?dl=0
    # GNC-A Blog - GOES-R-Level-2-Products - Superimpose
    # https://geonetcast.wordpress.com/2018/06/28/goes-r-level-2-products-a-python-script/
    # https://www.dropbox.com/s/2zylbwfjfkx9a7i/GNC-A%20Blog%20-%20GOES-R-Level-2-Products.py?dl=0
    product_caracteristics = {}

    # ACHAF - Cloud Top Height: 'HT'
    product_caracteristics["ACHAF"] = {
        "variable": ["HT"],
        "vmin": 0,
        "vmax": 15000,
        "cmap": "rainbow",
    }
    # CMIPF - Cloud and Moisture Imagery: 'CMI'
    product_caracteristics["CMIPF"] = {
        "variable": ["CMI"],
        "vmin": -50,
        "vmax": 50,
        "cmap": "jet",
    }
    # ACHAF - Cloud Top Height: 'HT'
    product_caracteristics["ACHAF"] = {
        "variable": ["HT"],
        "vmin": 0,
        "vmax": 15000,
        "cmap": "rainbow",
    }
    # ACHTF - Cloud Top Temperature: 'TEMP'
    product_caracteristics["ACHATF"] = {
        "variable": ["TEMP"],
        "vmin": 180,
        "vmax": 300,
        "cmap": "jet",
    }
    # ACMF - Clear Sky Masks: 'BCM'
    product_caracteristics["ACMF"] = {
        "variable": ["BCM"],
        "vmin": 0,
        "vmax": 1,
        "cmap": "gray",
    }
    # ACTPF - Cloud Top Phase: 'Phase'
    product_caracteristics["ACTPF"] = {
        "variable": ["Phase"],
        "vmin": 0,
        "vmax": 5,
        "cmap": "jet",
    }
    # ADPF - Aerosol Detection: 'Smoke'
    product_caracteristics["ADPF"] = {
        "variable": ["Smoke"],
        "vmin": 0,
        "vmax": 255,
        "cmap": "jet",
    }
    # AODF - Aerosol Optical Depth: 'AOD'
    product_caracteristics["AODF"] = {
        "variable": ["AOD"],
        "vmin": 0,
        "vmax": 2,
        "cmap": "rainbow",
    }
    # CODF - Cloud Optical Depth: 'COD'
    product_caracteristics["CODF"] = {
        "variable": ["CODF"],
        "vmin": 0,
        "vmax": 100,
        "cmap": "jet",
    }
    # CPSF - Cloud Particle Size: 'PSD'
    product_caracteristics["CPSF"] = {
        "variable": ["PSD"],
        "vmin": 0,
        "vmax": 80,
        "cmap": "rainbow",
    }
    # CTPF - Cloud Top Pressure: 'PRES'
    product_caracteristics["CTPF"] = {
        "variable": ["PRES"],
        "vmin": 0,
        "vmax": 1100,
        "cmap": "rainbow",
    }
    # DSIF - Derived Stability Indices: 'CAPE', 'KI', 'LI', 'SI', 'TT'
    product_caracteristics["DSIF"] = {
        "variable": ["LI", "CAPE", "TT", "SI", "KI"],
        "vmin": 0,
        "vmax": 1000,
        "cmap": "jet",
    }
    # FDCF - Fire-Hot Spot Characterization: 'Area', 'Mask', 'Power', 'Temp'
    product_caracteristics["FDCF"] = {
        "variable": ["Area", "Mask", "Power", "Temp"],
        "vmin": 0,
        "vmax": 255,
        "cmap": "jet",
    }
    # LSTF - Land Surface (Skin) Temperature: 'LST'
    product_caracteristics["LSTF"] = {
        "variable": ["LST"],
        "vmin": 213,
        "vmax": 330,
        "cmap": "jet",
    }
    # RRQPEF - Rainfall Rate - Quantitative Prediction Estimate: 'RRQPE'
    product_caracteristics["RRQPEF"] = {
        "variable": ["RRQPE"],
        "vmin": 0,
        "vmax": 50,
        "cmap": "jet",
    }
    # SSTF - Sea Surface (Skin) Temperature: 'SST'
    product_caracteristics["SSTF"] = {
        "variable": ["SST"],
        "vmin": 268,
        "vmax": 308,
        "cmap": "jet",
    }
    # TPWF - Total Precipitable Water: 'TPW'
    product_caracteristics["TPWF"] = {
        "variable": ["TPW"],
        "vmin": 0,
        "vmax": 60,
        "cmap": "jet",
    }
    # MCMIPF - Cloud and Moisture Imagery: 'MCMIP'
    # https://developers.google.com/earth-engine/datasets/catalog/NOAA_GOES_16_MCMIPM#bands

    product_caracteristics["MCMIPF"] = {
        "variable": [
            "CMI_C01",
            "CMI_C02",
            "CMI_C03",
            "CMI_C04",
            "CMI_C05",
            "CMI_C06",
            "CMI_C07",
            "CMI_C08",
            "CMI_C09",
            "CMI_C10",
            "CMI_C11",
            "CMI_C12",
            "CMI_C13",
            "CMI_C14",
            "CMI_C15",
            "CMI_C16",
        ],
    }

    # variable = product_caracteristics[product]['variable']
    # vmin = product_caracteristics[product]['vmin']
    # vmax = product_caracteristics[product]['vmax']
    # cmap = product_caracteristics[product]['cmap']
    product_caracteristics = product_caracteristics[product]
    product_caracteristics["product"] = product
    product_caracteristics["filename"] = path
    product_caracteristics["datetime_save"] = datetime_save

    if product_caracteristics["variable"] == ["CMI"]:
        # Search for the GOES-16 channel in the file name
        pattern = r"M(\d+)C(\d+)_G16"
        match = re.search(pattern, path)
        print(match)
        print(match.group(1))
        product_caracteristics["band"] = match.group(2)
    else:
        product_caracteristics["band"] = np.nan

    log(f"Product Caracteristics: {product_caracteristics}")

    return product_caracteristics

Getting Information From the File Name (Time, Date,

Product Type, Variable and Defining the CMAP)

def get_variable_values(dfr: pandas.core.frame.DataFrame, variable: str) ‑> xarray.core.dataarray.DataArray
Expand source code
def get_variable_values(dfr: pd.DataFrame, variable: str) -> xr.DataArray:
    """
    Convert pandas dataframe to a matrix with latitude on rows, longitudes on
    columns and the correspondent values on a xarray DataArray
    """

    log("Fill matrix")
    dfr = dfr.sort_values(by=["latitude", "longitude"], ascending=[False, True])
    matrix_temp = dfr.pivot(index="latitude", columns="longitude", values=variable)
    matrix_temp = matrix_temp.sort_index(ascending=False)
    log(
        f"[DEBUG]: matriz de conversão deve estar com a latitude em ordem descendente\
             e a longitude em ascendente: {matrix_temp.head(10)}"
    )

    # Create a NumPy matriz NumPy
    matrix = matrix_temp.values

    longitudes = list(matrix_temp.columns)
    latitudes = list(matrix_temp.index)

    log("Convert to xr dataarray")
    data_array = xr.DataArray(
        matrix, dims=("lat", "lon"), coords={"lon": longitudes, "lat": latitudes}
    )
    log("end")
    return data_array

Convert pandas dataframe to a matrix with latitude on rows, longitudes on columns and the correspondent values on a xarray DataArray

def read_netcdf(file_path: str) ‑> pandas.core.frame.DataFrame
Expand source code
def read_netcdf(file_path: str) -> pd.DataFrame:
    """
    Function to extract data from NetCDF file and convert it to pandas DataFrame
    with the name of columns the same as the variable saved on the filename
    """
    dxr = xr.open_dataset(file_path)

    pattern = r"variable-(.*?)\.nc"

    match = re.search(pattern, file_path)

    if match:
        # Extract the content between "variable-" and ".nc"
        variable = match.group(1)

        dfr = (
            dxr.to_dataframe()
            .reset_index()[["lat", "lon", "Band1"]]
            .rename(
                {
                    "lat": "latitude",
                    "lon": "longitude",
                    "Band1": f"{variable}",
                },
                axis=1,
            )
        )

    return dfr

Function to extract data from NetCDF file and convert it to pandas DataFrame with the name of columns the same as the variable saved on the filename

def remap_g16(path: str, extent: list, product: str, variable: list)
Expand source code
def remap_g16(
    path: str,
    extent: list,
    product: str,
    variable: list,
):
    """
    the GOES-16 image is reprojected to the rectangular projection in the extent region.
    If netcdf file has more than one variable remap function will save each variable in
    a different netcdf file inside temp/ folder.
    """

    n_variables = len(variable)
    print(variable, n_variables)
    remap_path = f"{os.getcwd()}/temp/treated/{product}/"

    # This removing old files step is important to do backfill
    if os.path.exists(remap_path):
        print("Removing old files")
        shutil.rmtree(remap_path)

    os.makedirs(remap_path)
    for i in range(n_variables):
        log(
            f"Starting remap for path: {path}, remap_path: {remap_path}, variable: {variable[i]}"
        )
        remap(path, remap_path, variable[i], extent)

the GOES-16 image is reprojected to the rectangular projection in the extent region. If netcdf file has more than one variable remap function will save each variable in a different netcdf file inside temp/ folder.

def save_data_in_file(product: str, variable: list, datetime_save: str, mode_redis: str = 'prod')
Expand source code
def save_data_in_file(
    product: str, variable: list, datetime_save: str, mode_redis: str = "prod"
):
    """
    Read all nc or tif files and save them in a unique file inside a partition
    """
    date_save = datetime_save[:8]
    time_save = str(int(datetime_save[9:11]))

    year = date_save[:4]
    month = str(int(date_save[4:6]))
    day = str(int(date_save[6:8]))
    date = year + "-" + month.zfill(2) + "-" + day.zfill(2)
    partitions = os.path.join(
        f"ano_particao={year}",
        f"mes_particao={month}",
        f"data_particao={date}",
        f"hora_particao={time_save}",
    )

    folder_path = f"{os.getcwd()}/temp/treated/{product}/"
    # cria pasta de partições se elas não existem
    output_path = os.path.join(os.getcwd(), "temp", "output", mode_redis, product)
    partitions_path = os.path.join(output_path, partitions)

    if not os.path.exists(partitions_path):
        os.makedirs(partitions_path)

    # Loop through all NetCDF files in the folder
    data = pd.DataFrame()
    files = [i for i in os.listdir(folder_path) if i.endswith(".nc")]
    for i, file_name in enumerate(files):
        saved_file_path = os.path.join(folder_path, file_name)
        data_temp = read_netcdf(saved_file_path)
        if i == 0:
            data = data_temp.copy()
        else:
            data = data.merge(data_temp, on=["latitude", "longitude"], how="outer")

    # Guarda horário do arquivo na coluna
    data["horario"] = pendulum.from_format(
        datetime_save, "YYYYMMDD HHmmss"
    ).to_time_string()

    print(f"Final df: {data.head()}")
    # Fixa ordem das colunas
    data = data[["longitude", "latitude", "horario"] + [i.lower() for i in variable]]
    print("cols", data.columns)

    file_name = files[0].split("_variable-")[0]
    print(f"\n\n[DEGUB]: Saving {file_name} on {output_path}\n")
    print(f"Data_save: {date_save}, time_save: {time_save}")
    # log(f"\n\n[DEGUB]: Saving {file_name} on {parquet_path}\n\n")
    # log(f"Data_save: {date_save}, time_save: {time_save}")
    file_path = os.path.join(partitions_path, f"{file_name}.csv")
    data.to_csv(file_path, index=False)
    return output_path, file_path

Read all nc or tif files and save them in a unique file inside a partition

def upload_image_to_api(info: dict, save_image_path: pathlib.Path)
Expand source code
def upload_image_to_api(info: dict, save_image_path: Path):
    """
    Upload image to api
    """
    username = "your-username"
    password = "your-password"

    image = base64.b64encode(open(save_image_path, "rb").read()).decode()

    response = requests.post(
        "https://api.example.com/upload-image",
        data={"image": image, "timestamp": info["datetime_save"]},
        auth=(username, password),
    )

    if response.status_code == 200:
        print("Image sent to API")
    else:
        print("Problem senting imagem to API")

Upload image to api