import requests from pybooru import Danbooru from telegram import Update from telegram.ext import ContextTypes from tgbot.config import STICKER_SHITHOLE, BOORU_API_URL, INLINE_QUERY_CACHE_SECONDS, DANBOORU_API_KEY from tgbot.shit.hentai_argparser import parse_args, create_parser from tgbot.shit.render import render_text_on_image import re def escape_markdown(text): """ Escapes special characters for MarkdownV2 formatting. """ escape_chars = r"[_*[\]()~`>#+-=|{}.!]" return re.sub(f"([{escape_chars}])", r"\\\1", text) async def handle_xitter(update: Update, context: ContextTypes.DEFAULT_TYPE): query = update.inline_query.query xcom = "https://x.com" results = [ { "type": "article", "id": "1", "title": "fix xitter links", "description": query.replace(xcom, "https://fxtwitter.com"), "input_message_content": { "message_text": query.replace(xcom, "https://fxtwitter.com") } } ] await context.bot.answer_inline_query(update.inline_query.id, results) async def handle_red_ebalo(update: Update, context: ContextTypes.DEFAULT_TYPE): query = update.inline_query.query file = render_text_on_image("tgbot/assets/red_ebalo.png", query) file_id = (await context.bot.send_sticker(chat_id=STICKER_SHITHOLE, sticker=file)).sticker.file_id results = [ { "type": "sticker", "id": "1", "sticker_file_id": file_id } ] await context.bot.answer_inline_query(update.inline_query.id, results) async def handle_hentai(update: Update, context: ContextTypes.DEFAULT_TYPE): query = update.inline_query.query.replace("hentai", "").replace("—", "--") parser = create_parser() try: args = parse_args(query, parser) except RuntimeError as e: if getattr(parser, 'help_message', None): help_message = parser.help_message escaped_help_message = escape_markdown(help_message) help_response = [ { "type": "article", "id": "1", "title": "Help message", "description": "Print help", "input_message_content": { "message_text": f"```\n{escaped_help_message}\n```", "parse_mode": "MarkdownV2", } } ] return await context.bot.answer_inline_query(update.inline_query.id, help_response, cache_time=INLINE_QUERY_CACHE_SECONDS) else: error_message = str(e) escaped_error_message = escape_markdown(error_message) error_response = [ { "type": "article", "id": "1", "title": "Invalid command!", "description": error_message, "input_message_content": { "message_text": f"```\n{escaped_error_message}\n```", "parse_mode": "MarkdownV2", } } ] return await context.bot.answer_inline_query(update.inline_query.id, error_response, cache_time=INLINE_QUERY_CACHE_SECONDS) print(args) # using danbooru if args["booru"] in ["db", "dan", "danbooru"]: if len(args["tags"]) > 2: error_response = [ { "type": "article", "id": "1", "title": "ERROR!", "description": "None", "input_message_content": { "message_text": "None" } } ] error_response[0]["description"] = "Danbooru won't let you search more than two tags at once cause " \ "they're greedy " error_response[0]["input_message_content"]["message_text"] = error_response[0]["description"] return await context.bot.answer_inline_query(update.inline_query.id, error_response, cache_time=INLINE_QUERY_CACHE_SECONDS) client = Danbooru('danbooru', username='owlrandomshitbot', api_key=DANBOORU_API_KEY) posts = client.post_list(tags=" ".join(args["tags"]), limit=args["limit"], random=args["random"]) results = [] for idx, post in enumerate(posts): booru_post_url = "https://danbooru.donmai.us/posts/" + str(post["id"]) try: results.append( { "type": "photo", "id": "%d" % idx, "photo_url": post["file_url"], "thumb_url": post["preview_file_url"], "caption": "source -> " + booru_post_url } ) except Exception: # this post requires a gold account print(post) return await context.bot.answer_inline_query(update.inline_query.id, results, cache_time=INLINE_QUERY_CACHE_SECONDS) # using node booru service response = requests.post(BOORU_API_URL, json=args) if response.status_code == 200: posts = response.json() results = [] for idx, post in enumerate(posts): booru_post_url = post["booru"]["domain"] + \ post["booru"]["site"]["api"]["postView"] + post["id"] if not args["spoiler"]: results.append( { "type": "photo", "id": "%d" % idx, "photo_url": post["fileUrl"], "thumb_url": post["previewUrl"], "caption": "source -> " + booru_post_url } ) else: results.append({ "type": "photo", "id": "%d" % idx, "photo_url": post["fileUrl"], "thumb_url": post["previewUrl"], "caption": "source -> " + booru_post_url, "input_message_content": { "message_text": "image with tags = " + str(args["tags"]) + " -> " + booru_post_url, "has_spoiler": True } }) return await context.bot.answer_inline_query(update.inline_query.id, results, cache_time=INLINE_QUERY_CACHE_SECONDS) else: error_response = [ { "type": "article", "id": "1", "title": "ERROR!", "description": "None", "input_message_content": { "message_text": "None" } } ] error_response[0]["description"] = "booru error" error_response[0]["input_message_content"]["message_text"] = response.text return await context.bot.answer_inline_query(update.inline_query.id, error_response, cache_time=INLINE_QUERY_CACHE_SECONDS)