|
|
|
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass, field, asdict
|
|
|
|
import typing
|
|
|
|
import random
|
|
|
|
from datetime import date, timedelta
|
|
|
|
from telebot.types import ChatMember, Chat
|
|
|
|
import time
|
|
|
|
from threading import Thread
|
|
|
|
|
|
|
|
if typing.TYPE_CHECKING:
|
|
|
|
from bot import Bot
|
|
|
|
|
|
|
|
|
|
|
|
def convert_to_date(x: typing.Union[int, date]) -> date:
|
|
|
|
if isinstance(x, int):
|
|
|
|
return date.fromordinal(x)
|
|
|
|
return x
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
|
class Community:
|
|
|
|
chat_id: int # Telegram chat id
|
|
|
|
name: str = ''
|
|
|
|
members: typing.List[int] = field(default_factory=list)
|
|
|
|
pool: typing.List[int] = field(default_factory=list) # [user_id]
|
|
|
|
polled: typing.Dict[int, date] = field(default_factory=dict) # Days since the user was asked
|
|
|
|
scheduled_meetings: typing.List[typing.Tuple[int, int, date]] = field(
|
|
|
|
default_factory=list) # (user_id_1, user_id_2, meeting date)
|
|
|
|
archived_meetings: typing.List[typing.Tuple[int, int, date]] = field(default_factory=list)
|
|
|
|
default_answers: typing.Dict[int, bool] = field(default_factory=dict) # Last answers
|
|
|
|
start_timestamp: int = field(default_factory=lambda: int(time.time()))
|
|
|
|
|
|
|
|
def dict(self) -> dict:
|
|
|
|
data = asdict(self)
|
|
|
|
data['polled'] = {str(k): self.polled[k].toordinal() for k in self.polled}
|
|
|
|
data['default_answers'] = {str(k): self.default_answers[k] for k in self.default_answers}
|
|
|
|
data['scheduled_meetings'] = [(l, m, r.toordinal()) for l, m, r in self.scheduled_meetings]
|
|
|
|
data['archived_meetings'] = [(l, m, r.toordinal()) for l, m, r in self.archived_meetings]
|
|
|
|
return data
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def from_dict(cls, data: dict) -> Community:
|
|
|
|
data = getattr(data, '__dict__', data)
|
|
|
|
data_ = {key: data[key] for key in data.keys() if
|
|
|
|
key in ['chat_id', 'name', 'members', 'pool', 'polled', 'scheduled_meetings',
|
|
|
|
'archived_meetings', 'start_timestamp', 'default_answers']}
|
|
|
|
self = cls(**data_)
|
|
|
|
self.polled = {int(k): convert_to_date(self.polled[k]) for k in self.polled}
|
|
|
|
self.default_answers = {int(k): self.default_answers[k] for k in self.default_answers}
|
|
|
|
self.scheduled_meetings = [(int(l), int(m), convert_to_date(r)) for l, m, r in self.scheduled_meetings]
|
|
|
|
self.archived_meetings = [(int(l), int(m), convert_to_date(r)) for l, m, r in self.archived_meetings]
|
|
|
|
return self
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def by_id(cls, chat_id: int, bot: Bot, create_if_not_exists=True) -> typing.Optional[Community]:
|
|
|
|
data = bot.get_community_from_db({'chat_id': chat_id}, create_if_not_exists)
|
|
|
|
if data is None:
|
|
|
|
return None
|
|
|
|
|
|
|
|
def upd_data():
|
|
|
|
chat: Chat = bot.get_chat(chat_id)
|
|
|
|
data.name = chat.title
|
|
|
|
bot.save_community(data)
|
|
|
|
|
|
|
|
Thread(target=upd_data).start()
|
|
|
|
return data
|
|
|
|
|
|
|
|
def add_member(self, user_id: int, bot: Bot) -> bool:
|
|
|
|
chat_member_info: ChatMember = bot.get_chat_member(self.chat_id, user_id)
|
|
|
|
if chat_member_info.is_member or chat_member_info.status in ['creator', 'administrator', 'admin']:
|
|
|
|
self.members.append(user_id)
|
|
|
|
bot.save_community(self)
|
|
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
|
|
def users_to_poll(self) -> typing.List[int]:
|
|
|
|
return [
|
|
|
|
user_id for user_id in self.members
|
|
|
|
if user_id not in self.polled or self.polled[user_id] <= date.today() - timedelta(days=7)
|
|
|
|
]
|
|
|
|
|
|
|
|
def add_answer(self, user_id: int, answer: bool, bot: Bot):
|
|
|
|
answer = bool(answer)
|
|
|
|
if user_id not in self.users_to_poll():
|
|
|
|
return
|
|
|
|
if user_id in self.pool:
|
|
|
|
self.pool.remove(user_id)
|
|
|
|
self.pool += [user_id] * answer
|
|
|
|
self.default_answers[user_id] = bool(answer)
|
|
|
|
bot.save_community(self)
|
|
|
|
|
|
|
|
def schedule_meetings(self):
|
|
|
|
random.shuffle(self.pool) # It's called RandomTea for a reason
|
|
|
|
today = date.today()
|
|
|
|
while len(self.pool) > 1:
|
|
|
|
user_id_1 = self.pool.pop()
|
|
|
|
while (user_id_2 := self.pool.pop()) == user_id_1:
|
|
|
|
continue
|
|
|
|
self.scheduled_meetings.append((user_id_1, user_id_2, today))
|