Module pipelines.utils.utils
General utilities for all pipelines.
Functions
def batch_to_dataframe(batch: Tuple[Tuple], columns: List[str]) ‑> pandas.core.frame.DataFrame
-
Expand source code
def batch_to_dataframe(batch: Tuple[Tuple], columns: List[str]) -> pd.DataFrame: """ Converts a batch of rows to a dataframe. """ return pd.DataFrame(batch, columns=columns)
Converts a batch of rows to a dataframe.
def build_redis_key(dataset_id: str, table_id: str, name: str = None, mode: str = 'prod')
-
Expand source code
def build_redis_key( dataset_id: str, table_id: str, name: str = None, mode: str = "prod" ): """ Helper function for building a key to redis """ key = dataset_id + "." + table_id if name: key = key + "." + name if mode == "dev": key = f"{mode}.{key}" return key
Helper function for building a key to redis
def clean_dataframe(dataframe: pandas.core.frame.DataFrame) ‑> pandas.core.frame.DataFrame
-
Expand source code
def clean_dataframe(dataframe: pd.DataFrame) -> pd.DataFrame: """ Cleans a dataframe. """ for col in dataframe.columns.tolist(): if dataframe[col].dtype == object: try: dataframe[col] = ( dataframe[col] .astype(str) .str.replace("\x00", "") .replace("None", np.nan) ) except Exception as exc: print( "Column: ", col, "\nData: ", dataframe[col].tolist(), "\n", exc, ) raise return dataframe
Cleans a dataframe.
def compare_dates_between_tables_redis(key_table_1: str,
format_date_table_1: str,
key_table_2: str,
format_date_table_2: str)-
Expand source code
def compare_dates_between_tables_redis( key_table_1: str, format_date_table_1: str, key_table_2: str, format_date_table_2: str, ): """ Function that checks if the date saved on the second table is bigger then the first one """ # get saved date on redis date_1 = get_redis_output(key_table_1) date_2 = get_redis_output(key_table_2) # Return true if there is no date_1 or date_2 saved on redis if (len(date_1) == 0) | (len(date_2) == 0): return True # Convert date to pendulum date_1 = pendulum.from_format(date_1["date"], format_date_table_1) date_2 = pendulum.from_format(date_2["date"], format_date_table_2) comparison = date_1 < date_2 log(f"Is {date_2} bigger than {date_1}? {comparison}") return comparison
Function that checks if the date saved on the second table is bigger then the first one
def dataframe_to_csv(dataframe: pandas.core.frame.DataFrame,
path: str | pathlib.Path,
build_json_dataframe: bool = False,
dataframe_key_column: str = None) ‑> None-
Expand source code
def dataframe_to_csv( dataframe: pd.DataFrame, path: Union[str, Path], build_json_dataframe: bool = False, dataframe_key_column: str = None, ) -> None: """ Writes a dataframe to CSV file. """ if build_json_dataframe: dataframe = to_json_dataframe(dataframe, key_column=dataframe_key_column) # Remove filename from path path = Path(path) # Create directory if it doesn't exist path.parent.mkdir(parents=True, exist_ok=True) # Write dataframe to CSV dataframe.to_csv(path, index=False, encoding="utf-8")
Writes a dataframe to CSV file.
def dataframe_to_parquet(dataframe: pandas.core.frame.DataFrame,
path: str | pathlib.Path,
build_json_dataframe: bool = False,
dataframe_key_column: str = None)-
Expand source code
def dataframe_to_parquet( dataframe: pd.DataFrame, path: Union[str, Path], build_json_dataframe: bool = False, dataframe_key_column: str = None, ): """ Writes a dataframe to Parquet file with Schema as STRING. """ # Code adapted from # https://stackoverflow.com/a/70817689/9944075 if build_json_dataframe: dataframe = to_json_dataframe(dataframe, key_column=dataframe_key_column) # If the file already exists, we: # - Load it # - Merge the new dataframe with the existing one if Path(path).exists(): # Load it original_df = pd.read_parquet(path) # Merge the new dataframe with the existing one dataframe = pd.concat([original_df, dataframe], sort=False) # Write dataframe to Parquet dataframe.to_parquet(path, engine="pyarrow")
Writes a dataframe to Parquet file with Schema as STRING.
def delete_blobs_list(bucket_name: str,
blobs: List[google.cloud.storage.blob.Blob],
mode: str = 'prod') ‑> None-
Expand source code
def delete_blobs_list(bucket_name: str, blobs: List[Blob], mode: str = "prod") -> None: """ Deletes all blobs in the bucket that are in the blobs list. Mode needs to be "prod" or "staging" """ credentials = get_credentials_from_env(mode=mode) storage_client = storage.Client(credentials=credentials) bucket = storage_client.bucket(bucket_name) bucket.delete_blobs(blobs)
Deletes all blobs in the bucket that are in the blobs list. Mode needs to be "prod" or "staging"
def determine_whether_to_execute_or_not(cron_expression: str,
datetime_now: datetime.datetime,
datetime_last_execution: datetime.datetime) ‑> bool-
Expand source code
def determine_whether_to_execute_or_not( cron_expression: str, datetime_now: datetime, datetime_last_execution: datetime ) -> bool: """ Determines whether the cron expression is currently valid. Args: cron_expression: The cron expression to check. datetime_now: The current datetime. datetime_last_execution: The last datetime the cron expression was executed. Returns: True if the cron expression should trigger, False otherwise. """ cron_expression_iterator = croniter.croniter( cron_expression, datetime_last_execution ) next_cron_expression_time = cron_expression_iterator.get_next(datetime) return next_cron_expression_time <= datetime_now
Determines whether the cron expression is currently valid.
Args
cron_expression
- The cron expression to check.
datetime_now
- The current datetime.
datetime_last_execution
- The last datetime the cron expression was executed.
Returns
True if the cron expression should trigger, False otherwise.
def dump_header_to_file(data_path: str | pathlib.Path, data_type: str = 'csv')
-
Expand source code
def dump_header_to_file(data_path: Union[str, Path], data_type: str = "csv"): """ Writes a header to a CSV file. """ try: assert data_type in ["csv", "parquet"] except AssertionError as exc: raise ValueError(f"Invalid data type: {data_type}") from exc # Remove filename from path path = Path(data_path) if not path.is_dir(): path = path.parent # Grab first `data_type` file found found: bool = False file: str = None for subdir, _, filenames in walk(str(path)): for fname in filenames: if fname.endswith(f".{data_type}"): file = join(subdir, fname) log(f"Found {data_type.upper()} file: {file}") found = True break if found: break save_header_path = f"data/{uuid4()}" # discover if it's a partitioned table if partition_folders := [folder for folder in file.split("/") if "=" in folder]: partition_path = "/".join(partition_folders) save_header_file_path = Path( f"{save_header_path}/{partition_path}/header.{data_type}" ) log(f"Found partition path: {save_header_file_path}") else: save_header_file_path = Path(f"{save_header_path}/header.{data_type}") log(f"Do not found partition path: {save_header_file_path}") # Create directory if it doesn't exist save_header_file_path.parent.mkdir(parents=True, exist_ok=True) # Read just first row and write dataframe to file if data_type == "csv": dataframe = pd.read_csv(file, nrows=1) dataframe.to_csv(save_header_file_path, index=False, encoding="utf-8") elif data_type == "parquet": dataframe = pd.read_parquet(file)[:1] dataframe_to_parquet(dataframe=dataframe, path=save_header_file_path) log(f"Wrote {data_type.upper()} header at {save_header_file_path}") return save_header_path
Writes a header to a CSV file.
def final_column_treatment(column: str) ‑> str
-
Expand source code
def final_column_treatment(column: str) -> str: """ Adds an underline before column name if it only has numbers or remove all non alpha numeric characters besides underlines ("_"). """ try: int(column) return f"_{column}" except ValueError: # pylint: disable=bare-except non_alpha_removed = re.sub(r"[\W]+", "", column) return non_alpha_removed
Adds an underline before column name if it only has numbers or remove all non alpha numeric characters besides underlines ("_").
def get_credentials_from_env(mode: str = 'prod', scopes: List[str] = None) ‑> google.oauth2.service_account.Credentials
-
Expand source code
def get_credentials_from_env( mode: str = "prod", scopes: List[str] = None ) -> service_account.Credentials: """ Gets credentials from env vars """ if mode not in ["prod", "staging"]: raise ValueError("Mode must be 'prod' or 'staging'") env: str = getenv(f"BASEDOSDADOS_CREDENTIALS_{mode.upper()}", "") if env == "": raise ValueError(f"BASEDOSDADOS_CREDENTIALS_{mode.upper()} env var not set!") info: dict = json.loads(base64.b64decode(env)) cred: service_account.Credentials = ( service_account.Credentials.from_service_account_info(info) ) if scopes: cred = cred.with_scopes(scopes) return cred
Gets credentials from env vars
def get_redis_client(host: str = 'redis.redis.svc.cluster.local',
port: int = 6379,
db: int = 0,
password: str = None) ‑> redis_pal.RedisPal.RedisPal-
Expand source code
def get_redis_client( host: str = "redis.redis.svc.cluster.local", port: int = 6379, db: int = 0, # pylint: disable=C0103 password: str = None, ) -> RedisPal: """ Returns a Redis client. """ return RedisPal( host=host, port=port, db=db, password=password, )
Returns a Redis client.
def get_redis_output(redis_key)
-
Expand source code
def get_redis_output(redis_key): """ Get Redis output Example: {b'date': b'2023-02-27 07:29:04'} """ redis_client = get_redis_client() output = redis_client.hgetall(redis_key) if len(output) > 0: output = treat_redis_output(output) return output
Get Redis output Example: {b'date': b'2023-02-27 07:29:04'}
def get_storage_blobs(dataset_id: str, table_id: str, mode: str = 'staging') ‑> list
-
Expand source code
def get_storage_blobs(dataset_id: str, table_id: str, mode: str = "staging") -> list: """ Get all blobs from a table in a dataset. Args: dataset_id (str): dataset id table_id (str): table id mode (str, optional): mode to use. Defaults to "staging". Returns: list: list of blobs """ bd_storage = bd.Storage(dataset_id=dataset_id, table_id=table_id) return list( bd_storage.client["storage_staging"] .bucket(bd_storage.bucket_name) .list_blobs(prefix=f"{mode}/{bd_storage.dataset_id}/{bd_storage.table_id}/") )
Get all blobs from a table in a dataset.
Args
dataset_id
:str
- dataset id
table_id
:str
- table id
mode
:str
, optional- mode to use. Defaults to "staging".
Returns
list
- list of blobs
def get_username_and_password_from_secret(secret_path: str, client: hvac.v1.Client = None) ‑> Tuple[str, str]
-
Expand source code
def get_username_and_password_from_secret( secret_path: str, client: hvac.Client = None, ) -> Tuple[str, str]: """ Returns a username and password from a secret in Vault. """ secret = get_vault_secret(secret_path, client) return ( secret["data"]["username"], secret["data"]["password"], )
Returns a username and password from a secret in Vault.
def get_vault_client() ‑> hvac.v1.Client
-
Expand source code
def get_vault_client() -> hvac.Client: """ Returns a Vault client. """ return hvac.Client( url=getenv("VAULT_ADDRESS").strip(), token=getenv("VAULT_TOKEN").strip(), )
Returns a Vault client.
def get_vault_secret(secret_path: str, client: hvac.v1.Client = None) ‑> dict
-
Expand source code
def get_vault_secret(secret_path: str, client: hvac.Client = None) -> dict: """ Returns a secret from Vault. """ vault_client = client or get_vault_client() return vault_client.secrets.kv.read_secret_version(secret_path)["data"]
Returns a secret from Vault.
def human_readable(value: int | float,
unit: str = '',
unit_prefixes: List[str] = None,
unit_divider: int = 1000,
decimal_places: int = 2)-
Expand source code
def human_readable( value: Union[int, float], unit: str = "", unit_prefixes: List[str] = None, unit_divider: int = 1000, decimal_places: int = 2, ): """ Formats a value in a human readable way. """ if unit_prefixes is None: unit_prefixes = ["", "k", "M", "G", "T", "P", "E", "Z", "Y"] if value == 0: return f"{value}{unit}" unit_prefix = unit_prefixes[0] for prefix in unit_prefixes[1:]: if value < unit_divider: break unit_prefix = prefix value /= unit_divider return f"{value:.{decimal_places}f}{unit_prefix}{unit}"
Formats a value in a human readable way.
def is_date(date_string: str, date_format: str = '%Y-%m-%d') ‑> datetime.datetime | bool
-
Expand source code
def is_date(date_string: str, date_format: str = "%Y-%m-%d") -> Union[datetime, bool]: """ Checks whether a string is a valid date. """ try: return datetime.strptime(date_string, date_format).strftime(date_format) except ValueError: return False
Checks whether a string is a valid date.
def list_blobs_with_prefix(bucket_name: str, prefix: str, mode: str = 'prod') ‑> List[google.cloud.storage.blob.Blob]
-
Expand source code
def list_blobs_with_prefix( bucket_name: str, prefix: str, mode: str = "prod" ) -> List[Blob]: """ Lists all the blobs in the bucket that begin with the prefix. This can be used to list all blobs in a "folder", e.g. "public/". Mode needs to be "prod" or "staging" """ credentials = get_credentials_from_env(mode=mode) storage_client = storage.Client(credentials=credentials) # Note: Client.list_blobs requires at least package version 1.17.0. blobs = storage_client.list_blobs(bucket_name, prefix=prefix) return list(blobs)
Lists all the blobs in the bucket that begin with the prefix. This can be used to list all blobs in a "folder", e.g. "public/". Mode needs to be "prod" or "staging"
def log(msg: Any, level: str = 'info') ‑> None
-
Expand source code
def log(msg: Any, level: str = "info") -> None: """ Logs a message to prefect's logger. """ levels = { "debug": logging.DEBUG, "info": logging.INFO, "warning": logging.WARNING, "error": logging.ERROR, "critical": logging.CRITICAL, } blank_spaces = 8 * " " msg = blank_spaces + "----\n" + str(msg) msg = "\n".join([blank_spaces + line for line in msg.split("\n")]) + "\n\n" if level not in levels: raise ValueError(f"Invalid log level: {level}") prefect.context.logger.log(levels[level], msg) # pylint: disable=E1101
Logs a message to prefect's logger.
def log_mod(msg: Any, level: str = 'info', index: int = 0, mod: int = 1)
-
Expand source code
def log_mod(msg: Any, level: str = "info", index: int = 0, mod: int = 1): """ Only logs a message if the index is a multiple of mod. """ if index % mod == 0 or index == 0: log(msg=f"iteration {index}:\n {msg}", level=level)
Only logs a message if the index is a multiple of mod.
def notify_discord_on_failure(flow: prefect.core.flow.Flow,
state: prefect.engine.state.State,
secret_path: str,
code_owners: List[str] | None = None)-
Expand source code
def notify_discord_on_failure( flow: prefect.Flow, state: State, secret_path: str, code_owners: Optional[List[str]] = None, ): """ Notifies a Discord channel when a flow fails. """ url = get_vault_secret(secret_path)["data"]["url"] flow_run_id = prefect.context.get("flow_run_id") code_owners = code_owners or constants.DEFAULT_CODE_OWNERS.value code_owner_dict = constants.OWNERS_DISCORD_MENTIONS.value at_code_owners = [] for code_owner in code_owners: code_owner_id = code_owner_dict[code_owner]["user_id"] code_owner_type = code_owner_dict[code_owner]["type"] if code_owner_type == "user": at_code_owners.append(f" - <@{code_owner_id}>\n") elif code_owner_type == "user_nickname": at_code_owners.append(f" - <@!{code_owner_id}>\n") elif code_owner_type == "channel": at_code_owners.append(f" - <#{code_owner_id}>\n") elif code_owner_type == "role": at_code_owners.append(f" - <@&{code_owner_id}>\n") message = ( f":man_facepalming: Flow **{flow.name}** has failed." + f'\n - State message: *"{state.message}"*' + "\n - Link to the failed flow: " + f"https://prefect.dados.rio/flow-run/{flow_run_id}" + "\n - Extra attention:\n" + "".join(at_code_owners) ) send_discord_message( message=message, webhook_url=url, )
Notifies a Discord channel when a flow fails.
def parse_date_columns(dataframe: pandas.core.frame.DataFrame, partition_date_column: str) ‑> Tuple[pandas.core.frame.DataFrame, List[str]]
-
Expand source code
def parse_date_columns( dataframe: pd.DataFrame, partition_date_column: str ) -> Tuple[pd.DataFrame, List[str]]: """ Parses the date columns to the partition format. """ ano_col = "ano_particao" mes_col = "mes_particao" data_col = "data_particao" cols = [ano_col, mes_col, data_col] for col in cols: if col in dataframe.columns: raise ValueError(f"Column {col} already exists, please review your model.") dataframe[partition_date_column] = dataframe[partition_date_column].astype(str) dataframe[data_col] = pd.to_datetime( dataframe[partition_date_column], errors="coerce" ) dataframe[ano_col] = ( dataframe[data_col] .dt.year.fillna(-1) .astype(int) .astype(str) .replace("-1", np.nan) ) dataframe[mes_col] = ( dataframe[data_col] .dt.month.fillna(-1) .astype(int) .astype(str) .replace("-1", np.nan) ) dataframe[data_col] = dataframe[data_col].dt.date return dataframe, [ano_col, mes_col, data_col]
Parses the date columns to the partition format.
def parser_blobs_to_partition_dict(blobs: list) ‑> dict
-
Expand source code
def parser_blobs_to_partition_dict(blobs: list) -> dict: """ Extracts the partition information from the blobs. """ partitions_dict = {} for blob in blobs: for folder in blob.name.split("/"): if "=" in folder: key = folder.split("=")[0] value = folder.split("=")[1] try: partitions_dict[key].append(value) except KeyError: partitions_dict[key] = [value] return partitions_dict
Extracts the partition information from the blobs.
def query_to_line(query: str) ‑> str
-
Expand source code
def query_to_line(query: str) -> str: """ Converts a query to a line. """ query = textwrap.dedent(query) return " ".join([line.strip() for line in query.split("\n")])
Converts a query to a line.
def remove_columns_accents(dataframe: pandas.core.frame.DataFrame) ‑> list
-
Expand source code
def remove_columns_accents(dataframe: pd.DataFrame) -> list: """ Remove accents from dataframe columns. """ columns = [str(column) for column in dataframe.columns] dataframe.columns = columns return list( dataframe.columns.str.normalize("NFKD") .str.encode("ascii", errors="ignore") .str.decode("utf-8") .map(lambda x: x.strip()) .str.replace(" ", "_") .str.replace("/", "_") .str.replace("-", "_") .str.replace("\a", "_") .str.replace("\b", "_") .str.replace("\n", "_") .str.replace("\t", "_") .str.replace("\v", "_") .str.replace("\f", "_") .str.replace("\r", "_") .str.lower() .map(final_column_treatment) )
Remove accents from dataframe columns.
def remove_tabs_from_query(query: str) ‑> str
-
Expand source code
def remove_tabs_from_query(query: str) -> str: """ Removes tabs from a query. """ query = query_to_line(query) return re.sub(r"\s+", " ", query).strip()
Removes tabs from a query.
def run_cloud(flow: prefect.core.flow.Flow,
labels: List[str],
parameters: Dict[str, Any] = None,
run_description: str = '')-
Expand source code
def run_cloud( flow: prefect.Flow, labels: List[str], parameters: Dict[str, Any] = None, run_description: str = "", ): """ Runs a flow on Prefect Server (must have VPN configured). """ # Setup no schedule flow.schedule = None # Change flow name for development and register flow.name = f"{flow.name} (development)" flow.run_config = KubernetesRun(image="ghcr.io/prefeitura-rio/prefect-flows:latest") flow_id = flow.register(project_name="main", labels=[]) # Get Prefect Client and submit flow run client = Client() flow_run_id = client.create_flow_run( flow_id=flow_id, run_name=f"TEST RUN - {run_description} - {flow.name}", labels=labels, parameters=parameters, ) # Print flow run link so user can check it print(f"Run submitted: TEST RUN - {run_description} - {flow.name}") print(f"Please check at: https://prefect.dados.rio/flow-run/{flow_run_id}")
Runs a flow on Prefect Server (must have VPN configured).
def run_local(flow: prefect.core.flow.Flow, parameters: Dict[str, Any] = None)
-
Expand source code
def run_local(flow: prefect.Flow, parameters: Dict[str, Any] = None): """ Runs a flow locally. """ # Setup for local run flow.storage = None flow.run_config = None flow.schedule = None # Run flow return flow.run(parameters=parameters) if parameters else flow.run()
Runs a flow locally.
def run_registered(flow_name: str,
flow_project: str,
labels: List[str],
parameters: Dict[str, Any] = None,
run_description: str = '') ‑> str-
Expand source code
def run_registered( flow_name: str, flow_project: str, labels: List[str], parameters: Dict[str, Any] = None, run_description: str = "", ) -> str: """ Runs an already registered flow on Prefect Server (must have credentials configured). """ # Get Prefect Client and submit flow run client = Client() flow_result = client.graphql( { "query": { with_args( "flow", { "where": { "_and": [ {"name": {"_eq": flow_name}}, {"archived": {"_eq": False}}, {"project": {"name": {"_eq": flow_project}}}, ] } }, ): { "id", } } } ) flows_found = flow_result.data.flow if len(flows_found) == 0: raise ValueError(f"Flow {flow_name} not found.") if len(flows_found) > 1: raise ValueError(f"More than one flow found for {flow_name}.") flow_id = flow_result["data"]["flow"][0]["id"] flow_run_id = client.create_flow_run( flow_id=flow_id, run_name=f"SUBMITTED REMOTELY - {run_description}", labels=labels, parameters=parameters, ) # Print flow run link so user can check it print(f"Run submitted: SUBMITTED REMOTELY - {run_description}") print(f"Please check at: https://prefect.dados.rio/flow-run/{flow_run_id}") return flow_run_id
Runs an already registered flow on Prefect Server (must have credentials configured).
def save_str_on_redis(redis_key: str, key: str, value: str)
-
Expand source code
def save_str_on_redis( redis_key: str, key: str, value: str, ): """ Function to save a string on redis """ redis_client = get_redis_client() redis_client.hset(redis_key, key, value)
Function to save a string on redis
def save_updated_rows_on_redis(dataframe: pandas.core.frame.DataFrame,
dataset_id: str,
table_id: str,
unique_id: str = 'id_estacao',
date_column: str = 'data_medicao',
date_format: str = '%Y-%m-%d %H:%M:%S',
mode: str = 'prod') ‑> pandas.core.frame.DataFrame-
Expand source code
def save_updated_rows_on_redis( # pylint: disable=R0914 dataframe: pd.DataFrame, dataset_id: str, table_id: str, unique_id: str = "id_estacao", date_column: str = "data_medicao", date_format: str = "%Y-%m-%d %H:%M:%S", mode: str = "prod", ) -> pd.DataFrame: """ Acess redis to get the last time each unique_id was updated, return updated unique_id as a DataFrame and save new dates on redis """ redis_client = get_redis_client() key = dataset_id + "." + table_id if mode == "dev": key = f"{mode}.{key}" # Access all data saved on redis with this key last_updates = redis_client.hgetall(key) if len(last_updates) == 0: last_updates = pd.DataFrame(dataframe[unique_id].unique(), columns=[unique_id]) last_updates["last_update"] = "1900-01-01 00:00:00" log(f"Redis key: {key}\nCreating Redis fake values:\n {last_updates}") else: # Convert data in dictionary in format with unique_id in key and last updated time as value # Example > {"12": "2022-06-06 14:45:00"} last_updates = { k.decode("utf-8"): v.decode("utf-8") for k, v in last_updates.items() } # Convert dictionary to dataframe last_updates = pd.DataFrame( last_updates.items(), columns=[unique_id, "last_update"] ) log(f"Redis key: {key}\nRedis actual values:\n {last_updates}") # Garante that both are string dataframe[unique_id] = dataframe[unique_id].astype(str) last_updates[unique_id] = last_updates[unique_id].astype(str) # dataframe and last_updates need to have the same index, in our case unique_id missing_in_dfr = [ i for i in last_updates[unique_id].unique() if i not in dataframe[unique_id].unique() ] missing_in_updates = [ i for i in dataframe[unique_id].unique() if i not in last_updates[unique_id].unique() ] # If unique_id doesn't exists on updates we create a fake date for this station on updates if len(missing_in_updates) > 0: for i, _id in enumerate(missing_in_updates): last_updates.loc[-i] = [_id, "1900-01-01 00:00:00"] # If unique_id doesn't exists on dataframe we remove this stations from last_updates if len(missing_in_dfr) > 0: last_updates = last_updates[~last_updates[unique_id].isin(missing_in_dfr)] # Merge dfs using unique_id dataframe = dataframe.merge(last_updates, how="left", on=unique_id) log(f"Comparing times: {dataframe.sort_values(unique_id)}") # Keep on dataframe only the stations that has a time after the one that is saved on redis dataframe[date_column] = dataframe[date_column].apply( pd.to_datetime, format=date_format ) + pd.DateOffset(hours=0) dataframe["last_update"] = dataframe["last_update"].apply( pd.to_datetime, format="%Y-%m-%d %H:%M:%S" ) + pd.DateOffset(hours=0) dataframe = dataframe[dataframe[date_column] > dataframe["last_update"]].dropna( subset=[unique_id] ) log(f"Dataframe after comparison: {dataframe.sort_values(unique_id)}") # Keep only the last date for each unique_id keep_cols = [unique_id, date_column] new_updates = dataframe[keep_cols].sort_values(keep_cols) new_updates = new_updates.groupby(unique_id, as_index=False).tail(1) new_updates[date_column] = new_updates[date_column].dt.strftime("%Y-%m-%d %H:%M:%S") log(f">>> Updated df: {new_updates.head(10)}") # Convert stations with the new updates dates in a dictionary new_updates = dict(zip(new_updates[unique_id], new_updates[date_column])) log(f">>> data to save in redis as a dict: {new_updates}") # Save this new information on redis [redis_client.hset(key, k, v) for k, v in new_updates.items()] return dataframe.reset_index()
Acess redis to get the last time each unique_id was updated, return updated unique_id as a DataFrame and save new dates on redis
def send_discord_message(message: str, webhook_url: str) ‑> None
-
Expand source code
def send_discord_message( message: str, webhook_url: str, ) -> None: """ Sends a message to a Discord channel. """ requests.post( webhook_url, data={"content": message}, )
Sends a message to a Discord channel.
def send_telegram_message(message: str, token: str, chat_id: int, parse_mode: str = 'HTML')
-
Expand source code
def send_telegram_message( message: str, token: str, chat_id: int, parse_mode: str = telegram.ParseMode.HTML, ): """ Sends a message to a Telegram chat. """ bot = telegram.Bot(token=token) bot.send_message( chat_id=chat_id, text=message, parse_mode=parse_mode, )
Sends a message to a Telegram chat.
def set_default_parameters(flow: prefect.core.flow.Flow, default_parameters: dict) ‑> prefect.core.flow.Flow
-
Expand source code
def set_default_parameters( flow: prefect.Flow, default_parameters: dict ) -> prefect.Flow: """ Sets default parameters for a flow. """ for parameter in flow.parameters(): if parameter.name in default_parameters: parameter.default = default_parameters[parameter.name] return flow
Sets default parameters for a flow.
def skip_if_running_handler(obj,
old_state: prefect.engine.state.State,
new_state: prefect.engine.state.State) ‑> prefect.engine.state.State-
Expand source code
def skip_if_running_handler(obj, old_state: State, new_state: State) -> State: """ State handler that will skip a flow run if another instance of the flow is already running. Adapted from Prefect Discourse: https://tinyurl.com/4hn5uz2w """ if new_state.is_running(): client = Client() query = """ query($flow_id: uuid) { flow_run( where: { _and: [ {state: {_eq: "Running"}}, {flow_id: {_eq: $flow_id}} ] } ) { id } } """ # pylint: disable=no-member response = client.graphql( query=query, variables=dict(flow_id=prefect.context.flow_id), ) active_flow_runs = response["data"]["flow_run"] if active_flow_runs: logger = prefect.context.get("logger") message = "Skipping this flow run since there are already some flow runs in progress" logger.info(message) return Skipped(message) return new_state
State handler that will skip a flow run if another instance of the flow is already running.
Adapted from Prefect Discourse: https://tinyurl.com/4hn5uz2w
def smart_split(text: str, max_length: int, separator: str = ' ') ‑> List[str]
-
Expand source code
def smart_split( text: str, max_length: int, separator: str = " ", ) -> List[str]: """ Splits a string into a list of strings. """ if len(text) <= max_length: return [text] separator_index = text.rfind(separator, 0, max_length) if (separator_index >= max_length) or (separator_index == -1): raise ValueError( f'Cannot split text "{text}" into {max_length}' f'characters using separator "{separator}"' ) return [ text[:separator_index], *smart_split( text[separator_index + len(separator) :], max_length, separator, ), ]
Splits a string into a list of strings.
def to_json_dataframe(dataframe: pandas.core.frame.DataFrame = None,
csv_path: str | pathlib.Path = None,
key_column: str = None,
read_csv_kwargs: dict = None,
save_to: str | pathlib.Path = None) ‑> pandas.core.frame.DataFrame-
Expand source code
def to_json_dataframe( dataframe: pd.DataFrame = None, csv_path: Union[str, Path] = None, key_column: str = None, read_csv_kwargs: dict = None, save_to: Union[str, Path] = None, ) -> pd.DataFrame: """ Manipulates a dataframe by keeping key_column and moving every other column data to a "content" column in JSON format. Example: - Input dataframe: pd.DataFrame({"key": ["a", "b", "c"], "col1": [1, 2, 3], "col2": [4, 5, 6]}) - Output dataframe: pd.DataFrame({ "key": ["a", "b", "c"], "content": [{"col1": 1, "col2": 4}, {"col1": 2, "col2": 5}, {"col1": 3, "col2": 6}] }) """ if dataframe is None and not csv_path: raise ValueError("dataframe or dataframe_path is required") if csv_path: dataframe = pd.read_csv(csv_path, **read_csv_kwargs) if key_column: dataframe["content"] = dataframe.drop(columns=[key_column]).to_dict( orient="records" ) dataframe = dataframe[["key", "content"]] else: dataframe["content"] = dataframe.to_dict(orient="records") dataframe = dataframe[["content"]] if save_to: dataframe.to_csv(save_to, index=False) return dataframe
Manipulates a dataframe by keeping key_column and moving every other column data to a "content" column in JSON format. Example:
- Input dataframe: pd.DataFrame({"key": ["a", "b", "c"], "col1": [1, 2, 3], "col2": [4, 5, 6]})
- Output dataframe: pd.DataFrame({ "key": ["a", "b", "c"], "content": [{"col1": 1, "col2": 4}, {"col1": 2, "col2": 5}, {"col1": 3, "col2": 6}] })
def to_partitions(data: pandas.core.frame.DataFrame,
partition_columns: List[str],
savepath: str,
data_type: str = 'csv',
suffix: str = None,
build_json_dataframe: bool = False,
dataframe_key_column: str = None) ‑> List[pathlib.Path]-
Expand source code
def to_partitions( data: pd.DataFrame, partition_columns: List[str], savepath: str, data_type: str = "csv", suffix: str = None, build_json_dataframe: bool = False, dataframe_key_column: str = None, ) -> List[Path]: # sourcery skip: raise-specific-error """Save data in to hive patitions schema, given a dataframe and a list of partition columns. Args: data (pandas.core.frame.DataFrame): Dataframe to be partitioned. partition_columns (list): List of columns to be used as partitions. savepath (str, pathlib.PosixPath): folder path to save the partitions Exemple: data = { "ano": [2020, 2021, 2020, 2021, 2020, 2021, 2021,2025], "mes": [1, 2, 3, 4, 5, 6, 6,9], "sigla_uf": ["SP", "SP", "RJ", "RJ", "PR", "PR", "PR","PR"], "dado": ["a", "b", "c", "d", "e", "f", "g",'h'], } to_partitions( data=pd.DataFrame(data), partition_columns=['ano','mes','sigla_uf'], savepath='partitions/' ) """ saved_files = [] if isinstance(data, (pd.core.frame.DataFrame)): savepath = Path(savepath) # create unique combinations between partition columns unique_combinations = ( data[partition_columns] .drop_duplicates(subset=partition_columns) .to_dict(orient="records") ) for filter_combination in unique_combinations: patitions_values = [ f"{partition}={value}" for partition, value in filter_combination.items() ] # get filtered data df_filter = data.loc[ data[filter_combination.keys()] .isin(filter_combination.values()) .all(axis=1), :, ] df_filter = df_filter.drop(columns=partition_columns).reset_index(drop=True) # create folder tree filter_save_path = Path(savepath / "/".join(patitions_values)) filter_save_path.mkdir(parents=True, exist_ok=True) if suffix is not None: file_filter_save_path = ( Path(filter_save_path) / f"data_{suffix}.{data_type}" ) else: file_filter_save_path = Path(filter_save_path) / f"data.{data_type}" if build_json_dataframe: df_filter = to_json_dataframe( df_filter, key_column=dataframe_key_column ) if data_type == "csv": # append data to csv df_filter.to_csv( file_filter_save_path, index=False, mode="a", header=not file_filter_save_path.exists(), ) saved_files.append(file_filter_save_path) elif data_type == "parquet": dataframe_to_parquet(dataframe=df_filter, path=file_filter_save_path) saved_files.append(file_filter_save_path) else: raise ValueError(f"Invalid data type: {data_type}") else: raise BaseException("Data need to be a pandas DataFrame") return saved_files
Save data in to hive patitions schema, given a dataframe and a list of partition columns.
Args
data
:pandas.core.frame.DataFrame
- Dataframe to be partitioned.
partition_columns
:list
- List of columns to be used as partitions.
savepath
:str, pathlib.PosixPath
- folder path to save the partitions
Exemple
data = { "ano": [2020, 2021, 2020, 2021, 2020, 2021, 2021,2025], "mes": [1, 2, 3, 4, 5, 6, 6,9], "sigla_uf": ["SP", "SP", "RJ", "RJ", "PR", "PR", "PR","PR"], "dado": ["a", "b", "c", "d", "e", "f", "g",'h'], } to_partitions( data=pd.DataFrame(data), partition_columns=['ano','mes','sigla_uf'], savepath='partitions/' )
def treat_redis_output(text)
-
Expand source code
def treat_redis_output(text): """ Redis returns a dict where both key and value are byte string Example: {b'date': b'2023-02-27 07:29:04'} """ return {k.decode("utf-8"): v.decode("utf-8") for k, v in text.items()}
Redis returns a dict where both key and value are byte string Example: {b'date': b'2023-02-27 07:29:04'}
def untuple_clocks(clocks)
-
Expand source code
def untuple_clocks(clocks): """ Converts a list of tuples to a list of clocks. """ return [clock[0] if isinstance(clock, tuple) else clock for clock in clocks]
Converts a list of tuples to a list of clocks.
def upload_files_to_storage(bucket_name: str,
prefix: str,
local_path: str | pathlib.Path = None,
files_list: List[str] = None,
mode: str = 'prod')-
Expand source code
def upload_files_to_storage( bucket_name: str, prefix: str, local_path: Union[str, Path] = None, files_list: List[str] = None, mode: str = "prod", ): """ Uploads all files from `local_path` to `bucket_name` with `prefix`. Mode needs to be "prod" or "staging" """ # Either local_path or files_list must be provided if local_path is None and files_list is None: raise ValueError("Either local_path or files_list must be provided") # If local_path is provided, get all files from it if local_path is not None: files_list: List[Path] = list(Path(local_path).glob("**/*")) # Assert all items in files_list are Path objects files_list: List[Path] = [Path(f) for f in files_list] credentials = get_credentials_from_env(mode=mode) storage_client = storage.Client(credentials=credentials) bucket = storage_client.bucket(bucket_name) for file in files_list: if file.is_file(): blob = bucket.blob(f"{prefix}/{file.name}") blob.upload_from_filename(file)
Uploads all files from
local_path
tobucket_name
withprefix
. Mode needs to be "prod" or "staging"