add argparser for booru command

This commit is contained in:
Owl 2025-01-05 07:56:09 +07:00
parent f8051314cc
commit c65401abc7
3 changed files with 168 additions and 68 deletions

View File

@ -2,4 +2,6 @@ import os
TOKEN = os.getenv("TOKEN") TOKEN = os.getenv("TOKEN")
BOORU_API_URL = "http://booru-api:3456/booru" BOORU_API_URL = "http://booru-api:3456/booru"
INLINE_QUERY_CACHE_SECONDS = 30
STICKER_SHITHOLE = -1002471390283 STICKER_SHITHOLE = -1002471390283

View File

@ -1,10 +1,19 @@
import requests import requests
from telegram import Update, InlineQueryResultPhoto from telegram import Update
from telegram.ext import ContextTypes from telegram.ext import ContextTypes
from tgbot.config import STICKER_SHITHOLE, BOORU_API_URL from tgbot.config import STICKER_SHITHOLE, BOORU_API_URL, INLINE_QUERY_CACHE_SECONDS
from tgbot.shit.hentai import QueryParams from tgbot.shit.hentai import parse_args, create_parser
from tgbot.shit.render import render_text_on_image from tgbot.shit.render import render_text_on_image
import re
def escape_markdown(text):
"""
Escapes special characters for MarkdownV2 formatting.
"""
escape_chars = r"[_*[\]()~`>#+-=|{}.!]"
return re.sub(f"([{escape_chars}])", r"\\\1", text)
async def handle_xitter(update: Update, context: ContextTypes.DEFAULT_TYPE): async def handle_xitter(update: Update, context: ContextTypes.DEFAULT_TYPE):
@ -44,30 +53,62 @@ async def handle_red_ebalo(update: Update, context: ContextTypes.DEFAULT_TYPE):
async def handle_hentai(update: Update, context: ContextTypes.DEFAULT_TYPE): async def handle_hentai(update: Update, context: ContextTypes.DEFAULT_TYPE):
query = update.inline_query.query.replace("hentai", "").replace("", "--")
error_response = [ parser = create_parser()
{
"type": "article",
"id": "1",
"title": "ERROR OCCURRED! That means, at least one of us is a retard.",
"description": "None",
"input_message_content": {
"message_text": "None"
}
}
]
query = update.inline_query.query
try: try:
params = QueryParams.from_query_str(query).__dict__
except ValueError as e:
print(e)
error_response[0]["description"] = "error parsing query"
error_response[0]["input_message_content"]["message_text"] = "error parsing query"
return await context.bot.answer_inline_query(update.inline_query.id, error_response)
response = requests.post(BOORU_API_URL, json=params) args = parse_args(query, parser)
except RuntimeError as e:
if getattr(parser, 'help_message', None):
help_message = parser.help_message
escaped_help_message = escape_markdown(help_message)
help_response = [
{
"type": "article",
"id": "1",
"title": "Help message",
"description": "Print help",
"input_message_content": {
"message_text": f"```\n{escaped_help_message}\n```",
"parse_mode": "MarkdownV2",
}
}
]
return await context.bot.answer_inline_query(update.inline_query.id, help_response,
cache_time=INLINE_QUERY_CACHE_SECONDS)
else:
error_message = str(e)
escaped_error_message = escape_markdown(error_message)
error_response = [
{
"type": "article",
"id": "1",
"title": "Invalid command!",
"description": error_message,
"input_message_content": {
"message_text": f"```\n{escaped_error_message}\n```",
"parse_mode": "MarkdownV2",
}
}
]
return await context.bot.answer_inline_query(update.inline_query.id, error_response,
cache_time=INLINE_QUERY_CACHE_SECONDS)
print(args)
response = requests.post(BOORU_API_URL, json=args)
if response.status_code == 200: if response.status_code == 200:
@ -79,24 +120,51 @@ async def handle_hentai(update: Update, context: ContextTypes.DEFAULT_TYPE):
booru_post_url = post["booru"]["domain"] + \ booru_post_url = post["booru"]["domain"] + \
post["booru"]["site"]["api"]["postView"] + post["id"] post["booru"]["site"]["api"]["postView"] + post["id"]
results.append( if not args["spoiler"]:
InlineQueryResultPhoto(
id="%d" % idx,
photo_url=post["fileUrl"],
thumbnail_url=post["previewUrl"],
title="IMAGE #%d" % idx,
description="Image from booru",
caption="source -> " + booru_post_url
)
)
await context.bot.answer_inline_query(update.inline_query.id, results) results.append(
{
"type": "photo",
"id": "%d" % idx,
"photo_url": post["fileUrl"],
"thumb_url": post["previewUrl"],
"caption": "source -> " + booru_post_url
}
)
else:
results.append({
"type": "photo",
"id": "%d" % idx,
"photo_url": post["fileUrl"],
"thumb_url": post["previewUrl"],
"caption": "source -> " + booru_post_url,
"input_message_content": {
"message_text": "image with tags = " + str(args["tags"]) + " -> " + booru_post_url,
"has_spoiler": True
}
})
await context.bot.answer_inline_query(update.inline_query.id, results,
cache_time=INLINE_QUERY_CACHE_SECONDS)
else: else:
print(response.text) error_response = [
{
"type": "article",
"id": "1",
"title": "ERROR!",
"description": "None",
"input_message_content": {
"message_text": "None"
}
}
]
error_response[0]["description"] = "booru error" error_response[0]["description"] = "booru error"
error_response[0]["input_message_content"]["message_text"] = response.text error_response[0]["input_message_content"]["message_text"] = response.text
return await context.bot.answer_inline_query(update.inline_query.id, error_response) return await context.bot.answer_inline_query(update.inline_query.id, error_response,
cache_time=INLINE_QUERY_CACHE_SECONDS)

