Added test calendar abstraction, thanks lina@mastodon.ml

https://mastodon.ml/@lina/112768409279788952
This commit is contained in:
localhost_frssoft 2024-07-12 12:17:05 +03:00
parent 1ecaddfaff
commit 973e25adfa
2 changed files with 52 additions and 12 deletions

View File

@ -1,5 +1,6 @@
from src.fedi_api import get_notifications, mark_as_read_notification, post_status, upload_attachment
from src.fmn_states_db import write_states, states_stor
from src.sheduler import check_stop_thread_scan
from config import admins_bot, limit_movies_per_user, limit_all_movies_poll, hour_poll_posting, fmn_next_watching_hour
import threading
@ -49,20 +50,32 @@ def get_control_mention():
st_date = i['status']['created_at']
thread_created_at = dateutilparse(st_date)
delta = relativedelta(hour=hour_poll_posting, minute=0, second=0, weekday=TU(1))
delta = relativedelta(
hour=hour_poll_posting, minute=0, second=0, weekday=TU(1))
stop_thread_scan = thread_created_at + delta
movies_accept_time = stop_thread_scan.strftime('%H:%M %d.%m.%Y по Москве')
stop_thread_scan = time.mktime(time.struct_time(stop_thread_scan.timetuple()))
if check_stop_thread_scan(stop_thread_scan) == stop_thread_scan:
logger.success("проверка на корректность даты пройдена")
else:
logger.warning("проверка на корректность даты не пройдена")
# stop_thread_scan = check_stop_thread_scan(stop_thread_scan)
movies_accept_time = stop_thread_scan.strftime(
'%H:%M %d.%m.%Y по Москве')
stop_thread_scan = time.mktime(
time.struct_time(stop_thread_scan.timetuple()))
if now_week == 6: # Фикс стыков двух недель. Если вс, то расчитываем на следующую неделю
next_week = 2
else:
next_week = 1
next_movie_watching_delta = relativedelta(hour=fmn_next_watching_hour, minute=0, second=0, weekday=SU(next_week))
next_movie_watching_delta = relativedelta(
hour=fmn_next_watching_hour, minute=0, second=0, weekday=SU(next_week))
next_movie_watching = time_now + next_movie_watching_delta
max_mute_time = time.mktime(time.struct_time(next_movie_watching.timetuple())) # Глушение до следующего сеанса FMN.
# Глушение до следующего сеанса FMN.
max_mute_time = time.mktime(
time.struct_time(next_movie_watching.timetuple()))
next_movie_watching = next_movie_watching.strftime('%d.%m.%Y')
post_status(start_collect_movies_text(movies_accept_time, next_movie_watching), st_id, attachments=[upload_attachment('src/FMN.webp')])
post_status(start_collect_movies_text(movies_accept_time, next_movie_watching),
st_id, attachments=[upload_attachment('src/FMN.webp')])
time.sleep(0.2)
mark_as_read_notification(i['id'])
@ -76,21 +89,33 @@ def get_control_mention():
logger.warning('Автоматический триггер в полночи сработал')
thread_created_at = time_now
delta = relativedelta(hour=hour_poll_posting, minute=0, second=0, weekday=TU(1))
delta = relativedelta(
hour=hour_poll_posting, minute=0, second=0, weekday=TU(1))
stop_thread_scan = thread_created_at + delta
movies_accept_time = stop_thread_scan.strftime('%H:%M %d.%m.%Y по Москве')
stop_thread_scan = time.mktime(time.struct_time(stop_thread_scan.timetuple()))
if check_stop_thread_scan(stop_thread_scan) == stop_thread_scan:
logger.success("проверка на корректность даты пройдена")
else:
logger.warning("проверка на корректность даты не пройдена")
# stop_thread_scan = check_stop_thread_scan(stop_thread_scan)
movies_accept_time = stop_thread_scan.strftime(
'%H:%M %d.%m.%Y по Москве')
stop_thread_scan = time.mktime(
time.struct_time(stop_thread_scan.timetuple()))
if now_week == 6: # Фикс стыков двух недель. Если вс, то расчитываем на следующую неделю
next_week = 2
else:
next_week = 1
next_movie_watching_delta = relativedelta(hour=fmn_next_watching_hour, minute=0, second=0, weekday=SU(next_week))
next_movie_watching_delta = relativedelta(
hour=fmn_next_watching_hour, minute=0, second=0, weekday=SU(next_week))
next_movie_watching = time_now + next_movie_watching_delta
max_mute_time = time.mktime(time.struct_time(next_movie_watching.timetuple())) # Глушение до следующего сеанса FMN.
# Глушение до следующего сеанса FMN.
max_mute_time = time.mktime(
time.struct_time(next_movie_watching.timetuple()))
next_movie_watching = next_movie_watching.strftime('%d.%m.%Y')
watched_movie_name = get_peertube_stream_name()
st_id = post_status("Спасибо что посмотрели " + watched_movie_name + "\n\n" + start_collect_movies_text(movies_accept_time, next_movie_watching) + '\n\n@rf@mastodon.ml', reply_to_status_id=None, attachments=[upload_attachment('src/FMN.webp')], visibility="public")['id']
st_id = post_status("Спасибо что посмотрели " + watched_movie_name + "\n\n" + start_collect_movies_text(movies_accept_time, next_movie_watching) +
'\n\n@rf@mastodon.ml', reply_to_status_id=None, attachments=[upload_attachment('src/FMN.webp')], visibility="public")['id']
time.sleep(0.2)

15
src/sheduler.py Normal file
View File

@ -0,0 +1,15 @@
import calendar
import datetime
from config import hour_poll_posting
def check_stop_thread_scan(suggested_date):
suggested_month = suggested_date.month
suggested_year = suggested_date.year
tuesdays = []
for i in calendar.Calendar().itermonthdays4(suggested_year, suggested_month):
print(i)
if i[3] == 1 and datetime.datetime(year=i[0], month=i[1], day=i[2]) > suggested_date:
tuesdays.append(datetime.datetime(
year=i[0], month=i[1], day=i[2], hour=hour_poll_posting))
return tuesdays[0] # near tuesday