Module pipelines.rj_escritorio.tweets_flamengo.tasks

Tasks for twitter scraping.

Functions

def creat_path_tree(path)
Expand source code
def creat_path_tree(path):
    """
    Creates a path tree.
    """
    current_path = ""
    for folder in path.split("/"):
        current_path += f"{folder}/"
        if folder != ".." and not os.path.isdir(current_path):
            os.mkdir(current_path)

Creates a path tree.

def decode_env(value: str)
Expand source code
def decode_env(value: str):
    """
    Decodes a base64 value.
    """
    return base64.b64decode(value).decode()

Decodes a base64 value.

def fetch_last_id(q)
Expand source code
@task
def fetch_last_id(q):  # pylint: disable=C0103
    """
    Download last_id table from storage.
    """
    q_folder = q.replace(" ", "_").replace("-", "_")
    # pylint: disable=C0103
    st = bd.Storage(
        dataset_id="twitter_flamengo",
        table_id="last_id",
    )
    try:
        st.download(
            filename=f"{q_folder}.csv",
            savepath="data",
            partitions=f"q={q_folder}/",
            mode="staging",
            if_not_exists="raise",
        )

        pre_path = f"data/staging/twitter_flamengo/last_id/q={q_folder}"
        df = pd.read_csv(f"{pre_path}/{q_folder}.csv")  # pylint: disable=C0103
        if "q" in df.columns.tolist():
            df.columns = ["id", "created_at", "q"]
            df.drop("q", 1).to_csv(  # pylint: disable=E1101
                f"{pre_path}/{q_folder}.csv", index=False
            )
    except FileNotFoundError:
        log(f"No table {q_folder} in storage")

    return "data/staging"

Download last_id table from storage.

