Browse Source

Init

master
Lev 3 years ago
commit
015196d164
  1. 3
      README.md
  2. 218
      bot.py
  3. 21
      main.py
  4. 5
      requirements.txt
  5. 72
      templates.py
  6. 24
      templates.txt
  7. 40
      user.py
  8. 42
      utils.py
  9. 34
      views.py

3
README.md

@ -0,0 +1,3 @@
# RandomTea
Random meetings for chats

218
bot.py

@ -0,0 +1,218 @@
import telebot
import pymongo
from functools import wraps
from user import User
from templates import TemplateProvider
import emoji_data_python as edp
from threading import Thread
import utils
import typing
import traceback
import time
def locale_from_ietf(ietf_locale: str) -> str:
if not ietf_locale:
return 'en'
ietf_locale = ietf_locale.split('-')[0]
return 'ru' if ietf_locale in ['ru', 'be', 'uk'] else 'en'
class Bot(telebot.TeleBot):
def __init__(self, config: dict):
self.name = config['name']
super().__init__(config['token'])
self.me: telebot.types.User = self.get_me()
self.db = getattr(pymongo.MongoClient(config['mongo']), self.name)
self.db.users.create_index([('user_id', pymongo.ASCENDING)], unique=True)
self.config = config
self.templates = TemplateProvider()
for template_file in config.get('template_files', []):
self.templates.load_file(template_file)
self.main_keyboard = None
if config.get('main_keyboard') is not None:
kb = config.get('main_keyboard')
self.main_keyboard = {key: utils.create_keyboard(kb[key]) for key in kb.keys()}
def render_template(self, template_name: str, locale: str = 'ru', **kwargs) -> \
typing.Tuple[str, typing.Union[telebot.types.ReplyKeyboardMarkup, telebot.types.InlineKeyboardMarkup]]:
text: str = self.templates.render_template(template_name, locale, bot=self, **kwargs)
text, keyboard = self.templates.separate_text_and_keyboards(text)
keyboard = utils.create_keyboard(keyboard) \
if keyboard is not None \
else self.main_keyboard.get(locale, self.main_keyboard.get('ru'))
return text, keyboard
def render_message_for_user(self, user: User, template_name: str, **kwargs) -> \
typing.Tuple[str, typing.Union[telebot.types.ReplyKeyboardMarkup, telebot.types.InlineKeyboardMarkup]]:
text: str = self.templates.render_template(template_name, user.locale, user=user, **kwargs)
text, keyboard = self.templates.separate_text_and_keyboards(text)
keyboard = utils.create_keyboard(keyboard) if keyboard is not None \
else self.main_keyboard.get(user.locale, self.main_keyboard.get('ru'))
return text, keyboard
def edit_message_with_template(self, query: telebot.types.CallbackQuery, template: str, **kwargs):
msg = query.message
text, keyboard = self.render_message_for_user(self.get_user_by_msg(msg), template,
tg_user=msg.from_user,
**kwargs)
self.edit_message_text(text, msg.chat.id, msg.id, reply_markup=keyboard, parse_mode='HTML')
self.answer_callback_query(query)
def reply_with_template(self, msg: typing.Union[telebot.types.Message, telebot.types.CallbackQuery], template: str,
as_reply: bool = False, locale_overwrite: typing.Optional[str] = None, **kwargs):
if isinstance(msg, telebot.types.CallbackQuery):
msg = msg.message
text, keyboard = self.render_message_for_user(self.get_user_by_msg(msg), template,
tg_user=msg.from_user, locale_overwrite=locale_overwrite,
**kwargs)
if as_reply:
self.reply_to(msg, text, reply_markup=keyboard)
else:
self.send_message(msg.chat.id, text, reply_markup=keyboard, parse_mode='HTML')
def get_user_from_db(self, request: dict) -> typing.Optional[User]:
data = self.db.users.find_one(request)
if data is None:
return None
return User.from_dict(data)
def get_user_by_msg(self, msg: typing.Union[telebot.types.Message, telebot.types.InlineQuery]) -> User:
tg_user: telebot.types.User = msg.from_user
if tg_user.id == self.me.id and msg.chat.id > 0:
user = User.by_id(msg.chat.id, self) or User.by_id(msg.from_user.id, self)
else:
user = User.by_id(msg.from_user.id, self)
if user is not None:
if tg_user.language_code:
user.locale = locale_from_ietf(tg_user.language_code)
return user
else:
user = User(msg.from_user.id, msg.from_user.id)
user.locale = locale_from_ietf(tg_user.language_code)
self.db.users.insert_one(user.dict())
return user
def save(self, user: User):
self.db.users.replace_one({'user_id': user.user_id}, user.dict())
def handle_commands(self, cmds: typing.List[str]):
cmds = [edp.replace_colons(cmd) for cmd in cmds]
def wrapper(func):
@wraps(func)
def func_wrapped(msg):
user = self.get_user_by_msg(msg)
args: str = msg.text
for cmd in cmds:
if args.startswith(cmd):
if len(args.split(cmd)) <= 1:
args = ''
break
args = cmd.join(args.split(cmd)[1:]).strip()
break
func(msg, user, args)
if cmds[0].startswith('/'):
self.message_handler(commands=[cmd[1:] for cmd in cmds])(func_wrapped)
else:
self.message_handler(func=lambda msg: any(msg.text.startswith(cmd) for cmd in cmds))(func_wrapped)
return wrapper
def handle_callback(self, name: str):
def wrapper(func):
@wraps(func)
def func_wrapped(query):
user = self.get_user_by_msg(query)
args = query.data.strip()[len(name) + 1:]
try:
return func(query, user, args.strip())
except Exception:
print(f'An exception occured: {traceback.format_exc()}')
self.callback_query_handler(
lambda query: query.data.strip().startswith(name + '_') or query.data.strip() == name)(func_wrapped)
return func_wrapped
return wrapper
def inline_handler_(self, filter_func, **kwargs):
def wrapper(func):
@wraps(func)
def handler(*args, **kwargs_2):
def func_2():
try:
func(*args, **kwargs_2)
except Exception as e:
print('An exception occured:', e)
Thread(target=func_2).start()
self.inline_handler(filter_func, **kwargs)(func)
return handler
return wrapper
def iter_users(self) -> typing.Iterable[User]:
for user_data in self.db.users.find():
yield User.from_dict(user_data)
def register_next_step_handler(self, message, callback, *args, **kwargs):
if isinstance(message, telebot.types.CallbackQuery):
message = message.message
def new_callback(new_message, *new_args, **new_kwargs):
try:
if new_message.text in ['🛑 Cancel', '🛑 Отменить']:
# self.clear_step_handler_by_chat_id(new_message.chat.id)
self.reply_with_template(new_message, 'canceled')
return callback(new_message, *new_args, **new_kwargs)
except:
print(traceback.format_exc())
super().register_next_step_handler(message, new_callback, *args, **kwargs)
def send_all_copy(self, message: telebot.types.Message):
num = 0
failures = 0
for user in self.iter_users():
try:
self.copy_message(user.user_id, message.chat.id, message.id)
num += 1
except:
print(traceback.format_exc())
failures += 1
print(f'Sent to {num} users. Failures: {failures}')
def user_growth(self) -> typing.List[int]:
timestamp = next(iter(self.db.users.find({}).sort('start_timestamp')))
data = list()
while timestamp < time.time():
data.append(self.db.users.find({'start_timestamp': {'$lt': timestamp}}).count_documents())
timestamp += 24 * 3600
return data
def stats(self) -> typing.Dict[str, int]:
total_users = self.db.users.count_documents({})
new_users_today = self.db.users.count_documents(
{'start_timestamp': {'$gt': int(time.time() - 24 * 3600)}}
)
new_users_month = self.db.users.count_documents(
{'start_timestamp': {'$gt': int(time.time() - 24 * 3600 * 30)}}
)
active_users_today = self.db.users.count_documents(
{'last_action_timestamp': {'$gt': int(time.time() - 24 * 3600)}}
)
active_users_month = self.db.users.count_documents(
{'last_action_timestamp': {'$gt': int(time.time() - 24 * 3600 * 30)}}
)
return {
'Total users': total_users,
'New users in the last 24 hours': new_users_today,
'New users last 30 days': new_users_month,
'Active users in the last 24 hours': active_users_today,
'Active users in the last 30 days': active_users_month
}

