Random Coffee alternative - random meetings for Telegram chats https://t.me/ranteabot
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

326 lines
14 KiB

import datetime
import telebot
import pymongo
from functools import wraps
from community import Community
from user import User
from templates import TemplateProvider
import emoji_data_python as edp
from threading import Thread
import utils
import typing
import traceback
import time
tz = datetime.timezone(datetime.timedelta(hours=3))
POLL_HOUR = 9
SCHEDULING_HOUR = 21
def locale_from_ietf(_ietf_locale: str) -> str:
# if not ietf_locale:
# return 'en'
# ietf_locale = ietf_locale.split('-')[0]
# return 'ru' if ietf_locale in ['ru', 'be', 'uk'] else 'en'
return 'ru'
class Bot(telebot.TeleBot):
def __init__(self, config: dict):
self.name = config['name']
super().__init__(config['token'])
self.me: telebot.types.User = self.get_me()
self.db = getattr(pymongo.MongoClient(config['mongo']), self.name)
self.db.users.create_index([('user_id', pymongo.ASCENDING)], unique=True)
self.config = config
self.templates = TemplateProvider()
for template_file in config.get('template_files', []):
self.templates.load_file(template_file)
self.main_keyboard = None
def render_template(self, template_name: str, locale: str = 'ru', **kwargs) -> \
typing.Tuple[str, typing.Union[telebot.types.ReplyKeyboardMarkup, telebot.types.InlineKeyboardMarkup]]:
text: str = self.templates.render_template(template_name, locale, bot=self, **kwargs)
text, keyboard = self.templates.separate_text_and_keyboards(text)
keyboard = utils.create_keyboard(keyboard) \
if keyboard is not None \
else self.main_keyboard.get(locale, self.main_keyboard.get('ru'))
return text, keyboard
def render_message_for_user(self, user: User, template_name: str, **kwargs) -> \
typing.Tuple[str, typing.Union[telebot.types.ReplyKeyboardMarkup, telebot.types.InlineKeyboardMarkup]]:
text: str = self.templates.render_template(template_name, user.locale, user=user, **kwargs)
text, keyboard = self.templates.separate_text_and_keyboards(text)
keyboard = utils.create_keyboard(keyboard)
return text, keyboard
def edit_message_with_template(self, query: telebot.types.CallbackQuery, template: str, **kwargs):
msg = query.message
text, keyboard = self.render_message_for_user(self.get_user_by_msg(msg), template,
tg_user=msg.from_user,
**kwargs)
self.edit_message_text(text, msg.chat.id, msg.id, reply_markup=keyboard, parse_mode='HTML')
self.answer_callback_query(query)
def reply_with_template(self, msg: typing.Union[telebot.types.Message, telebot.types.CallbackQuery], template: str,
as_reply: bool = False, locale_overwrite: typing.Optional[str] = None, **kwargs):
if isinstance(msg, telebot.types.CallbackQuery):
msg = msg.message
user = self.get_user_by_msg(msg)
text, keyboard = self.render_message_for_user(user, template,
tg_user=msg.from_user, locale_overwrite=locale_overwrite,
**kwargs)
if as_reply:
self.reply_to(msg, text, reply_markup=keyboard)
else:
self.send_message(msg.chat.id, text, reply_markup=keyboard, parse_mode='HTML')
def send_template(self, user_id: int, template: str, locale_overwrite: typing.Optional[str] = None, **kwargs):
user = User.by_id(user_id, self)
tg_user = self.get_chat(user_id)
text, keyboard = self.render_message_for_user(user, template,
tg_user=tg_user, locale_overwrite=locale_overwrite,
**kwargs)
self.send_message(user_id, text, reply_markup=keyboard, parse_mode='HTML')
def get_user_from_db(self, request: dict) -> typing.Optional[User]:
data = self.db.users.find_one(request)
if data is None:
return None
return User.from_dict(data)
def get_community_from_db(self, request: dict, create_if_not_found: bool = False) -> typing.Optional[Community]:
data = self.db.communities.find_one(request)
if data is None:
if create_if_not_found:
self.db.communities.insert_one(request)
return Community.from_dict(request)
return None
return Community.from_dict(data)
def get_user_by_msg(self, msg: typing.Union[telebot.types.Message, telebot.types.InlineQuery]) -> User:
tg_user: telebot.types.User = msg.from_user
if tg_user.id == self.me.id and msg.chat.id > 0:
user = User.by_id(msg.chat.id, self) or User.by_id(msg.from_user.id, self)
else:
user = User.by_id(msg.from_user.id, self)
if user is not None:
if tg_user.language_code:
user.locale = locale_from_ietf(tg_user.language_code)
return user
else:
user = User(msg.from_user.id, msg.from_user.id)
user.locale = locale_from_ietf(tg_user.language_code)
self.db.users.insert_one(user.dict())
return user
def save_user(self, user: User):
self.db.users.replace_one({'user_id': user.user_id}, user.dict())
def save_community(self, community: Community):
self.db.communities.replace_one({'chat_id': community.chat_id}, community.dict())
def handle_commands(self, cmds: typing.List[str]):
cmds = [edp.replace_colons(cmd) for cmd in cmds]
def wrapper(func):
@wraps(func)
def func_wrapped(msg):
user = self.get_user_by_msg(msg)
args: str = msg.text
for cmd in cmds:
if args.startswith(cmd):
if len(args.split(cmd)) <= 1:
args = ''
break
args = cmd.join(args.split(cmd)[1:]).strip()
break
try:
func(msg, user, args)
except:
print(traceback.format_exc())
if cmds[0].startswith('/'):
self.message_handler(commands=[cmd[1:] for cmd in cmds])(func_wrapped)
else:
self.message_handler(func=lambda msg: any(msg.text.startswith(cmd) for cmd in cmds))(func_wrapped)
return wrapper
def handle_callback(self, name: str):
def wrapper(func):
@wraps(func)
def func_wrapped(query):
user = self.get_user_by_msg(query)
args = query.data.strip()[len(name) + 1:]
try:
return func(query, user, args.strip())
except KeyboardInterrupt as e:
raise e
except Exception:
print(f'An exception occured: {traceback.format_exc()}')
self.callback_query_handler(
lambda query: query.data.strip().startswith(name + '_') or query.data.strip() == name)(func_wrapped)
return func_wrapped
return wrapper
def inline_handler_(self, filter_func, **kwargs):
def wrapper(func):
@wraps(func)
def handler(*args, **kwargs_2):
def func_2():
try:
func(*args, **kwargs_2)
except Exception as e:
print('An exception occured:', e)
Thread(target=func_2).start()
self.inline_handler(filter_func, **kwargs)(func)
return handler
return wrapper
def iter_users(self) -> typing.Iterable[User]:
for user_data in self.db.users.find():
yield User.from_dict(user_data)
def iter_communities(self) -> typing.Iterable[Community]:
for community_data in self.db.communities.find():
yield Community.from_dict(community_data)
def poll_user_for_community(self, user: User, community: Community):
try:
def_answer = False # community.default_answers.get(user.user_id, True)
community.polled[user.user_id] = datetime.date.today()
self.send_template(user.user_id, 'poll_user', community=community, answer=def_answer)
community.pool += [user.user_id] * def_answer
except telebot.apihelper.ApiTelegramException:
pass
def poll_users_in_community(self, community: Community):
for user_id in community.users_to_poll():
user = User.by_id(user_id, self)
if user is None:
continue
self.poll_user_for_community(user, community)
self.save_community(community)
def run_suggestions(self):
for user in self.iter_users():
found_communities = False
try:
for community in self.iter_communities():
if user.check_chat(community.chat_id, self) and user.user_id not in community.members:
self.send_template(user.user_id, 'community_suggest', community=community)
found_communities = True
except KeyboardInterrupt as e:
raise e
except Exception:
print(f'An exception occured in suggestions: {traceback.format_exc()}')
if found_communities:
self.save_user(user)
def send_meeting_info_in_community(self, community: Community):
for meeting in community.scheduled_meetings:
try:
person_1, person_2 = self.get_chat(meeting[0]), self.get_chat(meeting[1])
self.send_template(meeting[0], 'meeting_info', community=community, meeting=meeting, person=person_2)
self.send_template(meeting[1], 'meeting_info', community=community, meeting=meeting, person=person_1)
except telebot.apihelper.ApiTelegramException:
pass
community.archived_meetings += community.scheduled_meetings
community.scheduled_meetings = []
self.save_community(community)
def run_necessary_actions_for_community(self, community: Community):
if community.task_last_timestamps.get('poll', datetime.date.today() - datetime.timedelta(days=1)) \
< datetime.date.today() and datetime.datetime.now(tz).hour >= POLL_HOUR:
self.poll_users_in_community(community)
community.task_last_timestamps['poll'] = datetime.date.today()
if community.task_last_timestamps.get('scheduling', datetime.date.today() - datetime.timedelta(days=1)) \
< datetime.date.today() and datetime.datetime.now(tz).hour >= SCHEDULING_HOUR:
community.schedule_meetings()
self.send_meeting_info_in_community(community)
community.task_last_timestamps['scheduling'] = datetime.date.today()
self.save_community(community)
def actions_loop(self):
while True:
try:
for community in self.iter_communities():
try:
self.run_necessary_actions_for_community(community)
except KeyboardInterrupt as e:
raise e
except Exception:
print(f'An exception occurred: {traceback.format_exc()}')
except KeyboardInterrupt as e:
raise e
except Exception:
print(f'An exception occurred while iterating through communities: {traceback.format_exc()}')
Thread(target=self.run_suggestions).start()
time.sleep(60)
def register_next_step_handler(self, message, callback, *args, **kwargs):
if isinstance(message, telebot.types.CallbackQuery):
message = message.message
def new_callback(new_message, *new_args, **new_kwargs):
try:
if new_message.text in ['🛑 Cancel', '🛑 Отменить']:
# self.clear_step_handler_by_chat_id(new_message.chat.id)
self.reply_with_template(new_message, 'canceled')
return callback(new_message, *new_args, **new_kwargs)
except:
print(traceback.format_exc())
super().register_next_step_handler(message, new_callback, *args, **kwargs)
def send_all_copy(self, message: telebot.types.Message):
num = 0
failures = 0
for user in self.iter_users():
try:
self.copy_message(user.user_id, message.chat.id, message.id)
num += 1
except:
print(traceback.format_exc())
failures += 1
print(f'Sent to {num} users. Failures: {failures}')
def user_growth(self) -> typing.List[int]:
timestamp = next(iter(self.db.users.find({}).sort('start_timestamp')))
data = list()
while timestamp < time.time():
data.append(self.db.users.find({'start_timestamp': {'$lt': timestamp}}).count_documents())
timestamp += 24 * 3600
return data
def stats(self) -> typing.Dict[str, int]:
total_users = self.db.users.count_documents({})
new_users_today = self.db.users.count_documents(
{'start_timestamp': {'$gt': int(time.time() - 24 * 3600)}}
)
new_users_month = self.db.users.count_documents(
{'start_timestamp': {'$gt': int(time.time() - 24 * 3600 * 30)}}
)
active_users_today = self.db.users.count_documents(
{'last_action_timestamp': {'$gt': int(time.time() - 24 * 3600)}}
)
active_users_month = self.db.users.count_documents(
{'last_action_timestamp': {'$gt': int(time.time() - 24 * 3600 * 30)}}
)
return {
'Total users': total_users,
'New users in the last 24 hours': new_users_today,
'New users last 30 days': new_users_month,
'Active users in the last 24 hours': active_users_today,
'Active users in the last 30 days': active_users_month
}