Module pipelines.rj_escritorio.dump_ftp_inea.tasks
Tasks to dump data from a INEA FTP to BigQuery
Functions
def download_files(client, files, radar) ‑> List[str]
-
Expand source code
@task(max_retries=3, retry_delay=timedelta(seconds=30)) def download_files(client, files, radar) -> List[str]: """ Download files from FTP """ save_path = Path(radar.upper()) save_path.mkdir(parents=True, exist_ok=True) client.connect() files_downloaded = [] for file in files: log(f"Downloading file: {file}") # file_path = save_path / file file_path = file client.download(remote_path=file, local_path=file_path) files_downloaded.append(file_path) log(f"Downloaded: {files_downloaded}") file = Path(files_downloaded[0]) log(f"DEBUGGGG: {file.name.split('-')[2]}") return files_downloaded
Download files from FTP
def get_files_datalake(bucket_name: str,
prefix: str,
radar: str,
product: str,
date: str = None,
greater_than: str = None,
check_datalake_files: bool = True,
mode: str = 'prod',
wait=None) ‑> List[str]-
Expand source code
@task(nout=2, max_retries=2, retry_delay=timedelta(seconds=10)) # pylint: disable=too-many-arguments,too-many-locals, too-many-branches def get_files_datalake( bucket_name: str, prefix: str, radar: str, product: str, date: str = None, greater_than: str = None, check_datalake_files: bool = True, mode: str = "prod", wait=None, # pylint: disable=unused-argument ) -> List[str]: """ List files from INEA saved on datalake Args: product (str): "ppi" date (str): Date of the files to be fetched (e.g. 2022-01-25) greater_than (str): Fetch files with a date greater than this one (e.g. 2022-01-25) radar (str): Radar name. Must be `gua` or `mac` get_only_last_file (bool): Treat only the last file available How to use: to get real time data: let `greater_than` and `date` as None and `get_only_last_file` as True This will prevent the flow to be stucked treating all files when something happend and stoped the flow. Otherwise the flow will take a long time to treat all files and came back to real time. to fill missing files up to two days ago: let `greater_than` and `date` as None and `get_only_last_file` as False for backfill or to fill missing files for dates greather than two days ago: add a `greater_than` date and let `date` as None and `get_only_last_file` as False get all files for one day let `greater_than` as None and `get_only_last_file` as False and fill `date` """ if check_datalake_files: search_prefix = f"{prefix}/radar={radar}/produto={product}" # Get today's blobs if date: current_date = datetime.strptime(date, "%Y-%m-%d") else: current_date = datetime.now().date() if greater_than is None: past_date = current_date - timedelta(days=1) else: past_date = datetime.strptime(greater_than, "%Y-%m-%d") past_date = past_date.date() blobs = [] # Next, we get past day's blobs while past_date <= current_date: past_date_str = past_date.strftime("%Y-%m-%d") past_blobs = list_blobs_with_prefix( bucket_name=bucket_name, prefix=f"{search_prefix}/data_particao={past_date_str}", mode=mode, ) log( f"Searched for blobs with prefix {search_prefix}/data_particao={past_date_str}" ) # Then, we merge the two lists blobs += past_blobs past_date += timedelta(days=1) # Now, we sort it by `blob.name` blobs.sort(key=lambda blob: blob.name) # Get only the filenames datalake_files = [blob.name.split("/")[-1] for blob in blobs] # Format of the name is 9921GUA-PPIVol-20220930-121010-0004.hdf # We need remove the last characters to stay with 9921GUA-PPIVol-20220930-121010 datalake_files = ["-".join(fname.split("-")[:-1]) for fname in datalake_files] log(f"Last 10 datalake files: {datalake_files[-10:]}") else: datalake_files = [] log("This run is not considering datalake files") return datalake_files
List files from INEA saved on datalake
Args
product
:str
- "ppi"
date
:str
- Date of the files to be fetched (e.g. 2022-01-25)
greater_than
:str
- Fetch files with a date greater than this one (e.g. 2022-01-25)
radar
:str
- Radar name. Must be
gua
ormac
get_only_last_file
:bool
- Treat only the last file available
How to use: to get real time data: let
greater_than
anddate
as None andget_only_last_file
as True This will prevent the flow to be stucked treating all files when something happend and stoped the flow. Otherwise the flow will take a long time to treat all files and came back to real time. to fill missing files up to two days ago: letgreater_than
anddate
as None andget_only_last_file
as False for backfill or to fill missing files for dates greather than two days ago: add agreater_than
date and letdate
as None andget_only_last_file
as False get all files for one day letgreater_than
as None andget_only_last_file
as False and filldate
def get_files_from_ftp(client, radar: str) ‑> List[str]
-
Expand source code
@task(max_retries=3, retry_delay=timedelta(seconds=30)) # pylint: disable=too-many-arguments def get_files_from_ftp( client, radar: str, ) -> List[str]: """ List and get files to download FTP """ client.connect() files = client.list_files(path=f"./{radar.upper()}/") # Skip task if there is no new file on FTP if len(files) == 0: log("No new available files on FTP") skip = Skipped("No new available files on FTP") raise ENDRUN(state=skip) log(f"Last 10 files on FTP: {files[-10:]} {len(files)}") log(f"files on FTP: {files}") return files
List and get files to download FTP
def get_ftp_client(wait=None)
-
Expand source code
@task def get_ftp_client(wait=None): """ Get FTP client """ inea_secret = get_vault_secret("ftp_inea_radar") hostname = inea_secret["data"]["hostname"] username = inea_secret["data"]["username"] password = inea_secret["data"]["password"] return FTPClient(hostname=hostname, username=username, password=password)
Get FTP client
def select_files_to_download(files: list,
redis_files: list,
datalake_files: list,
date: str = None,
greater_than: str = None,
get_only_last_file: bool = True) ‑> List[str]-
Expand source code
@task(max_retries=3, retry_delay=timedelta(seconds=30)) # pylint: disable=too-many-arguments def select_files_to_download( files: list, redis_files: list, datalake_files: list, date: str = None, greater_than: str = None, get_only_last_file: bool = True, ) -> List[str]: """ Select files to download Args: radar (str): Radar name. Must be `gua` or `mac` redis_files (list): List with last files saved on GCP and redis datalake_files (list): List with filenames saved on GCP date (str): Date of the files to be fetched (e.g. 2022-01-25) greater_than (str): Fetch files with a date greater than this one (e.g. 2022-01-25) get_only_last_file (bool): Treat only the last file available How to use: to get real time data: let `greater_than` and `date` as None and `get_only_last_file` as True This will prevent the flow to be stucked treating all files when something happend and stoped the flow. Otherwise the flow will take a long time to treat all files and came back to real time. to fill missing files up to two days ago: let `greater_than` and `date` as None and `get_only_last_file` as False for backfill or to fill missing files for dates greather than two days ago: add a `greater_than` date and let `date` as None and `get_only_last_file` as False get all files for one day let `greater_than` as None and `get_only_last_file` as False and fill `date` """ # log(f"\n\nAvailable files on FTP: {files}") # log(f"\nFiles already saved on redis_files: {redis_files}") # Files obtained direct from INEA ends with 0000 as "9915MAC-PPIVol-20230921-123000-0000.hdf" # Files from FTP ends with an alphanumeric string as "9915MAC-PPIVol-20230921-142000-54d4.hdf" # We need to be careful when changing one pipeline to other # Get specific files based on date and greater_than parameters if date: files = [file for file in files if file.split("-")[2] == date.replace("-", "")] log(f"Last 10 files on FTP for date {date}: {files[-10:]}") if greater_than: files = [ file for file in files if file.split("-")[2] >= greater_than.replace("-", "") ] log( f"Last 10 files on FTP for date greater than {greater_than}: {files[-10:]}" ) # Check if files are already on redis files = [file for file in files if file not in redis_files] log(f"Last 10 files on FTP that are not on redis: {files[-10:]}") # Check if files are already on datalake # Some datalake files use the pattern "9915MAC-PPIVol-20230921-123000-0000.hdf" # Files from FTP use the pattern "./MAC/9915MAC-PPIVol-20230921-123000-3f28.hdf" # We are going to compare "9915MAC-PPIVol-20230921-123000" from both places if len(datalake_files) > 0: log("Removing files that are already on datalake") files = [ file for file in files if "-".join(file.split("/")[-1].split("-")[:-1]) not in datalake_files ] # Skip task if there is no new file if len(files) == 0: log("No new available files") skip = Skipped("No new available files") raise ENDRUN(state=skip) files.sort() if get_only_last_file: files = [files[-1]] log(f"\nFiles to be downloaded: {files}") return files
Select files to download
Args
radar
:str
- Radar name. Must be
gua
ormac
redis_files
:list
- List with last files saved on GCP and redis
datalake_files
:list
- List with filenames saved on GCP
date
:str
- Date of the files to be fetched (e.g. 2022-01-25)
greater_than
:str
- Fetch files with a date greater than this one (e.g. 2022-01-25)
get_only_last_file
:bool
- Treat only the last file available
How to use: to get real time data: let
greater_than
anddate
as None andget_only_last_file
as True This will prevent the flow to be stucked treating all files when something happend and stoped the flow. Otherwise the flow will take a long time to treat all files and came back to real time. to fill missing files up to two days ago: letgreater_than
anddate
as None andget_only_last_file
as False for backfill or to fill missing files for dates greather than two days ago: add agreater_than
date and letdate
as None andget_only_last_file
as False get all files for one day letgreater_than
as None andget_only_last_file
as False and filldate
def upload_file_to_gcs(file_to_upload: str,
bucket_name: str,
prefix: str,
radar: str,
product: str,
mode='prod',
task_mode='partitioned',
unlink: bool = True)-
Expand source code
@task(max_retries=3, retry_delay=timedelta(seconds=30)) # pylint: disable=too-many-arguments, too-many-locals def upload_file_to_gcs( file_to_upload: str, bucket_name: str, prefix: str, radar: str, product: str, mode="prod", task_mode="partitioned", unlink: bool = True, ): """ Upload files to GCS """ credentials = get_credentials_from_env(mode=mode) storage_client = storage.Client(credentials=credentials) bucket = storage_client.bucket(bucket_name) file = Path(file_to_upload) if task_mode == "partitioned": log(f"DEBUG: {file} e {file.name}") date_str = file.name.split("-")[2] date = datetime.strptime(date_str, "%Y%m%d").strftime("%Y-%m-%d") blob_name = ( f"{prefix}/radar={radar}/produto={product}/data_particao={date}/{file.name}" ) blob_name = blob_name.replace("//", "/") elif task_mode == "raw": blob_name = f"{prefix}/{file.name}" log(f"Uploading file {file} to GCS...") log(f"Blob name will be {blob_name}") blob = bucket.blob(blob_name) blob.upload_from_filename(file) log(f"File {file} uploaded to GCS.") if unlink: file.unlink()
Upload files to GCS