|
|
|
import datetime
|
|
|
|
|
|
|
|
import telebot
|
|
|
|
import pymongo
|
|
|
|
from functools import wraps
|
|
|
|
|
|
|
|
from community import Community
|
|
|
|
from user import User
|
|
|
|
from templates import TemplateProvider
|
|
|
|
import emoji_data_python as edp
|
|
|
|
from threading import Thread
|
|
|
|
import utils
|
|
|
|
import typing
|
|
|
|
import traceback
|
|
|
|
import time
|
|
|
|
|
|
|
|
|
|
|
|
tz = datetime.timezone(datetime.timedelta(hours=3))
|
|
|
|
POLL_HOUR = 10
|
|
|
|
SCHEDULING_HOUR = 20
|
|
|
|
|
|
|
|
|
|
|
|
def locale_from_ietf(_ietf_locale: str) -> str:
|
|
|
|
# if not ietf_locale:
|
|
|
|
# return 'en'
|
|
|
|
# ietf_locale = ietf_locale.split('-')[0]
|
|
|
|
# return 'ru' if ietf_locale in ['ru', 'be', 'uk'] else 'en'
|
|
|
|
return 'ru'
|
|
|
|
|
|
|
|
|
|
|
|
class Bot(telebot.TeleBot):
|
|
|
|
def __init__(self, config: dict):
|
|
|
|
self.name = config['name']
|
|
|
|
super().__init__(config['token'])
|
|
|
|
self.me: telebot.types.User = self.get_me()
|
|
|
|
self.db = getattr(pymongo.MongoClient(config['mongo']), self.name)
|
|
|
|
self.db.users.create_index([('user_id', pymongo.ASCENDING)], unique=True)
|
|
|
|
self.config = config
|
|
|
|
self.templates = TemplateProvider()
|
|
|
|
for template_file in config.get('template_files', []):
|
|
|
|
self.templates.load_file(template_file)
|
|
|
|
self.main_keyboard = None
|
|
|
|
|
|
|
|
def render_template(self, template_name: str, locale: str = 'ru', **kwargs) -> \
|
|
|
|
typing.Tuple[str, typing.Union[telebot.types.ReplyKeyboardMarkup, telebot.types.InlineKeyboardMarkup]]:
|
|
|
|
text: str = self.templates.render_template(template_name, locale, bot=self, **kwargs)
|
|
|
|
text, keyboard = self.templates.separate_text_and_keyboards(text)
|
|
|
|
keyboard = utils.create_keyboard(keyboard) \
|
|
|
|
if keyboard is not None \
|
|
|
|
else self.main_keyboard.get(locale, self.main_keyboard.get('ru'))
|
|
|
|
return text, keyboard
|
|
|
|
|
|
|
|
def render_message_for_user(self, user: User, template_name: str, **kwargs) -> \
|
|
|
|
typing.Tuple[str, typing.Union[telebot.types.ReplyKeyboardMarkup, telebot.types.InlineKeyboardMarkup]]:
|
|
|
|
text: str = self.templates.render_template(template_name, user.locale, user=user, **kwargs)
|
|
|
|
text, keyboard = self.templates.separate_text_and_keyboards(text)
|
|
|
|
keyboard = utils.create_keyboard(keyboard)
|
|
|
|
return text, keyboard
|
|
|
|
|
|
|
|
def edit_message_with_template(self, query: telebot.types.CallbackQuery, template: str, **kwargs):
|
|
|
|
msg = query.message
|
|
|
|
text, keyboard = self.render_message_for_user(self.get_user_by_msg(msg), template,
|
|
|
|
tg_user=msg.from_user,
|
|
|
|
**kwargs)
|
|
|
|
self.edit_message_text(text, msg.chat.id, msg.id, reply_markup=keyboard, parse_mode='HTML')
|
|
|
|
self.answer_callback_query(query)
|
|
|
|
|
|
|
|
def reply_with_template(self, msg: typing.Union[telebot.types.Message, telebot.types.CallbackQuery], template: str,
|
|
|
|
as_reply: bool = False, locale_overwrite: typing.Optional[str] = None, **kwargs):
|
|
|
|
if isinstance(msg, telebot.types.CallbackQuery):
|
|
|
|
msg = msg.message
|
|
|
|
user = self.get_user_by_msg(msg)
|
|
|
|
text, keyboard = self.render_message_for_user(user, template,
|
|
|
|
tg_user=msg.from_user, locale_overwrite=locale_overwrite,
|
|
|
|
**kwargs)
|
|
|
|
if as_reply:
|
|
|
|
self.reply_to(msg, text, reply_markup=keyboard)
|
|
|
|
else:
|
|
|
|
self.send_message(msg.chat.id, text, reply_markup=keyboard, parse_mode='HTML')
|
|
|
|
|
|
|
|
def send_template(self, user_id: int, template: str, locale_overwrite: typing.Optional[str] = None, **kwargs):
|
|
|
|
user = User.by_id(user_id, self)
|
|
|
|
tg_user = self.get_chat(user_id)
|
|
|
|
text, keyboard = self.render_message_for_user(user, template,
|
|
|
|
tg_user=tg_user, locale_overwrite=locale_overwrite,
|
|
|
|
**kwargs)
|
|
|
|
self.send_message(user_id, text, reply_markup=keyboard, parse_mode='HTML')
|
|
|
|
|
|
|
|
def get_user_from_db(self, request: dict) -> typing.Optional[User]:
|
|
|
|
data = self.db.users.find_one(request)
|
|
|
|
if data is None:
|
|
|
|
return None
|
|
|
|
return User.from_dict(data)
|
|
|
|
|
|
|
|
def get_community_from_db(self, request: dict, create_if_not_found: bool = False) -> typing.Optional[Community]:
|
|
|
|
data = self.db.communities.find_one(request)
|
|
|
|
if data is None:
|
|
|
|
if create_if_not_found:
|
|
|
|
self.db.communities.insert_one(request)
|
|
|
|
return Community.from_dict(request)
|
|
|
|
return None
|
|
|
|
return Community.from_dict(data)
|
|
|
|
|
|
|
|
def get_user_by_msg(self, msg: typing.Union[telebot.types.Message, telebot.types.InlineQuery]) -> User:
|
|
|
|
tg_user: telebot.types.User = msg.from_user
|
|
|
|
if tg_user.id == self.me.id and msg.chat.id > 0:
|
|
|
|
user = User.by_id(msg.chat.id, self) or User.by_id(msg.from_user.id, self)
|
|
|
|
else:
|
|
|
|
user = User.by_id(msg.from_user.id, self)
|
|
|
|
if user is not None:
|
|
|
|
if tg_user.language_code:
|
|
|
|
user.locale = locale_from_ietf(tg_user.language_code)
|
|
|
|
return user
|
|
|
|
else:
|
|
|
|
user = User(msg.from_user.id, msg.from_user.id)
|
|
|
|
user.locale = locale_from_ietf(tg_user.language_code)
|
|
|
|
self.db.users.insert_one(user.dict())
|
|
|
|
return user
|
|
|
|
|
|
|
|
def save_user(self, user: User):
|
|
|
|
self.db.users.replace_one({'user_id': user.user_id}, user.dict())
|
|
|
|
|
|
|
|
def save_community(self, community: Community):
|
|
|
|
self.db.communities.replace_one({'chat_id': community.chat_id}, community.dict())
|
|
|
|
|
|
|
|
def handle_commands(self, cmds: typing.List[str]):
|
|
|
|
cmds = [edp.replace_colons(cmd) for cmd in cmds]
|
|
|
|
|
|
|
|
def wrapper(func):
|
|
|
|
@wraps(func)
|
|
|
|
def func_wrapped(msg):
|
|
|
|
user = self.get_user_by_msg(msg)
|
|
|
|
args: str = msg.text
|
|
|
|
for cmd in cmds:
|
|
|
|
if args.startswith(cmd):
|
|
|
|
if len(args.split(cmd)) <= 1:
|
|
|
|
args = ''
|
|
|
|
break
|
|
|
|
args = cmd.join(args.split(cmd)[1:]).strip()
|
|
|
|
break
|
|
|
|
try:
|
|
|
|
func(msg, user, args)
|
|
|
|
except:
|
|
|
|
print(traceback.format_exc())
|
|
|
|
|
|
|
|
if cmds[0].startswith('/'):
|
|
|
|
self.message_handler(commands=[cmd[1:] for cmd in cmds])(func_wrapped)
|
|
|
|
else:
|
|
|
|
self.message_handler(func=lambda msg: any(msg.text.startswith(cmd) for cmd in cmds))(func_wrapped)
|
|
|
|
return wrapper
|
|
|
|
|
|
|
|
def handle_callback(self, name: str):
|
|
|
|
def wrapper(func):
|
|
|
|
@wraps(func)
|
|
|
|
def func_wrapped(query):
|
|
|
|
user = self.get_user_by_msg(query)
|
|
|
|
args = query.data.strip()[len(name) + 1:]
|
|
|
|
try:
|
|
|
|
return func(query, user, args.strip())
|
|
|
|
except KeyboardInterrupt as e:
|
|
|
|
raise e
|
|
|
|
except Exception:
|
|
|
|
print(f'An exception occured: {traceback.format_exc()}')
|
|
|
|
|
|
|
|
self.callback_query_handler(
|
|
|
|
lambda query: query.data.strip().startswith(name + '_') or query.data.strip() == name)(func_wrapped)
|
|
|
|
return func_wrapped
|
|
|
|
return wrapper
|
|
|
|
|
|
|
|
def inline_handler_(self, filter_func, **kwargs):
|
|
|
|
def wrapper(func):
|
|
|
|
@wraps(func)
|
|
|
|
def handler(*args, **kwargs_2):
|
|
|
|
def func_2():
|
|
|
|
try:
|
|
|
|
func(*args, **kwargs_2)
|
|
|
|
except Exception as e:
|
|
|
|
print('An exception occured:', e)
|
|
|
|
|
|
|
|
Thread(target=func_2).start()
|
|
|
|
|
|
|
|
self.inline_handler(filter_func, **kwargs)(func)
|
|
|
|
return handler
|
|
|
|
|
|
|
|
return wrapper
|
|
|
|
|
|
|
|
def iter_users(self) -> typing.Iterable[User]:
|
|
|
|
for user_data in self.db.users.find():
|
|
|
|
yield User.from_dict(user_data)
|
|
|
|
|
|
|
|
def iter_communities(self) -> typing.Iterable[Community]:
|
|
|
|
for community_data in self.db.communities.find():
|
|
|
|
yield Community.from_dict(community_data)
|
|
|
|
|
|
|
|
def poll_user_for_community(self, user: User, community: Community):
|
|
|
|
def_answer = community.default_answers.get(user.user_id, True)
|
|
|
|
self.send_template(user.user_id, 'poll_user', community=community, answer=def_answer)
|
|
|
|
community.pool += [user.user_id] * def_answer
|
|
|
|
community.polled[user.user_id] = datetime.date.today()
|
|
|
|
|
|
|
|
def poll_users_in_community(self, community: Community):
|
|
|
|
for user_id in community.users_to_poll():
|
|
|
|
user = User.by_id(user_id, self)
|
|
|
|
if user is None:
|
|
|
|
continue
|
|
|
|
self.poll_user_for_community(user, community)
|
|
|
|
self.save_community(community)
|
|
|
|
|
|
|
|
def run_suggestions(self):
|
|
|
|
for user in self.iter_users():
|
|
|
|
found_communities = False
|
|
|
|
try:
|
|
|
|
for community in self.iter_communities():
|
|
|
|
if user.check_chat(community.chat_id, self) and user.user_id not in community.members:
|
|
|
|
self.send_template(user.user_id, 'community_suggest', community=community)
|
|
|
|
found_communities = True
|
|
|
|
except KeyboardInterrupt as e:
|
|
|
|
raise e
|
|
|
|
except Exception:
|
|
|
|
print(f'An exception occured in suggestions: {traceback.format_exc()}')
|
|
|
|
if found_communities:
|
|
|
|
self.save_user(user)
|
|
|
|
|
|
|
|
def send_meeting_info_in_community(self, community: Community):
|
|
|
|
for meeting in community.scheduled_meetings:
|
|
|
|
person_1, person_2 = self.get_chat(meeting[0]), self.get_chat(meeting[1])
|
|
|
|
self.send_template(meeting[0], 'meeting_info', community=community, meeting=meeting, person=person_2)
|
|
|
|
self.send_template(meeting[1], 'meeting_info', community=community, meeting=meeting, person=person_1)
|
|
|
|
community.archived_meetings += community.scheduled_meetings
|
|
|
|
community.scheduled_meetings = []
|
|
|
|
self.save_community(community)
|
|
|
|
|
|
|
|
def run_necessary_actions_for_community(self, community: Community):
|
|
|
|
if community.task_last_timestamps.get('poll', datetime.date.today() - datetime.timedelta(days=1)) \
|
|
|
|
< datetime.date.today() and datetime.datetime.now(tz).hour >= POLL_HOUR:
|
|
|
|
self.poll_users_in_community(community)
|
|
|
|
community.task_last_timestamps['poll'] = datetime.date.today()
|
|
|
|
if community.task_last_timestamps.get('scheduling', datetime.date.today() - datetime.timedelta(days=1)) \
|
|
|
|
< datetime.date.today() and datetime.datetime.now(tz).hour >= SCHEDULING_HOUR:
|
|
|
|
community.schedule_meetings()
|
|
|
|
self.send_meeting_info_in_community(community)
|
|
|
|
community.task_last_timestamps['scheduling'] = datetime.date.today()
|
|
|
|
self.save_community(community)
|
|
|
|
|
|
|
|
def actions_loop(self):
|
|
|
|
while True:
|
|
|
|
try:
|
|
|
|
for community in self.iter_communities():
|
|
|
|
try:
|
|
|
|
self.run_necessary_actions_for_community(community)
|
|
|
|
except KeyboardInterrupt as e:
|
|
|
|
raise e
|
|
|
|
except Exception:
|
|
|
|
print(f'An exception occurred: {traceback.format_exc()}')
|
|
|
|
except KeyboardInterrupt as e:
|
|
|
|
raise e
|
|
|
|
except Exception:
|
|
|
|
print(f'An exception occurred while iterating through communities: {traceback.format_exc()}')
|
|
|
|
Thread(target=self.run_suggestions).start()
|
|
|
|
time.sleep(60)
|
|
|
|
|
|
|
|
def register_next_step_handler(self, message, callback, *args, **kwargs):
|
|
|
|
if isinstance(message, telebot.types.CallbackQuery):
|
|
|
|
message = message.message
|
|
|
|
|
|
|
|
def new_callback(new_message, *new_args, **new_kwargs):
|
|
|
|
try:
|
|
|
|
if new_message.text in ['🛑 Cancel', '🛑 Отменить']:
|
|
|
|
# self.clear_step_handler_by_chat_id(new_message.chat.id)
|
|
|
|
self.reply_with_template(new_message, 'canceled')
|
|
|
|
return callback(new_message, *new_args, **new_kwargs)
|
|
|
|
except:
|
|
|
|
print(traceback.format_exc())
|
|
|
|
|
|
|
|
super().register_next_step_handler(message, new_callback, *args, **kwargs)
|
|
|
|
|
|
|
|
def send_all_copy(self, message: telebot.types.Message):
|
|
|
|
num = 0
|
|
|
|
failures = 0
|
|
|
|
for user in self.iter_users():
|
|
|
|
try:
|
|
|
|
self.copy_message(user.user_id, message.chat.id, message.id)
|
|
|
|
num += 1
|
|
|
|
except:
|
|
|
|
print(traceback.format_exc())
|
|
|
|
failures += 1
|
|
|
|
print(f'Sent to {num} users. Failures: {failures}')
|
|
|
|
|
|
|
|
def user_growth(self) -> typing.List[int]:
|
|
|
|
timestamp = next(iter(self.db.users.find({}).sort('start_timestamp')))
|
|
|
|
data = list()
|
|
|
|
while timestamp < time.time():
|
|
|
|
data.append(self.db.users.find({'start_timestamp': {'$lt': timestamp}}).count_documents())
|
|
|
|
timestamp += 24 * 3600
|
|
|
|
return data
|
|
|
|
|
|
|
|
def stats(self) -> typing.Dict[str, int]:
|
|
|
|
total_users = self.db.users.count_documents({})
|
|
|
|
new_users_today = self.db.users.count_documents(
|
|
|
|
{'start_timestamp': {'$gt': int(time.time() - 24 * 3600)}}
|
|
|
|
)
|
|
|
|
new_users_month = self.db.users.count_documents(
|
|
|
|
{'start_timestamp': {'$gt': int(time.time() - 24 * 3600 * 30)}}
|
|
|
|
)
|
|
|
|
active_users_today = self.db.users.count_documents(
|
|
|
|
{'last_action_timestamp': {'$gt': int(time.time() - 24 * 3600)}}
|
|
|
|
)
|
|
|
|
active_users_month = self.db.users.count_documents(
|
|
|
|
{'last_action_timestamp': {'$gt': int(time.time() - 24 * 3600 * 30)}}
|
|
|
|
)
|
|
|
|
return {
|
|
|
|
'Total users': total_users,
|
|
|
|
'New users in the last 24 hours': new_users_today,
|
|
|
|
'New users last 30 days': new_users_month,
|
|
|
|
'Active users in the last 24 hours': active_users_today,
|
|
|
|
'Active users in the last 30 days': active_users_month
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|