Module pipelines.rj_cor.comando.eventos.tasks

Tasks for comando

Functions

def download_data_atividades(first_date, last_date, wait=None) ‑> pandas.core.frame.DataFrame
Expand source code
@task(
    nout=1,
    max_retries=3,
    retry_delay=timedelta(seconds=60),
)
def download_data_atividades(first_date, last_date, wait=None) -> pd.DataFrame:
    """
    Download data from API
    """

    url_secret = get_vault_secret("comando")["data"]
    url_atividades_evento = url_secret["endpoint_atividades_evento"]

    dfr = pd.read_json(
        f"{url_atividades_evento}/?data_i={first_date}&data_f={last_date}"
    )

    return dfr

Download data from API

def download_data_ocorrencias(first_date, last_date, wait=None) ‑> pandas.core.frame.DataFrame
Expand source code
@task(
    nout=1,
    max_retries=3,
    retry_delay=timedelta(seconds=60),
)
def download_data_ocorrencias(first_date, last_date, wait=None) -> pd.DataFrame:
    """
    Download data from API
    """
    # auth_token = get_token()

    url_secret = get_vault_secret("comando")["data"]
    url_eventos = url_secret["endpoint_eventos"]

    log(f"\n\nDownloading data from {first_date} to {last_date} (not included)")
    dfr = pd.read_json(f"{url_eventos}/?data_i={first_date}&data_f={last_date}")

    return dfr

Download data from API

def get_date_interval(first_date, last_date) ‑> Tuple[dict, str]
Expand source code
@task
def get_date_interval(first_date, last_date) -> Tuple[dict, str]:
    """
    If `first_date` and `last_date` are provided, format it to DD/MM/YYYY. Else,
    get data from last 3 days.
    first_date: str YYYY-MM-DD
    last_date: str YYYY-MM-DD
    """
    if first_date and last_date:
        first_date, last_date = format_date(first_date, last_date)
    else:
        last_date = pendulum.today(tz="America/Sao_Paulo").date()
        first_date = last_date.subtract(days=3)
        first_date, last_date = format_date(
            first_date.strftime("%Y-%m-%d"), last_date.strftime("%Y-%m-%d")
        )
    return first_date, last_date

If first_date and last_date are provided, format it to DD/MM/YYYY. Else, get data from last 3 days. first_date: str YYYY-MM-DD last_date: str YYYY-MM-DD

def get_redis_df(dataset_id: str, table_id: str, name: str, mode: str = 'prod') ‑> pandas.core.frame.DataFrame
Expand source code
@task
def get_redis_df(
    dataset_id: str,
    table_id: str,
    name: str,
    mode: str = "prod",
) -> pd.DataFrame:
    """
    Acess redis to get the last saved df and compare to actual df,
    return only the rows from actual df that are not already saved.
    """
    redis_key = build_redis_key(dataset_id, table_id, name, mode)
    log(f"Acessing redis_key: {redis_key}")

    dfr_redis = get_redis_output(redis_key)
    # dfr_redis = get_redis_output(redis_key, is_df=True)
    log(f"Redis output: {dfr_redis}")

    # if len(dfr_redis) == 0:
    #     dfr_redis = pd.DataFrame()
    #     dict_redis = {k: None for k in columns}
    #     print(f"\nCreating Redis fake values for key: {redis_key}\n")
    #     print(dict_redis)
    #     dfr_redis = pd.DataFrame(
    #         dict_redis, index=[0]
    #     )
    # else:
    #     dfr_redis = pd.DataFrame(
    #         dict_redis.items(), columns=columns
    #         )
    # print(f"\nConverting redis dict to df: \n{dfr_redis.head()}")

    return dfr_redis

Acess redis to get the last saved df and compare to actual df, return only the rows from actual df that are not already saved.

def get_redis_max_date(dataset_id: str, table_id: str, name: str = None, mode: str = 'prod') ‑> str
Expand source code
@task
def get_redis_max_date(
    dataset_id: str,
    table_id: str,
    name: str = None,
    mode: str = "prod",
) -> str:
    """
    Acess redis to get the last saved date and compare to actual df.
    """
    redis_key = build_redis_key(dataset_id, table_id, name, mode)
    log(f"Acessing redis_key: {redis_key}")

    redis_max_date = get_redis_output(redis_key)

    try:
        redis_max_date = redis_max_date["max_date"]
    except KeyError:
        redis_max_date = "1990-01-01"
        log("Creating a fake date because this key doesn't exist.")

    log(f"Redis output: {redis_max_date}")
    return redis_max_date

