Module pipelines.rj_escritorio.dump_ftp_inea.tasks

Tasks to dump data from a INEA FTP to BigQuery

Functions

def download_files(client, files, radar) ‑> List[str]

Download files from FTP

def get_files_datalake(bucket_name: str, prefix: str, radar: str, product: str, date: str = None, greater_than: str = None, check_datalake_files: bool = True, mode: str = 'prod', wait=None) ‑> List[str]

List files from INEA saved on datalake

Args

product : str
"ppi"
date : str
Date of the files to be fetched (e.g. 2022-01-25)
greater_than : str
Fetch files with a date greater than this one (e.g. 2022-01-25)
radar : str
Radar name. Must be gua or mac
get_only_last_file : bool
Treat only the last file available

How to use: to get real time data: let greater_than and date as None and get_only_last_file as True This will prevent the flow to be stucked treating all files when something happend and stoped the flow. Otherwise the flow will take a long time to treat all files and came back to real time. to fill missing files up to two days ago: let greater_than and date as None and get_only_last_file as False for backfill or to fill missing files for dates greather than two days ago: add a greater_than date and let date as None and get_only_last_file as False get all files for one day let greater_than as None and get_only_last_file as False and fill date

def get_files_from_ftp(client, radar: str) ‑> List[str]

List and get files to download FTP

def get_ftp_client(wait=None)

Get FTP client

def select_files_to_download(files: list, redis_files: list, datalake_files: list, date: str = None, greater_than: str = None, get_only_last_file: bool = True) ‑> List[str]

Select files to download

Args

radar : str
Radar name. Must be gua or mac
redis_files : list
List with last files saved on GCP and redis
datalake_files : list
List with filenames saved on GCP
date : str
Date of the files to be fetched (e.g. 2022-01-25)
greater_than : str
Fetch files with a date greater than this one (e.g. 2022-01-25)
get_only_last_file : bool
Treat only the last file available

How to use: to get real time data: let greater_than and date as None and get_only_last_file as True This will prevent the flow to be stucked treating all files when something happend and stoped the flow. Otherwise the flow will take a long time to treat all files and came back to real time. to fill missing files up to two days ago: let greater_than and date as None and get_only_last_file as False for backfill or to fill missing files for dates greather than two days ago: add a greater_than date and let date as None and get_only_last_file as False get all files for one day let greater_than as None and get_only_last_file as False and fill date

def upload_file_to_gcs(file_to_upload: str, bucket_name: str, prefix: str, radar: str, product: str, mode='prod', task_mode='partitioned', unlink: bool = True)

Upload files to GCS