owlrandomshitbot/bot.py
2025-05-13 03:28:09 +07:00

221 lines
7 KiB
Python

import contextlib
import io
from telegram import Update, InputFile
from telegram.ext import ApplicationBuilder, ContextTypes, CommandHandler, MessageHandler, filters, InlineQueryHandler, \
CallbackContext, CallbackQueryHandler
import logging
from tgbot.shit.fed import build_fediverse_inline_results, extract_content
from tgbot.config import TOKEN, PROMPTING_USERS, EXEC_UIDS
from tgbot.shit.handlers import handle_xitter, handle_red_ebalo, handle_hentai, handle_cute_button
from tgbot.shit.shitty_encoder import encode_balls, decode_balls, encode_shevkan, guess_coding, CodingType, \
decode_shevkan
from tgbot.shit.prompting import gen_image
from tgbot.shit.render import render_text_on_image, image_to_string
from io import BytesIO
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO)
logger = logging.getLogger(__name__)
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
await context.bot.send_message(chat_id=update.effective_chat.id,
text='WE ARE A PARTY OF UTTER MADNESS, '
'AND I AM A PERSON OF COMPLETE AND TOTAL DELUSION!')
async def handle_text(update: Update, context: ContextTypes.DEFAULT_TYPE):
if update.message.text.startswith("encode"):
await handle_encode(update, context)
return
if update.message.text.startswith("decode"):
await handle_decode(update, context)
return
if not update.message.text.startswith("prompting") or update.message.from_user["username"] not in PROMPTING_USERS:
file = render_text_on_image("tgbot/assets/red_ebalo.png", update.message.text)
await context.bot.send_sticker(chat_id=update.effective_chat.id, sticker=file)
else:
await gen_image(update, context)
async def handle_encode(update: Update, context: ContextTypes.DEFAULT_TYPE):
text = update.message.text.replace("encode", "", count=1).strip()
if text.startswith("shevkan"):
text = text.replace("shevkan", "", count=1)
encoded_text = encode_shevkan(text)
else:
encoded_text = encode_balls(text)
if len(encoded_text) < 4096:
await context.bot.send_message(chat_id=update.effective_chat.id, text=encoded_text)
else:
bio = BytesIO(encoded_text.encode("utf-8"))
bio.name = "output.txt" # must set filename
await update.message.reply_document(document=InputFile(bio))
async def handle_decode(update: Update, context: ContextTypes.DEFAULT_TYPE):
text = update.message.text.replace("decode", "", count=1)
coding_type = guess_coding(text)
if coding_type == CodingType.balls:
decoded_text = decode_balls(text)
else:
decoded_text = decode_shevkan(text)
await context.bot.send_message(chat_id=update.effective_chat.id, text=decoded_text)
# man balls man-balls and shevkan
async def handle_txt_upload(update: Update, context: ContextTypes.DEFAULT_TYPE):
document = update.message.document
if not document or not document.file_name.endswith(".txt"):
return
file = await document.get_file()
file_bytes = await file.download_as_bytearray()
text_content = file_bytes.decode("utf-8")
coding_type = guess_coding(text_content)
if coding_type == CodingType.balls:
decoded_text = decode_balls(text_content)
else:
decoded_text = decode_shevkan(text_content)
await update.message.reply_text(decoded_text)
async def image2string(update: Update, context: ContextTypes.DEFAULT_TYPE):
photo = update.message.photo[-1]
file = await photo.get_file()
bio = BytesIO()
await file.download_to_memory(out=bio)
bio.seek(0)
s = image_to_string(bio)
await update.message.reply_text(s)
async def callback_query_handler(update: Update, context: CallbackContext):
query = update.callback_query
if query.data == "show_cute_popup":
await context.bot.answer_callback_query(
callback_query_id=query.id,
text="Hey! This button is for you! Thank you for clicking it, you are awesome!",
show_alert=True
)
else:
await query.answer()
async def handle_inline(update: Update, context: ContextTypes.DEFAULT_TYPE):
query = update.inline_query.query
if not query:
return
if query.lower().startswith("fed"):
post_data = extract_content(query.replace("fed", "").strip())
results = build_fediverse_inline_results(post_data)
await update.inline_query.answer(results, cache_time=0, is_personal=True)
return
if "https://x.com" in query:
await handle_xitter(update, context)
return
if query.startswith("hentai"):
await handle_hentai(update, context)
return
if query.startswith("button!"):
await handle_cute_button(update, context)
return
await handle_red_ebalo(update, context)
async def handle_eval(update: Update, context: ContextTypes.DEFAULT_TYPE):
user_id = update.effective_user.id
if user_id not in EXEC_UIDS:
await update.message.reply_text("Eh")
return
code = update.message.text.removeprefix("/eval").strip()
if not code:
await update.message.reply_text("✍️ Send some Python code to evaluate.")
return
# Redirect stdout to capture output
stdout = io.StringIO()
# Create an isolated async context for execution
async def run_code():
local_vars = {}
code_wrapped = f"async def __eval_func():\n"
for line in code.splitlines():
code_wrapped += " " + line + "\n"
try:
exec(code_wrapped, {}, local_vars)
with contextlib.redirect_stdout(stdout):
r = await local_vars["__eval_func"]()
return r
except Exception as e:
return f"{e}"
# Run the async function and get the output
result = await run_code()
output = stdout.getvalue()
if result is not None:
output += f"\n➡️ {result}"
if not output.strip():
output = "✅ Done."
# Send output (truncate if needed)
MAX_LEN = 4000
if len(output) > MAX_LEN:
output = output[:MAX_LEN] + "\n... (truncated)"
await update.message.reply_text(f"<pre>{output.strip()}</pre>", parse_mode="HTML")
def main():
application = ApplicationBuilder().token(TOKEN).build()
start_handler = CommandHandler('start', start)
text_handler = MessageHandler(filters.TEXT & ~filters.COMMAND, handle_text)
application.add_handler(start_handler)
application.add_handler(MessageHandler(filters.PHOTO, image2string))
application.add_handler(text_handler)
application.add_handler(InlineQueryHandler(handle_inline))
application.add_handler(CallbackQueryHandler(callback_query_handler))
txt_file_filter = filters.Document.FileExtension("txt")
application.add_handler(MessageHandler(txt_file_filter, handle_txt_upload))
application.add_handler(CommandHandler("eval", handle_eval))
application.run_polling()
if __name__ == '__main__':
main()