Module pipelines.rj_smtr.controle_financeiro.tasks
tasks for rj-smtr.controle_financeiro
Functions
def cct_arquivo_retorno_save_redis(redis_key: str, raw_filepath: str)
-
Expand source code
@task def cct_arquivo_retorno_save_redis(redis_key: str, raw_filepath: str): """ Set control info on Redis Args: redis_key (str): Key on Redis raw_filepath (str): Filepath to raw data """ df = pd.read_json(raw_filepath) df["dataOrdem"] = pd.to_datetime(df["dataOrdem"]).dt.strftime("%Y-%m-%d") all_returned_dates = df["dataOrdem"].unique().tolist() df = ( df.groupby( # pylint: disable=E1101 [ "idConsorcio", "idOperadora", "dataOrdem", ] )["isPago"] .max() .reset_index() ) pending_dates = df.loc[~df["isPago"]]["dataOrdem"].unique().tolist() log(f"The API returned the following dates: {sorted(all_returned_dates)}") log(f"the following dates are not paid: {sorted(pending_dates)}") redis_client = get_redis_client() redis_return = redis_client.get(redis_key) if redis_return is None: redis_return = {} redis_return["last_date"] = max( [df["dataOrdem"].max(), redis_return.get("last_date", "2024-05-09")] ) redis_return["pending_dates"] = pending_dates + [ d for d in redis_return.get("pending_dates", []) if d not in all_returned_dates ] log( f""" Saving values on redis last_date: {redis_return["last_date"]} pending_dates: {sorted(redis_return["pending_dates"])} """ ) redis_client.set(redis_key, redis_return)
Set control info on Redis
Args
redis_key
:str
- Key on Redis
raw_filepath
:str
- Filepath to raw data
def create_cct_arquivo_retorno_params(redis_key: str, start_date: str | None, end_date: str | None) ‑> tuple[dict, list[dict]]
-
Expand source code
@task(nout=2) def create_cct_arquivo_retorno_params( redis_key: str, start_date: Union[str, None], end_date: Union[str, None] ) -> tuple[dict, list[dict]]: """ Create parameters to get data from cct api's arquivoPublicacao Args: redis_key (str): Redis key to get pending dates start_date (str): Initial data_ordem to filter end_date (str): Final data_ordem to filter Returns: dict: headers list[dict]: parameters """ auth_resp = requests.post( f"{constants.CCT_API_BASE_URL.value}/auth/admin/email/login", data=get_vault_secret(constants.CCT_API_SECRET_PATH.value)["data"], ) auth_resp.raise_for_status() headers = {"Authorization": f"Bearer {auth_resp.json()['token']}"} if start_date is not None and end_date is not None: return headers, [ { "dt_inicio": start_date, "dt_fim": end_date, } ] redis_client = get_redis_client() log(f"Getting pending dates on Redis. key = {redis_key}") redis_return = redis_client.get(redis_key) log(f"Got value from Redis: {redis_return}") if redis_return is None: params = [ { "dt_inicio": "2024-05-09", "dt_fim": date.today().isoformat(), } ] else: pending_dates = redis_return["pending_dates"] params = get_date_ranges( last_date=redis_return["last_date"], pending_dates=pending_dates, ) return headers, params
Create parameters to get data from cct api's arquivoPublicacao
Args
redis_key
:str
- Redis key to get pending dates
start_date
:str
- Initial data_ordem to filter
end_date
:str
- Final data_ordem to filter
Returns
dict
- headers
list[dict]
- parameters
def get_cct_arquivo_retorno_redis_key(mode: str) ‑> str
-
Expand source code
@task def get_cct_arquivo_retorno_redis_key(mode: str) -> str: """ Gets the key to search and store pending dates on Redis Args: mode (str): dev or prod Returns: str: Redis key """ return ( mode + "." + smtr_constants.CONTROLE_FINANCEIRO_DATASET_ID.value + "." + constants.ARQUIVO_RETORNO_TABLE_ID.value )
Gets the key to search and store pending dates on Redis
Args
mode
:str
- dev or prod
Returns
str
- Redis key
def get_raw_cct_arquivo_retorno(headers: dict, params: list[dict], local_filepath: str) ‑> str
-
Expand source code
@task def get_raw_cct_arquivo_retorno( headers: dict, params: list[dict], local_filepath: str ) -> str: """ Get data from cct api arquivoPublicacao Args: headers (dict): Request headers params (list[dict]): List of request query params local_filepath (str): Path to save the data Returns: str: filepath to raw data """ data = [] url = f"{constants.CCT_API_BASE_URL.value}/cnab/arquivoPublicacao" for param in params: log( f"""Getting raw data: url: {url}, params: {param}""" ) resp = requests.get( url, headers=headers, params=param, ) resp.raise_for_status() new_data = resp.json() data += new_data log(f"returned {len(new_data)} rows") return save_raw_local_func(data=data, filepath=local_filepath)
Get data from cct api arquivoPublicacao
Args
headers
:dict
- Request headers
params
:list[dict]
- List of request query params
local_filepath
:str
- Path to save the data
Returns
str
- filepath to raw data