|
|
|
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass, field, asdict
|
|
|
|
import typing
|
|
|
|
import random
|
|
|
|
from datetime import date, timedelta
|
|
|
|
from telebot.types import ChatMember, Chat
|
|
|
|
import time
|
|
|
|
from threading import Thread
|
|
|
|
|
|
|
|
if typing.TYPE_CHECKING:
|
|
|
|
from bot import Bot
|
|
|
|
|
|
|
|
|
|
|
|
def convert_to_date(x: typing.Union[int, date]) -> date:
|
|
|
|
if isinstance(x, int):
|
|
|
|
return date.fromordinal(x)
|
|
|
|
return x
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
|
class Community:
|
|
|
|
chat_id: int # Telegram chat id
|
|
|
|
name: str = ''
|
|
|
|
members: typing.List[int] = field(default_factory=list)
|
|
|
|
pool: typing.List[int] = field(default_factory=list) # [user_id]
|
|
|
|
polled: typing.Dict[int, date] = field(default_factory=dict) # Days since the user was asked
|
|
|
|
scheduled_meetings: typing.List[typing.Tuple[int, int, date]] = field(
|
|
|
|
default_factory=list) # (user_id_1, user_id_2, meeting date)
|
|
|
|
archived_meetings: typing.List[typing.Tuple[int, int, date]] = field(default_factory=list)
|
|
|
|
default_answers: typing.Dict[int, bool] = field(default_factory=dict) # Last answers
|
|
|
|
task_last_timestamps: typing.Dict[str, date] = field(default_factory=dict) # {'poll': ..., 'scheduling': ...}
|
|
|
|
start_timestamp: int = field(default_factory=lambda: int(time.time()))
|
|
|
|
|
|
|
|
def dict(self) -> dict:
|
|
|
|
data = asdict(self)
|
|
|
|
data['polled'] = {str(k): self.polled[k].toordinal() for k in self.polled}
|
|
|
|
data['default_answers'] = {str(k): self.default_answers[k] for k in self.default_answers}
|
|
|
|
data['scheduled_meetings'] = [(l, m, r.toordinal()) for l, m, r in self.scheduled_meetings]
|
|
|
|
data['archived_meetings'] = [(l, m, r.toordinal()) for l, m, r in self.archived_meetings]
|
|
|
|
data['task_last_timestamps'] = {str(k): self.task_last_timestamps[k].toordinal() for k in
|
|
|
|
self.task_last_timestamps}
|
|
|
|
return data
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def from_dict(cls, data: dict) -> Community:
|
|
|
|
data = getattr(data, '__dict__', data)
|
|
|
|
data_ = {key: data[key] for key in data.keys() if
|
|
|
|
key in ['chat_id', 'name', 'members', 'pool', 'polled', 'scheduled_meetings',
|
|
|
|
'archived_meetings', 'start_timestamp', 'default_answers']}
|
|
|
|
self = cls(**data_)
|
|
|
|
self.polled = {int(k): convert_to_date(self.polled[k]) for k in self.polled}
|
|
|
|
self.default_answers = {int(k): self.default_answers[k] for k in self.default_answers}
|
|
|
|
self.task_last_timestamps = {k: convert_to_date(self.task_last_timestamps[k]) for k in
|
|
|
|
self.task_last_timestamps}
|
|
|
|
self.scheduled_meetings = [(int(l), int(m), convert_to_date(r)) for l, m, r in self.scheduled_meetings]
|
|
|
|
self.archived_meetings = [(int(l), int(m), convert_to_date(r)) for l, m, r in self.archived_meetings]
|
|
|
|
return self
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def by_id(cls, chat_id: int, bot: Bot, create_if_not_exists=True) -> typing.Optional[Community]:
|
|
|
|
data = bot.get_community_from_db({'chat_id': chat_id}, create_if_not_exists)
|
|
|
|
if data is None:
|
|
|
|
return None
|
|
|
|
|
|
|
|
def upd_data():
|
|
|
|
chat: Chat = bot.get_chat(chat_id)
|
|
|
|
data.name = chat.title
|
|
|
|
bot.save_community(data)
|
|
|
|
|
|
|
|
Thread(target=upd_data).start()
|
|
|
|
return data
|
|
|
|
|
|
|
|
def can_user_request_a_meeting(self, user_id: int) -> bool:
|
|
|
|
# Check if there isn't a meeting in archived_meetings with its scheduling date today or yesterday
|
|
|
|
return not any(
|
|
|
|
r in [date.today(), date.today() - timedelta(days=1)] and user_id in [l, m]
|
|
|
|
for l, m, r in self.scheduled_meetings + self.archived_meetings) and user_id not in self.pool
|
|
|
|
|
|
|
|
def add_member(self, user_id: int, bot: Bot) -> bool:
|
|
|
|
chat_member_info: ChatMember = bot.get_chat_member(self.chat_id, user_id)
|
|
|
|
if chat_member_info.is_member or chat_member_info.status in ['creator', 'administrator', 'admin', 'member']:
|
|
|
|
self.members.append(user_id)
|
|
|
|
bot.save_community(self)
|
|
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
|
|
def users_to_poll(self) -> typing.List[int]:
|
|
|
|
return [
|
|
|
|
user_id for user_id in self.members
|
|
|
|
if (user_id not in self.polled or self.polled[user_id] <= date.today() - timedelta(days=7))
|
|
|
|
and self.can_user_request_a_meeting(user_id)
|
|
|
|
]
|
|
|
|
|
|
|
|
def add_answer(self, user_id: typing.Union[int, bool], answer: bool, bot: Bot):
|
|
|
|
answer = bool(answer)
|
|
|
|
while user_id in self.pool:
|
|
|
|
self.pool.remove(user_id)
|
|
|
|
self.pool += [user_id] * answer
|
|
|
|
self.default_answers[user_id] = bool(answer)
|
|
|
|
bot.save_community(self)
|
|
|
|
|
|
|
|
def have_already_met(self, user_id_1: int, user_id_2: int) -> bool:
|
|
|
|
"""
|
|
|
|
return True if there is a meeting with these users not earlier than 6 weeks ago
|
|
|
|
"""
|
|
|
|
return any(
|
|
|
|
{l, m} == {user_id_1, user_id_2} and r > date.today() - timedelta(days=42)
|
|
|
|
for l, m, r in self.scheduled_meetings)
|
|
|
|
|
|
|
|
def schedule_meetings(self):
|
|
|
|
random.shuffle(self.pool) # It's called RandomTea for a reason
|
|
|
|
today = date.today()
|
|
|
|
while len(self.pool) > 1:
|
|
|
|
user_id_1 = self.pool.pop()
|
|
|
|
while (user_id_2 := self.pool.pop()) == user_id_1 or self.have_already_met(user_id_1, user_id_2):
|
|
|
|
continue
|
|
|
|
self.scheduled_meetings.append((user_id_1, user_id_2, today))
|