Random Coffee alternative - random meetings for Telegram chats https://t.me/ranteabot
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

120 lines
5.4 KiB

from __future__ import annotations
from dataclasses import dataclass, field, asdict
import typing
import random
from datetime import date, timedelta
from telebot.types import ChatMember, Chat
import time
from threading import Thread
if typing.TYPE_CHECKING:
from bot import Bot
def convert_to_date(x: typing.Union[int, date]) -> date:
if isinstance(x, int):
return date.fromordinal(x)
return x
@dataclass
class Community:
chat_id: int # Telegram chat id
name: str = ''
members: typing.List[int] = field(default_factory=list)
pool: typing.List[int] = field(default_factory=list) # [user_id]
polled: typing.Dict[int, date] = field(default_factory=dict) # Days since the user was asked
scheduled_meetings: typing.List[typing.Tuple[int, int, date]] = field(
default_factory=list) # (user_id_1, user_id_2, meeting date)
archived_meetings: typing.List[typing.Tuple[int, int, date]] = field(default_factory=list)
default_answers: typing.Dict[int, bool] = field(default_factory=dict) # Last answers
task_last_timestamps: typing.Dict[str, date] = field(default_factory=dict) # {'poll': ..., 'scheduling': ...}
start_timestamp: int = field(default_factory=lambda: int(time.time()))
def dict(self) -> dict:
data = asdict(self)
data['polled'] = {str(k): self.polled[k].toordinal() for k in self.polled}
data['default_answers'] = {str(k): self.default_answers[k] for k in self.default_answers}
data['scheduled_meetings'] = [(l, m, r.toordinal()) for l, m, r in self.scheduled_meetings]
data['archived_meetings'] = [(l, m, r.toordinal()) for l, m, r in self.archived_meetings]
data['task_last_timestamps'] = {str(k): self.task_last_timestamps[k].toordinal() for k in
self.task_last_timestamps}
return data
@classmethod
def from_dict(cls, data: dict) -> Community:
data = getattr(data, '__dict__', data)
data_ = {key: data[key] for key in data.keys() if
key in ['chat_id', 'name', 'members', 'pool', 'polled', 'scheduled_meetings',
'archived_meetings', 'start_timestamp', 'default_answers']}
self = cls(**data_)
self.polled = {int(k): convert_to_date(self.polled[k]) for k in self.polled}
self.default_answers = {int(k): self.default_answers[k] for k in self.default_answers}
self.task_last_timestamps = {k: convert_to_date(self.task_last_timestamps[k]) for k in
self.task_last_timestamps}
self.scheduled_meetings = [(int(l), int(m), convert_to_date(r)) for l, m, r in self.scheduled_meetings]
self.archived_meetings = [(int(l), int(m), convert_to_date(r)) for l, m, r in self.archived_meetings]
return self
@classmethod
def by_id(cls, chat_id: int, bot: Bot, create_if_not_exists=True) -> typing.Optional[Community]:
data = bot.get_community_from_db({'chat_id': chat_id}, create_if_not_exists)
if data is None:
return None
def upd_data():
chat: Chat = bot.get_chat(chat_id)
data.name = chat.title
bot.save_community(data)
Thread(target=upd_data).start()
return data
def can_user_request_a_meeting(self, user_id: int) -> bool:
# Check if there isn't a meeting in archived_meetings with its scheduling date today or yesterday
return not any(
r in [date.today(), date.today() - timedelta(days=1)] and user_id in [l, m]
for l, m, r in self.scheduled_meetings + self.archived_meetings) and user_id not in self.pool
def add_member(self, user_id: int, bot: Bot) -> bool:
chat_member_info: ChatMember = bot.get_chat_member(self.chat_id, user_id)
if chat_member_info.is_member or chat_member_info.status in ['creator', 'administrator', 'admin', 'member']:
self.members.append(user_id)
bot.save_community(self)
return True
return False
def users_to_poll(self) -> typing.List[int]:
return [
user_id for user_id in self.members
if (user_id not in self.polled or self.polled[user_id] <= date.today() - timedelta(days=7))
and self.can_user_request_a_meeting(user_id)
]
def add_answer(self, user_id: typing.Union[int, bool], answer: bool, bot: Bot):
answer = bool(answer)
if user_id not in self.users_to_poll():
return
if user_id in self.pool:
self.pool.remove(user_id)
self.pool += [user_id] * answer
self.default_answers[user_id] = bool(answer)
bot.save_community(self)
def have_already_met(self, user_id_1: int, user_id_2: int) -> bool:
"""
return True if there is a meeting with these users not earlier than 6 weeks ago
"""
return any(
{l, m} == {user_id_1, user_id_2} and r > date.today() - timedelta(days=42)
for l, m, r in self.scheduled_meetings)
def schedule_meetings(self):
random.shuffle(self.pool) # It's called RandomTea for a reason
today = date.today()
while len(self.pool) > 1:
user_id_1 = self.pool.pop()
while (user_id_2 := self.pool.pop()) == user_id_1 or self.have_already_met(user_id_1, user_id_2):
continue
self.scheduled_meetings.append((user_id_1, user_id_2, today))