View File

@ -1,46 +1,76 @@
import re import argparse
import io
import shlex
def combine_whitespaces(s: str) -> str: class CustomArgParser(argparse.ArgumentParser):
return re.compile(r"\s+").sub(" ", s).strip() def __init__(
self,
*args, **kwargs,
):
super().__init__(*args, **kwargs)
self.help_message = None
def print_help(self, file=None):
# Store the help message in a buffer instead of printing
if file is None:
help_buffer = io.StringIO()
super().print_help(file=help_buffer)
self.help_message = help_buffer.getvalue()
help_buffer.close()
else:
super().print_help(file=file)
def parse_args(self, args=None, namespace=None):
# Check for --help manually to avoid stdout output
if '--help' in args:
self.print_help()
raise RuntimeError("Help requested")
return super().parse_args(args, namespace)
def exit(self, status=0, message=None):
raise RuntimeError(message)
def error(self, message):
raise RuntimeError(f"Error: {message}")
class QueryParams: def create_parser():
def __init__(self, booru: str, tags: list[str], random: bool = True, limit: int = 10): parser = CustomArgParser(
self.booru = booru description="A command to search random stuff on various image boorus. Feel free to use it irresponsibly!",
self.tags = tags usage="@owlrandomshitbot hentai BOORU_NAME --tags TAG1,TAG2,TAG3 -l LIMIT --random -s",
self.random = random )
self.limit = limit
@classmethod parser.add_argument("booru", type=str,
def from_query_str(cls, query: str): choices=['e6', 'e621', 'e9', 'e926', 'hh', 'hypno', 'hypnohub', 'db', 'dan', 'danbooru', 'kc',
query = query.replace("hentai ", "") 'konac', 'kcom', 'kn', 'konan', 'knet', 'yd', 'yand', 'yandere', 'gb', 'gel',
'gelbooru',
'r34', 'rule34', 'sb', 'safe', 'safebooru', 'tb', 'tbib', 'big', 'xb', 'xbooru', 'pa',
'paheal', 'dp', 'derp', 'derpi', 'derpibooru', 'rb', 'realbooru'],
help="Booru to search. For context, this command uses this library"
" -> https://github.com/AtoraSuunva/booru"
)
query = combine_whitespaces(query) parser.add_argument("--tags", "-t",
dest="tags", type=str, help="Tags, comma separated. If query contains tags with spaces, "
"e.g. 'downvote bait', put it in quotes.")
list_ = query.split(" ") parser.add_argument("--random", "-r",
dest="random", action='store_true', default=False,
help="Randomize the output, default=False")
booru = list_[0] parser.add_argument("--limit", "-l",
dest="limit", type=int, default=20, help="Limit the number of output posts, default=20")
list_.remove(booru) parser.add_argument("--spoiler", "-s", dest="spoiler",
action="store_true", help="Send the image with a spoiler.")
tags = [] return parser
random = False
limit = 10
for e in list_:
if "random=" in e: # this is 'random' argument
random = e.replace("random=", "").lower() == "true"
continue
if "limit=" in e: # this is 'limit' argument def parse_args(args: str, parser: CustomArgParser):
limit = int(e.replace("limit=", "")) args = parser.parse_args(shlex.split(args)).__dict__
continue
tags.append(e) args["tags"] = args["tags"].split(",")
return cls(booru, tags, random, limit) return args
def __str__(self):
return "QueryParams(booru=" + self.booru + ", tags=" + str(self.tags) + ", random=" + str(self.random) + \
", limit=" + str(self.limit) + ")"