diff --git a/bot.py b/bot.py index 689a2ab..385ea77 100644 --- a/bot.py +++ b/bot.py @@ -3,6 +3,7 @@ from telegram.ext import ApplicationBuilder, ContextTypes, CommandHandler, Messa CallbackContext, CallbackQueryHandler import logging +from fed import build_fediverse_inline_results, extract_content from tgbot.config import TOKEN, PROMPTING_USERS from tgbot.shit.handlers import handle_xitter, handle_red_ebalo, handle_hentai, handle_cute_button from tgbot.shit.prompting import gen_image @@ -63,6 +64,12 @@ async def handle_inline(update: Update, context: ContextTypes.DEFAULT_TYPE): if not query: return + if query.lower().startswith("fed"): + post_data = extract_content(query.replace("fed", "").strip()) + results = build_fediverse_inline_results(post_data) + await update.inline_query.answer(results, cache_time=0, is_personal=True) + return + if "https://x.com" in query: await handle_xitter(update, context) return diff --git a/fed.py b/fed.py new file mode 100644 index 0000000..201a769 --- /dev/null +++ b/fed.py @@ -0,0 +1,138 @@ +import re +from urllib.parse import urlparse +from mastodon import Mastodon +from bs4 import BeautifulSoup + +from telegram import InlineQueryResultPhoto, InlineQueryResultVideo, InlineQueryResultDocument, InputTextMessageContent +import mimetypes +import uuid + +def build_fediverse_inline_results(post_data): + """ + Build a list of InlineQueryResult* objects from fediverse post data. + """ + post_text = post_data.get("post_text", "") + media_urls = post_data.get("media", []) + results = [] + + for idx, media_url in enumerate(media_urls): + mime_type, _ = mimetypes.guess_type(media_url) + result_id = str(uuid.uuid4()) # unique ID for each result + + # Optional: include full post text only in the first result + caption = post_text if idx == 0 else None + + if mime_type: + if mime_type.startswith("image/"): + results.append( + InlineQueryResultPhoto( + id=result_id, + photo_url=media_url, + thumbnail_url=media_url, + caption=caption, + input_message_content=InputTextMessageContent(post_text) if idx == 0 else None + ) + ) + elif mime_type.startswith("video/"): + results.append( + InlineQueryResultVideo( + id=result_id, + video_url=media_url, + mime_type=mime_type, + thumbnail_url=media_url, + title="Video from post", + caption=caption, + input_message_content=InputTextMessageContent(post_text) if idx == 0 else None + ) + ) + else: + # Fallback for other types (e.g., audio or unknown) + results.append( + InlineQueryResultDocument( + id=result_id, + title="Media attachment", + document_url=media_url, + mime_type=mime_type, + caption=caption, + input_message_content=InputTextMessageContent(post_text) if idx == 0 else None + ) + ) + else: + # fallback in case mime type is unknown + results.append( + InlineQueryResultDocument( + id=result_id, + title="Unknown file", + document_url=media_url, + mime_type="application/octet-stream", + caption=caption, + input_message_content=InputTextMessageContent(post_text) if idx == 0 else None + ) + ) + + return results + + +def clean_html(raw_html): + """Remove HTML tags from a Mastodon/Pleroma post.""" + return BeautifulSoup(raw_html, "html.parser").get_text() + +def parse_post_url(url): + """ + Parse a Mastodon or Pleroma post URL. + Returns: instance domain, status_id + """ + parsed = urlparse(url) + domain = parsed.netloc + + # Mastodon format: https://instance/@user/id + mastodon_match = re.match(r'^/@[^/]+/(\d+)$', parsed.path) + if mastodon_match: + return domain, mastodon_match.group(1) + + # Pleroma format: https://instance/notice/id + pleroma_match = re.match(r'^/notice/([^/]+)$', parsed.path) + if pleroma_match: + return domain, pleroma_match.group(1) + + raise ValueError("Unsupported or invalid Fediverse post URL format.") + +def fetch_status(instance, status_id): + """Fetch status using Mastodon-compatible API.""" + mastodon = Mastodon(api_base_url=f"https://{instance}") + return mastodon.status(status_id) + +def extract_content(post_url): + try: + instance, status_id = parse_post_url(post_url) + status = fetch_status(instance, status_id) + + result = {} + + post_text = "src -> " + post_url + + post_text += "\n\n" + + post_text += clean_html(status['content']) + + result["post_text"] = post_text + result["media"] = [] + + if status['media_attachments']: + for media in status['media_attachments']: + result["media"].append(media["url"]) + else: + print("🖼️ No media found.") + return result + except Exception as e: + print(f"❌ Error: {e}") + return str(e) + +# === Example usage === +if __name__ == "__main__": + post_url = "https://aipub.social/@usluck@channels.im/114303377854884481" + + content = extract_content(post_url) + inline_query_response = build_fediverse_inline_results(content) + + print(content)