Module pipelines.rj_smtr.br_rj_riodejaneiro_rdo.tasks
Tasks for br_rj_riodejaneiro_rdo
Functions
def check_files_for_download(files: list, dataset_id: str, table_id: str)
-
Check redis for files already downloaded from the FTP
Args
files
:list
- file informations gathered from FTP
dataset_id
:str
- dataset_id on BigQuery
table_id
:str
- table_id on BigQuery
Returns
list
- Containing the info on the files to download
def download_and_save_local_from_ftp(file_info: dict)
-
Downloads file from FTP and saves to data/raw/
/ . def get_file_paths_from_ftp(transport_mode: str, report_type: str, wait=None, dump=False)
-
Search for files inside previous interval (days) from current date, get filename and partitions (from filename) on FTP client.
def get_rdo_date_range(dataset_id: str, table_id: str, mode: str = 'prod')
-
Get date range for RDO/RHO materialization
Args
dataset_id
:str
- dataset_id on BigQuery
table_id
:str
- table_id on BigQuery
mode
:str
, optional- mode to materialize to.
Accepted options are 'dev' and 'prod'. Defaults to "prod".
Returns
dict
- containing 'date_range_start' and 'date_range_end' keys
def pre_treatment_br_rj_riodejaneiro_rdo(files: list, divide_columns_by: int = 100) ‑> tuple
-
Adds header, capture_time and standardize columns
Args
file_info
:dict
- information for the files found in the current run
divide_columns_by
:int
, optional- value which to divide numeric columns.
Defaults to 100.
Returns
dict
- updated file_info with treated filepath
def update_rdo_redis(download_files: list, table_id: str, dataset_id: str = 'br_rj_riodejaneiro_rdo', errors=None, wait=None)
-
Update files downloaded to redis, if uploaded correctly.
Args
download_files
:list
- information on the downloaded files
table_id
:str
- table_id on BigQuery
dataset_id
:str
, optional- dataset_id on BigQuery.
- Defaults to constants.RDO_DATASET_ID.value.
errors
:list
, optional- list of errors. Defaults to None.
wait
:Any
, optional- wait for task before run. Defaults to None.
Returns
bool
- if redis key was set