tgmsbot/tgmsbot.py
2019-01-14 21:31:42 +08:00

289 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from mscore import Board, check_params
from copy import deepcopy
from telegram import InlineKeyboardMarkup, InlineKeyboardButton
from telegram.ext import Updater, CommandHandler, CallbackQueryHandler
from telegram.error import TimedOut as TimedOutError
from numpy import array_equal
import time
import logging
logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
token = "token_here"
updater = Updater(token, workers=16)
job_queue = updater.job_queue
job_queue.start()
KBD_MIN_INTERVAL = 0.5
KBD_DELAY_SECS = 0.5
HEIGHT = 8
WIDTH = 8
MINES = 9
UNOPENED_CELL = "\u2588"
FLAGGED_CELL = "\u259a"
STEPPED_CELL = "*"
WIN_TEXT_TEMPLATE = "哇所有奇怪的地方都被你打开啦…好羞羞\n" \
"地图Op {s_op} / Is {s_is} / 3bv {s_3bv}\n操作总数 {ops_count}\n" \
"统计:\n{ops_list}\n{last_player} 你要对人家负责哟/// ///\n\n" \
"用时{time}秒,超时{timeouts}\n\n" \
"/mine 开始新游戏"
LOSE_TEXT_TEMPLATE = "一道火光之后,你就在天上飞了呢…好奇怪喵\n" \
"地图Op {s_op} / Is {s_is} / 3bv {s_3bv}\n操作总数 {ops_count}\n" \
"统计:\n{ops_list}\n{last_player} 是我们中出的叛徒!\n\n" \
"用时{time}秒,超时{timeouts}\n\n" \
"/mine 开始新游戏"
def display_username(user, atuser=True, shorten=False, markdown=True):
"""
atuser and shorten has no effect if markdown is True.
"""
name = user.full_name
if markdown:
mdtext = user.mention_markdown(name=user.full_name)
return mdtext
if shorten:
return name
if user.username:
if atuser:
name += " (@{})".format(user.username)
else:
name += " ({})".format(user.username)
return name
class Game():
def __init__(self, board, group, creator):
self.board = board
self.group = group
self.creator = creator
self.__actions = dict()
self.__last_player = None
self.start_time = time.time()
self.stopped = False
# timestamp of the last update keyboard action,
# it is used to calculate time gap between
# two actions and identify unique actions.
self.last_action = 0
# number of timeout error catched
self.timeouts = 0
def save_action(self, user, spot):
'''spot is supposed to be a tuple'''
self.__last_player = user
if self.__actions.get(user, None):
self.__actions[user].append(spot)
else:
self.__actions[user] = [spot,]
def actions_sum(self):
mysum = 0
for user in self.__actions:
count = len(self.__actions.get(user, list()))
mysum += count
return mysum
def get_last_player(self):
return display_username(self.__last_player)
def get_actions(self):
'''Convert actions into text'''
msg = ""
for user in self.__actions:
count = len(self.__actions.get(user, list()))
msg = "{}{} - {}项操作\n".format(msg, display_username(user), count)
return msg
class GameManager:
__games = dict()
def append(self, board, board_hash, group_id, creator_id):
self.__games[board_hash] = Game(board, group_id, creator_id)
def remove(self, board_hash):
board = self.get_game_from_hash(board_hash)
if board:
del self.__games[board_hash]
return True
else:
return False
def get_game_from_hash(self, board_hash):
return self.__games.get(board_hash, None)
def count(self):
return len(self.__games)
game_manager = GameManager()
def send_keyboard(bot, update, args):
msg = update.message
logger.info("Mine from {0}".format(update.message.from_user.id))
# create a game board
if len(args) == 3:
height = HEIGHT
width = WIDTH
mines = MINES
try:
height = int(args[0])
width = int(args[1])
mines = int(args[2])
except:
pass
# telegram doesn't like keyboard width to exceed 8
if width > 8:
width = 8
msg.reply_text('宽度太大已经帮您设置成8了')
# telegram doesn't like keyboard keys to exceed 100
if height * width > 100:
msg.reply_text('格数不能超过100')
return
ck = check_params(height, width, mines)
if ck[0]:
board = Board(height, width, mines)
else:
msg.reply_text(ck[1])
return
elif len(args) == 0:
board = Board(HEIGHT, WIDTH, MINES)
else:
msg.reply_text('你输入的是什么鬼!')
return
bhash = hash(board)
game_manager.append(board, bhash, msg.chat, msg.from_user)
# create a new keyboard
keyboard = list()
for row in range(board.height):
current_row = list()
for col in range(board.width):
cell = InlineKeyboardButton(text=UNOPENED_CELL, callback_data="{} {} {}".format(bhash, row, col))
current_row.append(cell)
keyboard.append(current_row)
# send the keyboard
bot.send_message(chat_id=msg.chat.id, text="路过的大爷~来扫个雷嘛~", reply_to_message_id=msg.message_id,
parse_mode="Markdown", reply_markup=InlineKeyboardMarkup(keyboard))
def send_help(bot, update):
logger.debug("Start from {0}".format(update.message.from_user.id))
msg = update.message
msg.reply_text("这是一个扫雷bot\n\n/mine 开始新游戏")
def send_source(bot, update):
logger.debug("Source from {0}".format(update.message.from_user.id))
update.message.reply_text('Source code: https://git.jerryxiao.com/Jerry/tgmsbot')
def send_status(bot, update):
logger.info("Status from {0}".format(update.message.from_user.id))
count = game_manager.count()
update.message.reply_text('当前进行的游戏: {}'.format(count))
def update_keyboard_request(bot, bhash, game, chat_id, message_id):
current_action_timestamp = time.time()
if current_action_timestamp - game.last_action <= KBD_MIN_INTERVAL:
logger.debug('Rate limit triggered.')
game.last_action = current_action_timestamp
job_queue.run_once(update_keyboard, KBD_DELAY_SECS,
context=(bhash, game, chat_id, message_id, current_action_timestamp))
else:
game.last_action = current_action_timestamp
update_keyboard(bot, None, noqueue=(bhash, game, chat_id, message_id))
def update_keyboard(bot, job, noqueue=None):
if noqueue:
(bhash, game, chat_id, message_id) = noqueue
else:
(bhash, game, chat_id, message_id, current_action_timestamp) = job.context
if current_action_timestamp != game.last_action:
logger.debug('New update action requested, abort this one.')
return
def gen_keyboard(board):
keyboard = list()
for row in range(board.height):
current_row = list()
for col in range(board.width):
if board.map[row][col] <= 9:
cell_text = UNOPENED_CELL
elif board.map[row][col] == 10:
cell_text = " "
elif board.map[row][col] == 19:
cell_text = FLAGGED_CELL
elif board.map[row][col] == 20:
cell_text = STEPPED_CELL
else:
cell_text = str(board.map[row][col] - 10)
cell = InlineKeyboardButton(text=cell_text, callback_data="{} {} {}".format(bhash, row, col))
current_row.append(cell)
keyboard.append(current_row)
return keyboard
keyboard = gen_keyboard(game.board)
try:
bot.edit_message_reply_markup(chat_id=chat_id, message_id=message_id,
reply_markup=InlineKeyboardMarkup(keyboard))
except TimedOutError:
logger.debug('time out in game {}.'.format(bhash))
game.timeouts += 1
def handle_button_click(bot, update):
msg = update.callback_query.message
user = update.callback_query.from_user
chat_id = update.callback_query.message.chat.id
data = update.callback_query.data
logger.debug('Button clicked by {}, data={}.'.format(user.id, data))
bot.answer_callback_query(callback_query_id=update.callback_query.id)
try:
data = data.split(' ')
data = [int(i) for i in data]
(bhash, row, col) = data
except:
logger.info('Unknown callback data: {} from user {}'.format(data, user.id))
return
game = game_manager.get_game_from_hash(bhash)
if game is None:
logger.debug("No game found for hash {}".format(bhash))
return
elif game.stopped:
return
board = game.board
if board.state == 0:
mmap = None
board.move((row, col))
game.save_action(user, (row, col))
update_keyboard_request(bot, bhash, game, chat_id, msg.message_id)
else:
mmap = deepcopy(board.map)
board.move((row, col))
if board.state != 1:
game.stopped = True
# if this is the first move, there's no mmap
if mmap is not None:
game.save_action(user, (row, col))
update_keyboard_request(bot, bhash, game, chat_id, msg.message_id)
(s_op, s_is, s_3bv) = board.gen_statistics()
ops_count = game.actions_sum()
ops_list = game.get_actions()
last_player = game.get_last_player()
time_used = time.time() - game.start_time
timeouts = game.timeouts
if board.state == 2:
template = WIN_TEXT_TEMPLATE
else:
template = LOSE_TEXT_TEMPLATE
myreply = template.format(s_op=s_op, s_is=s_is, s_3bv=s_3bv, ops_count=ops_count,
ops_list=ops_list, last_player=last_player,
time=round(time_used, 4), timeouts=timeouts)
try:
msg.reply_text(myreply, parse_mode="Markdown")
except TimedOutError:
logger.debug('timeout sending report for game {}'.format(bhash))
game_manager.remove(bhash)
elif mmap is not None and (not array_equal(board.map, mmap)):
game.save_action(user, (row, col))
update_keyboard_request(bot, bhash, game, chat_id, msg.message_id)
updater.dispatcher.add_handler(CommandHandler('start', send_help))
updater.dispatcher.add_handler(CommandHandler('mine', send_keyboard, pass_args=True))
updater.dispatcher.add_handler(CommandHandler('status', send_status))
updater.dispatcher.add_handler(CommandHandler('source', send_source))
updater.dispatcher.add_handler(CallbackQueryHandler(handle_button_click))
updater.start_polling()
updater.idle()