Module pipelines.rj_escritorio.inea.tasks

Tasks for INEA.

Functions

def convert_vol_file(downloaded_file: str, output_format: str = 'HDF5', convert_params: str = '-k=ODIM2.1 -M=All') ‑> List[str]

Convert VOL files to NetCDF using the volconvert CLI tool. For output_format = "NetCDF" convert_params must be "-f=Whole -k=CFext -r=Short -p=Radar -M=All -z" For output_format = "HDF5" convert_params must be "-k=ODIM2.1 -M=All" for all products

Args

output_format : str
"NetCDF" or "HDF5"
def execute_shell_command(command: str, stdout_callback: Callable = <function log>, stderr_callback: Callable = functools.partial(<function log>, level='error'))

Executes a shell command and logs output

def fetch_vol_file(remote_file: str, radar: str, output_directory: str = '/var/escritoriodedados/temp/')

Fetch files from INEA server

Args

remote_file : str
Remote file to be fetched
radar : str
Radar name. Must be gua or mac
output_directory : str
Directory where the files will be saved
def list_vol_files(bucket_name: str, prefix: str, radar: str, product: str, date: str = None, greater_than: str = None, get_only_last_file: bool = True, mode: str = 'prod', output_directory: str = '/var/escritoriodedados/temp/', vols_remote_directory: str = '/var/opt/edge/vols') ‑> Tuple[List[str], str]

List files from INEA server

Args

product : str
"ppi"
date : str
Date of the files to be fetched (e.g. 20220125)
greater_than : str
Fetch files with a date greater than this one
less_than : str
Fetch files with a date less than this one
output_directory : str
Directory where the files will be saved
radar : str
Radar name. Must be gua or mac
get_only_last_file : bool
Treat only the last file available

How to use: to get real time data: let greater_than and date as None and get_only_last_file as True This will prevent the flow to be stucked treating all files when something happend and stoped the flow. Otherwise the flow will take a long time to treat all files and came back to real time. to fill missing files up to two days ago: let greater_than and date as None and get_only_last_file as False for backfill or to fill missing files for dates greather than two days ago: add a greater_than date and let date as None and get_only_last_file as False get all files for one day let greater_than as None and get_only_last_file as False and fill date

def print_environment_variables()

Print all environment variables

def upload_file_to_gcs(converted_file: str, bucket_name: str, prefix: str, radar: str, product: str, mode='prod', task_mode='partitioned', unlink: bool = True)

Upload files to GCS