Module pipelines.rj_cor.meteorologia.meteorologia_inmet.tasks

Tasks for meteorologia_inmet

Functions

def download(data_inicio: str, data_fim: str) ‑> pandas.core.frame.DataFrame
Expand source code
@task(
    max_retries=constants.TASK_MAX_RETRIES.value,
    retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value),
)
def download(data_inicio: str, data_fim: str) -> pd.DataFrame:
    """
    Faz o request na data especificada e retorna dados
    """

    # Lista com as estações da cidade do Rio de Janeiro
    estacoes_unicas = [
        "A602",
        "A621",
        "A636",
        "A651",
        "A652",
        "A653",
        "A654",
        "A655",
        "A656",
    ]

    # Faz o request do dia atual e anterior e salva na variável raw
    # Trazer desde o dia anterior evita problemas quando já é outro dia
    # no UTC, visto que ele só traria dados do novo dia e substituiria
    # no arquivo da partição do dia atual no nosso timezone

    dicionario = get_vault_secret("inmet_api")
    token = dicionario["data"]["token"]

    raw = []
    for id_estacao in estacoes_unicas:
        base_url = "https://apitempo.inmet.gov.br/token/estacao"
        url = f"{base_url}/{data_inicio}/{data_fim}/{id_estacao}/{token}"
        res = requests.get(url)
        if res.status_code != 200:
            log(f"Problema no id: {id_estacao}, {res.status_code}, {url}")
            continue
        raw.append(json.loads(res.text))

    # Faz um flat da lista de listas
    flat_list = [item for sublist in raw for item in sublist]
    raw = flat_list.copy()

    # converte para dados
    dados = pd.DataFrame(raw)

    return dados

Faz o request na data especificada e retorna dados

def get_dates(data_inicio: str, data_fim: str) ‑> Tuple[str, str]
Expand source code
@task(nout=3)
def get_dates(data_inicio: str, data_fim: str) -> Tuple[str, str]:
    """
    Task para obter o dia de início e o de fim.
    Se nenhuma data foi passada a data_inicio corresponde a ontem
    e data_fim a hoje e não estamos fazendo backfill.
    Caso contrário, retorna as datas inputadas mos parâmetros do flow.
    """
    # segundo o manual do inmet o dado vem em UTC
    log(f"data de inicio e fim antes do if {data_inicio} {data_fim}")
    if data_inicio == "":
        data_fim = pendulum.now("UTC").format("YYYY-MM-DD")
        data_inicio = pendulum.yesterday("UTC").format("YYYY-MM-DD")
        backfill = 0
    else:
        backfill = 1
    log(f"data de inicio e fim dps do if {data_inicio} {data_fim}")

    return data_inicio, data_fim, backfill

Task para obter o dia de início e o de fim. Se nenhuma data foi passada a data_inicio corresponde a ontem e data_fim a hoje e não estamos fazendo backfill. Caso contrário, retorna as datas inputadas mos parâmetros do flow.

def salvar_dados(dados: pandas.core.frame.DataFrame) ‑> str | pathlib.Path
Expand source code
@task
def salvar_dados(dados: pd.DataFrame) -> Union[str, Path]:
    """
    Salvar dados em csv
    """

    prepath = Path("/tmp/precipitacao_alertario/")
    prepath.mkdir(parents=True, exist_ok=True)

    partition_column = "data"
    dataframe, partitions = parse_date_columns(dados, partition_column)

    # Cria partições a partir da data
    to_partitions(
        data=dataframe,
        partition_columns=partitions,
        savepath=prepath,
        data_type="csv",
    )
    log(f"[DEBUG] Files saved on {prepath}")
    return prepath

Salvar dados em csv

def slice_data(current_time: str) ‑> str
Expand source code
@task()
def slice_data(current_time: str) -> str:
    """
    Retorna a data e hora do timestamp de execução
    """
    if not isinstance(current_time, str):
        current_time = current_time.to_datetime_string()

    data = current_time[:10]
    return data

Retorna a data e hora do timestamp de execução