def fetch_tweets(api, q, last_id, created_at)
Expand source code
@task
def fetch_tweets(api, q, last_id, created_at):  # pylint: disable=C0103
    """
    Scrapy tweets since last_id in batchs of 100 tweets.
    """
    q_folder = q.replace(" ", "_").replace("-", "_")
    dt = datetime.today().strftime("%Y-%m-%d-%H-%M-%S")  # pylint: disable=C0103
    first_page_df = None
    log(f"{q} | last_id: {last_id} | created_at: {created_at} | file: {dt}")

    for i, page in enumerate(
        tweepy.Cursor(api.search_tweets, q=q, since_id=last_id, count=100).pages(100),
        start=1,
    ):
        json_data = [t._json for t in page]  # pylint: disable=W0212
        dd = pd.json_normalize(json_data)  # pylint: disable=C0103
        dd.columns = normalize_cols(dd.columns)

        cols_343 = [
            "created_at",
            "id",
            "id_str",
            "text",
            "truncated",
            "source",
            "in_reply_to_status_id",
            "in_reply_to_status_id_str",
            "in_reply_to_user_id",
            "in_reply_to_user_id_str",
            "in_reply_to_screen_name",
            "geo",
            "coordinates",
            "place",
            "contributors",
            "is_quote_status",
            "retweet_count",
            "favorite_count",
            "favorited",
            "retweeted",
            "lang",
            "entitieshashtags",
            "entitiessymbols",
            "entitiesuser_mentions",
            "entitiesurls",
            "metadataiso_language_code",
            "metadataresult_type",
            "userid",
            "userid_str",
            "username",
            "userscreen_name",
            "userlocation",
            "userdescription",
            "userurl",
            "userentitiesurlurls",
            "userentitiesdescriptionurls",
            "userprotected",
            "userfollowers_count",
            "userfriends_count",
            "userlisted_count",
            "usercreated_at",
            "userfavourites_count",
            "userutc_offset",
            "usertime_zone",
            "usergeo_enabled",
            "userverified",
            "userstatuses_count",
            "userlang",
            "usercontributors_enabled",
            "useris_translator",
            "useris_translation_enabled",
            "userprofile_background_color",
            "userprofile_background_image_url",
            "userprofile_background_image_url_https",
            "userprofile_background_tile",
            "userprofile_image_url",
            "userprofile_image_url_https",
            "userprofile_banner_url",
            "userprofile_link_color",
            "userprofile_sidebar_border_color",
            "userprofile_sidebar_fill_color",
            "userprofile_text_color",
            "userprofile_use_background_image",
            "userhas_extended_profile",
            "userdefault_profile",
            "userdefault_profile_image",
            "userfollowing",
            "userfollow_request_sent",
            "usernotifications",
            "usertranslator_type",
            "userwithheld_in_countries",
            "retweeted_statuscreated_at",
            "retweeted_statusid",
            "retweeted_statusid_str",
            "retweeted_statustext",
            "retweeted_statustruncated",
            "retweeted_statusentitieshashtags",
            "retweeted_statusentitiessymbols",
            "retweeted_statusentitiesuser_mentions",
            "retweeted_statusentitiesurls",
            "retweeted_statusmetadataiso_language_code",
            "retweeted_statusmetadataresult_type",
            "retweeted_statussource",
            "retweeted_statusin_reply_to_status_id",
            "retweeted_statusin_reply_to_status_id_str",
            "retweeted_statusin_reply_to_user_id",
            "retweeted_statusin_reply_to_user_id_str",
            "retweeted_statusin_reply_to_screen_name",
            "retweeted_statususerid",
            "retweeted_statususerid_str",
            "retweeted_statususername",
            "retweeted_statususerscreen_name",
            "retweeted_statususerlocation",
            "retweeted_statususerdescription",
            "retweeted_statususerurl",
            "retweeted_statususerentitiesdescriptionurls",
            "retweeted_statususerprotected",
            "retweeted_statususerfollowers_count",
            "retweeted_statususerfriends_count",
            "retweeted_statususerlisted_count",
            "retweeted_statususercreated_at",
            "retweeted_statususerfavourites_count",
            "retweeted_statususerutc_offset",
            "retweeted_statususertime_zone",
            "retweeted_statususergeo_enabled",
            "retweeted_statususerverified",
            "retweeted_statususerstatuses_count",
            "retweeted_statususerlang",
            "retweeted_statususercontributors_enabled",
            "retweeted_statususeris_translator",
            "retweeted_statususeris_translation_enabled",
            "retweeted_statususerprofile_background_color",
            "retweeted_statususerprofile_background_image_url",
            "retweeted_statususerprofile_background_image_url_https",
            "retweeted_statususerprofile_background_tile",
            "retweeted_statususerprofile_image_url",
            "retweeted_statususerprofile_image_url_https",
            "retweeted_statususerprofile_banner_url",
            "retweeted_statususerprofile_link_color",
            "retweeted_statususerprofile_sidebar_border_color",
            "retweeted_statususerprofile_sidebar_fill_color",
            "retweeted_statususerprofile_text_color",
            "retweeted_statususerprofile_use_background_image",
            "retweeted_statususerhas_extended_profile",
            "retweeted_statususerdefault_profile",
            "retweeted_statususerdefault_profile_image",
            "retweeted_statususerfollowing",
            "retweeted_statususerfollow_request_sent",
            "retweeted_statususernotifications",
            "retweeted_statususertranslator_type",
            "retweeted_statususerwithheld_in_countries",
            "retweeted_statusgeo",
            "retweeted_statuscoordinates",
            "retweeted_statusplace",
            "retweeted_statuscontributors",
            "retweeted_statusis_quote_status",
            "retweeted_statusretweet_count",
            "retweeted_statusfavorite_count",
            "retweeted_statusfavorited",
            "retweeted_statusretweeted",
            "retweeted_statuslang",
            "possibly_sensitive",
            "entitiesmedia",
            "extended_entitiesmedia",
            "retweeted_statusentitiesmedia",
            "retweeted_statusextended_entitiesmedia",
            "retweeted_statususerentitiesurlurls",
            "retweeted_statuspossibly_sensitive",
            "placeid",
            "placeurl",
            "placeplace_type",
            "placename",
            "placefull_name",
            "placecountry_code",
            "placecountry",
            "placecontained_within",
            "placebounding_boxtype",
            "placebounding_boxcoordinates",
            "quoted_status_id",
            "quoted_status_id_str",
            "quoted_statuscreated_at",
            "quoted_statusid",
            "quoted_statusid_str",
            "quoted_statustext",
            "quoted_statustruncated",
            "quoted_statusentitieshashtags",
            "quoted_statusentitiessymbols",
            "quoted_statusentitiesuser_mentions",
            "quoted_statusentitiesurls",
            "quoted_statusentitiesmedia",
            "quoted_statusextended_entitiesmedia",
            "quoted_statusmetadataiso_language_code",
            "quoted_statusmetadataresult_type",
            "quoted_statussource",
            "quoted_statusin_reply_to_status_id",
            "quoted_statusin_reply_to_status_id_str",
            "quoted_statusin_reply_to_user_id",
            "quoted_statusin_reply_to_user_id_str",
            "quoted_statusin_reply_to_screen_name",
            "quoted_statususerid",
            "quoted_statususerid_str",
            "quoted_statususername",
            "quoted_statususerscreen_name",
            "quoted_statususerlocation",
            "quoted_statususerdescription",
            "quoted_statususerurl",
            "quoted_statususerentitiesurlurls",
            "quoted_statususerentitiesdescriptionurls",
            "quoted_statususerprotected",
            "quoted_statususerfollowers_count",
            "quoted_statususerfriends_count",
            "quoted_statususerlisted_count",
            "quoted_statususercreated_at",
            "quoted_statususerfavourites_count",
            "quoted_statususerutc_offset",
            "quoted_statususertime_zone",
            "quoted_statususergeo_enabled",
            "quoted_statususerverified",
            "quoted_statususerstatuses_count",
            "quoted_statususerlang",
            "quoted_statususercontributors_enabled",
            "quoted_statususeris_translator",
            "quoted_statususeris_translation_enabled",
            "quoted_statususerprofile_background_color",
            "quoted_statususerprofile_background_image_url",
            "quoted_statususerprofile_background_image_url_https",
            "quoted_statususerprofile_background_tile",
            "quoted_statususerprofile_image_url",
            "quoted_statususerprofile_image_url_https",
            "quoted_statususerprofile_banner_url",
            "quoted_statususerprofile_link_color",
            "quoted_statususerprofile_sidebar_border_color",
            "quoted_statususerprofile_sidebar_fill_color",
            "quoted_statususerprofile_text_color",
            "quoted_statususerprofile_use_background_image",
            "quoted_statususerhas_extended_profile",
            "quoted_statususerdefault_profile",
            "quoted_statususerdefault_profile_image",
            "quoted_statususerfollowing",
            "quoted_statususerfollow_request_sent",
            "quoted_statususernotifications",
            "quoted_statususertranslator_type",
            "quoted_statususerwithheld_in_countries",
            "quoted_statusgeo",
            "quoted_statuscoordinates",
            "quoted_statusplaceid",
            "quoted_statusplaceurl",
            "quoted_statusplaceplace_type",
            "quoted_statusplacename",
            "quoted_statusplacefull_name",
            "quoted_statusplacecountry_code",
            "quoted_statusplacecountry",
            "quoted_statusplacecontained_within",
            "quoted_statusplacebounding_boxtype",
            "quoted_statusplacebounding_boxcoordinates",
            "quoted_statuscontributors",
            "quoted_statusis_quote_status",
            "quoted_statusretweet_count",
            "quoted_statusfavorite_count",
            "quoted_statusfavorited",
            "quoted_statusretweeted",
            "quoted_statuspossibly_sensitive",
            "quoted_statuslang",
            "retweeted_statusplaceid",
            "retweeted_statusplaceurl",
            "retweeted_statusplaceplace_type",
            "retweeted_statusplacename",
            "retweeted_statusplacefull_name",
            "retweeted_statusplacecountry_code",
            "retweeted_statusplacecountry",
            "retweeted_statusplacecontained_within",
            "retweeted_statusplacebounding_boxtype",
            "retweeted_statusplacebounding_boxcoordinates",
            "retweeted_statusquoted_status_id",
            "retweeted_statusquoted_status_id_str",
            "retweeted_statusquoted_statuscreated_at",
            "retweeted_statusquoted_statusid",
            "retweeted_statusquoted_statusid_str",
            "retweeted_statusquoted_statustext",
            "retweeted_statusquoted_statustruncated",
            "retweeted_statusquoted_statusentitieshashtags",
            "retweeted_statusquoted_statusentitiessymbols",
            "retweeted_statusquoted_statusentitiesuser_mentions",
            "retweeted_statusquoted_statusentitiesurls",
            "retweeted_statusquoted_statusentitiesmedia",
            "retweeted_statusquoted_statusextended_entitiesmedia",
            "retweeted_statusquoted_statusmetadataiso_language_code",
            "retweeted_statusquoted_statusmetadataresult_type",
            "retweeted_statusquoted_statussource",
            "retweeted_statusquoted_statusin_reply_to_status_id",
            "retweeted_statusquoted_statusin_reply_to_status_id_str",
            "retweeted_statusquoted_statusin_reply_to_user_id",
            "retweeted_statusquoted_statusin_reply_to_user_id_str",
            "retweeted_statusquoted_statusin_reply_to_screen_name",
            "retweeted_statusquoted_statususerid",
            "retweeted_statusquoted_statususerid_str",
            "retweeted_statusquoted_statususername",
            "retweeted_statusquoted_statususerscreen_name",
            "retweeted_statusquoted_statususerlocation",
            "retweeted_statusquoted_statususerdescription",
            "retweeted_statusquoted_statususerurl",
            "retweeted_statusquoted_statususerentitiesurlurls",
            "retweeted_statusquoted_statususerentitiesdescriptionurls",
            "retweeted_statusquoted_statususerprotected",
            "retweeted_statusquoted_statususerfollowers_count",
            "retweeted_statusquoted_statususerfriends_count",
            "retweeted_statusquoted_statususerlisted_count",
            "retweeted_statusquoted_statususercreated_at",
            "retweeted_statusquoted_statususerfavourites_count",
            "retweeted_statusquoted_statususerutc_offset",
            "retweeted_statusquoted_statususertime_zone",
            "retweeted_statusquoted_statususergeo_enabled",
            "retweeted_statusquoted_statususerverified",
            "retweeted_statusquoted_statususerstatuses_count",
            "retweeted_statusquoted_statususerlang",
            "retweeted_statusquoted_statususercontributors_enabled",
            "retweeted_statusquoted_statususeris_translator",
            "retweeted_statusquoted_statususeris_translation_enabled",
            "retweeted_statusquoted_statususerprofile_background_color",
            "retweeted_statusquoted_statususerprofile_background_image_url",
            "retweeted_statusquoted_statususerprofile_background_image_url_https",
            "retweeted_statusquoted_statususerprofile_background_tile",
            "retweeted_statusquoted_statususerprofile_image_url",
            "retweeted_statusquoted_statususerprofile_image_url_https",
            "retweeted_statusquoted_statususerprofile_banner_url",
            "retweeted_statusquoted_statususerprofile_link_color",
            "retweeted_statusquoted_statususerprofile_sidebar_border_color",
            "retweeted_statusquoted_statususerprofile_sidebar_fill_color",
            "retweeted_statusquoted_statususerprofile_text_color",
            "retweeted_statusquoted_statususerprofile_use_background_image",
            "retweeted_statusquoted_statususerhas_extended_profile",
            "retweeted_statusquoted_statususerdefault_profile",
            "retweeted_statusquoted_statususerdefault_profile_image",
            "retweeted_statusquoted_statususerfollowing",
            "retweeted_statusquoted_statususerfollow_request_sent",
            "retweeted_statusquoted_statususernotifications",
            "retweeted_statusquoted_statususertranslator_type",
            "retweeted_statusquoted_statususerwithheld_in_countries",
            "retweeted_statusquoted_statusgeo",
            "retweeted_statusquoted_statuscoordinates",
            "retweeted_statusquoted_statusplaceid",
            "retweeted_statusquoted_statusplaceurl",
            "retweeted_statusquoted_statusplaceplace_type",
            "retweeted_statusquoted_statusplacename",
            "retweeted_statusquoted_statusplacefull_name",
            "retweeted_statusquoted_statusplacecountry_code",
            "retweeted_statusquoted_statusplacecountry",
            "retweeted_statusquoted_statusplacecontained_within",
            "retweeted_statusquoted_statusplacebounding_boxtype",
            "retweeted_statusquoted_statusplacebounding_boxcoordinates",
            "retweeted_statusquoted_statuscontributors",
            "retweeted_statusquoted_statusis_quote_status",
            "retweeted_statusquoted_statusretweet_count",
            "retweeted_statusquoted_statusfavorite_count",
            "retweeted_statusquoted_statusfavorited",
            "retweeted_statusquoted_statusretweeted",
            "retweeted_statusquoted_statuspossibly_sensitive",
            "retweeted_statusquoted_statuslang",
            "quoted_statusplace",
            "geotype",
            "geocoordinates",
            "coordinatestype",
            "coordinatescoordinates",
        ]
        col_not_in_dd = [col for col in cols_343 if col not in dd.columns.tolist()]
        for col in col_not_in_dd:
            dd[col] = np.nan
        dd = dd[cols_343]  # pylint: disable=C0103

        creat_path_tree(f"data/tweets/q={q_folder}")
        if os.path.exists(f"data/tweets/q={q_folder}/{dt}.csv"):
            dd.to_csv(
                f"data/tweets/q={q_folder}/{dt}.csv",
                index=False,
                mode="a",
                header=False,
            )
        else:
            dd.to_csv(f"data/tweets/q={q_folder}/{dt}.csv", index=False)
            # twitter fetch data from most recent to most oldest
            # the first tweet from first page is the most recent, how its what we call last_id
            first_page_df = dd.copy()

        log(f"    page: {i} | tweets: {len(dd)} | columns: {len(dd.columns)}")

    return first_page_df

