Module pipelines.rj_smtr.br_rj_riodejaneiro_stpl_gps.tasks
Tasks for br_rj_riodejaneiro_stpl_gps
Functions
def pre_treatment_br_rj_riodejaneiro_stpl_gps(status_dict, timestamp)
-
Expand source code
@task def pre_treatment_br_rj_riodejaneiro_stpl_gps(status_dict, timestamp): """Parse data from status_dict['data'] to DataFrame as partially nested table. Args: status_dict (dict): Contains data, run time timestamp and any previous error key_column (str): Unique key field from each dict within data Returns: dict: "df" contains the transformed DataFrame from data, "error" contains any caught error during execution. """ key_column = "codigo" columns = [key_column, "dataHora", "timestamp_captura", "content"] data = status_dict["data"]["veiculos"] if status_dict["error"] is not None: return {"data": pd.DataFrame(), "error": status_dict["error"]} error = None # get tz info from constants timezone = constants.TIMEZONE.value # initialize df for nested columns df = pd.DataFrame(columns=columns) timestamp_captura = pd.to_datetime(timestamp) # separate each nested piece in data into a row df["content"] = list(data) # retrive key field from each nested piece in data df[key_column] = [piece[key_column] for piece in data] df["dataHora"] = [piece["dataHora"] for piece in data] df["timestamp_captura"] = timestamp_captura df["dataHora"] = df["dataHora"].apply( lambda ms: pd.to_datetime( pendulum.from_timestamp(ms / 1000.0, timezone).isoformat() ) ) # Filter data for 0 <= time diff <= 1min try: datahora_col = "dataHora" df_treated = df try: df_treated[datahora_col] = df_treated[datahora_col].apply( lambda x: x.tz_convert(timezone) ) except TypeError: df_treated[datahora_col] = df_treated[datahora_col].apply( lambda x: x.tz_localize(timezone) ) try: df_treated["timestamp_captura"] = df_treated["timestamp_captura"].apply( lambda x: x.tz_convert(timezone) ) except TypeError: df_treated["timestamp_captura"] = df_treated["timestamp_captura"].apply( lambda x: x.tz_localize(timezone) ) mask = (df_treated["timestamp_captura"] - df_treated[datahora_col]).apply( lambda x: timedelta(seconds=0) <= x <= timedelta(minutes=1) ) df_treated = df_treated[mask] log(f"Shape antes da filtragem: {df.shape}") log(f"Shape após a filtragem: {df_treated.shape}") if df_treated.shape[0] == 0: error = ValueError("After filtering, the dataframe is empty!") df = df_treated except Exception: error = traceback.format_exc() log_critical(f"Failed to filter STPL data: \n{error}") return {"data": df, "error": error}
Parse data from status_dict['data'] to DataFrame as partially nested table.
Args
status_dict
:dict
- Contains data, run time timestamp and any previous error
key_column
:str
- Unique key field from each dict within data
Returns
dict
- "df" contains the transformed DataFrame from data, "error" contains any caught error
during execution.