Module pipelines.utils.dump_url.tasks
General purpose tasks for dumping data from URLs.
Functions
def download_url(url: str,
fname: str,
url_type: str = 'direct',
gsheets_sheet_order: int = 0,
gsheets_sheet_name: str = None,
gsheets_sheet_range: str = None) ‑> None-
Expand source code
@task( checkpoint=False, max_retries=constants.TASK_MAX_RETRIES.value, retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value), ) # pylint: disable=R0912,R0914,R0915 def download_url( # pylint: disable=too-many-arguments url: str, fname: str, url_type: str = "direct", gsheets_sheet_order: int = 0, gsheets_sheet_name: str = None, gsheets_sheet_range: str = None, ) -> None: """ Downloads a file from a URL and saves it to a local file. Try to do it without using lots of RAM. It is not optimized for Google Sheets downloads. Args: url: URL to download from. fname: Name of the file to save to. url_type: Type or URL that is being passed. `direct`-> common URL to download directly; `google_drive`-> Google Drive URL; `google_sheet`-> Google Sheet URL. gsheets_sheet_order: Worksheet index, in the case you want to select it by index. \ Worksheet indexes start from zero. gsheets_sheet_name: Worksheet name, in the case you want to select it by name. gsheets_sheet_range: Range in selected worksheet to get data from. Defaults to entire \ worksheet. Returns: None. """ filepath = Path(fname) filepath.parent.mkdir(parents=True, exist_ok=True) if url_type == "google_sheet": url_prefix = "https://docs.google.com/spreadsheets/d/" if not url.startswith(url_prefix): raise ValueError( "URL must start with https://docs.google.com/spreadsheets/d/" f"Invalid URL: {url}" ) log(">>>>> URL is a Google Sheets URL, downloading directly") credentials = get_credentials_from_env( scopes=[ "https://www.googleapis.com/auth/spreadsheets", "https://www.googleapis.com/auth/drive", ] ) gspread_client = gspread.authorize(credentials) sheet = gspread_client.open_by_url(url) if gsheets_sheet_name: worksheet = sheet.worksheet(gsheets_sheet_name) else: worksheet = sheet.get_worksheet(gsheets_sheet_order) if gsheets_sheet_range: # if range is informed, get range from worksheet dataframe = pd.DataFrame(worksheet.batch_get((gsheets_sheet_range,))[0]) else: dataframe = pd.DataFrame(worksheet.get_values()) new_header = dataframe.iloc[0] # grab the first row for the header dataframe = dataframe[1:] # take the data less the header row dataframe.columns = new_header # set the header row as the df header log(f">>>>> Dataframe shape: {dataframe.shape}") log(f">>>>> Dataframe columns: {dataframe.columns}") dataframe.columns = remove_columns_accents(dataframe) log(f">>>>> Dataframe columns after treatment: {dataframe.columns}") dataframe.to_csv(filepath, index=False) elif url_type == "direct": log(">>>>> URL is not a Google Drive URL, downloading directly") req = requests.get(url, stream=True) with open(fname, "wb") as file: for chunk in req.iter_content(chunk_size=1024): if chunk: file.write(chunk) file.flush() elif url_type == "google_drive": log(">>>>> URL is a Google Drive URL, downloading from Google Drive") # URL is in format # https://drive.google.com/file/d/<FILE_ID>/... # We want to extract the FILE_ID log(">>>>> Extracting FILE_ID from URL") url_prefix = "https://drive.google.com/file/d/" if not url.startswith(url_prefix): raise ValueError( "URL must start with https://drive.google.com/file/d/." f"Invalid URL: {url}" ) file_id = url.removeprefix(url_prefix).split("/")[0] log(f">>>>> FILE_ID: {file_id}") creds = get_credentials_from_env( scopes=["https://www.googleapis.com/auth/drive"] ) try: service = build("drive", "v3", credentials=creds) request = service.files().get_media(fileId=file_id) # pylint: disable=E1101 fh = io.FileIO(fname, mode="wb") # pylint: disable=C0103 downloader = MediaIoBaseDownload(fh, request) done = False while done is False: status, done = downloader.next_chunk() log(f"Downloading file... {int(status.progress() * 100)}%.") except HttpError as error: log(f"HTTPError: {error}", "error") raise error else: raise ValueError("Invalid URL type. Please set values to `url_type` parameter")
Downloads a file from a URL and saves it to a local file. Try to do it without using lots of RAM. It is not optimized for Google Sheets downloads.
Args
url
- URL to download from.
fname
- Name of the file to save to.
url_type
- Type or URL that is being passed.
direct
-> common URL to download directly;google_drive
-> Google Drive URL;google_sheet
-> Google Sheet URL. gsheets_sheet_order
- Worksheet index, in the case you want to select it by index. Worksheet indexes start from zero.
gsheets_sheet_name
- Worksheet name, in the case you want to select it by name.
gsheets_sheet_range
- Range in selected worksheet to get data from. Defaults to entire worksheet.
Returns
None.
def dump_files(file_path: str,
partition_columns: List[str],
save_path: str = '.',
chunksize: int = 1000000,
build_json_dataframe: bool = False,
dataframe_key_column: str = None,
encoding: str = 'utf-8',
on_bad_lines: str = 'error',
separator: str = ',') ‑> None-
Expand source code
@task( checkpoint=False, max_retries=constants.TASK_MAX_RETRIES.value, retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value), ) # pylint: disable=R0913 def dump_files( file_path: str, partition_columns: List[str], save_path: str = ".", chunksize: int = 10**6, build_json_dataframe: bool = False, dataframe_key_column: str = None, encoding: str = "utf-8", on_bad_lines: str = "error", separator: str = ",", ) -> None: """ Dump files according to chunk size and read mode """ event_id = datetime.now().strftime("%Y%m%d-%H%M%S") for idx, chunk in enumerate( pd.read_csv( Path(file_path), chunksize=chunksize, encoding=encoding, on_bad_lines=on_bad_lines, sep=separator, ) ): log(f"Dumping batch {idx} with size {chunksize}") handle_dataframe_chunk( dataframe=chunk, save_path=save_path, partition_columns=partition_columns, event_id=event_id, idx=idx, build_json_dataframe=build_json_dataframe, dataframe_key_column=dataframe_key_column, )
Dump files according to chunk size and read mode