From 015196d164535144bfc876e47b05db7df96d3120 Mon Sep 17 00:00:00 2001 From: ennucore Date: Mon, 20 Jun 2022 14:43:43 +0300 Subject: [PATCH] Init --- README.md | 3 + bot.py | 218 +++++++++++++++++++++++++++++++++++++++++++++++ main.py | 21 +++++ requirements.txt | 5 ++ templates.py | 72 ++++++++++++++++ templates.txt | 24 ++++++ user.py | 40 +++++++++ utils.py | 42 +++++++++ views.py | 34 ++++++++ 9 files changed, 459 insertions(+) create mode 100644 README.md create mode 100644 bot.py create mode 100644 main.py create mode 100644 requirements.txt create mode 100644 templates.py create mode 100644 templates.txt create mode 100644 user.py create mode 100644 utils.py create mode 100644 views.py diff --git a/README.md b/README.md new file mode 100644 index 0000000..ede58f7 --- /dev/null +++ b/README.md @@ -0,0 +1,3 @@ +# RandomTea + +Random meetings for chats diff --git a/bot.py b/bot.py new file mode 100644 index 0000000..3c00d6e --- /dev/null +++ b/bot.py @@ -0,0 +1,218 @@ +import telebot +import pymongo +from functools import wraps +from user import User +from templates import TemplateProvider +import emoji_data_python as edp +from threading import Thread +import utils +import typing +import traceback +import time + + +def locale_from_ietf(ietf_locale: str) -> str: + if not ietf_locale: + return 'en' + ietf_locale = ietf_locale.split('-')[0] + return 'ru' if ietf_locale in ['ru', 'be', 'uk'] else 'en' + + +class Bot(telebot.TeleBot): + def __init__(self, config: dict): + self.name = config['name'] + super().__init__(config['token']) + self.me: telebot.types.User = self.get_me() + self.db = getattr(pymongo.MongoClient(config['mongo']), self.name) + self.db.users.create_index([('user_id', pymongo.ASCENDING)], unique=True) + self.config = config + self.templates = TemplateProvider() + for template_file in config.get('template_files', []): + self.templates.load_file(template_file) + self.main_keyboard = None + if config.get('main_keyboard') is not None: + kb = config.get('main_keyboard') + self.main_keyboard = {key: utils.create_keyboard(kb[key]) for key in kb.keys()} + + def render_template(self, template_name: str, locale: str = 'ru', **kwargs) -> \ + typing.Tuple[str, typing.Union[telebot.types.ReplyKeyboardMarkup, telebot.types.InlineKeyboardMarkup]]: + text: str = self.templates.render_template(template_name, locale, bot=self, **kwargs) + text, keyboard = self.templates.separate_text_and_keyboards(text) + keyboard = utils.create_keyboard(keyboard) \ + if keyboard is not None \ + else self.main_keyboard.get(locale, self.main_keyboard.get('ru')) + return text, keyboard + + def render_message_for_user(self, user: User, template_name: str, **kwargs) -> \ + typing.Tuple[str, typing.Union[telebot.types.ReplyKeyboardMarkup, telebot.types.InlineKeyboardMarkup]]: + text: str = self.templates.render_template(template_name, user.locale, user=user, **kwargs) + text, keyboard = self.templates.separate_text_and_keyboards(text) + keyboard = utils.create_keyboard(keyboard) if keyboard is not None \ + else self.main_keyboard.get(user.locale, self.main_keyboard.get('ru')) + return text, keyboard + + def edit_message_with_template(self, query: telebot.types.CallbackQuery, template: str, **kwargs): + msg = query.message + text, keyboard = self.render_message_for_user(self.get_user_by_msg(msg), template, + tg_user=msg.from_user, + **kwargs) + self.edit_message_text(text, msg.chat.id, msg.id, reply_markup=keyboard, parse_mode='HTML') + self.answer_callback_query(query) + + def reply_with_template(self, msg: typing.Union[telebot.types.Message, telebot.types.CallbackQuery], template: str, + as_reply: bool = False, locale_overwrite: typing.Optional[str] = None, **kwargs): + if isinstance(msg, telebot.types.CallbackQuery): + msg = msg.message + text, keyboard = self.render_message_for_user(self.get_user_by_msg(msg), template, + tg_user=msg.from_user, locale_overwrite=locale_overwrite, + **kwargs) + if as_reply: + self.reply_to(msg, text, reply_markup=keyboard) + else: + self.send_message(msg.chat.id, text, reply_markup=keyboard, parse_mode='HTML') + + def get_user_from_db(self, request: dict) -> typing.Optional[User]: + data = self.db.users.find_one(request) + if data is None: + return None + return User.from_dict(data) + + def get_user_by_msg(self, msg: typing.Union[telebot.types.Message, telebot.types.InlineQuery]) -> User: + tg_user: telebot.types.User = msg.from_user + if tg_user.id == self.me.id and msg.chat.id > 0: + user = User.by_id(msg.chat.id, self) or User.by_id(msg.from_user.id, self) + else: + user = User.by_id(msg.from_user.id, self) + if user is not None: + if tg_user.language_code: + user.locale = locale_from_ietf(tg_user.language_code) + return user + else: + user = User(msg.from_user.id, msg.from_user.id) + user.locale = locale_from_ietf(tg_user.language_code) + self.db.users.insert_one(user.dict()) + return user + + def save(self, user: User): + self.db.users.replace_one({'user_id': user.user_id}, user.dict()) + + def handle_commands(self, cmds: typing.List[str]): + cmds = [edp.replace_colons(cmd) for cmd in cmds] + + def wrapper(func): + @wraps(func) + def func_wrapped(msg): + user = self.get_user_by_msg(msg) + args: str = msg.text + for cmd in cmds: + if args.startswith(cmd): + if len(args.split(cmd)) <= 1: + args = '' + break + args = cmd.join(args.split(cmd)[1:]).strip() + break + func(msg, user, args) + + if cmds[0].startswith('/'): + self.message_handler(commands=[cmd[1:] for cmd in cmds])(func_wrapped) + else: + self.message_handler(func=lambda msg: any(msg.text.startswith(cmd) for cmd in cmds))(func_wrapped) + return wrapper + + def handle_callback(self, name: str): + def wrapper(func): + @wraps(func) + def func_wrapped(query): + user = self.get_user_by_msg(query) + args = query.data.strip()[len(name) + 1:] + try: + return func(query, user, args.strip()) + except Exception: + print(f'An exception occured: {traceback.format_exc()}') + + self.callback_query_handler( + lambda query: query.data.strip().startswith(name + '_') or query.data.strip() == name)(func_wrapped) + return func_wrapped + + return wrapper + + def inline_handler_(self, filter_func, **kwargs): + def wrapper(func): + @wraps(func) + def handler(*args, **kwargs_2): + def func_2(): + try: + func(*args, **kwargs_2) + except Exception as e: + print('An exception occured:', e) + + Thread(target=func_2).start() + + self.inline_handler(filter_func, **kwargs)(func) + return handler + + return wrapper + + def iter_users(self) -> typing.Iterable[User]: + for user_data in self.db.users.find(): + yield User.from_dict(user_data) + + def register_next_step_handler(self, message, callback, *args, **kwargs): + if isinstance(message, telebot.types.CallbackQuery): + message = message.message + + def new_callback(new_message, *new_args, **new_kwargs): + try: + if new_message.text in ['🛑 Cancel', '🛑 Отменить']: + # self.clear_step_handler_by_chat_id(new_message.chat.id) + self.reply_with_template(new_message, 'canceled') + return callback(new_message, *new_args, **new_kwargs) + except: + print(traceback.format_exc()) + + super().register_next_step_handler(message, new_callback, *args, **kwargs) + + def send_all_copy(self, message: telebot.types.Message): + num = 0 + failures = 0 + for user in self.iter_users(): + try: + self.copy_message(user.user_id, message.chat.id, message.id) + num += 1 + except: + print(traceback.format_exc()) + failures += 1 + print(f'Sent to {num} users. Failures: {failures}') + + def user_growth(self) -> typing.List[int]: + timestamp = next(iter(self.db.users.find({}).sort('start_timestamp'))) + data = list() + while timestamp < time.time(): + data.append(self.db.users.find({'start_timestamp': {'$lt': timestamp}}).count_documents()) + timestamp += 24 * 3600 + return data + + def stats(self) -> typing.Dict[str, int]: + total_users = self.db.users.count_documents({}) + new_users_today = self.db.users.count_documents( + {'start_timestamp': {'$gt': int(time.time() - 24 * 3600)}} + ) + new_users_month = self.db.users.count_documents( + {'start_timestamp': {'$gt': int(time.time() - 24 * 3600 * 30)}} + ) + active_users_today = self.db.users.count_documents( + {'last_action_timestamp': {'$gt': int(time.time() - 24 * 3600)}} + ) + active_users_month = self.db.users.count_documents( + {'last_action_timestamp': {'$gt': int(time.time() - 24 * 3600 * 30)}} + ) + return { + 'Total users': total_users, + 'New users in the last 24 hours': new_users_today, + 'New users last 30 days': new_users_month, + 'Active users in the last 24 hours': active_users_today, + 'Active users in the last 30 days': active_users_month + } + + + diff --git a/main.py b/main.py new file mode 100644 index 0000000..e23c56d --- /dev/null +++ b/main.py @@ -0,0 +1,21 @@ +from bot import Bot +from views import views +import traceback + + +config = { + 'mongo': __import__('os').getenv('DB'), + 'name': 'test_bot', + 'token': '1686277528:AAHHJgWfulqd9uGmK-RVOM-vQ60kbGgZRIg', + 'template_files': ['templates.txt'], + 'main_keyboard': {'en': [['ℹ️ About']], 'ru': [['ℹ️ О боте']]} +} + +bot = Bot(config) +views(bot) +if __name__ == '__main__': + while True: + try: + bot.polling(none_stop=True) + except: + print(traceback.format_exc()) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..dc34332 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,5 @@ +pyTelegramBotAPI +pymongo +attrs +jinja2 +emoji_data_python \ No newline at end of file diff --git a/templates.py b/templates.py new file mode 100644 index 0000000..db0d290 --- /dev/null +++ b/templates.py @@ -0,0 +1,72 @@ +import attr +import typing +import jinja2 +import emoji_data_python as edp + + +""" +Template format: + +||<| template_name |>|| +//| locale_1 |// +Text for locale 1 +>>| Reply Keyboard: row 1, button 1;; Row 1, button 2 |<< +>>| Row 2, button 1 :-: callback_data_for_inline_keyboard |<< +//| locale_2 |// +Text for locale 2 +""" + + +KeyboardMarkup = typing.List[typing.List[typing.Union[str, typing.Tuple[str, str]]]] + + +@attr.s(auto_attribs=True) +class TemplateProvider: + # {template: {locale: jinja template}} + templates: typing.Dict[str, typing.Dict[str, jinja2.Template]] = attr.Factory(dict) + locales: typing.Tuple[str] = ('', 'both', 'en', 'ru') + + def get_template(self, template_name: str, locale: str) -> jinja2.Template: + locales = [locale] + list(self.locales) + template = self.templates[template_name] + for locale in locales: + if locale in template.keys(): + return template[locale] + return next(template.values()) + + def render_template(self, template_name: str, locale: str, **kwargs) -> str: + return edp.replace_colons( + self.get_template(template_name, locale) + .render(**kwargs, **__builtins__)).strip() + + def add_template(self, template_string: str): + template_name = template_string.split('||<|')[1].split('|>||')[0].strip() + template: typing.Dict[str, jinja2.Template] = dict() + for locale_string in template_string.split('//|')[1:]: + locale_name: str = locale_string.split('|//')[0].strip() + locale_text: str = '|//'.join(locale_string.split('|//')[1:]).strip() + template[locale_name] = jinja2.Environment(loader=jinja2.FileSystemLoader('./')).from_string(locale_text) + self.templates[template_name] = template + + def load_file(self, filename: str): + file_content = open(filename).read() + for template_string in file_content.split('||<|')[1:]: + template_string = '||<|' + template_string + self.add_template(edp.replace_colons(template_string)) + + @staticmethod + def separate_text_and_keyboards(text: str) -> typing.Tuple[str, typing.Optional[KeyboardMarkup]]: + pure_text = text.split('>>|')[0].strip() + if len(text.split('>>|')) <= 1: + return pure_text, None + keyboard_rows = [row.split('|<<')[0].strip() for row in text.split('>>|')[1:]] + keyboard: KeyboardMarkup = list() + for row_raw in keyboard_rows: + row = list() + for btn in row_raw.split(';;'): + if ':-:' in btn: + row.append(tuple(btn.split(':-:'))) + else: + row.append(btn) + keyboard.append(row) + return pure_text, keyboard diff --git a/templates.txt b/templates.txt new file mode 100644 index 0000000..1167203 --- /dev/null +++ b/templates.txt @@ -0,0 +1,24 @@ +||<| start |>|| +//| ru |// +Здравствуйте, {{ tg_user.first_name }}! +//| en |// +Hello, {{ tg_user.first_name }}! + +||<| help |>|| +//| en |// +/help - Help +//| ru |// +/help - помощь + +||<| admin_stats |>|| +//| |// +Stats: +{% for key in stats.keys() %}{{ key }}: {{ stats[key] }} +{% endfor %} + +||<| canceled |>|| +//| en |// +Canceled +//| ru |// +Отменено + diff --git a/user.py b/user.py new file mode 100644 index 0000000..61e9a16 --- /dev/null +++ b/user.py @@ -0,0 +1,40 @@ +from __future__ import annotations +from dataclasses import dataclass, field, asdict +import attr +import typing +import time + + +@dataclass +class User: + user_id: int + chat_id: int = 0 + locale: str = '' + start_timestamp: int = field(default_factory=lambda: int(time.time())) + last_action_timestamp: int = field(default_factory=lambda: int(time.time())) + + def dict(self) -> dict: + data = asdict(self) + data['last_action_timestamp'] = int(time.time()) + return data + + @classmethod + def from_dict(cls, data: dict) -> User: + data = getattr(data, '__dict__', data) + data_ = {key: data[key] for key in data.keys() if key in ['user_id', 'chat_id', 'locale']} + self = cls(**data_) + return self + + @classmethod + def by_id(cls, user_id: int, bot) -> typing.Optional[User]: + data = bot.get_user_from_db({'user_id': user_id}) + if data is None: + return None + return data + + @property + def id(self): + return self._id + + def is_admin(self) -> bool: + return self.user_id in [218952152] diff --git a/utils.py b/utils.py new file mode 100644 index 0000000..edc297b --- /dev/null +++ b/utils.py @@ -0,0 +1,42 @@ +import telebot +import typing +import random +import string + + +def generate_id() -> str: + return ''.join(random.choice(string.digits + string.ascii_lowercase) for _ in range(8)) + + +def isint(st: str) -> bool: + try: + int(st) + return True + except ValueError: + return False + + +def create_inline_button(text: str, callback_data: str) -> telebot.types.InlineKeyboardButton: + if callback_data.strip().startswith('https://') or callback_data.startswith('tg://'): + return telebot.types.InlineKeyboardButton(text, url=callback_data) + if callback_data.strip().startswith('inline://'): + telebot.types.InlineKeyboardButton(text, + switch_inline_query_current_chat=callback_data.replace('inline://', '')) + if callback_data.strip().startswith('inline_other://'): + telebot.types.InlineKeyboardButton(text, switch_inline_query=callback_data.replace('inline_other://', '')) + return telebot.types.InlineKeyboardButton(text, callback_data=callback_data) + + +def create_keyboard(rows: typing.List[typing.List[str]]): + is_inline = not isinstance(rows[0][0], str) + markup = telebot.types.InlineKeyboardMarkup() if is_inline else telebot.types.ReplyKeyboardMarkup( + resize_keyboard=True) + for row in rows: + if is_inline: + markup.row(*( + create_inline_button(text, callback_data) + for text, callback_data in row + )) + else: + markup.row(*(telebot.types.KeyboardButton(text) for text in row)) + return markup diff --git a/views.py b/views.py new file mode 100644 index 0000000..515df68 --- /dev/null +++ b/views.py @@ -0,0 +1,34 @@ +from bot import Bot + + +def views(bot: Bot): + @bot.handle_commands(['/start']) + def handle_start(msg, _user, _args): + bot.reply_with_template(msg, 'start') + + @bot.handle_commands(['/help', 'ℹ️ About', 'ℹ️ О боте']) + def handle_help(msg, _user, _args): + bot.reply_with_template(msg, 'help') + + @bot.handle_commands(['/send_all']) + def send_spam(msg, user, _args): + if not user.is_admin(): + return + def handler(msg_1): + bot.send_all_copy(msg_1) + + bot.send_message(msg.chat.id, 'Send the message') + bot.register_next_step_handler(msg, handler) + + @bot.handle_commands(['/all_stats']) + def all_stats(msg, user, _args): + if not user.is_admin(): + return + stats = bot.stats() + bot.reply_with_template(msg, 'admin_stats', stats=stats) + + @bot.handle_commands(['🛑 Cancel', '🛑 Отменить']) + def cancel_creation(msg, _user, _args): + bot.clear_step_handler_by_chat_id(msg.chat.id) + bot.reply_with_template(msg, 'canceled') +