Module pipelines.rj_escritorio.dump_ftp_inea.tasks
Tasks to dump data from a INEA FTP to BigQuery
Functions
def download_files(client, files, radar) ‑> List[str]
-
Download files from FTP
def get_files_datalake(bucket_name: str, prefix: str, radar: str, product: str, date: str = None, greater_than: str = None, check_datalake_files: bool = True, mode: str = 'prod', wait=None) ‑> List[str]
-
List files from INEA saved on datalake
Args
product
:str
- "ppi"
date
:str
- Date of the files to be fetched (e.g. 2022-01-25)
greater_than
:str
- Fetch files with a date greater than this one (e.g. 2022-01-25)
radar
:str
- Radar name. Must be
gua
ormac
get_only_last_file
:bool
- Treat only the last file available
How to use: to get real time data: let
greater_than
anddate
as None andget_only_last_file
as True This will prevent the flow to be stucked treating all files when something happend and stoped the flow. Otherwise the flow will take a long time to treat all files and came back to real time. to fill missing files up to two days ago: letgreater_than
anddate
as None andget_only_last_file
as False for backfill or to fill missing files for dates greather than two days ago: add agreater_than
date and letdate
as None andget_only_last_file
as False get all files for one day letgreater_than
as None andget_only_last_file
as False and filldate
def get_files_from_ftp(client, radar: str) ‑> List[str]
-
List and get files to download FTP
def get_ftp_client(wait=None)
-
Get FTP client
def select_files_to_download(files: list, redis_files: list, datalake_files: list, date: str = None, greater_than: str = None, get_only_last_file: bool = True) ‑> List[str]
-
Select files to download
Args
radar
:str
- Radar name. Must be
gua
ormac
redis_files
:list
- List with last files saved on GCP and redis
datalake_files
:list
- List with filenames saved on GCP
date
:str
- Date of the files to be fetched (e.g. 2022-01-25)
greater_than
:str
- Fetch files with a date greater than this one (e.g. 2022-01-25)
get_only_last_file
:bool
- Treat only the last file available
How to use: to get real time data: let
greater_than
anddate
as None andget_only_last_file
as True This will prevent the flow to be stucked treating all files when something happend and stoped the flow. Otherwise the flow will take a long time to treat all files and came back to real time. to fill missing files up to two days ago: letgreater_than
anddate
as None andget_only_last_file
as False for backfill or to fill missing files for dates greather than two days ago: add agreater_than
date and letdate
as None andget_only_last_file
as False get all files for one day letgreater_than
as None andget_only_last_file
as False and filldate
def upload_file_to_gcs(file_to_upload: str, bucket_name: str, prefix: str, radar: str, product: str, mode='prod', task_mode='partitioned', unlink: bool = True)
-
Upload files to GCS