Module pipelines.rj_smtr.br_rj_riodejaneiro_sigmob.tasks
Tasks for br_rj_riodejaneiro_sigmob
Functions
def request_data(endpoints: dict)
-
Expand source code
@task(max_retries=3, retry_delay=timedelta(seconds=30)) def request_data(endpoints: dict): """Request data from multiple API's Args: endpoints (dict): dict contaning the API id, URL and unique_key column Raises: e: Any exception during the request. Usually timeouts Returns: dict: containing the paths for the data saved during each request """ # Get resources run_date = pendulum.now(constants.TIMEZONE.value).date() # Initialize empty dict for storing file paths paths_dict = {} # Iterate over endpoints for key in endpoints.keys(): log("#" * 80) log(f"KEY = {key}") # Start with empty contents, page count = 0 and file_id = 0 contents = None file_id = 0 page_count = 0 # Setup a template for every CSV file path_template = jinja2.Template( "{{run_date}}/{{key}}/data_versao={{run_date}}/{{key}}_version-{{run_date}}-{{id}}.csv" ) # The first next_page is the initial URL next_page = endpoints[key]["url"] # Iterate over pages while next_page: page_count += 1 try: # Get data log(f"URL = {next_page}") data = requests.get( next_page, timeout=constants.SIGMOB_GET_REQUESTS_TIMEOUT.value ) # Raise exception if not 200 data.raise_for_status() data = data.json() # Store contents if contents is None: contents = { "data": data["result"] if "result" in data else data["data"], "key_column": endpoints[key]["key_column"], } else: contents["data"].extend(data["data"]) # pylint: disable=E1136 # Get next page if "next" in data and data["next"] != "EOF" and data["next"] != "": next_page = data["next"] else: next_page = None except Exception as unknown_error: err = traceback.format_exc() log(err) log_critical(f"Failed to request data from SIGMOB: \n{err}") raise unknown_error # Create a new file for every (constants.SIGMOB_PAGES_FOR_CSV_FILE.value) pages if page_count % constants.SIGMOB_PAGES_FOR_CSV_FILE.value == 0: # Increment file ID file_id += 1 # "{{run_date}}/{{key}}/data_versao={{run_date}}/{{key}}_version-{{run_date}}-{{file_id}}.csv" path = Path( path_template.render( run_date=run_date, key=key, id="{:03}".format(file_id) ) ) log(f"Reached page count {page_count}, saving file at {path}") # If it's the first file, create directories and save path if file_id == 1: paths_dict[key] = path path.parent.mkdir(parents=True, exist_ok=True) # Save CSV file generate_df_and_save(contents, path) # Reset contents contents = None # Save last file if contents is not None: file_id += 1 path = Path( path_template.render( run_date=run_date, key=key, id="{:03}".format(file_id) ) ) if file_id == 1: paths_dict[key] = path path.parent.mkdir(parents=True, exist_ok=True) generate_df_and_save(contents, path) log(f"Saved last file with page count {page_count} at {path}") # Move on to the next endpoint. # Return paths return paths_dict
Request data from multiple API's
Args
endpoints
:dict
- dict contaning the API id, URL and unique_key column
Raises
e
- Any exception during the request. Usually timeouts
Returns
dict
- containing the paths for the data saved during each request