From c1f0e8ac61b58d6f39cf188b4343fe8c61b73a39 Mon Sep 17 00:00:00 2001 From: Timothy Quilling Date: Thu, 29 Jun 2023 00:20:12 -0400 Subject: [PATCH] feat: lemmy --- find_posts.py | 84 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 84 insertions(+) diff --git a/find_posts.py b/find_posts.py index e442d9c..2ad366b 100644 --- a/find_posts.py +++ b/find_posts.py @@ -439,6 +439,10 @@ def parse_user_url(url): if match is not None: return match + match = parse_lemmy_profile_url(url) + if match is not None: + return match + log(f"Error parsing Profile URL {url}") return None @@ -459,6 +463,11 @@ def parse_url(url, parsed_urls): if match is not None: parsed_urls[url] = match + if url not in parsed_urls: + match = parse_lemmy_url(url) + if match is not None: + parsed_urls[url] = match + if url not in parsed_urls: log(f"Error parsing toot URL {url}") parsed_urls[url] = None @@ -522,6 +531,25 @@ def parse_pixelfed_profile_url(url): return (match.group("server"), match.group("username")) return None +def parse_lemmy_url(url): + """parse a Lemmy URL and return the server, and ID""" + match = re.match( + r"https://(?P[^/]+)/comment/(?P[^/]+)", url + ) + if match is None: + match = re.match( + r"https://(?P[^/]+)/post/(?P[^/]+)", url + ) + if match is not None: + return (match.group("server"), match.group("toot_id")) + return None + +def parse_lemmy_profile_url(url): + """parse a Lemmy Profile URL and return the server and username""" + match = re.match(r"https://(?P[^/]+)/u/(?P[^/]+)", url) + if match is not None: + return (match.group("server"), match.group("username")) + return None def get_redirect_url(url): """get the URL given URL redirects to""" @@ -559,6 +587,10 @@ def get_all_context_urls(server, replied_toot_ids): def get_toot_context(server, toot_id, toot_url): """get the URLs of the context toots of the given toot""" + if toot_url.find("/comment/") != -1: + return get_comment_context(server, toot_id, toot_url) + if toot_url.find("/post/") != -1: + return get_comments_urls(server, toot_id, toot_url) url = f"https://{server}/api/v1/statuses/{toot_id}/context" try: resp = get(url) @@ -585,6 +617,58 @@ def get_toot_context(server, toot_id, toot_url): ) return [] +def get_comment_context(server, toot_id, toot_url): + """get the URLs of the context toots of the given toot""" + comment = f"https://{server}/api/v3/comment?id={toot_id}" + try: + resp = get(comment) + except Exception as ex: + log(f"Error getting comment {toot_id} from {toot_url}. Exception: {ex}") + return [] + + if resp.status_code == 200: + try: + res = resp.json() + post_id = res['comment_view']['comment']['post_id'] + log(f"Got parent post ID {post_id} for comment {toot_url}") + return get_comments_urls(server, post_id, toot_url) + except Exception as ex: + log(f"Error parsing context for comment {toot_url}. Exception: {ex}") + return [] + elif resp.status_code == 429: + reset = datetime.strptime(resp.headers['x-ratelimit-reset'], '%Y-%m-%dT%H:%M:%S.%fZ') + log(f"Rate Limit hit when getting context for {toot_url}. Waiting to retry at {resp.headers['x-ratelimit-reset']}") + time.sleep((reset - datetime.now()).total_seconds() + 1) + return get_comment_context(server, toot_id, toot_url) + +def get_comments_urls(server, post_id, toot_url): + """get the URLs of the comments of the given post""" + url = f"https://{server}/api/v3/comment/list?post_id={post_id}" + try: + resp = get(url) + except Exception as ex: + log(f"Error getting comments for post {post_id} from {toot_url}. Exception: {ex}") + return [] + + if resp.status_code == 200: + try: + res = resp.json() + list_of_urls = [comment_info['comment']['ap_id'] for comment_info in res['comments']] + log(f"Got {len(list_of_urls)} comments for post {toot_url}") + return list_of_urls + except Exception as ex: + log(f"Error parsing comments for post {toot_url}. Exception: {ex}") + return [] + elif resp.status_code == 429: + reset = datetime.strptime(resp.headers['x-ratelimit-reset'], '%Y-%m-%dT%H:%M:%S.%fZ') + log(f"Rate Limit hit when getting comments for {toot_url}. Waiting to retry at {resp.headers['x-ratelimit-reset']}") + time.sleep((reset - datetime.now()).total_seconds() + 1) + return get_comments_urls(server, post_id, toot_url) + + log( + f"Error getting comments for post {toot_url}. Status code: {resp.status_code}" + ) + return [] def add_context_urls(server, access_token, context_urls, seen_urls): """add the given toot URLs to the server"""