21
main.py

@ -0,0 +1,21 @@
from bot import Bot
from views import views
import traceback
config = {
'mongo': __import__('os').getenv('DB'),
'name': 'test_bot',
'token': '1686277528:AAHHJgWfulqd9uGmK-RVOM-vQ60kbGgZRIg',
'template_files': ['templates.txt'],
'main_keyboard': {'en': [[' About']], 'ru': [[' О боте']]}
}
bot = Bot(config)
views(bot)
if __name__ == '__main__':
while True:
try:
bot.polling(none_stop=True)
except:
print(traceback.format_exc())

5
requirements.txt

@ -0,0 +1,5 @@
pyTelegramBotAPI
pymongo
attrs
jinja2
emoji_data_python

72
templates.py

@ -0,0 +1,72 @@
import attr
import typing
import jinja2
import emoji_data_python as edp
"""
Template format:
||<| template_name |>||
//| locale_1 |//
Text for locale 1
>>| Reply Keyboard: row 1, button 1;; Row 1, button 2 |<<
>>| Row 2, button 1 :-: callback_data_for_inline_keyboard |<<
//| locale_2 |//
Text for locale 2
"""
KeyboardMarkup = typing.List[typing.List[typing.Union[str, typing.Tuple[str, str]]]]
@attr.s(auto_attribs=True)
class TemplateProvider:
# {template: {locale: jinja template}}
templates: typing.Dict[str, typing.Dict[str, jinja2.Template]] = attr.Factory(dict)
locales: typing.Tuple[str] = ('', 'both', 'en', 'ru')
def get_template(self, template_name: str, locale: str) -> jinja2.Template:
locales = [locale] + list(self.locales)
template = self.templates[template_name]
for locale in locales:
if locale in template.keys():
return template[locale]
return next(template.values())
def render_template(self, template_name: str, locale: str, **kwargs) -> str:
return edp.replace_colons(
self.get_template(template_name, locale)
.render(**kwargs, **__builtins__)).strip()
def add_template(self, template_string: str):
template_name = template_string.split('||<|')[1].split('|>||')[0].strip()
template: typing.Dict[str, jinja2.Template] = dict()
for locale_string in template_string.split('//|')[1:]:
locale_name: str = locale_string.split('|//')[0].strip()
locale_text: str = '|//'.join(locale_string.split('|//')[1:]).strip()
template[locale_name] = jinja2.Environment(loader=jinja2.FileSystemLoader('./')).from_string(locale_text)
self.templates[template_name] = template
def load_file(self, filename: str):
file_content = open(filename).read()
for template_string in file_content.split('||<|')[1:]:
template_string = '||<|' + template_string
self.add_template(edp.replace_colons(template_string))
@staticmethod
def separate_text_and_keyboards(text: str) -> typing.Tuple[str, typing.Optional[KeyboardMarkup]]:
pure_text = text.split('>>|')[0].strip()
if len(text.split('>>|')) <= 1:
return pure_text, None
keyboard_rows = [row.split('|<<')[0].strip() for row in text.split('>>|')[1:]]
keyboard: KeyboardMarkup = list()
for row_raw in keyboard_rows:
row = list()
for btn in row_raw.split(';;'):
if ':-:' in btn:
row.append(tuple(btn.split(':-:')))
else:
row.append(btn)
keyboard.append(row)
return pure_text, keyboard

