Module pipelines.rj_escritorio.rain_dashboard.tasks

Tasks for setting rain data in Redis.

Functions

def dataframe_to_dict(dataframe: pandas.core.frame.DataFrame) ‑> List[Dict[str, str | float]]
Expand source code
@task(checkpoint=False)
def dataframe_to_dict(dataframe: pd.DataFrame) -> List[Dict[str, Union[str, float]]]:
    """
    Convert dataframe to dictionary
    """
    log("Converting dataframe to dictionary...")
    return dataframe.to_dict(orient="records")

Convert dataframe to dictionary

def get_data(query: str, mode: str = 'prod') ‑> pandas.core.frame.DataFrame
Expand source code
@task(checkpoint=False)
def get_data(query: str, mode: str = "prod") -> pd.DataFrame:
    """
    Load rain data from BigQuery
    """

    # Get billing project ID
    log("Inferring billing project ID from environment.")
    billing_project_id: str = None
    try:
        bd_base = Base()
        billing_project_id = bd_base.config["gcloud-projects"][mode]["name"]
    except KeyError:
        pass
    if not billing_project_id:
        raise ValueError(
            "billing_project_id must be either provided or inferred from environment variables"
        )
    log(f"Billing project ID: {billing_project_id}")

    # Load data
    log("Loading data from BigQuery...")
    dataframe = bd.read_sql(
        query=query, billing_project_id=billing_project_id, from_file=True
    )

    # Type assertions
    if "chuva_15min" in dataframe.columns:
        dataframe["chuva_15min"] = dataframe["chuva_15min"].astype("float64")
    else:
        log(
            'Column "chuva_15min" not found in dataframe, skipping type assertion.',
            "warning",
        )

    log("Data loaded successfully.")

    return dataframe

Load rain data from BigQuery

def set_redis_key(key: str,
value: List[Dict[str, str | float]],
host: str = 'redis.redis.svc.cluster.local',
port: int = 6379,
db: int = 0) ‑> None
Expand source code
@task(checkpoint=False)
def set_redis_key(
    key: str,
    value: List[Dict[str, Union[str, float]]],
    host: str = "redis.redis.svc.cluster.local",
    port: int = 6379,
    db: int = 0,  # pylint: disable=C0103
) -> None:
    """
    Set Redis key
    """
    log("Setting Redis key...")
    redis_client = get_redis_client(host=host, port=port, db=db)
    redis_client.set(key, value)
    log("Redis key set successfully.")
    log(f"key: {key} and value: {value}")

Set Redis key