FMN_bot/src/fmn_database.py

98 lines
3.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import sqlite3
import logging
from config import limit_movies_per_user
logger = logging.getLogger('fmn_db')
conn = sqlite3.connect("fmn.sqlite")
c = conn.cursor()
c.execute(
'''CREATE TABLE IF NOT EXISTS watched_movies(original_name VARCHAR (500) DEFAULT NULL, ru_name VARCHAR (500) DEFAULT NULL, year INTEGER DEFAULT NULL)''')
c.execute(
'''CREATE TABLE IF NOT EXISTS poll(user_suggested VARCHAR (500) DEFAULT NULL, original_name VARCHAR (500) DEFAULT NULL, ru_name VARCHAR (500) DEFAULT NULL, year INTEGER DEFAULT NULL, votes INTEGER)''')
conn.commit()
def mark_as_watched_movie(original_name=None, ru_name=None, year=None):
try:
year = int(year)
except:
year = None
c.execute("INSERT OR IGNORE INTO watched_movies(original_name, ru_name, year) VALUES (?, ?, ?)",
(original_name, ru_name, year))
conn.commit()
logger.info(f'Watched movie added to db: ' + str((original_name, ru_name, year)))
def get_already_watched(original_name=None, ru_name=None, year=None):
already_watched = c.execute(f"SELECT * FROM watched_movies WHERE original_name == (?) AND ru_name == (?) AND year == (?) COLLATE NOCASE",
(original_name,ru_name,year)).fetchone()
if already_watched:
return True
else:
return False
def get_suggested_movies_count():
poll_count = c.execute(f"SELECT * FROM poll").fetchall()
return len(poll_count)
def per_user_accepted_movie_count(acct=str):
user_suggested_count = c.execute(f"SELECT user_suggested FROM poll WHERE user_suggested == (?)", (acct,)).fetchall()
return len(user_suggested_count)
def add_movie_to_poll(acct=str, original_name=None, ru_name=None, year=None):
if per_user_accepted_movie_count(acct) >= limit_movies_per_user:
return False
else:
c.execute("INSERT OR IGNORE INTO poll(user_suggested, original_name, ru_name, year, votes) VALUES (?, ?, ?, ?, ?)", (acct, original_name, ru_name, year, 0))
conn.commit()
logger.info('Add option to poll: ' + str((acct, original_name, ru_name, year)))
return True
def get_movies_for_poll():
list_poll = c.execute(f"SELECT * FROM poll ORDER BY year").fetchall()
return list_poll
def write_votes(votes_list_tuples=list):
'''Запись результатов голосования'''
original = get_movies_for_poll()
print(original)
new = []
indexator = 0
for i in original:
try:
new.append((i[0], i[1], i[2], i[3], votes_list_tuples[indexator]))
except:
break
indexator += 1
if len(original) == len(new):
c.execute("DELETE FROM poll")
c.executemany("INSERT OR IGNORE INTO poll(user_suggested, original_name, ru_name, year, votes) VALUES (?, ?, ?, ?, ?)", (new))
def read_votes():
list_poll = c.execute(f"SELECT * FROM poll ORDER BY votes DESC").fetchall()
return list_poll
def rewrite_db(winned_movies=list):
'''Переписываем базу победившими фильмами, на случай tie breaker'''
c.execute("DELETE FROM poll")
c.executemany("INSERT OR IGNORE INTO poll(user_suggested, original_name, ru_name, year, votes) VALUES (?, ?, ?, ?, ?)",
(winned_movies))
conn.commit()
def reset_poll():
'''Сброс содержимого предложки-опроса'''
c.execute("DELETE FROM poll")
conn.commit()