Random Coffee alternative - random meetings for Telegram chats https://t.me/ranteabot
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

100 lines
4.1 KiB

from __future__ import annotations
from dataclasses import dataclass, field, asdict
import typing
import random
from datetime import date, timedelta
from telebot.types import ChatMember, Chat
import time
from threading import Thread
if typing.TYPE_CHECKING:
from bot import Bot
def convert_to_date(x: typing.Union[int, date]) -> date:
if isinstance(x, int):
return date.fromordinal(x)
return x
@dataclass
class Community:
chat_id: int # Telegram chat id
name: str = ''
members: typing.List[int] = field(default_factory=list)
pool: typing.List[int] = field(default_factory=list) # [user_id]
polled: typing.Dict[int, date] = field(default_factory=dict) # Days since the user was asked
scheduled_meetings: typing.List[typing.Tuple[int, int, date]] = field(
default_factory=list) # (user_id_1, user_id_2, meeting date)
archived_meetings: typing.List[typing.Tuple[int, int, date]] = field(default_factory=list)
default_answers: typing.Dict[int, bool] = field(default_factory=dict) # Last answers
start_timestamp: int = field(default_factory=lambda: int(time.time()))
def dict(self) -> dict:
data = asdict(self)
data['polled'] = {str(k): self.polled[k].toordinal() for k in self.polled}
data['default_answers'] = {str(k): self.default_answers[k] for k in self.default_answers}
data['scheduled_meetings'] = [(l, m, r.toordinal()) for l, m, r in self.scheduled_meetings]
data['archived_meetings'] = [(l, m, r.toordinal()) for l, m, r in self.archived_meetings]
return data
@classmethod
def from_dict(cls, data: dict) -> Community:
data = getattr(data, '__dict__', data)
data_ = {key: data[key] for key in data.keys() if
key in ['chat_id', 'name', 'members', 'pool', 'polled', 'scheduled_meetings',
'archived_meetings', 'start_timestamp', 'default_answers']}
self = cls(**data_)
self.polled = {int(k): convert_to_date(self.polled[k]) for k in self.polled}
self.default_answers = {int(k): self.default_answers[k] for k in self.default_answers}
self.scheduled_meetings = [(int(l), int(m), convert_to_date(r)) for l, m, r in self.scheduled_meetings]
self.archived_meetings = [(int(l), int(m), convert_to_date(r)) for l, m, r in self.archived_meetings]
return self
@classmethod
def by_id(cls, chat_id: int, bot: Bot, create_if_not_exists=True) -> typing.Optional[Community]:
data = bot.get_community_from_db({'chat_id': chat_id}, create_if_not_exists)
if data is None:
return None
def upd_data():
chat: Chat = bot.get_chat(chat_id)
data.name = chat.title
bot.save_community(data)
Thread(target=upd_data).start()
return data
def add_member(self, user_id: int, bot: Bot) -> bool:
chat_member_info: ChatMember = bot.get_chat_member(self.chat_id, user_id)
if chat_member_info.is_member or chat_member_info.status in ['creator', 'administrator', 'admin', 'member']:
self.members.append(user_id)
bot.save_community(self)
return True
return False
def users_to_poll(self) -> typing.List[int]:
return [
user_id for user_id in self.members
if user_id not in self.polled or self.polled[user_id] <= date.today() - timedelta(days=7)
]
def add_answer(self, user_id: typing.Union[int, bool], answer: bool, bot: Bot):
answer = bool(answer)
if user_id not in self.users_to_poll():
return
if user_id in self.pool:
self.pool.remove(user_id)
self.pool += [user_id] * answer
self.default_answers[user_id] = bool(answer)
bot.save_community(self)
def schedule_meetings(self):
random.shuffle(self.pool) # It's called RandomTea for a reason
today = date.today()
while len(self.pool) > 1:
user_id_1 = self.pool.pop()
while (user_id_2 := self.pool.pop()) == user_id_1:
continue
self.scheduled_meetings.append((user_id_1, user_id_2, today))