Module pipelines.rj_segovi.dump_ftp_adm_processorio_sicop.tasks

Tasks to dump data from a SICOP FTP to BigQuery

Functions

def download_files(client, files, save_path)
Expand source code
@task
def download_files(client, files, save_path):
    """
    Download files from FTP
    """

    save_path = Path(save_path)
    save_path.mkdir(parents=True, exist_ok=True)

    client.connect()
    files_to_parse = []
    for file in files:
        file_path = save_path / file
        if not file_path.exists():
            client.download(remote_path=file, local_path=file_path)
            log(
                f"downloaded: {file_path}",
            )
        else:
            log(
                f"already exists: {file_path}",
            )
        files_to_parse.append(file_path)
    log(f"files_to_parse: {files_to_parse}")
    return files_to_parse

Download files from FTP

def get_files_to_download(client, pattern, dataset_id, table_id, date_format)
Expand source code
@task(nout=2)
def get_files_to_download(client, pattern, dataset_id, table_id, date_format):
    """
    Get files to download FTP and GCS
    """

    client.connect()
    files = client.list_files(path=".", pattern=pattern)
    log(f"files: {files}")

    blobs = get_storage_blobs(dataset_id, table_id)
    storage_partitions_dict = parser_blobs_to_partition_dict(blobs)
    storage_pattern_files = [
        date.replace("-", "") if date is not None else None
        for date in storage_partitions_dict.get("data_particao") or [None]
    ]
    log(f"storage_pattern_files: {storage_pattern_files}")

    files_to_download = files.copy()
    for file in files:
        for storage_pattern in storage_pattern_files:
            if storage_pattern == file.split("_")[1]:
                files_to_download.remove(file)

    log(f"files to download: {files_to_download}")

    download_data = files_to_download != []

    return files_to_download, download_data

Get files to download FTP and GCS

def get_ftp_client(wait=None)
Expand source code
@task
def get_ftp_client(wait=None):
    """
    Get FTP client
    """
    siscob_secret = get_vault_secret("sicop")
    hostname = siscob_secret["data"]["hostname"]
    username = siscob_secret["data"]["username"]
    password = siscob_secret["data"]["password"]

    return FTPClient(
        hostname=hostname,
        username=username,
        password=password,
    )

Get FTP client

def parse_save_dataframe(files, save_path, pattern)
Expand source code
@task
def parse_save_dataframe(files, save_path, pattern):
    """
    Parse and save files from FTP
    """

    save_path = Path(save_path)
    save_path.mkdir(parents=True, exist_ok=True)

    if pattern == "ARQ2001":
        columns = {
            "orgao_transcritor": 9,
            "codigo_sici": 7,
            "numero_processo": 15,
            "tipo_processo": 12,
            "numero_documento_identidade": 15,
            "tipo_documento_identidade": 3,
            "descricao_tipo_documento_identidade": 31,
            "requerente": 51,
            "data_processo": 11,
            "codigo_assunto": 6,
            "descricao_assunto": 51,
            "data_cadastro": 11,
            "assunto_complementar": 225,
            "matricula_digitador": 9,
            "prazo_cadastro": 26,
        }
    elif pattern == "ARQ2296":
        columns = {
            "codigo_orgao": 8,
            "codigo_sici": 6,
            "tipo_documento": 2,
            "data_cadastro": 10,
            "numero_documento": 14,
            "requerente": 60,
            "codigo_assunto": 5,
            "descricao_assunto": 50,
            "data_tramitacao": 10,
            "orgao_destino": 8,
            "codigo_despacho": 5,
            "data_inicio": 10,
            "data_fim": 10,
            "orgao_inicio": 8,
            "orgao_fim": 8,
            "dias_parados": 6,
            "matricula_digitador": 8,
            "opcao": 1,
            "descricao_relatorio": 16,
            "filler": 3,
            "orgao_responsavel": 8,
            "informacao_complementar": 256,
        }
    else:
        raise "Pattern not found"

    table_columns = list(columns.keys())
    widths_columns = list(columns.values())

    for file in files:
        if file.stat().st_size != 0:
            dataframe = pd.read_fwf(
                file,
                encoding="cp1251",
                widths=widths_columns,
                header=None,
            )
            dataframe.columns = table_columns

            for col in dataframe.columns:
                dataframe[col] = dataframe[col].astype(str).str.replace(";", "")

            file_original_name = str(file).split("/")[-1]

            data_hora = file_original_name.split("_")[1] + file_original_name.split(
                "_"
            )[2].replace(".TXT", "")
            dataframe.insert(0, "data_arquivo", data_hora)
            dataframe["data_arquivo"] = pd.to_datetime(dataframe["data_arquivo"])

            dataframe, date_partition_columns = parse_date_columns(
                dataframe=dataframe, partition_date_column="data_arquivo"
            )
            log(f"dataframe: {dataframe.head()}")
            to_partitions(
                data=dataframe,
                partition_columns=date_partition_columns,
                savepath=save_path,
                data_type="csv",
            )
            log(f"parsed and saved: {file}")
        else:
            log(f"File has no data : {file}")

    return save_path

Parse and save files from FTP