Module pipelines.rj_smtr.br_rj_riodejaneiro_stu.tasks

Tasks for br_rj_riodejaneiro_stu

Functions

def create_final_stu_dataframe(dfs: list[pandas.core.frame.DataFrame]) ‑> tuple[pandas.core.frame.DataFrame, pandas.core.frame.DataFrame]
Expand source code
@task(checkpoint=False, nout=2)
def create_final_stu_dataframe(
    dfs: list[pd.DataFrame],
) -> tuple[pd.DataFrame, pd.DataFrame]:
    """
    Join all dataframes according to the document type

    Args:
        dfs (list[pd.DataFrame]): The list of dfs from all stu files

    Returns:
        tuple[pd.DataFrame, pd.DataFrame]: Dataframe for regular persons, dataframe for companies
    """
    dfs_pf = []
    dfs_pj = []

    for df in dfs:
        if "CPF" in df.columns:
            dfs_pf.append(df)
        elif "CNPJ" in df.columns:
            dfs_pj.append(df)
        else:
            raise ValueError("Document column not found")

    return pd.concat(dfs_pf), pd.concat(dfs_pj)

Join all dataframes according to the document type

Args

dfs : list[pd.DataFrame]
The list of dfs from all stu files

Returns

tuple[pd.DataFrame, pd.DataFrame]
Dataframe for regular persons, dataframe for companies
def get_stu_raw_blobs(data_versao_stu: str) ‑> list[google.cloud.storage.blob.Blob]
Expand source code
@task(checkpoint=False)
def get_stu_raw_blobs(data_versao_stu: str) -> list[Blob]:
    """
    Get STU extraction files

    Args:
        data_versao_stu (str): The STU version date in the format YYYY-MM-DD

    Returns:
        list[Blob]: The blob list
    """
    bd_storage = bd.Storage(
        dataset_id=constants.STU_GENERAL_CAPTURE_PARAMS.value["dataset_id"],
        table_id="",
        bucket_name=constants.STU_GENERAL_CAPTURE_PARAMS.value["save_bucket_name"],
    )

    blob_list = (
        bd_storage.client["storage_staging"]
        .bucket(bd_storage.bucket_name)
        .list_blobs(prefix=f"upload/{bd_storage.dataset_id}/Tptran_")
    )

    blob_list = [
        b
        for b in blob_list
        if b.name.endswith(f"{data_versao_stu.replace('-', '')}.txt")
    ]

    log(f"Files found: {', '.join([b.name for b in blob_list])}")

    return blob_list

Get STU extraction files

Args

data_versao_stu : str
The STU version date in the format YYYY-MM-DD

Returns

list[Blob]
The blob list
def read_stu_raw_file(blob: google.cloud.storage.blob.Blob) ‑> pandas.core.frame.DataFrame
Expand source code
@task(checkpoint=False)
def read_stu_raw_file(blob: Blob) -> pd.DataFrame:
    """
    Read an extracted file from STU

    Args:
        blob (Blob): The GCS blob

    Returns:
        pd.DataFrame: data
    """

    log(f"Downloading blob: {blob.name}")
    data = blob.download_as_bytes().decode("latin-1")
    name_parts = blob.name.split("/")[-1].split("_")
    mode_id = name_parts[1]
    mode = constants.STU_MODE_MAPPING.value[mode_id]
    perm_type = constants.STU_TYPE_MAPPING.value[int(name_parts[3]) - 1]

    df = pd.read_csv(
        StringIO(data),
        sep=";",
        decimal=",",
        encoding="latin-1",
        dtype="object",
    )

    df["modo"] = mode
    df["id_modo"] = mode_id
    df["tipo_permissao"] = perm_type
    df.columns = [c.replace("/", " ").replace(" ", "_") for c in df.columns]

    return df

Read an extracted file from STU

Args

blob : Blob
The GCS blob

Returns

pd.DataFrame
data
def save_stu_dataframes(df_pf: pandas.core.frame.DataFrame, df_pj: pandas.core.frame.DataFrame)
Expand source code
@task
def save_stu_dataframes(df_pf: pd.DataFrame, df_pj: pd.DataFrame):
    """
    Save STU concatenated dataframes into the upload folder

    Args:
        df_pf (pd.DataFrame): Dataframe for regular persons
        df_pj (pd.DataFrame): Dataframe for companies
    """

    df_mapping = {"operadora_pessoa_fisica": df_pf, "operadora_empresa": df_pj}
    bd_storage = bd.Storage(
        table_id="",
        dataset_id=constants.STU_GENERAL_CAPTURE_PARAMS.value["dataset_id"],
        bucket_name=constants.STU_GENERAL_CAPTURE_PARAMS.value["save_bucket_name"],
    )

    bucket = bd_storage.client["storage_staging"].bucket(bd_storage.bucket_name)

    for table in constants.STU_TABLE_CAPTURE_PARAMS.value:
        table_id = table["table_id"]
        df = df_mapping[table_id]
        bucket.blob(
            f"upload/{bd_storage.dataset_id}/{table_id}.csv"
        ).upload_from_string(df.to_csv(index=False), "text/csv")

Save STU concatenated dataframes into the upload folder

Args

df_pf : pd.DataFrame
Dataframe for regular persons
df_pj : pd.DataFrame
Dataframe for companies