Module pipelines.rj_cor.meteorologia.meteorologia_redemet.tasks

Tasks for meteorologia_redemet

Functions

def check_for_new_stations(dataframe: pandas.core.frame.DataFrame, wait=None)
Expand source code
@task
def check_for_new_stations(
    dataframe: pd.DataFrame,
    wait=None,  # pylint: disable=unused-argument
):
    """
    Check if the updated stations are the same as before.
    If not, consider flow as failed and call attention to
    change treat_data task.
    """

    stations_before = [
        "SBAF",
        "SBGL",
        "SBJR",
        "SBRJ",
        "SBSC",
    ]
    new_stations = [
        i for i in dataframe.id_estacao.unique() if i not in stations_before
    ]
    if len(new_stations) != 0:
        message = f"New station identified. You need to update REDEMET\
              flow and add station(s) {new_stations}"
        log(message)
        raise ENDRUN(state=Failed(message))

Check if the updated stations are the same as before. If not, consider flow as failed and call attention to change treat_data task.

def download_data(first_date: str, last_date: str) ‑> pandas.core.frame.DataFrame
Expand source code
@task(
    max_retries=constants.TASK_MAX_RETRIES.value,
    retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value),
)
def download_data(first_date: str, last_date: str) -> pd.DataFrame:
    """
    Request data from especified date range
    """

    # Stations inside Rio de Janeiro city
    rj_stations = [
        "SBAF",
        "SBGL",
        "SBJR",
        "SBRJ",
        "SBSC",
    ]

    redemet_token = get_vault_secret("redemet-token")
    redemet_token = redemet_token["data"]["token"]

    # Converte datas em int para cálculo de faixas.
    first_date_int = int(first_date.replace("-", ""))
    last_date_int = int(last_date.replace("-", ""))

    raw = []
    for id_estacao in rj_stations:
        base_url = f"https://api-redemet.decea.mil.br/aerodromos/info?api_key={redemet_token}"  # noqa
        for data in range(first_date_int, last_date_int + 1):
            for hora in range(24):
                url = f"{base_url}&localidade={id_estacao}&datahora={data:06}{hora:02}"
                res = requests.get(url)
                if res.status_code != 200:
                    log(f"Problema no id: {id_estacao}, {res.status_code}")
                    continue
                res_data = json.loads(res.text)
                if res_data["status"] is not True:
                    log(f"Problema no id: {id_estacao}, {res_data['message']}")
                    continue
                if "data" not in res_data["data"]:
                    # Sem dataframe para esse horario
                    continue
                raw.append(res_data)

    # Extrai objetos de dataframe
    raw = [res_data["data"] for res_data in raw]

    # converte para dataframe
    dataframe = pd.DataFrame(raw)

    return dataframe

Request data from especified date range

def download_stations_data() ‑> pandas.core.frame.DataFrame
Expand source code
@task
def download_stations_data() -> pd.DataFrame:
    """
    Download station information
    """

    redemet_token = get_vault_secret("redemet-token")
    redemet_token = redemet_token["data"]["token"]
    base_url = (
        f"https://api-redemet.decea.mil.br/aerodromos/?api_key={redemet_token}"  # noqa
    )
    url = f"{base_url}&pais=Brasil"
    res = requests.get(url)
    if res.status_code != 200:
        print(f"Problem on request: {res.status_code}")

    res_data = json.loads(res.text)
    log(f"API Return: {res_data}")

    dataframe = pd.DataFrame(res_data["data"])
    log(f"Stations dataframe: {dataframe.head()}")

    return dataframe

Download station information

def get_dates(first_date: str, last_date: str) ‑> Tuple[str, str]
Expand source code
@task(nout=3)
def get_dates(first_date: str, last_date: str) -> Tuple[str, str]:
    """
    Task to get first and last date.
    If none date is passed on parameters or we are not doing a backfill
    the first_date will be yesterday and last_date will be today.
    Otherwise, this function will return date inputed on flow's parameters.
    """
    # a API sempre retorna o dado em UTC
    if first_date:
        backfill = 1
    else:
        last_date = pendulum.now("UTC").format("YYYY-MM-DD")
        first_date = pendulum.yesterday("UTC").format("YYYY-MM-DD")
        backfill = 0
    log(f"Selected first_date as: {first_date} and last_date as: {last_date}")

    return first_date, last_date, backfill

Task to get first and last date. If none date is passed on parameters or we are not doing a backfill the first_date will be yesterday and last_date will be today. Otherwise, this function will return date inputed on flow's parameters.

