Module pipelines.rj_cor.meteorologia.radar.precipitacao.tasks
Tasks for setting rain dashboard using radar data.
Functions
def change_predict_rain_specs(files_to_model: list, destination_path: str) ‑> None
-
Expand source code
@task() def change_predict_rain_specs(files_to_model: list, destination_path: str) -> None: """ Change name of radar files inside json file """ json_file = ( "pipelines/rj_cor/meteorologia/radar/precipitacao/src/predict_rain_specs.json" ) with open(json_file, "r") as file: predict_specs = json.load(file) log("load") filenames = [destination_path + i.split("/")[-1] for i in files_to_model] log(f"filenames to save on json file: {filenames}") predict_specs["radar_ppi_hdfs"] = filenames log(f"predict_specs : {predict_specs}") with open(json_file, "w") as file: json.dump(predict_specs, file) # verifica arquivo with open(json_file, "r") as file: predict_specs_valida = json.load(file) log(f"predict_specs after updates: {predict_specs_valida}")
Change name of radar files inside json file
def download_files_storage(bucket_name: str, files_to_download: list, destination_path: str) ‑> None
-
Expand source code
@task() def download_files_storage( bucket_name: str, files_to_download: list, destination_path: str ) -> None: """ Realiza o download dos arquivos listados em files_to_download no bucket especificado """ os.makedirs(destination_path, exist_ok=True) for file in files_to_download: source_blob_name, destination_file_name = file, file.split("/")[-1] destination_file_name = Path(destination_path, destination_file_name) download_blob(bucket_name, source_blob_name, destination_file_name)
Realiza o download dos arquivos listados em files_to_download no bucket especificado
def get_filenames_storage(bucket_name: str = 'rj-escritorio-dev',
radar: str = 'gua',
files_saved_redis: list = []) ‑> list-
Expand source code
@task() def get_filenames_storage( bucket_name: str = "rj-escritorio-dev", radar: str = "gua", files_saved_redis: list = [], ) -> list: """Esc""" last_30min = pendulum.now("UTC").subtract(minutes=30).to_datetime_string() today = pendulum.now("UTC").format("YYYY-MM-DD") # Get data from yesterday if the date from last_30min is different from today log(f"day of last 30 min: {last_30min[:10]}") if today == last_30min[:10]: filter_partition_days = [today] else: filter_partition_days = [last_30min[:10], today] log(f"filter_partition_days: {filter_partition_days}") files_on_storage_list = [] for i in filter_partition_days: base_path = "raw/meio_ambiente_clima/inea_radar_hdf5/" prefix = base_path + f"radar={radar}/produto=ppi/data_particao={i}/" log(f"DEBUG prefix {prefix}") files_on_storage = list_blobs_with_prefix(bucket_name, prefix, delimiter=None) log(f"debug files on storage {files_on_storage}") files_on_storage_list.extend([blob.name for blob in files_on_storage]) log(f"debug files on storage list {files_on_storage_list[-3:]}") files_on_storage_list = list(set(files_on_storage_list)) files_on_storage_list.sort() files_on_storage_list = files_on_storage_list[-3:] files_saved_redis.sort() log(f"[DEBUG] Last radar files: {files_on_storage_list}") log(f"[DEBUG] Last redis files: {files_saved_redis}") # if we have the same files on redis and on radar, skip flow if files_on_storage_list == files_saved_redis: log("No available files on API") skip = Skipped("No available files on API") raise ENDRUN(state=skip) return files_on_storage_list
Esc
def run_model(wait=None) ‑> Tuple[pandas.core.frame.DataFrame, str | pathlib.Path]
-
Expand source code
@task( nout=2, max_retries=constants.TASK_MAX_RETRIES.value, retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value), ) def run_model(wait=None) -> Tuple[pd.DataFrame, Union[str, Path]]: """ Call a shell task to run model https://github.com/BioBD/sgwfc-gene-python/blob/7dadf7b854a7a37405ee203331671f8cd61b114b/workflow/modules.py """ log("[DEBUG] Start runing model") base_path = "pipelines/rj_cor/meteorologia/radar/precipitacao" data_path = f"{base_path}/predictions/" path = Path(data_path) if path.exists(): log("DEBUG predictions path exists") else: os.makedirs(data_path, exist_ok=True) dfr = run_model_prediction(base_path=base_path) log("[DEBUG] End runing model") return dfr
Call a shell task to run model https://github.com/BioBD/sgwfc-gene-python/blob/7dadf7b854a7a37405ee203331671f8cd61b114b/workflow/modules.py
def save_data(dfr: pandas.core.frame.DataFrame) ‑> str | pathlib.Path
-
Expand source code
@task def save_data(dfr: pd.DataFrame) -> Union[str, Path]: """ Save treated data in csv partitioned by date """ prepath = Path("/tmp/precipitacao_radar/") prepath.mkdir(parents=True, exist_ok=True) partition_column = "data_medicao" dataframe, partitions = parse_date_columns(dfr, partition_column) suffix = pd.to_datetime(dataframe[partition_column]).max().strftime("%Y%m%d%H%M%S") # Cria partições a partir da data to_partitions( data=dataframe, partition_columns=partitions, savepath=prepath, data_type="csv", suffix=suffix, ) log(f"[DEBUG] Files saved on {prepath}") return prepath
Save treated data in csv partitioned by date