def tratar_dados(dados: pandas.core.frame.DataFrame, backfill: bool = 0) ‑> pandas.core.frame.DataFrame
Expand source code
@task
def tratar_dados(dados: pd.DataFrame, backfill: bool = 0) -> pd.DataFrame:
    """
    Renomeia colunas e filtra dados com a hora do timestamp de execução
    """

    def converte_timezone(data: str, horario: str) -> str:
        """
        Recebe o formato de data em YYYY-MM-DD e hora em HH:mm:SS no UTC
        e retorna no mesmo formato no horário São Paulo
        """
        datahora = pendulum.from_format(data + " " + horario, "YYYY-MM-DD HH:mm:SS")
        datahora = datahora.in_tz("America/Sao_Paulo")

        data = datahora.format("YYYY-MM-DD")
        horario = datahora.format("HH:mm:SS")
        return data, horario

    drop_cols = [
        "DC_NOME",
        "VL_LATITUDE",
        "VL_LONGITUDE",
        "TEM_SEN",
        "UF",
        "TEN_BAT",
        "TEM_CPU",
    ]
    # Checa se todas estão no df
    drop_cols = [c for c in drop_cols if c in dados.columns]

    # Remove colunas que já temos os dados em outras tabelas
    dados = dados.drop(drop_cols, axis=1)

    # Adequando nome das variáveis
    rename_cols = {
        "DC_NOME": "estacao",
        "UF": "sigla_uf",
        "VL_LATITUDE": "latitude",
        "VL_LONGITUDE": "longitude",
        "CD_ESTACAO": "id_estacao",
        "VEN_DIR": "direcao_vento",
        "DT_MEDICAO": "data",
        "HR_MEDICAO": "horario",
        "VEN_RAJ": "rajada_vento_max",
        "CHUVA": "acumulado_chuva_1_h",
        "PRE_INS": "pressao",
        "PRE_MIN": "pressao_minima",
        "PRE_MAX": "pressao_maxima",
        "UMD_INS": "umidade",
        "UMD_MIN": "umidade_minima",
        "UMD_MAX": "umidade_maxima",
        "VEN_VEL": "velocidade_vento",
        "TEM_INS": "temperatura",
        "TEM_MIN": "temperatura_minima",
        "TEM_MAX": "temperatura_maxima",
        "RAD_GLO": "radiacao_global",
        "PTO_INS": "temperatura_orvalho",
        "PTO_MIN": "temperatura_orvalho_minimo",
        "PTO_MAX": "temperatura_orvalho_maximo",
    }

    dados = dados.rename(columns=rename_cols)

    # Converte coluna de horas de 2300 para 23:00:00
    dados["horario"] = pd.to_datetime(dados.horario, format="%H%M")
    dados["horario"] = dados.horario.apply(lambda x: datetime.strftime(x, "%H:%M:%S"))

    # Converte horário de UTC para America/Sao Paulo
    dados[["data", "horario"]] = dados[["data", "horario"]].apply(
        lambda x: converte_timezone(x.data, x.horario), axis=1, result_type="expand"
    )

    # Ordenamento de variáveis
    chaves_primarias = ["id_estacao", "data", "horario"]
    demais_cols = [c for c in dados.columns if c not in chaves_primarias]

    dados = dados[chaves_primarias + demais_cols]

    # Converte variáveis que deveriam ser float para float
    float_cols = [
        "pressao",
        "pressao_maxima",
        "radiacao_global",
        "temperatura_orvalho",
        "temperatura_minima",
        "umidade_minima",
        "temperatura_orvalho_maximo",
        "direcao_vento",
        "acumulado_chuva_1_h",
        "pressao_minima",
        "umidade_maxima",
        "velocidade_vento",
        "temperatura_orvalho_minimo",
        "temperatura_maxima",
        "rajada_vento_max",
        "temperatura",
        "umidade",
    ]
    dados[float_cols] = dados[float_cols].astype(float)

    dados["horario"] = pd.to_datetime(dados.horario, format="%H:%M:%S").dt.time
    dados["data"] = pd.to_datetime(dados.data, format="%Y-%m-%d")

    # Pegar o dia no nosso timezone como partição
    br_timezone = pendulum.now("America/Sao_Paulo").format("YYYY-MM-DD")

    # Define colunas que serão salvas
    dados = dados[
        [
            "id_estacao",
            "data",
            "horario",
            "pressao",
            "pressao_maxima",
            "radiacao_global",
            "temperatura_orvalho",
            "temperatura_minima",
            "umidade_minima",
            "temperatura_orvalho_maximo",
            "direcao_vento",
            "acumulado_chuva_1_h",
            "pressao_minima",
            "umidade_maxima",
            "velocidade_vento",
            "temperatura_orvalho_minimo",
            "temperatura_maxima",
            "rajada_vento_max",
            "temperatura",
            "umidade",
        ]
    ]

    if not backfill:
        # Seleciona apenas dados daquele dia (devido à UTC)
        dados = dados[dados["data"] == br_timezone]

    # Remove linhas com todos os dados nan
    dados = dados.dropna(subset=float_cols, how="all")

    print(">>>> max hora ", dados[~dados.temperatura.isna()].horario.max())
    return dados

Renomeia colunas e filtra dados com a hora do timestamp de execução