def save_data(dataframe: pandas.core.frame.DataFrame,
partition_column: str = 'data_medicao') ‑> str | pathlib.Path
Expand source code
@task
def save_data(
    dataframe: pd.DataFrame, partition_column: str = "data_medicao"
) -> Union[str, Path]:
    """
    Salve dataframe as a csv file
    """

    prepath = Path("/tmp/meteorologia_redemet/")
    prepath.mkdir(parents=True, exist_ok=True)

    dataframe, partitions = parse_date_columns(dataframe, partition_column)

    # Cria partições a partir da data
    to_partitions(
        data=dataframe,
        partition_columns=partitions,
        savepath=prepath,
        data_type="csv",
    )
    log(f"[DEBUG] Files saved on {prepath}")
    return prepath

Salve dataframe as a csv file

def treat_data(dataframe: pandas.core.frame.DataFrame, backfill: bool = 0) ‑> pandas.core.frame.DataFrame
Expand source code
@task
def treat_data(dataframe: pd.DataFrame, backfill: bool = 0) -> pd.DataFrame:
    """
    Rename cols, convert timestamp, filter data for the actual date
    """

    drop_cols = ["nome", "cidade", "lon", "lat", "localizacao", "tempoImagem", "metar"]
    # Check if all columns are on the dataframe
    drop_cols = [c for c in drop_cols if c in dataframe.columns]

    # Remove columns that are already in another table
    dataframe = dataframe.drop(drop_cols, axis=1)

    rename_cols = {
        "localidade": "id_estacao",
        "ur": "umidade",
    }

    dataframe = dataframe.rename(columns=rename_cols)

    # Convert UTC time to America/Sao Paulo
    formato = "DD/MM/YYYY HH:mm(z)"
    dataframe["data"] = dataframe["data"].apply(
        lambda x: pendulum.from_format(x, formato)
        .in_tz("America/Sao_Paulo")
        .format(formato)
    )

    # Order variables
    primary_keys = ["id_estacao", "data"]
    other_cols = [c for c in dataframe.columns if c not in primary_keys]

    dataframe = dataframe[primary_keys + other_cols]

    # Clean data
    dataframe["temperatura"] = dataframe["temperatura"].apply(
        lambda x: None if x[:-2] == "NIL" else int(x[:-2])
    )
    dataframe["umidade"] = dataframe["umidade"].apply(
        lambda x: None if "%" not in x else int(x[:-1])
    )

    dataframe["data"] = pd.to_datetime(dataframe.data, format="%d/%m/%Y %H:%M(%Z)")

    # Define colunas que serão salvas
    dataframe = dataframe[
        [
            "id_estacao",
            "data",
            "temperatura",
            "umidade",
            "condicoes_tempo",
            "ceu",
            "teto",
            "visibilidade",
        ]
    ]

    dataframe = dataframe.drop_duplicates(subset=["id_estacao", "data"])

    log(f"Dados antes do filtro dia:\n{dataframe[['id_estacao', 'data']]}")

    if not backfill:
        # Select our date
        br_timezone = pendulum.now("America/Sao_Paulo").format("YYYY-MM-DD")

        # Select only data from that date
        dataframe = dataframe[dataframe["data"].dt.date.astype(str) == br_timezone]

    log(f">>>> min hora {dataframe[~dataframe.temperatura.isna()].data.min()}")
    log(f">>>> max hora {dataframe[~dataframe.temperatura.isna()].data.max()}")

    dataframe["data"] = dataframe["data"].dt.strftime("%Y-%m-%d %H:%M:%S")
    dataframe.rename(columns={"data": "data_medicao"}, inplace=True)

    dataframe["ceu"] = dataframe["ceu"].str.capitalize()

    return dataframe

Rename cols, convert timestamp, filter data for the actual date

def treat_stations_data(dataframe: pandas.core.frame.DataFrame) ‑> pandas.core.frame.DataFrame
Expand source code
@task
def treat_stations_data(dataframe: pd.DataFrame) -> pd.DataFrame:
    """
    Treat station data
    """
    rename_cols = {
        "lat_dec": "latitude",
        "lon_dec": "longitude",
        "nome": "estacao",
        "altitude_metros": "altitude",
        "cod": "id_estacao",
    }
    dataframe = dataframe.rename(rename_cols, axis=1)

    dataframe = dataframe[dataframe.cidade.str.contains("Rio de Janeiro")]

    dataframe["estacao"] = dataframe["estacao"].apply(unidecode)
    dataframe["data_atualizacao"] = pendulum.now(tz="America/Sao_Paulo").format(
        "YYYY-MM-DD"
    )

    keep_cols = [
        "id_estacao",
        "estacao",
        "latitude",
        "longitude",
        "altitude",
        "data_atualizacao",
    ]
    return dataframe[keep_cols]

Treat station data