Random Coffee alternative - random meetings for Telegram chats
https://t.me/ranteabot
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
119 lines
5.4 KiB
119 lines
5.4 KiB
from __future__ import annotations |
|
from dataclasses import dataclass, field, asdict |
|
import typing |
|
import random |
|
from datetime import date, timedelta |
|
from telebot.types import ChatMember, Chat |
|
import time |
|
from threading import Thread |
|
|
|
if typing.TYPE_CHECKING: |
|
from bot import Bot |
|
|
|
|
|
def convert_to_date(x: typing.Union[int, date]) -> date: |
|
if isinstance(x, int): |
|
return date.fromordinal(x) |
|
return x |
|
|
|
|
|
@dataclass |
|
class Community: |
|
chat_id: int # Telegram chat id |
|
name: str = '' |
|
members: typing.List[int] = field(default_factory=list) |
|
pool: typing.List[int] = field(default_factory=list) # [user_id] |
|
polled: typing.Dict[int, date] = field(default_factory=dict) # Days since the user was asked |
|
scheduled_meetings: typing.List[typing.Tuple[int, int, date]] = field( |
|
default_factory=list) # (user_id_1, user_id_2, meeting date) |
|
archived_meetings: typing.List[typing.Tuple[int, int, date]] = field(default_factory=list) |
|
default_answers: typing.Dict[int, bool] = field(default_factory=dict) # Last answers |
|
task_last_timestamps: typing.Dict[str, date] = field(default_factory=dict) # {'poll': ..., 'scheduling': ...} |
|
start_timestamp: int = field(default_factory=lambda: int(time.time())) |
|
|
|
def dict(self) -> dict: |
|
data = asdict(self) |
|
data['polled'] = {str(k): self.polled[k].toordinal() for k in self.polled} |
|
data['default_answers'] = {str(k): self.default_answers[k] for k in self.default_answers} |
|
data['scheduled_meetings'] = [(l, m, r.toordinal()) for l, m, r in self.scheduled_meetings] |
|
data['archived_meetings'] = [(l, m, r.toordinal()) for l, m, r in self.archived_meetings] |
|
data['task_last_timestamps'] = {str(k): self.task_last_timestamps[k].toordinal() for k in |
|
self.task_last_timestamps} |
|
return data |
|
|
|
@classmethod |
|
def from_dict(cls, data: dict) -> Community: |
|
data = getattr(data, '__dict__', data) |
|
data_ = {key: data[key] for key in data.keys() if |
|
key in ['chat_id', 'name', 'members', 'pool', 'polled', 'scheduled_meetings', |
|
'archived_meetings', 'start_timestamp', 'default_answers']} |
|
self = cls(**data_) |
|
self.polled = {int(k): convert_to_date(self.polled[k]) for k in self.polled} |
|
self.default_answers = {int(k): self.default_answers[k] for k in self.default_answers} |
|
self.task_last_timestamps = {k: convert_to_date(self.task_last_timestamps[k]) for k in |
|
self.task_last_timestamps} |
|
self.scheduled_meetings = [(int(l), int(m), convert_to_date(r)) for l, m, r in self.scheduled_meetings] |
|
self.archived_meetings = [(int(l), int(m), convert_to_date(r)) for l, m, r in self.archived_meetings] |
|
return self |
|
|
|
@classmethod |
|
def by_id(cls, chat_id: int, bot: Bot, create_if_not_exists=True) -> typing.Optional[Community]: |
|
data = bot.get_community_from_db({'chat_id': chat_id}, create_if_not_exists) |
|
if data is None: |
|
return None |
|
|
|
def upd_data(): |
|
chat: Chat = bot.get_chat(chat_id) |
|
data.name = chat.title |
|
bot.save_community(data) |
|
|
|
Thread(target=upd_data).start() |
|
return data |
|
|
|
def can_user_request_a_meeting(self, user_id: int) -> bool: |
|
# Check if there isn't a meeting in archived_meetings with its scheduling date today or yesterday |
|
return not any( |
|
r in [date.today(), date.today() - timedelta(days=1)] and user_id in [l, m] |
|
for l, m, r in self.scheduled_meetings + self.archived_meetings) and user_id not in self.pool |
|
|
|
def add_member(self, user_id: int, bot: Bot) -> bool: |
|
chat_member_info: ChatMember = bot.get_chat_member(self.chat_id, user_id) |
|
if chat_member_info.is_member or chat_member_info.status in ['creator', 'administrator', 'admin', 'member']: |
|
self.members.append(user_id) |
|
bot.save_community(self) |
|
return True |
|
return False |
|
|
|
def users_to_poll(self) -> typing.List[int]: |
|
return [ |
|
user_id for user_id in self.members |
|
if (user_id not in self.polled or self.polled[user_id] <= date.today() - timedelta(days=7)) |
|
and self.can_user_request_a_meeting(user_id) |
|
] |
|
|
|
def add_answer(self, user_id: typing.Union[int, bool], answer: bool, bot: Bot): |
|
answer = bool(answer) |
|
if user_id not in self.users_to_poll(): |
|
return |
|
if user_id in self.pool: |
|
self.pool.remove(user_id) |
|
self.pool += [user_id] * answer |
|
self.default_answers[user_id] = bool(answer) |
|
bot.save_community(self) |
|
|
|
def have_already_met(self, user_id_1: int, user_id_2: int) -> bool: |
|
""" |
|
return True if there is a meeting with these users not earlier than 6 weeks ago |
|
""" |
|
return any( |
|
{l, m} == {user_id_1, user_id_2} and r > date.today() - timedelta(days=42) |
|
for l, m, r in self.scheduled_meetings) |
|
|
|
def schedule_meetings(self): |
|
random.shuffle(self.pool) # It's called RandomTea for a reason |
|
today = date.today() |
|
while len(self.pool) > 1: |
|
user_id_1 = self.pool.pop() |
|
while (user_id_2 := self.pool.pop()) == user_id_1 or self.have_already_met(user_id_1, user_id_2): |
|
continue |
|
self.scheduled_meetings.append((user_id_1, user_id_2, today))
|
|
|