Acess redis to get the last saved date and compare to actual df.

def not_none(something: Any) ‑> bool
Expand source code
@task
def not_none(something: Any) -> bool:
    """
    Returns True if something is not None.
    """
    return something is not None

Returns True if something is not None.

def save_data(dataframe: pandas.core.frame.DataFrame) ‑> str | pathlib.Path
Expand source code
@task
def save_data(dataframe: pd.DataFrame) -> Union[str, Path]:
    """
    Save data on a csv file to be uploaded to GCP
    """

    prepath = Path("/tmp/data/")
    prepath.mkdir(parents=True, exist_ok=True)

    partition_column = "data_inicio"
    dataframe, partitions = parse_date_columns(dataframe, partition_column)

    to_partitions(
        data=dataframe,
        partition_columns=partitions,
        savepath=prepath,
        data_type="csv",
    )
    log(f"[DEBUG] Files saved on {prepath}")
    return prepath

Save data on a csv file to be uploaded to GCP

def save_no_partition(dataframe: pandas.core.frame.DataFrame, append: bool = False) ‑> str
Expand source code
@task
def save_no_partition(dataframe: pd.DataFrame, append: bool = False) -> str:
    """
    Saves a dataframe to a temporary directory and returns the path to the directory.
    """

    if "sigla" in dataframe.columns:
        dataframe = dataframe.sort_values(["id_pop", "sigla", "acao"])
    else:
        dataframe = dataframe.sort_values("id_pop")

    path_to_directory = "/tmp/" + str(uuid4().hex) + "/"
    os.makedirs(path_to_directory, exist_ok=True)
    if append:
        current_time = pendulum.now("America/Sao_Paulo").strftime("%Y-%m-%d %H:%M:%S")
        dataframe.to_csv(path_to_directory + f"dados_{current_time}.csv", index=False)
    else:
        dataframe.to_csv(path_to_directory + "dados.csv", index=False)
    return path_to_directory

Saves a dataframe to a temporary directory and returns the path to the directory.

def save_redis_max_date(dataset_id: str,
table_id: str,
name: str = None,
mode: str = 'prod',
redis_max_date: str = None,
wait=None)
Expand source code
@task
def save_redis_max_date(  # pylint: disable=too-many-arguments
    dataset_id: str,
    table_id: str,
    name: str = None,
    mode: str = "prod",
    redis_max_date: str = None,
    wait=None,  # pylint: disable=unused-argument
):
    """
    Acess redis to save last date.
    """
    redis_key = build_redis_key(dataset_id, table_id, name, mode)
    log(f"Acessing redis_key: {redis_key}")

    save_str_on_redis(redis_key, "max_date", redis_max_date)

Acess redis to save last date.

def treat_data_atividades(dfr: pandas.core.frame.DataFrame,
dfr_redis: pandas.core.frame.DataFrame,
columns: list) ‑> Tuple[pandas.core.frame.DataFrame, pandas.core.frame.DataFrame]
Expand source code
@task(nout=2)
def treat_data_atividades(
    dfr: pd.DataFrame,
    dfr_redis: pd.DataFrame,
    columns: list,
) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """
    Normalize data to be similiar to old API.
    """

    print("Start treating data")
    dfr.orgao = dfr.orgao.replace(["\r", "\n"], ["", ""], regex=True)

    print(f"Dataframe before comparing with last data saved on redis {dfr.head()}")

    dfr, dfr_redis = compare_actual_df_with_redis_df(
        dfr,
        dfr_redis,
        columns,
    )
    print(f"Dataframe after comparing with last data saved on redis {dfr.head()}")

    # If df is empty stop flow
    if dfr.shape[0] == 0:
        skip_text = "No new data available on API"
        print(skip_text)
        raise ENDRUN(state=Skipped(skip_text))

    mandatory_cols = [
        "id_evento",
        "sigla",
        "orgao",  # esse não tem na tabela antiga
        "data_chegada",
        "data_inicio",
        "data_fim",
        "descricao",
        "status",
    ]

    # Create cols if they don exist on new API
    for col in mandatory_cols:
        if col not in dfr.columns:
            dfr[col] = None

    categorical_cols = [
        "sigla",
        "orgao",
        "descricao",
        "status",
    ]

    print("\n\nDEBUG", dfr[categorical_cols])
    for i in categorical_cols:
        dfr[i] = dfr[i].str.capitalize()
        # dfr[i] = dfr[i].apply(unidecode)

    for col in ["data_inicio", "data_fim", "data_chegada"]:
        dfr[col] = pd.to_datetime(dfr[col], errors="coerce")

    # TODO: Essa conversão é temporária
    for col in ["data_inicio", "data_fim", "data_chegada"]:
        dfr[col] = dfr[col].dt.tz_convert("America/Sao_Paulo")

    for col in ["data_inicio", "data_fim", "data_chegada"]:
        dfr[col] = dfr[col].dt.strftime("%Y-%m-%d %H:%M:%S")

    # Set the order to match the original table
    dfr = dfr[mandatory_cols]

    # Create a column with time of row creation to keep last event on dbt
    dfr["created_at"] = pendulum.now(tz="America/Sao_Paulo").strftime(
        "%Y-%m-%d %H:%M:%S"
    )

    return dfr.drop_duplicates(), dfr_redis

