Module pipelines.rj_escritorio.waze.tasks
Tasks for emd
Functions
def fecth_waze(areas: pandas.core.frame.DataFrame, wait=None) ‑> list
-
Expand source code
@task( max_retries=constants.TASK_MAX_RETRIES.value, retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value), ) def fecth_waze(areas: pd.DataFrame, wait=None) -> list: """ Fetch data from waze. """ coords = areas["coords"].to_list() base_url = "https://www.waze.com/row-rtserver/web/TGeoRSS?bottom={bottom}&left={left}&ma=200&mj=200&mu=20&right={right}&top={top}&types=alerts" headers = {} payload = {} res = [] for coord in coords: url = base_url.format(**coord) response = requests.request("GET", url, headers=headers, data=payload) res.append(response.json()) return res
Fetch data from waze.
def load_geometries(wait=None) ‑> pandas.core.frame.DataFrame
-
Expand source code
@task( max_retries=constants.TASK_MAX_RETRIES.value, retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value), ) def load_geometries(wait=None) -> pd.DataFrame: """ Loads the geometries from the database. """ areas_dict = { "geometry": { 0: "POLYGON ((-43.62216601277018 -23.08289269734031, -43.79653853090349 -23.08289269734031, -43.79653853090349 -22.91445704293636, -43.62216601277018 -22.91445704293636, -43.62216601277018 -23.08289269734031))", 1: "POLYGON ((-43.62216601277018 -22.74602138853241, -43.62216601277018 -22.91445704293636, -43.79653853090349 -22.91445704293636, -43.79653853090349 -22.74602138853241, -43.62216601277018 -22.74602138853241))", 2: "POLYGON ((-43.44779349463688 -23.08289269734031, -43.62216601277018 -23.08289269734031, -43.62216601277018 -22.91445704293636, -43.44779349463688 -22.91445704293636, -43.44779349463688 -23.08289269734031))", 3: "POLYGON ((-43.44779349463688 -22.91445704293636, -43.62216601277018 -22.91445704293636, -43.62216601277018 -22.74602138853241, -43.44779349463688 -22.74602138853241, -43.44779349463688 -22.91445704293636))", 4: "POLYGON ((-43.44779349463688 -23.08289269734031, -43.44779349463688 -22.91445704293636, -43.36060723557023 -22.91445704293636, -43.36060723557023 -23.08289269734031, -43.44779349463688 -23.08289269734031))", 5: "POLYGON ((-43.36060723557023 -23.08289269734031, -43.36060723557023 -22.91445704293636, -43.27342097650357 -22.91445704293636, -43.27342097650357 -23.08289269734031, -43.36060723557023 -23.08289269734031))", 6: "POLYGON ((-43.44779349463688 -22.83023921573439, -43.36060723557023 -22.83023921573439, -43.36060723557023 -22.91445704293636, -43.44779349463688 -22.91445704293636, -43.44779349463688 -22.83023921573439))", 7: "POLYGON ((-43.44779349463688 -22.74602138853241, -43.36060723557023 -22.74602138853241, -43.36060723557023 -22.83023921573439, -43.44779349463688 -22.83023921573439, -43.44779349463688 -22.74602138853241))", 8: "POLYGON ((-43.36060723557023 -22.91445704293636, -43.36060723557023 -22.83023921573439, -43.27342097650357 -22.83023921573439, -43.27342097650357 -22.91445704293636, -43.36060723557023 -22.91445704293636))", 9: "POLYGON ((-43.3170141060369 -22.74602138853241, -43.3170141060369 -22.83023921573439, -43.36060723557023 -22.83023921573439, -43.36060723557023 -22.74602138853241, -43.3170141060369 -22.74602138853241))", 10: "POLYGON ((-43.27342097650357 -22.74602138853241, -43.27342097650357 -22.83023921573439, -43.3170141060369 -22.83023921573439, -43.3170141060369 -22.74602138853241, -43.27342097650357 -22.74602138853241))", 11: "POLYGON ((-43.27342097650357 -22.99867487013834, -43.18623471743692 -22.99867487013834, -43.18623471743692 -23.08289269734031, -43.27342097650357 -23.08289269734031, -43.27342097650357 -22.99867487013834))", 12: "POLYGON ((-43.22982784697025 -22.91445704293636, -43.22982784697025 -22.99867487013834, -43.27342097650357 -22.99867487013834, -43.27342097650357 -22.91445704293636, -43.22982784697025 -22.91445704293636))", 13: "POLYGON ((-43.18623471743692 -22.91445704293636, -43.18623471743692 -22.99867487013834, -43.22982784697025 -22.99867487013834, -43.22982784697025 -22.91445704293636, -43.18623471743692 -22.91445704293636))", 14: "POLYGON ((-43.18623471743692 -23.08289269734031, -43.18623471743692 -22.91445704293636, -43.09904845837026 -22.91445704293636, -43.09904845837026 -23.08289269734031, -43.18623471743692 -23.08289269734031))", 15: "POLYGON ((-43.22982784697025 -22.83023921573439, -43.22982784697025 -22.91445704293636, -43.27342097650357 -22.91445704293636, -43.27342097650357 -22.83023921573439, -43.22982784697025 -22.83023921573439))", 16: "POLYGON ((-43.18623471743692 -22.83023921573439, -43.18623471743692 -22.91445704293636, -43.22982784697025 -22.91445704293636, -43.22982784697025 -22.83023921573439, -43.18623471743692 -22.83023921573439))", 17: "POLYGON ((-43.27342097650357 -22.74602138853241, -43.18623471743692 -22.74602138853241, -43.18623471743692 -22.83023921573439, -43.27342097650357 -22.83023921573439, -43.27342097650357 -22.74602138853241))", 18: "POLYGON ((-43.18623471743692 -22.91445704293636, -43.18623471743692 -22.74602138853241, -43.09904845837026 -22.74602138853241, -43.09904845837026 -22.91445704293636, -43.18623471743692 -22.91445704293636))", }, "bounds": { 0: "POLYGON ((-43.79653853090349 -23.08289269734031, -43.62216601277018 -23.08289269734031, -43.62216601277018 -22.91445704293636, -43.79653853090349 -22.91445704293636, -43.79653853090349 -23.08289269734031))", 1: "POLYGON ((-43.79653853090349 -22.91445704293636, -43.62216601277018 -22.91445704293636, -43.62216601277018 -22.74602138853241, -43.79653853090349 -22.74602138853241, -43.79653853090349 -22.91445704293636))", 2: "POLYGON ((-43.62216601277018 -23.08289269734031, -43.44779349463688 -23.08289269734031, -43.44779349463688 -22.91445704293636, -43.62216601277018 -22.91445704293636, -43.62216601277018 -23.08289269734031))", 3: "POLYGON ((-43.62216601277018 -22.91445704293636, -43.44779349463688 -22.91445704293636, -43.44779349463688 -22.74602138853241, -43.62216601277018 -22.74602138853241, -43.62216601277018 -22.91445704293636))", 4: "POLYGON ((-43.44779349463688 -23.08289269734031, -43.36060723557023 -23.08289269734031, -43.36060723557023 -22.91445704293636, -43.44779349463688 -22.91445704293636, -43.44779349463688 -23.08289269734031))", 5: "POLYGON ((-43.36060723557023 -23.08289269734031, -43.27342097650357 -23.08289269734031, -43.27342097650357 -22.91445704293636, -43.36060723557023 -22.91445704293636, -43.36060723557023 -23.08289269734031))", 6: "POLYGON ((-43.44779349463688 -22.91445704293636, -43.36060723557023 -22.91445704293636, -43.36060723557023 -22.83023921573439, -43.44779349463688 -22.83023921573439, -43.44779349463688 -22.91445704293636))", 7: "POLYGON ((-43.44779349463688 -22.83023921573439, -43.36060723557023 -22.83023921573439, -43.36060723557023 -22.74602138853241, -43.44779349463688 -22.74602138853241, -43.44779349463688 -22.83023921573439))", 8: "POLYGON ((-43.36060723557023 -22.91445704293636, -43.27342097650357 -22.91445704293636, -43.27342097650357 -22.83023921573439, -43.36060723557023 -22.83023921573439, -43.36060723557023 -22.91445704293636))", 9: "POLYGON ((-43.36060723557023 -22.83023921573439, -43.3170141060369 -22.83023921573439, -43.3170141060369 -22.74602138853241, -43.36060723557023 -22.74602138853241, -43.36060723557023 -22.83023921573439))", 10: "POLYGON ((-43.3170141060369 -22.83023921573439, -43.27342097650357 -22.83023921573439, -43.27342097650357 -22.74602138853241, -43.3170141060369 -22.74602138853241, -43.3170141060369 -22.83023921573439))", 11: "POLYGON ((-43.27342097650357 -23.08289269734031, -43.18623471743692 -23.08289269734031, -43.18623471743692 -22.99867487013834, -43.27342097650357 -22.99867487013834, -43.27342097650357 -23.08289269734031))", 12: "POLYGON ((-43.27342097650357 -22.99867487013834, -43.22982784697025 -22.99867487013834, -43.22982784697025 -22.91445704293636, -43.27342097650357 -22.91445704293636, -43.27342097650357 -22.99867487013834))", 13: "POLYGON ((-43.22982784697025 -22.99867487013834, -43.18623471743692 -22.99867487013834, -43.18623471743692 -22.91445704293636, -43.22982784697025 -22.91445704293636, -43.22982784697025 -22.99867487013834))", 14: "POLYGON ((-43.18623471743692 -23.08289269734031, -43.09904845837026 -23.08289269734031, -43.09904845837026 -22.91445704293636, -43.18623471743692 -22.91445704293636, -43.18623471743692 -23.08289269734031))", 15: "POLYGON ((-43.27342097650357 -22.91445704293636, -43.22982784697025 -22.91445704293636, -43.22982784697025 -22.83023921573439, -43.27342097650357 -22.83023921573439, -43.27342097650357 -22.91445704293636))", 16: "POLYGON ((-43.22982784697025 -22.91445704293636, -43.18623471743692 -22.91445704293636, -43.18623471743692 -22.83023921573439, -43.22982784697025 -22.83023921573439, -43.22982784697025 -22.91445704293636))", 17: "POLYGON ((-43.27342097650357 -22.83023921573439, -43.18623471743692 -22.83023921573439, -43.18623471743692 -22.74602138853241, -43.27342097650357 -22.74602138853241, -43.27342097650357 -22.83023921573439))", 18: "POLYGON ((-43.18623471743692 -22.91445704293636, -43.09904845837026 -22.91445704293636, -43.09904845837026 -22.74602138853241, -43.18623471743692 -22.74602138853241, -43.18623471743692 -22.91445704293636))", }, "ts": { 0: "2022-02-08T11:21:53.205525", 1: "2022-02-08T11:21:53.205525", 2: "2022-02-08T11:21:53.205525", 3: "2022-02-08T11:21:53.205525", 4: "2022-02-08T11:21:53.205525", 5: "2022-02-08T11:21:53.205525", 6: "2022-02-08T11:21:53.205525", 7: "2022-02-08T11:21:53.205525", 8: "2022-02-08T11:21:53.205525", 9: "2022-02-08T11:21:53.205525", 10: "2022-02-08T11:21:53.205525", 11: "2022-02-08T11:21:53.205525", 12: "2022-02-08T11:21:53.205525", 13: "2022-02-08T11:21:53.205525", 14: "2022-02-08T11:21:53.205525", 15: "2022-02-08T11:21:53.205525", 16: "2022-02-08T11:21:53.205525", 17: "2022-02-08T11:21:53.205525", 18: "2022-02-08T11:21:53.205525", }, "coords": { 0: "{'left': -43.79653853090349, 'bottom': -23.08289269734031, 'right': -43.62216601277018, 'top': -22.91445704293636}", 1: "{'left': -43.79653853090349, 'bottom': -22.91445704293636, 'right': -43.62216601277018, 'top': -22.74602138853241}", 2: "{'left': -43.62216601277018, 'bottom': -23.08289269734031, 'right': -43.44779349463688, 'top': -22.91445704293636}", 3: "{'left': -43.62216601277018, 'bottom': -22.91445704293636, 'right': -43.44779349463688, 'top': -22.74602138853241}", 4: "{'left': -43.44779349463688, 'bottom': -23.08289269734031, 'right': -43.36060723557023, 'top': -22.91445704293636}", 5: "{'left': -43.36060723557023, 'bottom': -23.08289269734031, 'right': -43.27342097650357, 'top': -22.91445704293636}", 6: "{'left': -43.44779349463688, 'bottom': -22.91445704293636, 'right': -43.36060723557023, 'top': -22.83023921573439}", 7: "{'left': -43.44779349463688, 'bottom': -22.83023921573439, 'right': -43.36060723557023, 'top': -22.74602138853241}", 8: "{'left': -43.36060723557023, 'bottom': -22.91445704293636, 'right': -43.27342097650357, 'top': -22.83023921573439}", 9: "{'left': -43.36060723557023, 'bottom': -22.83023921573439, 'right': -43.3170141060369, 'top': -22.74602138853241}", 10: "{'left': -43.3170141060369, 'bottom': -22.83023921573439, 'right': -43.27342097650357, 'top': -22.74602138853241}", 11: "{'left': -43.27342097650357, 'bottom': -23.08289269734031, 'right': -43.18623471743692, 'top': -22.99867487013834}", 12: "{'left': -43.27342097650357, 'bottom': -22.99867487013834, 'right': -43.22982784697025, 'top': -22.91445704293636}", 13: "{'left': -43.22982784697025, 'bottom': -22.99867487013834, 'right': -43.18623471743692, 'top': -22.91445704293636}", 14: "{'left': -43.18623471743692, 'bottom': -23.08289269734031, 'right': -43.09904845837026, 'top': -22.91445704293636}", 15: "{'left': -43.27342097650357, 'bottom': -22.91445704293636, 'right': -43.22982784697025, 'top': -22.83023921573439}", 16: "{'left': -43.22982784697025, 'bottom': -22.91445704293636, 'right': -43.18623471743692, 'top': -22.83023921573439}", 17: "{'left': -43.27342097650357, 'bottom': -22.83023921573439, 'right': -43.18623471743692, 'top': -22.74602138853241}", 18: "{'left': -43.18623471743692, 'bottom': -22.91445704293636, 'right': -43.09904845837026, 'top': -22.74602138853241}", }, } areas = pd.DataFrame.from_dict(areas_dict) areas["bounds"] = areas["bounds"].apply(loads) areas["coords"] = areas["bounds"].apply( lambda x: dict(zip(["left", "bottom", "right", "top"], x.bounds)) ) return areas
Loads the geometries from the database.
def normalize_data(responses: list, wait=None) ‑> pandas.core.frame.DataFrame
-
Expand source code
@task( max_retries=constants.TASK_MAX_RETRIES.value, retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value), ) def normalize_data(responses: list, wait=None) -> pd.DataFrame: """ Normalize data. """ normalized = [] for data in responses: alerts = data.get("alerts", []) for dictionary in alerts: normalized.append( { "date": datetime.fromisoformat(data["startTime"][:10]), "ts": datetime.fromisoformat(data["startTime"]), "ts_alert_creation": datetime.fromtimestamp( int(str(dictionary.get("pubMillis"))[:10]) ), "uuid": dictionary.get("uuid"), "country": dictionary.get("country"), "city": dictionary.get("city"), "street": dictionary.get("street"), "type": dictionary.get("type"), "subtype": dictionary.get("subtype"), "road_type": dictionary.get("roadType"), "reliability": dictionary.get("reliability"), "confidence": dictionary.get("confidence"), "number_thumbs_up": dictionary.get("nThumbsUp"), "number_comments": dictionary.get("nComments"), "report_mood": dictionary.get("reportMood"), "magvar": dictionary.get("magvar"), "report_rating": dictionary.get("reportRating"), "geometry": "POINT ({x} {y})".format( **dictionary.get("location") ), # pylint: disable=consider-using-f-string } ) return pd.concat([pd.DataFrame([r]) for r in normalized])
Normalize data.
def upload_to_native_table(dataset_id: str, table_id: str, dataframe: pandas.core.frame.DataFrame, wait=None) ‑> None
-
Expand source code
@task( max_retries=constants.TASK_MAX_RETRIES.value, retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value), ) def upload_to_native_table( dataset_id: str, table_id: str, dataframe: pd.DataFrame, wait=None ) -> None: """ Upload data to native table. """ table = bd.Table(dataset_id=dataset_id, table_id=table_id) schema = [ bigquery.SchemaField("date", "DATE"), bigquery.SchemaField("ts", "TIMESTAMP"), bigquery.SchemaField("ts_alert_creation", "TIMESTAMP"), bigquery.SchemaField("uuid", "STRING"), bigquery.SchemaField("country", "STRING"), bigquery.SchemaField("city", "STRING"), bigquery.SchemaField("street", "STRING"), bigquery.SchemaField("type", "STRING"), bigquery.SchemaField("subtype", "STRING"), bigquery.SchemaField("road_type", "INT64"), bigquery.SchemaField("reliability", "INT64"), bigquery.SchemaField("confidence", "INT64"), bigquery.SchemaField("number_thumbs_up", "INT64"), bigquery.SchemaField("number_comments", "INT64"), bigquery.SchemaField("report_mood", "INT64"), bigquery.SchemaField("magvar", "INT64"), bigquery.SchemaField("report_rating", "INT64"), bigquery.SchemaField("geometry", "GEOGRAPHY"), ] job_config = bigquery.LoadJobConfig( schema=schema, # Optionally, set the write disposition. BigQuery appends loaded rows # to an existing table by default, but with WRITE_TRUNCATE write # disposition it replaces the table with the loaded data. write_disposition="WRITE_APPEND", time_partitioning=bigquery.TimePartitioning( type_=bigquery.TimePartitioningType.DAY, field="date", # name of column to use for partitioning ), ) job = table.client["bigquery_prod"].load_table_from_dataframe( dataframe, table.table_full_name["prod"], job_config=job_config ) job.result()
Upload data to native table.