Scrapy tweets since last_id in batchs of 100 tweets.

def get_api()
Expand source code
@task
def get_api():
    """
    Get the Twitter API.
    """

    # pylint: disable=C0103
    CREDENTIALS = json.loads(decode_env(os.getenv("TWITTER_CREDENTIALS")))

    auth = tweepy.OAuthHandler(
        CREDENTIALS["CONSUMER_KEY"], CREDENTIALS["CONSUMER_SECRET"]
    )
    auth.set_access_token(
        CREDENTIALS["ACCESS_TOKEN"], CREDENTIALS["ACCESS_TOKEN_SECRET"]
    )
    return tweepy.API(auth)

Get the Twitter API.

def get_last_id(api, q, data_path: str)
Expand source code
@task(nout=2)
def get_last_id(api, q, data_path: str):  # pylint: disable=C0103
    """
    Get last_id from storage table or twitter api.
    """
    q_folder = q.replace(" ", "_").replace("-", "_")
    pre_path = f"{data_path}/twitter_flamengo/last_id/q={q_folder}"
    if not os.path.exists(f"{pre_path}/{q_folder}.csv"):
        log(f"No last_id table found for {q}, fetch last_id from Twitter API")
        tweet = api.search_tweets(q=q, count=1)[0]
        last_id = tweet.id
        created_at = tweet.created_at
        time.sleep(5)
    else:
        df = pd.read_csv(f"{pre_path}/{q_folder}.csv").copy()  # pylint: disable=C0103
        if len(df) > 0:
            log(f"Getting last_id from storage table {q_folder}")
            last_id = int(df[["id"]].iloc[-1])
            created_at = df[["created_at"]].iloc[-1].values[0]
        else:
            log(f"No last_id saved in table for {q}, fetch last_id from Twitter API")
            tweet = api.search_tweets(q=q, count=1)[0]
            last_id = tweet.id
            created_at = tweet.created_at
            time.sleep(5)
    return last_id, created_at

