mau_mau_bot_bot/bot.py

397 lines
15 KiB
Python
Raw Normal View History

2018-12-20 13:12:30 +08:00
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from telethon.sync import TelegramClient
from telethon import events
import re
import logging
import time
import sys
from telethon.tl.functions.messages import GetInlineBotResultsRequest, SendInlineBotResultRequest, \
SendMessageRequest, SetTypingRequest
from telethon.tl.types import SendMessageTypingAction, SendMessageCancelAction
from telethon.tl.types import PeerUser, PeerChat, PeerChannel, User as _User, Chat as _Chat, Channel as _Channel, \
InputPeerChannel, InputPeerChat, InputPeerUser
from telethon.errors.rpcbaseerrors import RPCError
import asyncio
from game import Game
game = Game()
from config import api_id, api_hash, PHONE, session_name, unobot_username, unobot_usernames, \
default_delay, print_cards, disable_all_commands, game_consts
game.delay = default_delay
SAFE_MODE = False
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
client = TelegramClient(session_name, api_id, api_hash, sequential_updates=True, use_ipv6=False)
while not client.start(phone=PHONE):
logger.info("Start failed. Retrying...")
time.sleep(5)
logger.info("Started!")
my_username = client.get_me().username
my_firstname = client.get_me().first_name
# parse game_consts
for key in game_consts:
game_consts[key] = game_consts[key].replace('{username}', my_username)
game_consts[key] = game_consts[key].replace('{firstname}', my_firstname)
logger.info("Getting dialogs")
for dialog in client.iter_dialogs():
pass
logger.info("Done getting dialogs")
unobot = client.get_input_entity(unobot_username)
unochat = None
async def inline_query(fail=None):
""" This part handles interaction with unobot.
"""
# typing @unobot and a space :)
async def set_typing(cancel=False):
'''let others see you are typing'''
if cancel:
try:
await client(SetTypingRequest(
peer = unochat,
action = SendMessageCancelAction()
))
except Exception as err:
print(err)
else:
try:
await client(SetTypingRequest(
peer = unochat,
action = SendMessageTypingAction()
))
except Exception as err:
print(err)
async def typing_sleep(seconds):
''' Telegram cancels your typing notification in several secs
we also call set_typing in the beginning
'''
INTERVAL = 5
seconds = int(seconds)
if seconds <= 0:
pass
else:
if seconds <= INTERVAL:
await asyncio.sleep(INTERVAL)
else:
rounds = int(seconds/INTERVAL)
for i in range(rounds):
await asyncio.sleep(INTERVAL)
await set_typing()
assert (seconds - rounds * INTERVAL) > 0
await asyncio.sleep(int(seconds - rounds * INTERVAL))
await set_typing()
for tries in range(10):
try:
bot_results = await client(GetInlineBotResultsRequest(
unobot, unochat, '', ''
))
except RPCError:
await asyncio.sleep(1)
else:
break
if bot_results.results:
query_id = bot_results.query_id
for result in bot_results.results:
# is it a grey card? if so, get it.
try:
if hasattr(result.document.attributes[1], 'stickerset'):
try:
(result_id, anti_cheat) = result.id.split(':')
except:
pass
else:
if len(result_id) == 36:
# uuid result for grey cards
sset = result.document.attributes[1].stickerset
2023-06-12 12:36:50 +08:00
game.add_grey_card(sset.id, result.document.id)
continue
2018-12-20 13:12:30 +08:00
except (AttributeError, IndexError):
pass
except Exception as err:
logger.exception("while getting grey cards")
# get ordinary cards
try:
(result_id, anti_cheat) = result.id.split(':')
except ValueError:
if fail is None:
fail = 1
await asyncio.sleep(1)
await inline_query(fail=fail)
return
elif fail < 5:
fail += 1
await asyncio.sleep(1)
await inline_query(fail=fail)
return
else:
return
game.add_card(result_id, anti_cheat)
if game.delay:
await typing_sleep(game.delay)
if print_cards:
print(game.print_cards())
callback_id = game.play_card()
if not callback_id:
callback_id = 'draw'
#await client(SendMessageRequest(unochat, 'Error: No card can be played. Leaving game'))
print(unochat, 'Error: No card can be played. Leaving game')
await client(SendMessageRequest(unochat, "/leave@{}".format(unobot_username)))
return
for tries in range(6):
try:
await client(SendInlineBotResultRequest(
unochat,
query_id,
"{}:{}".format(callback_id, game.anti_cheat)
))
except Exception as err:
logger.critical('Exception: {}'.format(err))
#await client(SendMessageRequest(unochat, 'Exception: {}'.format(err)))
else:
break
await set_typing(cancel=True)
game.rotate_deck()
return True
else:
logger.critical('Bad inline result from bot')
print(bot_results)
return None
def safety_check(chat_id, force=False):
if SAFE_MODE or force:
safe_ids = [-100000000000]
if chat_id in safe_ids:
return True
else:
return False
else:
return True
def commandify(text, my_commands=True, wild_card=True):
args = text.split()
if not args:
return [None]
match = re.match(r'/([^@]+$)', args[0])
if match:
command = match.group(1)
if not wild_card:
return [None]
username = my_username
args = args[1:]
return [command, username, args]
else:
match = re.match(r'/([^@]+)@([^@]+)$', args[0])
if match:
username = match.group(2)
command = match.group(1)
args = args[1:]
if username != my_username and my_commands == True:
return [None]
else:
return [command, username, args]
else:
return [None]
def get_peer_id(peer, reverse=False):
'''
reverse = False:
peer: telethon.tl.types.PeerUser, PeerChat, PeerChannel / User, Chat, Channel
return int (-100xxxx)
reverse = True:
peer: int (-100xxxx)
return int (xxxx)
'''
if reverse:
str_peer = str(peer)
if str_peer.startswith("-100"):
orig_id = str_peer[4:]
return orig_id
else:
return peer
else:
peerid = None
if type(peer) in (PeerChannel, InputPeerChannel):
peerid = getattr(peer, 'channel_id')
peerid = int('-100{}'.format(peerid))
elif type(peer) in [ _Channel, _Chat]:
peerid = getattr(peer, 'id')
peerid = int('-100{}'.format(peerid))
elif type(peer) in (PeerChat, InputPeerChat):
peerid = getattr(peer, 'chat_id')
peerid = int('-100{}'.format(peerid))
elif type(peer) in (PeerUser, InputPeerUser):
peerid = getattr(peer, 'user_id')
elif type(peer) is _User:
peerid = getattr(peer, 'id')
else:
print("Error: ", peer)
if not peerid:
print('W: Peer_id is none')
return peerid
class EmptyChat:
def __init__(self, title=None):
self.title = title
class EmptyUser:
def __init__(self, first_name="None", last_name=None, username=None):
self.first_name = first_name
self.last_name = last_name
self.username = username
def display_username(user, atuser=False, shorten=False):
if user.first_name and user.last_name:
name = "{} {}".format(user.first_name, user.last_name)
else:
name = user.first_name
if shorten:
return name
if user.username:
if atuser:
name += " (@{})".format(user.username)
else:
name += " ({})".format(user.username)
return name
@client.on(events.NewMessage)
async def new_msg_handler(event):
global unochat
global unobot_username, unobot
msg = event.message
group = await event.get_chat()
user = await event.get_sender()
if event.is_channel and msg.message and (not msg.media):
# Text handler
if not safety_check(get_peer_id(msg.to_id)):
return
logger.info("Group - {} - {}: {}".format(group.title, display_username(user), msg.message))
# react to commands
if not disable_all_commands:
c = commandify(event.raw_text, wild_card=False)
if c[0]:
if c[0] == 'hello':
await event.reply('hi!')
if c[0] in ['startgame', 'start', 'join']:
if game.is_playing:
await event.reply("I'm playing right now.")
else:
unochat = msg.to_id
# get unobot name from args
if len(c) == 3 and len(c[2]) >= 1 and c[2][0].endswith('bot'):
unobot_username = c[2][0]
unobot = await client.get_input_entity(unobot_username)
game.join_game(get_peer_id(unochat))
await client(SendMessageRequest(unochat, "/join@{}".format(unobot_username)))
elif c[0] in ['stopgame', 'stop', 'leave']:
if game.is_playing:
game.leave_game(get_peer_id(unochat))
game.stop_game()
await client(SendMessageRequest(unochat, "/leave@{}".format(unobot_username)))
elif game.joined and get_peer_id(unochat) in game.joined:
game.leave_game(get_peer_id(unochat))
await client(SendMessageRequest(unochat, "/leave@{}".format(unobot_username)))
else:
await event.reply("I'm not playing right now.")
elif c[0] in ['wait', 'delay']:
if game.delay or (not game.is_playing):
await event.reply("Nothing to do.")
else:
game.delay = 8
await event.reply("OK. {} seconds of delay has been set.".format(game.delay))
elif c[0] in ['nowait', 'nodelay']:
if game.delay and game.is_playing:
myreply = "OK. {} seconds of delay has been removed.".format(game.delay)
game.delay = None
await event.reply(myreply)
else:
await event.reply("Nothing to do.")
return
# react to unobot
if user.username and user.username in unobot_usernames:
if re.search(game_consts['myturn'], msg.message):
if re.search(game_consts['start'], msg.message):
# I'm the first player
if not game.is_playing:
unochat = msg.to_id
unobot_username = user.username
unobot = await client.get_input_entity(unobot_username)
if game.joined and get_peer_id(unochat) in game.joined:
logger.info('Bot: Game started. I\'m the first player.')
game.start_game()
logger.info('Bot: It\'s my turn.')
if game.joined and get_peer_id(msg.to_id) in game.joined:
if not game.is_playing:
logger.info('Bot: Game started a long time ago. I joined midway.')
unochat = msg.to_id
unobot_username = user.username
unobot = await client.get_input_entity(unobot_username)
game.start_game()
await inline_query()
else:
if not game.is_playing:
logger.info('I\'m not playing in {} - {}, play anyway.'.format(group.title, get_peer_id(msg.to_id)))
unochat = msg.to_id
unobot_username = user.username
unobot = await client.get_input_entity(unobot_username)
game.join_game(get_peer_id(unochat))
game.start_game()
await inline_query()
else:
logger.info('I\'m not playing in {} - {}'.format(group.title, get_peer_id(msg.to_id)))
await client(SendMessageRequest(msg.to_id, "/leave@{}".format(user.username)))
elif re.search(game_consts['end'], msg.message):
if game.is_playing:
logger.info('Bot: Game ended.')
game.clear_deck()
game.leave_game(get_peer_id(unochat))
game.stop_game()
game.delay = default_delay
elif re.search(game_consts['create'], msg.message):
if not game.is_playing:
logger.info('Bot: New game created.')
if default_delay:
await asyncio.sleep(default_delay)
unochat = msg.to_id
unobot_username = user.username
game.join_game(get_peer_id(unochat))
await client(SendMessageRequest(unochat, "/join@{}".format(unobot_username)))
elif re.search(game_consts['start'], msg.message):
if not game.is_playing:
unochat = msg.to_id
unobot_username = user.username
unobot = await client.get_input_entity(unobot_username)
if game.joined and get_peer_id(unochat) in game.joined:
logger.info('Bot: Game started.')
game.start_game()
else:
await client(SendMessageRequest(unochat, "/leave@{}".format(unobot_username)))
elif re.search(game_consts['win'], msg.message):
if game.is_playing:
logger.info('Bot: I win.')
game.clear_deck()
game.leave_game(get_peer_id(unochat))
game.stop_game()
async def main():
await client.run_until_disconnected()
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
except KeyboardInterrupt:
client.disconnect()