Module pipelines.rj_escritorio.inea.tasks
Tasks for INEA.
Functions
def convert_vol_file(downloaded_file: str,
output_format: str = 'HDF5',
convert_params: str = '-k=ODIM2.1 -M=All') ‑> List[str]-
Expand source code
@task def convert_vol_file( downloaded_file: str, output_format: str = "HDF5", convert_params: str = "-k=ODIM2.1 -M=All", ) -> List[str]: """ Convert VOL files to NetCDF using the `volconvert` CLI tool. For output_format = "NetCDF" convert_params must be "-f=Whole -k=CFext -r=Short -p=Radar -M=All -z" For output_format = "HDF5" convert_params must be "-k=ODIM2.1 -M=All" for all products Args: output_format (str): "NetCDF" or "HDF5" """ # Run volconvert log(f"Converting file {downloaded_file} to {output_format}...") command = ( f'/opt/edge/bin/volconvert {downloaded_file} "{output_format}.' + "{" + convert_params + '}"' ) log(f"Running command: {command}") child = pexpect.spawn(command) try: log(f"before expect {str(child)}") # Look for the "OutFiles:..." row and get only that row child.expect("OutFiles:(.*)\n") # Get the output file name log(f"after expect {str(child)}") converted_file = child.match.group(1).decode("utf-8").strip() log(f"after match.group expect {str(child)}") # Log the output file name log(f"Output file: {converted_file}") # Go to the end of the command log child.expect(pexpect.EOF) except Exception as exc: # Log the error log(child.before.decode("utf-8")) raise exc # Delete the VOL file Path(downloaded_file).unlink() # Return the name of the converted file return converted_file
Convert VOL files to NetCDF using the
volconvert
CLI tool. For output_format = "NetCDF" convert_params must be "-f=Whole -k=CFext -r=Short -p=Radar -M=All -z" For output_format = "HDF5" convert_params must be "-k=ODIM2.1 -M=All" for all productsArgs
output_format
:str
- "NetCDF" or "HDF5"
def execute_shell_command(command: str,
stdout_callback: Callable = <function log>,
stderr_callback: Callable = functools.partial(<function log>, level='error'))-
Expand source code
@task def execute_shell_command( command: str, stdout_callback: Callable = log, stderr_callback: Callable = partial(log, level="error"), ): """ Executes a shell command and logs output """ # pylint: disable=consider-using-with popen = subprocess.Popen( command, shell=True, stdout=subprocess.PIPE, universal_newlines=True, stderr=subprocess.PIPE, ) for stdout_line in iter(popen.stdout.readline, ""): stdout_callback(stdout_line) for stderr_line in iter(popen.stderr.readline, ""): stderr_callback(stderr_line) popen.stdout.close() return_code = popen.wait() if return_code: log(f"Command {command} failed with return code {return_code}", "error") else: log(f"Command {command} executed successfully")
Executes a shell command and logs output
def fetch_vol_file(remote_file: str,
radar: str,
output_directory: str = '/var/escritoriodedados/temp/')-
Expand source code
@task( max_retries=2, retry_delay=timedelta(seconds=30), ) def fetch_vol_file( remote_file: str, radar: str, output_directory: str = "/var/escritoriodedados/temp/", ): """ Fetch files from INEA server Args: remote_file (str): Remote file to be fetched radar (str): Radar name. Must be `gua` or `mac` output_directory (str): Directory where the files will be saved """ # Create vars based on radar name if radar == "gua": env_variable = "INEA_SSH_PASSWORD" hostname = "a9921" elif radar == "mac": env_variable = "INEA_MAC_SSH_PASSWORD" hostname = "a9915" # APAGAR LOG log(f"Radar: {radar} env {env_variable}") # Get SSH password from env ssh_password = getenv(env_variable) # Open SSH client ssh_client = SSHClient() ssh_client.load_system_host_keys() ssh_client.connect( hostname=hostname, username="root", password=ssh_password, timeout=300, auth_timeout=300, banner_timeout=300, ) # Open SCP client scp = SCPClient(ssh_client.get_transport(), sanitize=lambda x: x) # Fetch VOL file scp.get(remote_file, local_path=str(output_directory)) # Close connection scp.close() # Return local file path return Path(output_directory) / remote_file.split("/")[-1]
Fetch files from INEA server
Args
remote_file
:str
- Remote file to be fetched
radar
:str
- Radar name. Must be
gua
ormac
output_directory
:str
- Directory where the files will be saved
def list_vol_files(bucket_name: str,
prefix: str,
radar: str,
product: str,
date: str = None,
greater_than: str = None,
get_only_last_file: bool = True,
mode: str = 'prod',
output_directory: str = '/var/escritoriodedados/temp/',
vols_remote_directory: str = '/var/opt/edge/vols') ‑> Tuple[List[str], str]-
Expand source code
@task( nout=2, max_retries=2, retry_delay=timedelta(seconds=10), ) # pylint: disable=too-many-arguments,too-many-locals, too-many-branches def list_vol_files( bucket_name: str, prefix: str, radar: str, product: str, date: str = None, greater_than: str = None, get_only_last_file: bool = True, mode: str = "prod", output_directory: str = "/var/escritoriodedados/temp/", vols_remote_directory: str = "/var/opt/edge/vols", ) -> Tuple[List[str], str]: """ List files from INEA server Args: product (str): "ppi" date (str): Date of the files to be fetched (e.g. 20220125) greater_than (str): Fetch files with a date greater than this one less_than (str): Fetch files with a date less than this one output_directory (str): Directory where the files will be saved radar (str): Radar name. Must be `gua` or `mac` get_only_last_file (bool): Treat only the last file available How to use: to get real time data: let `greater_than` and `date` as None and `get_only_last_file` as True This will prevent the flow to be stucked treating all files when something happend and stoped the flow. Otherwise the flow will take a long time to treat all files and came back to real time. to fill missing files up to two days ago: let `greater_than` and `date` as None and `get_only_last_file` as False for backfill or to fill missing files for dates greather than two days ago: add a `greater_than` date and let `date` as None and `get_only_last_file` as False get all files for one day let `greater_than` as None and `get_only_last_file` as False and fill `date` """ # If none of `date`, `greater_than` are provided, find blob with the latest date if date is None: # First, we build the search prefix search_prefix = f"{prefix}/radar={radar}/produto={product}" if greater_than is None: log("No date or greater_than provided. Finding latest blob...") # Then, we add the current date partition current_date = datetime.now() current_date_str = current_date.strftime("%Y-%m-%d") today_blobs = list_blobs_with_prefix( bucket_name=bucket_name, prefix=f"{search_prefix}/data_particao={current_date_str}", mode=mode, ) log( f"Searched for blobs with prefix {search_prefix}/data_particao={current_date_str}" ) # Next, we get past day blobs past_date = current_date - timedelta(days=1) past_date_str = past_date.strftime("%Y-%m-%d") past_blobs = list_blobs_with_prefix( bucket_name=bucket_name, prefix=f"{search_prefix}/data_particao={past_date_str}", mode=mode, ) log( f"Searched for blobs with prefix {search_prefix}/data_particao={past_date_str}" ) # Then, we merge the two lists blobs = today_blobs + past_blobs else: blobs = list_blobs_with_prefix( bucket_name=bucket_name, prefix=f"{search_prefix}/", mode=mode, ) # Now, we sort it by `blob.name` blobs.sort(key=lambda blob: blob.name) # Get only the filenames datalake_files = [blob.name.split("/")[-1] for blob in blobs] # Format of the name is 9921GUA-20221017-070010-PPIVol-0000.nc.gz # We need to join 20221017 and 070010 # Format of the name is 9921GUA-PPIVol-20220930-121010-0004.hdf # We need to join 20220930 and 121010 datalake_files = [fname.replace("-PPIVol", "") for fname in datalake_files] datalake_files = [ fname.split("-")[1] + fname.split("-")[2] for fname in datalake_files ] # Define greater_than depending if you want last file or fill missing files if greater_than is None: if get_only_last_file: # Finally, we get the latest date greater_than = datalake_files[-1][:8] else: # Keep all files since last day to be compared with remote files greater_than = past_date_str.replace("-", "") log(f"Latest blob date: {greater_than}") # Creating temporary directory if date: output_directory_path = Path(output_directory) / date else: output_directory_path = Path(output_directory) / f"greaterthan-{greater_than}" output_directory_path.mkdir(parents=True, exist_ok=True) log(f"Temporary directory created: {output_directory_path}") # Create vars based on radar name if radar == "gua": env_variable = "INEA_SSH_PASSWORD" hostname = "a9921" startswith = "9921GUA" elif radar == "mac": dicionario = get_vault_secret("inea_mac_ssh_password") env_variable = dicionario["data"]["password"] hostname = "a9915" startswith = "9915MAC" # Get SSH password from env ssh_password = getenv(env_variable) # Open SSH client ssh_client = SSHClient() ssh_client.load_system_host_keys() ssh_client.connect( hostname=hostname, username="root", password=ssh_password, timeout=300, auth_timeout=300, banner_timeout=300, ) # List remote files log(f"Listing remote files for radar {startswith}...") if date: _, stdout, _ = ssh_client.exec_command( f"find {vols_remote_directory} -name '{startswith}{date}*.vol'" ) remote_files = stdout.read().decode("utf-8").splitlines() if len(remote_files) == 0: _, stdout, _ = ssh_client.exec_command( f"find {vols_remote_directory} -name '*{startswith}*.vol'" ) remote_files = stdout.read().decode("utf-8").splitlines() remote_files = [i[26:34] for i in remote_files] remote_files.sort() remote_files = set(remote_files) log( f"Remote dates identified when specified date was not found: {remote_files}" ) skip = Skipped(f"No files where found for date {date}") raise ENDRUN(state=skip) log(f"Remote files identified: {remote_files}") else: _, stdout, _ = ssh_client.exec_command( f"find {vols_remote_directory} -name '{startswith}*.vol'" ) all_files = stdout.read().decode("utf-8").splitlines() # Adjust greather_than if user didn't gave hour, minutes and seconds greater_than = greater_than.ljust(14, "0") remote_files = [ file for file in all_files if file.split("/")[-1][: len(greater_than) + 7] >= f"{startswith}{greater_than}" ] log(f"Remote files identified: {remote_files}") if get_only_last_file: remote_files.sort() remote_files = [remote_files[-1]] log(f"Last remote file: {remote_files}") else: # Remove from remote_files files that are already on datalake remote_files = [ item1 for item1 in remote_files if not any(item2 in item1 for item2 in datalake_files) ] log(f"Remote files missing on datalake: {remote_files}") # Stop flow if there is no new file if len(remote_files) == 0: skip = Skipped("No new available files") raise ENDRUN(state=skip) # Filter files with same filename filenames = set() filtered_remote_files = [] for file in remote_files: filename = file.split("/")[-1] log(f"filename split: {filename}") if filename not in filenames: filtered_remote_files.append(file) filenames.add(filename) remote_files = filtered_remote_files log(f"Found {len(remote_files)} files to be treated.") log(f"Remote files to be treated: {remote_files}") return remote_files, output_directory_path
List files from INEA server
Args
product
:str
- "ppi"
date
:str
- Date of the files to be fetched (e.g. 20220125)
greater_than
:str
- Fetch files with a date greater than this one
less_than
:str
- Fetch files with a date less than this one
output_directory
:str
- Directory where the files will be saved
radar
:str
- Radar name. Must be
gua
ormac
get_only_last_file
:bool
- Treat only the last file available
How to use: to get real time data: let
greater_than
anddate
as None andget_only_last_file
as True This will prevent the flow to be stucked treating all files when something happend and stoped the flow. Otherwise the flow will take a long time to treat all files and came back to real time. to fill missing files up to two days ago: letgreater_than
anddate
as None andget_only_last_file
as False for backfill or to fill missing files for dates greather than two days ago: add agreater_than
date and letdate
as None andget_only_last_file
as False get all files for one day letgreater_than
as None andget_only_last_file
as False and filldate
def print_environment_variables()
-
Expand source code
@task def print_environment_variables(): """ Print all environment variables """ log("Environment variables:") for key, value in environ.items(): log(f"{key}={value}")
Print all environment variables
def upload_file_to_gcs(converted_file: str,
bucket_name: str,
prefix: str,
radar: str,
product: str,
mode='prod',
task_mode='partitioned',
unlink: bool = True)-
Expand source code
@task( max_retries=3, retry_delay=timedelta(seconds=30), ) # pylint: disable=too-many-arguments, too-many-locals def upload_file_to_gcs( converted_file: str, bucket_name: str, prefix: str, radar: str, product: str, mode="prod", task_mode="partitioned", unlink: bool = True, ): """ Upload files to GCS """ credentials = get_credentials_from_env(mode=mode) storage_client = storage.Client(credentials=credentials) bucket = storage_client.bucket(bucket_name) file = Path(converted_file) if file.is_file(): if task_mode == "partitioned": # Converted file path is in the format: # /var/opt/edge/.../YYYYMMDD/<filename>.nc.gz # We need to get the datetime for the file date_str = file.parent.name date = datetime.strptime(date_str, "%Y%m%d").strftime("%Y-%m-%d") blob_name = f"{prefix}/radar={radar}/produto={product}/data_particao={date}/{file.name}" blob_name = blob_name.replace("//", "/") elif task_mode == "raw": blob_name = f"{prefix}/{file.name}" else: raise ValueError(f"Invalid task_mode: {task_mode}") log(f"Uploading file {file} to GCS...") log(f"Blob name will be {blob_name}") blob = bucket.blob(blob_name) blob.upload_from_filename(file) log(f"File {file} uploaded to GCS.") if unlink: file.unlink()
Upload files to GCS