Get last_id from storage table or twitter api.

def normalize_cols(df)
Expand source code
def normalize_cols(df):  # pylint: disable=C0103
    """
    Normalize columns names.
    """
    return (
        df.str.normalize("NFKD")
        .str.encode("ascii", errors="ignore")
        .str.decode("utf-8")
        .str.replace("$", "")
        .str.replace("(", "")
        .str.replace(")", "")
        .str.replace("-", "")
        .str.replace(" ", "_")
        .str.lower()
        .str.replace(".", "")
        .str.replace("/", "_")
    )

Normalize columns names.

def save_last_id(df, q)
Expand source code
@task
def save_last_id(df, q):  # pylint: disable=C0103
    """
    Save the last tweet ID.
    """
    q_folder = q.replace(" ", "_").replace("-", "_")
    pre_path = f"data/staging/twitter_flamengo/last_id/q={q_folder}"
    creat_path_tree(pre_path)

    if not os.path.exists(f"{pre_path}/{q_folder}.csv"):
        pd.DataFrame({"q": [], "id": [], "created_at": []}).to_csv(
            f"{pre_path}/{q_folder}.csv", index=False
        )

    if df is None:
        log("   No new tweets found")
    else:
        # twitter fetch data from most recent to most oldest
        # last_id must to be the most recent id
        df = df[["id", "created_at"]].iloc[[0]].copy()
        df.to_csv(f"{pre_path}/{q_folder}.csv", index=False, mode="a", header=False)
    return "data/staging"

Save the last tweet ID.

def upload_to_storage(path: str)
Expand source code
@task
def upload_to_storage(path: str):
    """
    upload data to storage
    """
    tb_last_id = bd.Table(dataset_id="twitter_flamengo", table_id="last_id")
    tb_last_id.append(f"{path}/twitter_flamengo/last_id")
    if os.path.isdir(f"{path}/"):
        shutil.rmtree(f"{path}/")

    tb_tweet = bd.Table(dataset_id="twitter_flamengo", table_id="tweets")
    tb_tweet.append(
        "data/tweets",
    )
    if os.path.isdir("data/tweets/"):
        shutil.rmtree("data/tweets/")

upload data to storage