Module pipelines.rj_smtr.br_rj_riodejaneiro_rdo.tasks

Tasks for br_rj_riodejaneiro_rdo

Functions

def check_files_for_download(files: list, dataset_id: str, table_id: str)

Check redis for files already downloaded from the FTP

Args

files : list
file informations gathered from FTP
dataset_id : str
dataset_id on BigQuery
table_id : str
table_id on BigQuery

Returns

list
Containing the info on the files to download
def download_and_save_local_from_ftp(file_info: dict)

Downloads file from FTP and saves to data/raw//.

def get_file_paths_from_ftp(transport_mode: str, report_type: str, wait=None, dump=False)

Search for files inside previous interval (days) from current date, get filename and partitions (from filename) on FTP client.

def get_rdo_date_range(dataset_id: str, table_id: str, mode: str = 'prod')

Get date range for RDO/RHO materialization

Args

dataset_id : str
dataset_id on BigQuery
table_id : str
table_id on BigQuery
mode : str, optional
mode to materialize to.

Accepted options are 'dev' and 'prod'. Defaults to "prod".

Returns

dict
containing 'date_range_start' and 'date_range_end' keys
def pre_treatment_br_rj_riodejaneiro_rdo(files: list, divide_columns_by: int = 100) ‑> tuple

Adds header, capture_time and standardize columns

Args

file_info : dict
information for the files found in the current run
divide_columns_by : int, optional
value which to divide numeric columns.

Defaults to 100.

Returns

dict
updated file_info with treated filepath
def update_rdo_redis(download_files: list, table_id: str, dataset_id: str = 'br_rj_riodejaneiro_rdo', errors=None, wait=None)

Update files downloaded to redis, if uploaded correctly.

Args

download_files : list
information on the downloaded files
table_id : str
table_id on BigQuery
dataset_id : str, optional
dataset_id on BigQuery.
Defaults to constants.RDO_DATASET_ID.value.
errors : list, optional
list of errors. Defaults to None.
wait : Any, optional
wait for task before run. Defaults to None.

Returns

bool
if redis key was set