Browse Source

Started writing - now the Community model is ready, bot welcomes the users and adds them communities

master
Lev 3 years ago
parent
commit
b47f8967f7
  1. 3
      README.md
  2. 30
      bot.py
  3. 57
      community.py
  4. 4
      main.py
  5. 19
      templates.txt
  6. 4
      user.py
  7. 2
      utils.py
  8. 23
      views.py

3
README.md

@ -1,9 +1,10 @@
# RandomTea # RandomTea
![](logo.png) ![](logo.png)
Random meetings for chats, kind of like RandomCoffee. Random meetings for chats, kind of like RandomCoffee.
### How it works ## How it works
1. The bot is invited to a chat 1. The bot is invited to a chat
2. The members of the chat open the bot 2. The members of the chat open the bot

30
bot.py

@ -1,6 +1,8 @@
import telebot import telebot
import pymongo import pymongo
from functools import wraps from functools import wraps
from community import Community
from user import User from user import User
from templates import TemplateProvider from templates import TemplateProvider
import emoji_data_python as edp import emoji_data_python as edp
@ -11,11 +13,12 @@ import traceback
import time import time
def locale_from_ietf(ietf_locale: str) -> str: def locale_from_ietf(_ietf_locale: str) -> str:
if not ietf_locale: # if not ietf_locale:
return 'en' # return 'en'
ietf_locale = ietf_locale.split('-')[0] # ietf_locale = ietf_locale.split('-')[0]
return 'ru' if ietf_locale in ['ru', 'be', 'uk'] else 'en' # return 'ru' if ietf_locale in ['ru', 'be', 'uk'] else 'en'
return 'ru'
class Bot(telebot.TeleBot): class Bot(telebot.TeleBot):
@ -77,6 +80,15 @@ class Bot(telebot.TeleBot):
return None return None
return User.from_dict(data) return User.from_dict(data)
def get_community_from_db(self, request: dict, create_if_not_found: bool = False) -> typing.Optional[Community]:
data = self.db.communities.find_one(request)
if data is None:
if create_if_not_found:
self.db.communities.insert_one(request)
return Community.from_dict(request)
return None
return Community.from_dict(data)
def get_user_by_msg(self, msg: typing.Union[telebot.types.Message, telebot.types.InlineQuery]) -> User: def get_user_by_msg(self, msg: typing.Union[telebot.types.Message, telebot.types.InlineQuery]) -> User:
tg_user: telebot.types.User = msg.from_user tg_user: telebot.types.User = msg.from_user
if tg_user.id == self.me.id and msg.chat.id > 0: if tg_user.id == self.me.id and msg.chat.id > 0:
@ -93,9 +105,12 @@ class Bot(telebot.TeleBot):
self.db.users.insert_one(user.dict()) self.db.users.insert_one(user.dict())
return user return user
def save(self, user: User): def save_user(self, user: User):
self.db.users.replace_one({'user_id': user.user_id}, user.dict()) self.db.users.replace_one({'user_id': user.user_id}, user.dict())
def save_community(self, community: Community):
self.db.communities.replace_one({'chat_id': community.chat_id}, community.dict())
def handle_commands(self, cmds: typing.List[str]): def handle_commands(self, cmds: typing.List[str]):
cmds = [edp.replace_colons(cmd) for cmd in cmds] cmds = [edp.replace_colons(cmd) for cmd in cmds]
@ -111,7 +126,10 @@ class Bot(telebot.TeleBot):
break break
args = cmd.join(args.split(cmd)[1:]).strip() args = cmd.join(args.split(cmd)[1:]).strip()
break break
try:
func(msg, user, args) func(msg, user, args)
except:
print(traceback.format_exc())
if cmds[0].startswith('/'): if cmds[0].startswith('/'):
self.message_handler(commands=[cmd[1:] for cmd in cmds])(func_wrapped) self.message_handler(commands=[cmd[1:] for cmd in cmds])(func_wrapped)

57
community.py

@ -0,0 +1,57 @@
from __future__ import annotations
from dataclasses import dataclass, field, asdict
import typing
from datetime import date
from telebot.types import ChatMember, Chat
import time
from threading import Thread
if typing.TYPE_CHECKING:
from bot import Bot
@dataclass
class Community:
chat_id: int # Telegram chat id
name: str = ''
members: typing.List[int] = field(default_factory=list)
pool: typing.List[typing.Tuple[int, int]] = field(default_factory=list) # (user_id, number_of_copies)
scheduled_meetings: typing.List[typing.Tuple[int, int, date]] = field(
default_factory=list) # (user_id_1, user_id_2, meeting date)
archived_meetings: typing.List[typing.Tuple[int, int, date]] = field(default_factory=list)
start_timestamp: int = field(default_factory=lambda: int(time.time()))
def dict(self) -> dict:
data = asdict(self)
return data
@classmethod
def from_dict(cls, data: dict) -> Community:
data = getattr(data, '__dict__', data)
data_ = {key: data[key] for key in data.keys() if
key in ['chat_id', 'name', 'members', 'pool', 'scheduled_meetings', 'archived_meetings',
'start_timestamp']}
self = cls(**data_)
return self
@classmethod
def by_id(cls, chat_id: int, bot: Bot, create_if_not_exists=True) -> typing.Optional[Community]:
data = bot.get_community_from_db({'chat_id': chat_id}, create_if_not_exists)
if data is None:
return None
def upd_data():
chat: Chat = bot.get_chat(chat_id)
data.name = chat.title
bot.save_community(data)
Thread(target=upd_data).start()
return data
def add_member(self, user_id: int, bot: Bot) -> bool:
chat_member_info: ChatMember = bot.get_chat_member(self.chat_id, user_id)
if chat_member_info.is_member or chat_member_info.status in ['creator', 'administrator', 'admin']:
self.members.append(user_id)
bot.save_community(self)
return True
return False

