imaginaryfriend/src/handler/command_handler.py

168 lines
6.3 KiB
Python
Raw Normal View History

2016-11-16 20:42:00 +01:00
import random
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
2016-11-12 13:37:14 +01:00
from telegram.ext import Handler
from src.entity.chat import Chat
2016-11-20 19:20:53 +01:00
from src.entity.pair import Pair
2016-11-12 13:37:14 +01:00
from src.domain.command import Command
2016-11-16 20:42:00 +01:00
from src.entity.word import Word
2016-11-12 13:37:14 +01:00
class CommandHandler(Handler):
2016-11-12 23:55:48 +01:00
def __init__(self, message_sender):
2016-11-12 13:37:14 +01:00
super(CommandHandler, self).__init__(self.handle)
self.commands = {
'start': self.__start_command,
'help': self.__help_command,
'ping': self.__ping_command,
'set_chance': self.__set_chance_command,
'get_chance': self.__get_chance_command,
2016-11-16 20:42:00 +01:00
'get_stats': self.__get_stats_command,
'moderate': self.__moderate_command
2016-11-12 13:37:14 +01:00
}
2016-11-12 23:55:48 +01:00
self.message_sender = message_sender
2016-11-16 20:42:00 +01:00
2016-11-12 13:37:14 +01:00
def check_update(self, update):
if isinstance(update, Update) and update.message:
message = update.message
return message.text and message.text.startswith('/') and Command.parse_name(message) in self.commands
else:
return False
2016-11-16 20:42:00 +01:00
2016-11-12 13:37:14 +01:00
def handle_update(self, update, dispatcher):
optional_args = self.collect_optional_args(dispatcher, update)
return self.callback(dispatcher.bot, update, **optional_args)
def handle(self, bot, update):
2016-11-12 23:55:48 +01:00
chat = Chat.get_chat(update.message)
command = Command(chat=chat, message=update.message)
2016-11-12 13:37:14 +01:00
2016-11-12 23:55:48 +01:00
try:
2016-11-16 20:42:00 +01:00
self.commands[command.name](bot, command)
2016-11-12 13:37:14 +01:00
except (IndexError, ValueError):
2016-11-12 23:55:48 +01:00
self.message_sender.reply(command, 'Invalid command! Type /help')
2016-11-16 20:42:00 +01:00
def __start_command(self, bot, command):
2016-11-12 23:55:48 +01:00
self.message_sender.reply(command, 'Hi! :3')
2016-11-16 20:42:00 +01:00
def __help_command(self, bot, command):
2016-11-12 23:55:48 +01:00
self.message_sender.reply(
command,
2016-11-16 20:42:00 +01:00
"""Add me to your group and let me listen to your chat for a while.
2016-11-12 13:37:14 +01:00
When I learn enough word pairs, I'll start bringing fun and absurdity to your conversations.
Available commands:
/ping,
/get_stats: get the number of word pairs I've learned in this chat,
/set_chance: set the chance that I'll reply to a random message (must be in range 1-50, default: 5),
/get_chance: get the current chance of my random reply.
If you get tired of me, you can kick me from the group.
In 12 hours, I'll forget everything that have been learned in your chat, so you can add me again and teach me new things!
"""
)
2016-11-16 20:42:00 +01:00
def __ping_command(self, bot, command):
answers = [
'echo',
'pong',
'ACK',
'reply',
'pingback'
]
self.message_sender.reply(command, random.choice(answers))
2016-11-12 13:37:14 +01:00
2016-11-16 20:42:00 +01:00
def __set_chance_command(self, bot, command):
2016-11-12 13:37:14 +01:00
try:
random_chance = int(command.args[0])
if random_chance < 1 or random_chance > 50:
raise ValueError
command.chat.update(random_chance=random_chance)
2016-11-12 23:55:48 +01:00
self.message_sender.reply(command, 'Set chance to: {}'.format(random_chance))
2016-11-12 13:37:14 +01:00
except (IndexError, ValueError):
2016-11-12 23:55:48 +01:00
self.message_sender.reply(command, 'Usage: /set_chance 1-50.')
2016-11-16 20:42:00 +01:00
def __get_chance_command(self, bot, command):
2016-11-12 23:55:48 +01:00
self.message_sender.reply(command, 'Current chance: {}'.format(command.chat.random_chance))
2016-11-16 20:42:00 +01:00
def __get_stats_command(self, bot, command):
2016-11-12 23:55:48 +01:00
self.message_sender.reply(command, 'Pairs: {}'.format(command.chat.pairs().count()))
2016-11-16 20:42:00 +01:00
def __moderate_command(self, bot, command):
try:
def is_admin():
user_id = command.message.from_user.id
admin_ids = list(map(
lambda member: member.user.id,
bot.get_chat_administrators(chat_id=command.chat.telegram_id)
))
return user_id in admin_ids
def generate_keyboard(words):
custom_keyboard = []
2016-11-22 22:03:55 +01:00
for k, v in words.items():
2016-11-16 20:42:00 +01:00
custom_keyboard.append([
2016-11-20 19:20:53 +01:00
InlineKeyboardButton(text=v, callback_data='nothing'),
2016-11-22 22:03:55 +01:00
InlineKeyboardButton(text='🔲', callback_data=str(k))
2016-11-16 20:42:00 +01:00
])
custom_keyboard.append([
InlineKeyboardButton(text='Cancel', callback_data='cancel'),
InlineKeyboardButton(text='OK', callback_data='remove_all_marked_words')
])
return InlineKeyboardMarkup(custom_keyboard)
2016-11-20 19:20:53 +01:00
def find_current_chat_words(search_word):
found_words = Word.where('word', 'like', search_word + '%') \
.order_by('word', 'asc') \
.limit(10) \
.lists('word', 'id')
if len(found_words) == 0:
return []
2016-11-22 22:03:55 +01:00
found_words_keys = list(found_words.keys())
2016-11-20 19:20:53 +01:00
in_current_chat = Pair.select('first_id', 'second_id') \
2016-11-22 22:03:55 +01:00
.where('chat_id', command.chat.id) \
.where(
Pair.query().where_in('first_id', found_words_keys)
.or_where_in('second_id', found_words_keys)
2016-11-20 19:20:53 +01:00
) \
.get()
to_keep = []
for pair in in_current_chat:
if pair.first_id in found_words:
to_keep.append(pair.first_id)
if pair.second_id in found_words:
to_keep.append(pair.second_id)
2016-11-22 22:03:55 +01:00
to_keep = set(to_keep)
2016-11-20 19:20:53 +01:00
2016-11-22 22:03:55 +01:00
return dict((k, found_words[k]) for k in found_words if k in to_keep)
2016-11-20 19:20:53 +01:00
2016-11-16 20:42:00 +01:00
if not is_admin():
return self.message_sender.reply(command, 'You don\'t have admin privileges!')
2016-11-20 19:20:53 +01:00
found_words = find_current_chat_words(command.args[0])
2016-11-16 20:42:00 +01:00
if len(found_words) == 0:
self.message_sender.reply(command, 'No words found!')
else:
self.message_sender.send_reply_markup(command,
message="Mark all words to delete and press OK, "
"or press Cancel to close this window",
reply_markup=generate_keyboard(found_words))
except (IndexError, ValueError):
self.message_sender.reply(command, 'Usage: /moderate <word>')