Module pipelines.rj_segovi.dump_ftp_adm_processorio_sicop.tasks
Tasks to dump data from a SICOP FTP to BigQuery
Functions
def download_files(client, files, save_path)
-
Expand source code
@task def download_files(client, files, save_path): """ Download files from FTP """ save_path = Path(save_path) save_path.mkdir(parents=True, exist_ok=True) client.connect() files_to_parse = [] for file in files: file_path = save_path / file if not file_path.exists(): client.download(remote_path=file, local_path=file_path) log( f"downloaded: {file_path}", ) else: log( f"already exists: {file_path}", ) files_to_parse.append(file_path) log(f"files_to_parse: {files_to_parse}") return files_to_parse
Download files from FTP
def get_files_to_download(client, pattern, dataset_id, table_id, date_format)
-
Expand source code
@task(nout=2) def get_files_to_download(client, pattern, dataset_id, table_id, date_format): """ Get files to download FTP and GCS """ client.connect() files = client.list_files(path=".", pattern=pattern) log(f"files: {files}") blobs = get_storage_blobs(dataset_id, table_id) storage_partitions_dict = parser_blobs_to_partition_dict(blobs) storage_pattern_files = [ date.replace("-", "") if date is not None else None for date in storage_partitions_dict.get("data_particao") or [None] ] log(f"storage_pattern_files: {storage_pattern_files}") files_to_download = files.copy() for file in files: for storage_pattern in storage_pattern_files: if storage_pattern == file.split("_")[1]: files_to_download.remove(file) log(f"files to download: {files_to_download}") download_data = files_to_download != [] return files_to_download, download_data
Get files to download FTP and GCS
def get_ftp_client(wait=None)
-
Expand source code
@task def get_ftp_client(wait=None): """ Get FTP client """ siscob_secret = get_vault_secret("sicop") hostname = siscob_secret["data"]["hostname"] username = siscob_secret["data"]["username"] password = siscob_secret["data"]["password"] return FTPClient( hostname=hostname, username=username, password=password, )
Get FTP client
def parse_save_dataframe(files, save_path, pattern)
-
Expand source code
@task def parse_save_dataframe(files, save_path, pattern): """ Parse and save files from FTP """ save_path = Path(save_path) save_path.mkdir(parents=True, exist_ok=True) if pattern == "ARQ2001": columns = { "orgao_transcritor": 9, "codigo_sici": 7, "numero_processo": 15, "tipo_processo": 12, "numero_documento_identidade": 15, "tipo_documento_identidade": 3, "descricao_tipo_documento_identidade": 31, "requerente": 51, "data_processo": 11, "codigo_assunto": 6, "descricao_assunto": 51, "data_cadastro": 11, "assunto_complementar": 225, "matricula_digitador": 9, "prazo_cadastro": 26, } elif pattern == "ARQ2296": columns = { "codigo_orgao": 8, "codigo_sici": 6, "tipo_documento": 2, "data_cadastro": 10, "numero_documento": 14, "requerente": 60, "codigo_assunto": 5, "descricao_assunto": 50, "data_tramitacao": 10, "orgao_destino": 8, "codigo_despacho": 5, "data_inicio": 10, "data_fim": 10, "orgao_inicio": 8, "orgao_fim": 8, "dias_parados": 6, "matricula_digitador": 8, "opcao": 1, "descricao_relatorio": 16, "filler": 3, "orgao_responsavel": 8, "informacao_complementar": 256, } else: raise "Pattern not found" table_columns = list(columns.keys()) widths_columns = list(columns.values()) for file in files: if file.stat().st_size != 0: dataframe = pd.read_fwf( file, encoding="cp1251", widths=widths_columns, header=None, ) dataframe.columns = table_columns for col in dataframe.columns: dataframe[col] = dataframe[col].astype(str).str.replace(";", "") file_original_name = str(file).split("/")[-1] data_hora = file_original_name.split("_")[1] + file_original_name.split( "_" )[2].replace(".TXT", "") dataframe.insert(0, "data_arquivo", data_hora) dataframe["data_arquivo"] = pd.to_datetime(dataframe["data_arquivo"]) dataframe, date_partition_columns = parse_date_columns( dataframe=dataframe, partition_date_column="data_arquivo" ) log(f"dataframe: {dataframe.head()}") to_partitions( data=dataframe, partition_columns=date_partition_columns, savepath=save_path, data_type="csv", ) log(f"parsed and saved: {file}") else: log(f"File has no data : {file}") return save_path
Parse and save files from FTP