Module pipelines.rj_escritorio.inea.tasks
Tasks for INEA.
Functions
def convert_vol_file(downloaded_file: str, output_format: str = 'HDF5', convert_params: str = '-k=ODIM2.1 -M=All') ‑> List[str]
-
Convert VOL files to NetCDF using the
volconvert
CLI tool. For output_format = "NetCDF" convert_params must be "-f=Whole -k=CFext -r=Short -p=Radar -M=All -z" For output_format = "HDF5" convert_params must be "-k=ODIM2.1 -M=All" for all productsArgs
output_format
:str
- "NetCDF" or "HDF5"
def execute_shell_command(command: str, stdout_callback: Callable = <function log>, stderr_callback: Callable = functools.partial(<function log>, level='error'))
-
Executes a shell command and logs output
def fetch_vol_file(remote_file: str, radar: str, output_directory: str = '/var/escritoriodedados/temp/')
-
Fetch files from INEA server
Args
remote_file
:str
- Remote file to be fetched
radar
:str
- Radar name. Must be
gua
ormac
output_directory
:str
- Directory where the files will be saved
def list_vol_files(bucket_name: str, prefix: str, radar: str, product: str, date: str = None, greater_than: str = None, get_only_last_file: bool = True, mode: str = 'prod', output_directory: str = '/var/escritoriodedados/temp/', vols_remote_directory: str = '/var/opt/edge/vols') ‑> Tuple[List[str], str]
-
List files from INEA server
Args
product
:str
- "ppi"
date
:str
- Date of the files to be fetched (e.g. 20220125)
greater_than
:str
- Fetch files with a date greater than this one
less_than
:str
- Fetch files with a date less than this one
output_directory
:str
- Directory where the files will be saved
radar
:str
- Radar name. Must be
gua
ormac
get_only_last_file
:bool
- Treat only the last file available
How to use: to get real time data: let
greater_than
anddate
as None andget_only_last_file
as True This will prevent the flow to be stucked treating all files when something happend and stoped the flow. Otherwise the flow will take a long time to treat all files and came back to real time. to fill missing files up to two days ago: letgreater_than
anddate
as None andget_only_last_file
as False for backfill or to fill missing files for dates greather than two days ago: add agreater_than
date and letdate
as None andget_only_last_file
as False get all files for one day letgreater_than
as None andget_only_last_file
as False and filldate
def print_environment_variables()
-
Print all environment variables
def upload_file_to_gcs(converted_file: str, bucket_name: str, prefix: str, radar: str, product: str, mode='prod', task_mode='partitioned', unlink: bool = True)
-
Upload files to GCS