from __future__ import annotations from dataclasses import dataclass, field, asdict import typing import random from datetime import date, timedelta from telebot.types import ChatMember, Chat import time from threading import Thread if typing.TYPE_CHECKING: from bot import Bot def convert_to_date(x: typing.Union[int, date]) -> date: if isinstance(x, int): return date.fromordinal(x) return x @dataclass class Community: chat_id: int # Telegram chat id name: str = '' members: typing.List[int] = field(default_factory=list) pool: typing.List[int] = field(default_factory=list) # [user_id] polled: typing.Dict[int, date] = field(default_factory=dict) # Days since the user was asked scheduled_meetings: typing.List[typing.Tuple[int, int, date]] = field( default_factory=list) # (user_id_1, user_id_2, meeting date) archived_meetings: typing.List[typing.Tuple[int, int, date]] = field(default_factory=list) default_answers: typing.Dict[int, bool] = field(default_factory=dict) # Last answers task_last_timestamps: typing.Dict[str, date] = field(default_factory=dict) # {'poll': ..., 'scheduling': ...} start_timestamp: int = field(default_factory=lambda: int(time.time())) def dict(self) -> dict: data = asdict(self) data['polled'] = {str(k): self.polled[k].toordinal() for k in self.polled} data['default_answers'] = {str(k): self.default_answers[k] for k in self.default_answers} data['scheduled_meetings'] = [(l, m, r.toordinal()) for l, m, r in self.scheduled_meetings] data['archived_meetings'] = [(l, m, r.toordinal()) for l, m, r in self.archived_meetings] data['task_last_timestamps'] = {str(k): self.task_last_timestamps[k].toordinal() for k in self.task_last_timestamps} return data @classmethod def from_dict(cls, data: dict) -> Community: data = getattr(data, '__dict__', data) data_ = {key: data[key] for key in data.keys() if key in ['chat_id', 'name', 'members', 'pool', 'polled', 'scheduled_meetings', 'archived_meetings', 'start_timestamp', 'default_answers']} self = cls(**data_) self.polled = {int(k): convert_to_date(self.polled[k]) for k in self.polled} self.default_answers = {int(k): self.default_answers[k] for k in self.default_answers} self.task_last_timestamps = {k: convert_to_date(self.task_last_timestamps[k]) for k in self.task_last_timestamps} self.scheduled_meetings = [(int(l), int(m), convert_to_date(r)) for l, m, r in self.scheduled_meetings] self.archived_meetings = [(int(l), int(m), convert_to_date(r)) for l, m, r in self.archived_meetings] return self @classmethod def by_id(cls, chat_id: int, bot: Bot, create_if_not_exists=True) -> typing.Optional[Community]: data = bot.get_community_from_db({'chat_id': chat_id}, create_if_not_exists) if data is None: return None def upd_data(): chat: Chat = bot.get_chat(chat_id) data.name = chat.title bot.save_community(data) Thread(target=upd_data).start() return data def can_user_request_a_meeting(self, user_id: int) -> bool: # Check if there isn't a meeting in archived_meetings with its scheduling date today or yesterday return not any( r in [date.today(), date.today() - timedelta(days=1)] and user_id in [l, m] for l, m, r in self.scheduled_meetings + self.archived_meetings) def add_member(self, user_id: int, bot: Bot) -> bool: chat_member_info: ChatMember = bot.get_chat_member(self.chat_id, user_id) if chat_member_info.is_member or chat_member_info.status in ['creator', 'administrator', 'admin', 'member']: self.members.append(user_id) bot.save_community(self) return True return False def users_to_poll(self) -> typing.List[int]: return [ user_id for user_id in self.members if (user_id not in self.polled or self.polled[user_id] <= date.today() - timedelta(days=7)) and self.can_user_request_a_meeting(user_id) ] def add_answer(self, user_id: typing.Union[int, bool], answer: bool, bot: Bot): answer = bool(answer) if user_id not in self.users_to_poll(): return if user_id in self.pool: self.pool.remove(user_id) self.pool += [user_id] * answer self.default_answers[user_id] = bool(answer) bot.save_community(self) def have_already_met(self, user_id_1: int, user_id_2: int) -> bool: """ return True if there is a meeting with these users not earlier than 6 weeks ago """ return any( {l, m} == {user_id_1, user_id_2} and r > date.today() - timedelta(days=42) for l, m, r in self.scheduled_meetings) def schedule_meetings(self): random.shuffle(self.pool) # It's called RandomTea for a reason today = date.today() while len(self.pool) > 1: user_id_1 = self.pool.pop() while (user_id_2 := self.pool.pop()) == user_id_1 or self.have_already_met(user_id_1, user_id_2): continue self.scheduled_meetings.append((user_id_1, user_id_2, today))