24
templates.txt

@ -0,0 +1,24 @@
||<| start |>||
//| ru |//
Здравствуйте, <b>{{ tg_user.first_name }}</b>!
//| en |//
Hello, <b>{{ tg_user.first_name }}</b>!
||<| help |>||
//| en |//
/help - Help
//| ru |//
/help - помощь
||<| admin_stats |>||
//| |//
<b>Stats:</b>
{% for key in stats.keys() %}<i>{{ key }}:</i> {{ stats[key] }}
{% endfor %}
||<| canceled |>||
//| en |//
Canceled
//| ru |//
Отменено

40
user.py

@ -0,0 +1,40 @@
from __future__ import annotations
from dataclasses import dataclass, field, asdict
import attr
import typing
import time
@dataclass
class User:
user_id: int
chat_id: int = 0
locale: str = ''
start_timestamp: int = field(default_factory=lambda: int(time.time()))
last_action_timestamp: int = field(default_factory=lambda: int(time.time()))
def dict(self) -> dict:
data = asdict(self)
data['last_action_timestamp'] = int(time.time())
return data
@classmethod
def from_dict(cls, data: dict) -> User:
data = getattr(data, '__dict__', data)
data_ = {key: data[key] for key in data.keys() if key in ['user_id', 'chat_id', 'locale']}
self = cls(**data_)
return self
@classmethod
def by_id(cls, user_id: int, bot) -> typing.Optional[User]:
data = bot.get_user_from_db({'user_id': user_id})
if data is None:
return None
return data
@property
def id(self):
return self._id
def is_admin(self) -> bool:
return self.user_id in [218952152]

