commit 015196d164535144bfc876e47b05db7df96d3120 Author: ennucore Date: Mon Jun 20 14:43:43 2022 +0300 Init diff --git a/README.md b/README.md new file mode 100644 index 0000000..ede58f7 --- /dev/null +++ b/README.md @@ -0,0 +1,3 @@ +# RandomTea + +Random meetings for chats diff --git a/bot.py b/bot.py new file mode 100644 index 0000000..3c00d6e --- /dev/null +++ b/bot.py @@ -0,0 +1,218 @@ +import telebot +import pymongo +from functools import wraps +from user import User +from templates import TemplateProvider +import emoji_data_python as edp +from threading import Thread +import utils +import typing +import traceback +import time + + +def locale_from_ietf(ietf_locale: str) -> str: + if not ietf_locale: + return 'en' + ietf_locale = ietf_locale.split('-')[0] + return 'ru' if ietf_locale in ['ru', 'be', 'uk'] else 'en' + + +class Bot(telebot.TeleBot): + def __init__(self, config: dict): + self.name = config['name'] + super().__init__(config['token']) + self.me: telebot.types.User = self.get_me() + self.db = getattr(pymongo.MongoClient(config['mongo']), self.name) + self.db.users.create_index([('user_id', pymongo.ASCENDING)], unique=True) + self.config = config + self.templates = TemplateProvider() + for template_file in config.get('template_files', []): + self.templates.load_file(template_file) + self.main_keyboard = None + if config.get('main_keyboard') is not None: + kb = config.get('main_keyboard') + self.main_keyboard = {key: utils.create_keyboard(kb[key]) for key in kb.keys()} + + def render_template(self, template_name: str, locale: str = 'ru', **kwargs) -> \ + typing.Tuple[str, typing.Union[telebot.types.ReplyKeyboardMarkup, telebot.types.InlineKeyboardMarkup]]: + text: str = self.templates.render_template(template_name, locale, bot=self, **kwargs) + text, keyboard = self.templates.separate_text_and_keyboards(text) + keyboard = utils.create_keyboard(keyboard) \ + if keyboard is not None \ + else self.main_keyboard.get(locale, self.main_keyboard.get('ru')) + return text, keyboard + + def render_message_for_user(self, user: User, template_name: str, **kwargs) -> \ + typing.Tuple[str, typing.Union[telebot.types.ReplyKeyboardMarkup, telebot.types.InlineKeyboardMarkup]]: + text: str = self.templates.render_template(template_name, user.locale, user=user, **kwargs) + text, keyboard = self.templates.separate_text_and_keyboards(text) + keyboard = utils.create_keyboard(keyboard) if keyboard is not None \ + else self.main_keyboard.get(user.locale, self.main_keyboard.get('ru')) + return text, keyboard + + def edit_message_with_template(self, query: telebot.types.CallbackQuery, template: str, **kwargs): + msg = query.message + text, keyboard = self.render_message_for_user(self.get_user_by_msg(msg), template, + tg_user=msg.from_user, + **kwargs) + self.edit_message_text(text, msg.chat.id, msg.id, reply_markup=keyboard, parse_mode='HTML') + self.answer_callback_query(query) + + def reply_with_template(self, msg: typing.Union[telebot.types.Message, telebot.types.CallbackQuery], template: str, + as_reply: bool = False, locale_overwrite: typing.Optional[str] = None, **kwargs): + if isinstance(msg, telebot.types.CallbackQuery): + msg = msg.message + text, keyboard = self.render_message_for_user(self.get_user_by_msg(msg), template, + tg_user=msg.from_user, locale_overwrite=locale_overwrite, + **kwargs) + if as_reply: + self.reply_to(msg, text, reply_markup=keyboard) + else: + self.send_message(msg.chat.id, text, reply_markup=keyboard, parse_mode='HTML') + + def get_user_from_db(self, request: dict) -> typing.Optional[User]: + data = self.db.users.find_one(request) + if data is None: + return None + return User.from_dict(data) + + def get_user_by_msg(self, msg: typing.Union[telebot.types.Message, telebot.types.InlineQuery]) -> User: + tg_user: telebot.types.User = msg.from_user + if tg_user.id == self.me.id and msg.chat.id > 0: + user = User.by_id(msg.chat.id, self) or User.by_id(msg.from_user.id, self) + else: + user = User.by_id(msg.from_user.id, self) + if user is not None: + if tg_user.language_code: + user.locale = locale_from_ietf(tg_user.language_code) + return user + else: + user = User(msg.from_user.id, msg.from_user.id) + user.locale = locale_from_ietf(tg_user.language_code) + self.db.users.insert_one(user.dict()) + return user + + def save(self, user: User): + self.db.users.replace_one({'user_id': user.user_id}, user.dict()) + + def handle_commands(self, cmds: typing.List[str]): + cmds = [edp.replace_colons(cmd) for cmd in cmds] + + def wrapper(func): + @wraps(func) + def func_wrapped(msg): + user = self.get_user_by_msg(msg) + args: str = msg.text + for cmd in cmds: + if args.startswith(cmd): + if len(args.split(cmd)) <= 1: + args = '' + break + args = cmd.join(args.split(cmd)[1:]).strip() + break + func(msg, user, args) + + if cmds[0].startswith('/'): + self.message_handler(commands=[cmd[1:] for cmd in cmds])(func_wrapped) + else: + self.message_handler(func=lambda msg: any(msg.text.startswith(cmd) for cmd in cmds))(func_wrapped) + return wrapper + + def handle_callback(self, name: str): + def wrapper(func): + @wraps(func) + def func_wrapped(query): + user = self.get_user_by_msg(query) + args = query.data.strip()[len(name) + 1:] + try: + return func(query, user, args.strip()) + except Exception: + print(f'An exception occured: {traceback.format_exc()}') + + self.callback_query_handler( + lambda query: query.data.strip().startswith(name + '_') or query.data.strip() == name)(func_wrapped) + return func_wrapped + + return wrapper + + def inline_handler_(self, filter_func, **kwargs): + def wrapper(func): + @wraps(func) + def handler(*args, **kwargs_2): + def func_2(): + try: + func(*args, **kwargs_2) + except Exception as e: + print('An exception occured:', e) + + Thread(target=func_2).start() + + self.inline_handler(filter_func, **kwargs)(func) + return handler + + return wrapper + + def iter_users(self) -> typing.Iterable[User]: + for user_data in self.db.users.find(): + yield User.from_dict(user_data) + + def register_next_step_handler(self, message, callback, *args, **kwargs): + if isinstance(message, telebot.types.CallbackQuery): + message = message.message + + def new_callback(new_message, *new_args, **new_kwargs): + try: + if new_message.text in ['🛑 Cancel', '🛑 Отменить']: + # self.clear_step_handler_by_chat_id(new_message.chat.id) + self.reply_with_template(new_message, 'canceled') + return callback(new_message, *new_args, **new_kwargs) + except: + print(traceback.format_exc()) + + super().register_next_step_handler(message, new_callback, *args, **kwargs) + + def send_all_copy(self, message: telebot.types.Message): + num = 0 + failures = 0 + for user in self.iter_users(): + try: + self.copy_message(user.user_id, message.chat.id, message.id) + num += 1 + except: + print(traceback.format_exc()) + failures += 1 + print(f'Sent to {num} users. Failures: {failures}') + + def user_growth(self) -> typing.List[int]: + timestamp = next(iter(self.db.users.find({}).sort('start_timestamp'))) + data = list() + while timestamp < time.time(): + data.append(self.db.users.find({'start_timestamp': {'$lt': timestamp}}).count_documents()) + timestamp += 24 * 3600 + return data + + def stats(self) -> typing.Dict[str, int]: + total_users = self.db.users.count_documents({}) + new_users_today = self.db.users.count_documents( + {'start_timestamp': {'$gt': int(time.time() - 24 * 3600)}} + ) + new_users_month = self.db.users.count_documents( + {'start_timestamp': {'$gt': int(time.time() - 24 * 3600 * 30)}} + ) + active_users_today = self.db.users.count_documents( + {'last_action_timestamp': {'$gt': int(time.time() - 24 * 3600)}} + ) + active_users_month = self.db.users.count_documents( + {'last_action_timestamp': {'$gt': int(time.time() - 24 * 3600 * 30)}} + ) + return { + 'Total users': total_users, + 'New users in the last 24 hours': new_users_today, + 'New users last 30 days': new_users_month, + 'Active users in the last 24 hours': active_users_today, + 'Active users in the last 30 days': active_users_month + } + + + diff --git a/main.py b/main.py new file mode 100644 index 0000000..e23c56d --- /dev/null +++ b/main.py @@ -0,0 +1,21 @@ +from bot import Bot +from views import views +import traceback + + +config = { + 'mongo': __import__('os').getenv('DB'), + 'name': 'test_bot', + 'token': '1686277528:AAHHJgWfulqd9uGmK-RVOM-vQ60kbGgZRIg', + 'template_files': ['templates.txt'], + 'main_keyboard': {'en': [['ℹ️ About']], 'ru': [['ℹ️ О боте']]} +} + +bot = Bot(config) +views(bot) +if __name__ == '__main__': + while True: + try: + bot.polling(none_stop=True) + except: + print(traceback.format_exc()) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..dc34332 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,5 @@ +pyTelegramBotAPI +pymongo +attrs +jinja2 +emoji_data_python \ No newline at end of file diff --git a/templates.py b/templates.py new file mode 100644 index 0000000..db0d290 --- /dev/null +++ b/templates.py @@ -0,0 +1,72 @@ +import attr +import typing +import jinja2 +import emoji_data_python as edp + + +""" +Template format: + +||<| template_name |>|| +//| locale_1 |// +Text for locale 1 +>>| Reply Keyboard: row 1, button 1;; Row 1, button 2 |<< +>>| Row 2, button 1 :-: callback_data_for_inline_keyboard |<< +//| locale_2 |// +Text for locale 2 +""" + + +KeyboardMarkup = typing.List[typing.List[typing.Union[str, typing.Tuple[str, str]]]] + + +@attr.s(auto_attribs=True) +class TemplateProvider: + # {template: {locale: jinja template}} + templates: typing.Dict[str, typing.Dict[str, jinja2.Template]] = attr.Factory(dict) + locales: typing.Tuple[str] = ('', 'both', 'en', 'ru') + + def get_template(self, template_name: str, locale: str) -> jinja2.Template: + locales = [locale] + list(self.locales) + template = self.templates[template_name] + for locale in locales: + if locale in template.keys(): + return template[locale] + return next(template.values()) + + def render_template(self, template_name: str, locale: str, **kwargs) -> str: + return edp.replace_colons( + self.get_template(template_name, locale) + .render(**kwargs, **__builtins__)).strip() + + def add_template(self, template_string: str): + template_name = template_string.split('||<|')[1].split('|>||')[0].strip() + template: typing.Dict[str, jinja2.Template] = dict() + for locale_string in template_string.split('//|')[1:]: + locale_name: str = locale_string.split('|//')[0].strip() + locale_text: str = '|//'.join(locale_string.split('|//')[1:]).strip() + template[locale_name] = jinja2.Environment(loader=jinja2.FileSystemLoader('./')).from_string(locale_text) + self.templates[template_name] = template + + def load_file(self, filename: str): + file_content = open(filename).read() + for template_string in file_content.split('||<|')[1:]: + template_string = '||<|' + template_string + self.add_template(edp.replace_colons(template_string)) + + @staticmethod + def separate_text_and_keyboards(text: str) -> typing.Tuple[str, typing.Optional[KeyboardMarkup]]: + pure_text = text.split('>>|')[0].strip() + if len(text.split('>>|')) <= 1: + return pure_text, None + keyboard_rows = [row.split('|<<')[0].strip() for row in text.split('>>|')[1:]] + keyboard: KeyboardMarkup = list() + for row_raw in keyboard_rows: + row = list() + for btn in row_raw.split(';;'): + if ':-:' in btn: + row.append(tuple(btn.split(':-:'))) + else: + row.append(btn) + keyboard.append(row) + return pure_text, keyboard diff --git a/templates.txt b/templates.txt new file mode 100644 index 0000000..1167203 --- /dev/null +++ b/templates.txt @@ -0,0 +1,24 @@ +||<| start |>|| +//| ru |// +Здравствуйте, {{ tg_user.first_name }}! +//| en |// +Hello, {{ tg_user.first_name }}! + +||<| help |>|| +//| en |// +/help - Help +//| ru |// +/help - помощь + +||<| admin_stats |>|| +//| |// +Stats: +{% for key in stats.keys() %}{{ key }}: {{ stats[key] }} +{% endfor %} + +||<| canceled |>|| +//| en |// +Canceled +//| ru |// +Отменено + diff --git a/user.py b/user.py new file mode 100644 index 0000000..61e9a16 --- /dev/null +++ b/user.py @@ -0,0 +1,40 @@ +from __future__ import annotations +from dataclasses import dataclass, field, asdict +import attr +import typing +import time + + +@dataclass +class User: + user_id: int + chat_id: int = 0 + locale: str = '' + start_timestamp: int = field(default_factory=lambda: int(time.time())) + last_action_timestamp: int = field(default_factory=lambda: int(time.time())) + + def dict(self) -> dict: + data = asdict(self) + data['last_action_timestamp'] = int(time.time()) + return data + + @classmethod + def from_dict(cls, data: dict) -> User: + data = getattr(data, '__dict__', data) + data_ = {key: data[key] for key in data.keys() if key in ['user_id', 'chat_id', 'locale']} + self = cls(**data_) + return self + + @classmethod + def by_id(cls, user_id: int, bot) -> typing.Optional[User]: + data = bot.get_user_from_db({'user_id': user_id}) + if data is None: + return None + return data + + @property + def id(self): + return self._id + + def is_admin(self) -> bool: + return self.user_id in [218952152] diff --git a/utils.py b/utils.py new file mode 100644 index 0000000..edc297b --- /dev/null +++ b/utils.py @@ -0,0 +1,42 @@ +import telebot +import typing +import random +import string + + +def generate_id() -> str: + return ''.join(random.choice(string.digits + string.ascii_lowercase) for _ in range(8)) + + +def isint(st: str) -> bool: + try: + int(st) + return True + except ValueError: + return False + + +def create_inline_button(text: str, callback_data: str) -> telebot.types.InlineKeyboardButton: + if callback_data.strip().startswith('https://') or callback_data.startswith('tg://'): + return telebot.types.InlineKeyboardButton(text, url=callback_data) + if callback_data.strip().startswith('inline://'): + telebot.types.InlineKeyboardButton(text, + switch_inline_query_current_chat=callback_data.replace('inline://', '')) + if callback_data.strip().startswith('inline_other://'): + telebot.types.InlineKeyboardButton(text, switch_inline_query=callback_data.replace('inline_other://', '')) + return telebot.types.InlineKeyboardButton(text, callback_data=callback_data) + + +def create_keyboard(rows: typing.List[typing.List[str]]): + is_inline = not isinstance(rows[0][0], str) + markup = telebot.types.InlineKeyboardMarkup() if is_inline else telebot.types.ReplyKeyboardMarkup( + resize_keyboard=True) + for row in rows: + if is_inline: + markup.row(*( + create_inline_button(text, callback_data) + for text, callback_data in row + )) + else: + markup.row(*(telebot.types.KeyboardButton(text) for text in row)) + return markup diff --git a/views.py b/views.py new file mode 100644 index 0000000..515df68 --- /dev/null +++ b/views.py @@ -0,0 +1,34 @@ +from bot import Bot + + +def views(bot: Bot): + @bot.handle_commands(['/start']) + def handle_start(msg, _user, _args): + bot.reply_with_template(msg, 'start') + + @bot.handle_commands(['/help', 'ℹ️ About', 'ℹ️ О боте']) + def handle_help(msg, _user, _args): + bot.reply_with_template(msg, 'help') + + @bot.handle_commands(['/send_all']) + def send_spam(msg, user, _args): + if not user.is_admin(): + return + def handler(msg_1): + bot.send_all_copy(msg_1) + + bot.send_message(msg.chat.id, 'Send the message') + bot.register_next_step_handler(msg, handler) + + @bot.handle_commands(['/all_stats']) + def all_stats(msg, user, _args): + if not user.is_admin(): + return + stats = bot.stats() + bot.reply_with_template(msg, 'admin_stats', stats=stats) + + @bot.handle_commands(['🛑 Cancel', '🛑 Отменить']) + def cancel_creation(msg, _user, _args): + bot.clear_step_handler_by_chat_id(msg.chat.id) + bot.reply_with_template(msg, 'canceled') +