4
main.py

@ -5,8 +5,8 @@ import traceback
config = { config = {
'mongo': __import__('os').getenv('DB'), 'mongo': __import__('os').getenv('DB'),
'name': 'test_bot', 'name': 'ranteabot',
'token': '1686277528:AAHHJgWfulqd9uGmK-RVOM-vQ60kbGgZRIg', 'token': __import__('os').getenv('TOKEN'),
'template_files': ['templates.txt'], 'template_files': ['templates.txt'],
'main_keyboard': {'en': [[' About']], 'ru': [[' О боте']]} 'main_keyboard': {'en': [[' About']], 'ru': [[' О боте']]}
} }

19
templates.txt

@ -16,6 +16,25 @@ Hello, <b>{{ tg_user.first_name }}</b>!
{% for key in stats.keys() %}<i>{{ key }}:</i> {{ stats[key] }} {% for key in stats.keys() %}<i>{{ key }}:</i> {{ stats[key] }}
{% endfor %} {% endfor %}
||<| welcome |>||
//| en |//
Thank you for adding RandomTea to your community!
Now, the members of your community that wish to participate can press the button below👇.
>>| Join :-: https://t.me/ranteabot?start=comm-{{ community.chat_id }} |<<
//| ru |//
{# todo #}
Спасибо за добавление RandomTea в вашу сообщество!
Теперь пользователи вашего сообщества могут нажать на кнопку ниже👇, чтобы присоединиться к RandomTea.
>>| Присоединиться :-: https://t.me/ranteabot?start=comm-{{ community.chat_id }} |<<
||<| community_added |>||
//| en |//
{# todo: name #}
You have joined the community, congrats!
//| ru |//
Вы присоединились к сообществу, хорошего дня!
||<| canceled |>|| ||<| canceled |>||
//| en |// //| en |//
Canceled Canceled

4
user.py

@ -1,6 +1,5 @@
from __future__ import annotations from __future__ import annotations
from dataclasses import dataclass, field, asdict from dataclasses import dataclass, field, asdict
import attr
import typing import typing
import time import time
@ -21,7 +20,8 @@ class User:
@classmethod @classmethod
def from_dict(cls, data: dict) -> User: def from_dict(cls, data: dict) -> User:
data = getattr(data, '__dict__', data) data = getattr(data, '__dict__', data)
data_ = {key: data[key] for key in data.keys() if key in ['user_id', 'chat_id', 'locale']} data_ = {key: data[key] for key in data.keys() if
key in ['user_id', 'chat_id', 'locale', 'start_timestamp', 'last_action_timestamp']}
self = cls(**data_) self = cls(**data_)
return self return self

2
utils.py

@ -18,7 +18,7 @@ def isint(st: str) -> bool:
def create_inline_button(text: str, callback_data: str) -> telebot.types.InlineKeyboardButton: def create_inline_button(text: str, callback_data: str) -> telebot.types.InlineKeyboardButton:
if callback_data.strip().startswith('https://') or callback_data.startswith('tg://'): if callback_data.strip().startswith('https://') or callback_data.startswith('tg://'):
return telebot.types.InlineKeyboardButton(text, url=callback_data) return telebot.types.InlineKeyboardButton(text, url=callback_data.strip())
if callback_data.strip().startswith('inline://'): if callback_data.strip().startswith('inline://'):
telebot.types.InlineKeyboardButton(text, telebot.types.InlineKeyboardButton(text,
switch_inline_query_current_chat=callback_data.replace('inline://', '')) switch_inline_query_current_chat=callback_data.replace('inline://', ''))

23
views.py

@ -1,9 +1,22 @@
from bot import Bot from bot import Bot
from community import Community
def views(bot: Bot): def views(bot: Bot):
@bot.handle_commands(['/start']) @bot.handle_commands(['/start'])
def handle_start(msg, _user, _args): def handle_start(msg, user, args):
if msg.chat.type in 'supergroup':
bot.reply_with_template(msg, 'welcome', community=Community.by_id(msg.chat.id, bot))
return
if args.strip().startswith('comm'):
community_id = int('-'.join(args.strip().split('-')[1:]))
community = Community.by_id(community_id, bot)
if user.user_id in community.members:
bot.reply_with_template(msg, 'community_added', community=community, already_member=True)
elif community.add_member(user.user_id, bot):
bot.reply_with_template(msg, 'community_added', community=community, already_member=False)
else:
bot.reply_with_template(msg, 'err_not_a_member', community=community)
bot.reply_with_template(msg, 'start') bot.reply_with_template(msg, 'start')
@bot.handle_commands(['/help', ' About', ' О боте']) @bot.handle_commands(['/help', ' About', ' О боте'])
@ -14,6 +27,7 @@ def views(bot: Bot):
def send_spam(msg, user, _args): def send_spam(msg, user, _args):
if not user.is_admin(): if not user.is_admin():
return return
def handler(msg_1): def handler(msg_1):
bot.send_all_copy(msg_1) bot.send_all_copy(msg_1)
@ -32,3 +46,10 @@ def views(bot: Bot):
bot.clear_step_handler_by_chat_id(msg.chat.id) bot.clear_step_handler_by_chat_id(msg.chat.id)
bot.reply_with_template(msg, 'canceled') bot.reply_with_template(msg, 'canceled')
@bot.my_chat_member_handler()
def handle_my_chat_member(upd):
chat_id: int = upd.chat.id
# get the community and add it if it doesn't exist
community: Community = Community.by_id(chat_id, bot)
# send welcome message to the chat (if we can) - and if we should
bot.reply_with_template(upd, 'welcome', community=community)

Loading…
Cancel
Save