42
utils.py

@ -0,0 +1,42 @@
import telebot
import typing
import random
import string
def generate_id() -> str:
return ''.join(random.choice(string.digits + string.ascii_lowercase) for _ in range(8))
def isint(st: str) -> bool:
try:
int(st)
return True
except ValueError:
return False
def create_inline_button(text: str, callback_data: str) -> telebot.types.InlineKeyboardButton:
if callback_data.strip().startswith('https://') or callback_data.startswith('tg://'):
return telebot.types.InlineKeyboardButton(text, url=callback_data)
if callback_data.strip().startswith('inline://'):
telebot.types.InlineKeyboardButton(text,
switch_inline_query_current_chat=callback_data.replace('inline://', ''))
if callback_data.strip().startswith('inline_other://'):
telebot.types.InlineKeyboardButton(text, switch_inline_query=callback_data.replace('inline_other://', ''))
return telebot.types.InlineKeyboardButton(text, callback_data=callback_data)
def create_keyboard(rows: typing.List[typing.List[str]]):
is_inline = not isinstance(rows[0][0], str)
markup = telebot.types.InlineKeyboardMarkup() if is_inline else telebot.types.ReplyKeyboardMarkup(
resize_keyboard=True)
for row in rows:
if is_inline:
markup.row(*(
create_inline_button(text, callback_data)
for text, callback_data in row
))
else:
markup.row(*(telebot.types.KeyboardButton(text) for text in row))
return markup

34
views.py

@ -0,0 +1,34 @@
from bot import Bot
def views(bot: Bot):
@bot.handle_commands(['/start'])
def handle_start(msg, _user, _args):
bot.reply_with_template(msg, 'start')
@bot.handle_commands(['/help', ' About', ' О боте'])
def handle_help(msg, _user, _args):
bot.reply_with_template(msg, 'help')
@bot.handle_commands(['/send_all'])
def send_spam(msg, user, _args):
if not user.is_admin():
return
def handler(msg_1):
bot.send_all_copy(msg_1)
bot.send_message(msg.chat.id, 'Send the message')
bot.register_next_step_handler(msg, handler)
@bot.handle_commands(['/all_stats'])
def all_stats(msg, user, _args):
if not user.is_admin():
return
stats = bot.stats()
bot.reply_with_template(msg, 'admin_stats', stats=stats)
@bot.handle_commands(['🛑 Cancel', '🛑 Отменить'])
def cancel_creation(msg, _user, _args):
bot.clear_step_handler_by_chat_id(msg.chat.id)
bot.reply_with_template(msg, 'canceled')
Loading…
Cancel
Save