import sqlite3 import logging from config import limit_movies_per_user logger = logging.getLogger('fmn_db') conn = sqlite3.connect("fmn.sqlite") c = conn.cursor() c.execute( '''CREATE TABLE IF NOT EXISTS watched_movies(original_name VARCHAR (500) DEFAULT NULL, ru_name VARCHAR (500) DEFAULT NULL, year INTEGER DEFAULT NULL)''') c.execute( '''CREATE TABLE IF NOT EXISTS poll(user_suggested VARCHAR (500) DEFAULT NULL, original_name VARCHAR (500) DEFAULT NULL, ru_name VARCHAR (500) DEFAULT NULL, year INTEGER DEFAULT NULL, votes INTEGER)''') conn.commit() def mark_as_watched_movie(original_name=None, ru_name=None, year=None): try: year = int(year) except: year = None c.execute("INSERT OR IGNORE INTO watched_movies(original_name, ru_name, year) VALUES (?, ?, ?)", (original_name, ru_name, year)) conn.commit() logger.info(f'Watched movie added to db: ' + str((original_name, ru_name, year))) def get_already_watched(original_name=None, ru_name=None, year=None): already_watched = c.execute(f"SELECT * FROM watched_movies WHERE original_name == (?) AND ru_name == (?) AND year == (?)", (original_name,ru_name,year)).fetchone() if already_watched is not None: return True else: return False def per_user_accepted_movie_count(acct=str): user_suggested_count = c.execute(f"SELECT user_suggested FROM poll WHERE user_suggested == (?)", (acct,)).fetchall() return len(user_suggested_count) def add_movie_to_poll(acct=str, original_name=None, ru_name=None, year=None): if per_user_accepted_movie_count(acct) >= limit_movies_per_user: return False else: c.execute("INSERT OR IGNORE INTO poll(user_suggested, original_name, ru_name, year, votes) VALUES (?, ?, ?, ?, ?)", (acct, original_name, ru_name, year, 0)) conn.commit() logger.info('Add option to poll: ' + str((acct, original_name, ru_name, year))) return True def get_movies_for_poll(): list_poll = c.execute(f"SELECT * FROM poll ORDER BY year").fetchall() return list_poll def write_votes(votes_list_tuples=list): '''Запись результатов голосования''' original = get_movies_for_poll() print(original) new = [] indexator = 0 for i in original: try: new.append((i[0], i[1], i[2], i[3], votes_list_tuples[indexator])) except: break indexator += 1 if len(original) == len(new): c.execute("DELETE FROM poll") c.executemany("INSERT OR IGNORE INTO poll(user_suggested, original_name, ru_name, year, votes) VALUES (?, ?, ?, ?, ?)", (new)) def read_votes(): list_poll = c.execute(f"SELECT * FROM poll ORDER BY votes DESC").fetchall() return list_poll def rewrite_db(winned_movies=list): '''Переписываем базу победившими фильмами, на случай tie breaker''' c.execute("DELETE FROM poll") c.executemany("INSERT OR IGNORE INTO poll(user_suggested, original_name, ru_name, year, votes) VALUES (?, ?, ?, ?, ?)", (winned_movies)) conn.commit()