Normalize data to be similiar to old API.

def treat_data_ocorrencias(dfr: pandas.core.frame.DataFrame, redis_max_date: str) ‑> Tuple[pandas.core.frame.DataFrame, pandas.core.frame.DataFrame]
Expand source code
@task(nout=2)
def treat_data_ocorrencias(
    dfr: pd.DataFrame,
    redis_max_date: str,
) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """
    Rename cols and normalize data.
    """

    log("Start treating data")
    dfr = dfr.rename(
        columns={
            "id": "id_evento",
            "pop_id": "id_pop",
            "inicio": "data_inicio",
            "fim": "data_fim",
            "pop": "pop_titulo",
            "titulo": "pop_especificacao",
        }
    )

    log(f"First row: \n{dfr.iloc[0]}")

    dfr["id_evento"] = dfr["id_evento"].astype(float).astype(int).astype(str)

    for col in ["data_inicio", "data_fim"]:
        dfr[col] = pd.to_datetime(dfr[col], errors="coerce")
    max_date = dfr[["data_inicio", "data_fim"]].max().max()
    max_date = max_date.strftime("%Y-%m-%d %H:%M:%S")
    log(f"Last API data was {max_date} and last redis uptade was {redis_max_date}")

    if max_date <= redis_max_date:
        skip_text = "No new data available on API"
        print(skip_text)
        raise ENDRUN(state=Skipped(skip_text))

    # Get new max_date to save on redis
    redis_max_date = max_date

    dfr["tipo"] = dfr["tipo"].replace(
        {
            "Primária": "Primario",
            "Secundária": "Secundario",
        }
    )
    dfr["descricao"] = dfr["descricao"].apply(unidecode)

    mandatory_cols = [
        "id_pop",
        "id_evento",
        "bairro",
        "data_inicio",
        "data_fim",
        "prazo",
        "descricao",
        "gravidade",
        "latitude",
        "longitude",
        "status",
        "tipo",
    ]
    # Create cols if they don exist on new API
    for col in mandatory_cols:
        if col not in dfr.columns:
            dfr[col] = None

    categorical_cols = [
        "bairro",
        "descricao",
        "gravidade",
        "status",
        "tipo",
        "pop_titulo",
    ]
    for i in categorical_cols:
        dfr[i] = dfr[i].str.capitalize()

    # This treatment is temporary. Now the id_pop from API is comming with the same value as id_evento
    dfr = treat_wrong_id_pop(dfr)
    log(f"This id_pop are missing {dfr[dfr.id_pop.isna()]} they were replaced by 99")
    dfr["id_pop"] = dfr["id_pop"].fillna(99)

    # Treat id_pop col
    dfr["id_pop"] = dfr["id_pop"].astype(float).astype(int)

    for col in ["data_inicio", "data_fim"]:
        dfr[col] = dfr[col].dt.strftime("%Y-%m-%d %H:%M:%S")

    # Set the order to match the original table
    dfr = dfr[mandatory_cols]

    # Create a column with time of row creation to keep last event on dbt
    dfr["created_at"] = pendulum.now(tz="America/Sao_Paulo").strftime(
        "%Y-%m-%d %H:%M:%S"
    )

    return dfr.drop_duplicates(), redis_max_date

Rename cols and normalize data.