Module pipelines.rj_cor.comando.eventos.utils
General purpose functions for the comando project
Functions
def build_redis_key(dataset_id: str, table_id: str, name: str, mode: str = 'prod')
-
Expand source code
def build_redis_key(dataset_id: str, table_id: str, name: str, mode: str = "prod"): """ Helper function for building a key where to store the last updated time """ key = dataset_id + "." + table_id + "." + name if mode == "dev": key = f"{mode}.{key}" return key
Helper function for building a key where to store the last updated time
def compare_actual_df_with_redis_df(dfr: pandas.core.frame.DataFrame,
dfr_redis: pandas.core.frame.DataFrame,
columns: list) ‑> pandas.core.frame.DataFrame-
Expand source code
def compare_actual_df_with_redis_df( dfr: pd.DataFrame, dfr_redis: pd.DataFrame, columns: list, ) -> pd.DataFrame: """ Compare df from redis to actual df and return only the rows from actual df that are not already saved on redis. """ for col in columns: if col not in dfr_redis.columns: dfr_redis[col] = None dfr_redis[col] = dfr_redis[col].astype(dfr[col].dtypes) log(f"\nEnded conversion types from dfr to dfr_redis: \n{dfr_redis.dtypes}") dfr_diff = ( pd.merge(dfr, dfr_redis, how="left", on=columns, indicator=True) .query('_merge == "left_only"') .drop("_merge", axis=1) ) log( f"\nDf resulted from the difference between dft_redis and dfr: \n{dfr_diff.head()}" ) updated_dfr_redis = pd.concat([dfr_redis, dfr_diff[columns]]) return dfr_diff, updated_dfr_redis
Compare df from redis to actual df and return only the rows from actual df that are not already saved on redis.
def format_date(first_date, last_date)
-
Expand source code
def format_date(first_date, last_date): """ Format date to "dd/mm/yyyy" and add one day to last date because the API has open interval at the end: [first_date, last_date). """ first_date = pendulum.from_format(first_date, "YYYY-MM-DD").strftime("%d/%m/%Y") last_date = ( pendulum.from_format(last_date, "YYYY-MM-DD").add(days=1).strftime("%d/%m/%Y") ) return first_date, last_date
Format date to "dd/mm/yyyy" and add one day to last date because the API has open interval at the end: [first_date, last_date).
def get_redis_output(redis_key, is_df: bool = False)
-
Expand source code
def get_redis_output(redis_key, is_df: bool = False): """ Get Redis output. Use get to obtain a df from redis or hgetall if is a key value pair. """ redis_client = get_redis_client() # (host="127.0.0.1") if is_df: json_data = redis_client.get(redis_key) log(f"[DEGUB] json_data {json_data}") if json_data: # If data is found, parse the JSON string back to a Python object (dictionary) data_dict = json.loads(json_data) # Convert the dictionary back to a DataFrame return pd.DataFrame(data_dict) return pd.DataFrame() output = redis_client.hgetall(redis_key) if len(output) > 0: output = treat_redis_output(output) return output
Get Redis output. Use get to obtain a df from redis or hgetall if is a key value pair.
def get_token()
-
Expand source code
def get_token(): """Get token to access comando's API""" # Acessar username e password dicionario = get_vault_secret("comando") host = dicionario["data"]["host"] username = dicionario["data"]["username"] password = dicionario["data"]["password"] payload = {"username": username, "password": password} return requests.post(host, json=payload).text
Get token to access comando's API
def get_url(url, parameters: dict = None, token: str = None)
-
Expand source code
def get_url(url, parameters: dict = None, token: str = None): # pylint: disable=W0102 """Make request to comando's API""" if not parameters: parameters = {} if not token: token = get_token() sess = requests.Session() retries = Retry(total=5, backoff_factor=1.5) sess.mount("http://", HTTPAdapter(max_retries=retries)) headers = {"Authorization": token} try: response = sess.get(url, json=parameters, headers=headers) response = response.json() except Exception as exc: log(f"This resulted in the following error: {exc}") response = {"response": None} return response
Make request to comando's API
def treat_wrong_id_pop(dfr)
-
Expand source code
def treat_wrong_id_pop(dfr): """ Create id_pop based on pop_titulo column """ pop = { "Ajuste de banco": 0, "Acidente/enguiço sem vítima": 1, "Acidente com vítima(s)": 2, "Acidente com vítima(s) fatal(is)": 3, "Incêndio em veículo(s)": 4, "Bolsão d'água em via": 5, "Alagamentos e enchentes": 6, "Manifestação em local público": 7, "Incêndio em imóvel": 8, "Sinais de trânsito com mau funcionamento": 9, "Reintegração de posse": 10, "Queda de árvore": 11, "Queda de poste": 12, "Acidente com queda de carga": 13, "Incêndio no entorno de vias públicas": 14, "Incêndio dentro de túneis": 15, "Vazamento de água / esgoto": 16, "Falta de luz / apagão": 17, "Implosão": 18, "Queda de estrutura de alvenaria": 19, "Vazamento de gás": 20, "Evento em local público ou particular": 21, "Atropelamento": 22, "Afundamento de pista / buraco na via": 23, "Abalroamento": 24, "Obra em local público": 25, "Operação policial": 26, "Bloco de rua": 27, "Deslizamento": 28, "Animal em local público": 29, "Acionamento de sirenes": 30, "Alagamento": 31, "Enchente": 32, "Lâmina d'água": 33, "Acidente ambiental": 34, "Bueiro": 35, "Incidente com bueiro": 35, "Resgate ou remoção de animais terrestres e aéreos": 36, "Remoção de animais mortos na areia": 37, "Resgate de animal marinho preso em rede / encalhado": 38, "Incendio em vegetacao": 39, "Queda de árvore sobre fiação": 40, "Residuos na via": 41, "Evento não programado": 99, } dfr["id_pop"] = dfr["pop_titulo"].map(pop) return dfr
Create id_pop based on pop_titulo column