Module pipelines.rj_smtr.registros_ocr_rir.tasks
Tasks for registros_ocr_rir
Functions
def download_and_save_local(file_info: list)
-
Expand source code
@task def download_and_save_local(file_info: list): """Download files from FTP Args: file_info (list): containing dicts representing each file found. Returns: dict: updated file info with the local path for the downloaded file. """ dataset_id = constants.RIR_DATASET_ID.value table_id = constants.RIR_TABLE_ID.value ftp_client = connect_ftp(secret_path=constants.RIR_SECRET_PATH.value, secure=False) for info in file_info: filepath = f"{dataset_id}/{table_id}/{info['partitions']}/{info['filename']}" Path(filepath).parent.mkdir(exist_ok=True, parents=True) # write file to <filepath> with open(filepath, "wb") as file: ftp_client.retrbinary( "RETR " + info["filename"], file.write, ) info["filepath"] = filepath log(f"Updated file info:\n{info}") ftp_client.quit() return file_info
Download files from FTP
Args
file_info
:list
- containing dicts representing each file
found.
Returns
dict
- updated file info with the local path for the downloaded
file.
def get_files_from_ftp(dump: bool = False, execution_time: str = None, wait=None)
-
Expand source code
@task def get_files_from_ftp( dump: bool = False, execution_time: str = None, wait=None ): # pylint: disable=W0613 """Search FTP for files created in the same minute as the capture time. Args: dump (bool, optional): if True will dump all files found on the FTP. Defaults to False. execution_time (str, optional): optionally, search for a file created at a given minute. Defaults to None. wait (optional): used to create an upstream dependency with a previous task. Returns: dict: 'capture' is a flag for skipping tasks if no files were found, 'file_info' is the info for processing the captured file """ if execution_time: execution_time = datetime.fromisoformat(execution_time) - timedelta(minutes=1) else: execution_time = pendulum.now(constants.TIMEZONE.value).replace( tzinfo=None, second=0, microsecond=0 ) - timedelta(minutes=1) start_date = datetime.fromisoformat(constants.RIR_START_DATE.value) file_info = [] try: ftp_client = connect_ftp( secret_path=constants.RIR_SECRET_PATH.value, secure=False ) files = { file: datetime.strptime( file.split(".")[0].split("_")[1], "%Y%m%d%H%M%S" ).replace(second=0, microsecond=0) for file, info in ftp_client.mlsd() if file.startswith("ocr") } if not dump: for file, created_time in files.items(): if execution_time == created_time: file_info.append( { "filename": file, "created_time": created_time, "partitions": f"data={created_time.date()}/hora={created_time.hour}", } ) else: file_info = [ { "filename": file, "created_time": created_time, "partitions": f"data={created_time.date()}/hora={created_time.hour}", } for file, created_time in files.items() if created_time >= start_date ] log(f"Found {len(file_info)} files:\n{file_info}") except Exception as error: # pylint: disable=W0703 message = f""" @here Rock In Rio 2022 Captura falhou com erro: {error} """ log_critical(message) # add flag para skipar proximas tasks se não tiver arquivo return {"capture": bool(file_info), "file_info": file_info}
Search FTP for files created in the same minute as the capture time.
Args
dump
:bool
, optional- if True will dump all files found on the FTP.
- Defaults to False.
execution_time
:str
, optional- optionally, search for a file created
- at a given minute. Defaults to None.
wait
:optional
- used to create an upstream dependency with a previous
task.
Returns
dict
- 'capture' is a flag for skipping tasks if no files were found,
'file_info' is the info for processing the captured file
def pre_treatment_ocr(file_info: list)
-
Expand source code
@task def pre_treatment_ocr(file_info: list): """Standardize columns Args: file_info (list): containing dicts representing each file found. Returns: str: path to the table folder containing partitioned files """ primary_cols = constants.RIR_OCR_PRIMARY_COLUMNS.value secondary_cols = constants.RIR_OCR_SECONDARY_COLUMNS.value standard_cols = dict(primary_cols, **secondary_cols) log(f"Standard columns are:{standard_cols}") # Initialize variable for skipping next task skip_upload = True for info in file_info: log(f'open file {info["filepath"]}') data = pd.read_csv(info["filepath"], sep=";") log( f""" Received data: {data[:50]} with columns: {data.columns.to_list()} """ ) # if data is empty, we don't upload an empty file if data.empty: # delete the empty file log("Data was empty, deleting it...") Path(info["filepath"]).unlink(missing_ok=True) continue data["datahora"] = pd.to_datetime(data["DATA"] + " " + data["HORA"]) log(f"Created column datahora as:\n{data['datahora']}") for col, new_col in secondary_cols.items(): if col not in data.columns: log(f"Add empty column {col} to data") data[new_col] = "" data = data.rename(columns=standard_cols)[list(standard_cols.values())] data.to_csv(info["filepath"], index=False) # if non-empty data was written to a file, then upload it skip_upload = False table_dir = f"{constants.RIR_DATASET_ID.value}/{constants.RIR_TABLE_ID.value}" return {"skip_upload": skip_upload, "table_dir": table_dir}
Standardize columns
Args
file_info
:list
- containing dicts representing each file
found.
Returns
str
- path to the table folder containing partitioned files