988 lines
37 KiB
Python
988 lines
37 KiB
Python
"""Support to send and receive Telegram messages."""
|
|
from functools import partial
|
|
import importlib
|
|
import io
|
|
from ipaddress import ip_network
|
|
import logging
|
|
|
|
import requests
|
|
from requests.auth import HTTPBasicAuth, HTTPDigestAuth
|
|
from telegram import (
|
|
Bot,
|
|
InlineKeyboardButton,
|
|
InlineKeyboardMarkup,
|
|
ReplyKeyboardMarkup,
|
|
ReplyKeyboardRemove,
|
|
)
|
|
from telegram.error import TelegramError
|
|
from telegram.parsemode import ParseMode
|
|
from telegram.utils.request import Request
|
|
import voluptuous as vol
|
|
|
|
from homeassistant.const import (
|
|
ATTR_COMMAND,
|
|
ATTR_LATITUDE,
|
|
ATTR_LONGITUDE,
|
|
CONF_API_KEY,
|
|
CONF_PLATFORM,
|
|
CONF_URL,
|
|
HTTP_BEARER_AUTHENTICATION,
|
|
HTTP_DIGEST_AUTHENTICATION,
|
|
)
|
|
from homeassistant.core import HomeAssistant, ServiceCall
|
|
from homeassistant.exceptions import TemplateError
|
|
import homeassistant.helpers.config_validation as cv
|
|
from homeassistant.helpers.typing import ConfigType
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
ATTR_DATA = "data"
|
|
ATTR_MESSAGE = "message"
|
|
ATTR_TITLE = "title"
|
|
|
|
ATTR_ARGS = "args"
|
|
ATTR_AUTHENTICATION = "authentication"
|
|
ATTR_CALLBACK_QUERY = "callback_query"
|
|
ATTR_CALLBACK_QUERY_ID = "callback_query_id"
|
|
ATTR_CAPTION = "caption"
|
|
ATTR_CHAT_ID = "chat_id"
|
|
ATTR_CHAT_INSTANCE = "chat_instance"
|
|
ATTR_DISABLE_NOTIF = "disable_notification"
|
|
ATTR_DISABLE_WEB_PREV = "disable_web_page_preview"
|
|
ATTR_EDITED_MSG = "edited_message"
|
|
ATTR_FILE = "file"
|
|
ATTR_FROM_FIRST = "from_first"
|
|
ATTR_FROM_LAST = "from_last"
|
|
ATTR_KEYBOARD = "keyboard"
|
|
ATTR_KEYBOARD_INLINE = "inline_keyboard"
|
|
ATTR_MESSAGEID = "message_id"
|
|
ATTR_MSG = "message"
|
|
ATTR_MSGID = "id"
|
|
ATTR_PARSER = "parse_mode"
|
|
ATTR_PASSWORD = "password"
|
|
ATTR_REPLY_TO_MSGID = "reply_to_message_id"
|
|
ATTR_REPLYMARKUP = "reply_markup"
|
|
ATTR_SHOW_ALERT = "show_alert"
|
|
ATTR_STICKER_ID = "sticker_id"
|
|
ATTR_TARGET = "target"
|
|
ATTR_TEXT = "text"
|
|
ATTR_URL = "url"
|
|
ATTR_USER_ID = "user_id"
|
|
ATTR_USERNAME = "username"
|
|
ATTR_VERIFY_SSL = "verify_ssl"
|
|
ATTR_TIMEOUT = "timeout"
|
|
ATTR_MESSAGE_TAG = "message_tag"
|
|
ATTR_CHANNEL_POST = "channel_post"
|
|
|
|
CONF_ALLOWED_CHAT_IDS = "allowed_chat_ids"
|
|
CONF_PROXY_URL = "proxy_url"
|
|
CONF_PROXY_PARAMS = "proxy_params"
|
|
CONF_TRUSTED_NETWORKS = "trusted_networks"
|
|
|
|
DOMAIN = "telegram_bot"
|
|
|
|
SERVICE_SEND_MESSAGE = "send_message"
|
|
SERVICE_SEND_PHOTO = "send_photo"
|
|
SERVICE_SEND_STICKER = "send_sticker"
|
|
SERVICE_SEND_ANIMATION = "send_animation"
|
|
SERVICE_SEND_VIDEO = "send_video"
|
|
SERVICE_SEND_VOICE = "send_voice"
|
|
SERVICE_SEND_DOCUMENT = "send_document"
|
|
SERVICE_SEND_LOCATION = "send_location"
|
|
SERVICE_EDIT_MESSAGE = "edit_message"
|
|
SERVICE_EDIT_CAPTION = "edit_caption"
|
|
SERVICE_EDIT_REPLYMARKUP = "edit_replymarkup"
|
|
SERVICE_ANSWER_CALLBACK_QUERY = "answer_callback_query"
|
|
SERVICE_DELETE_MESSAGE = "delete_message"
|
|
SERVICE_LEAVE_CHAT = "leave_chat"
|
|
|
|
EVENT_TELEGRAM_CALLBACK = "telegram_callback"
|
|
EVENT_TELEGRAM_COMMAND = "telegram_command"
|
|
EVENT_TELEGRAM_TEXT = "telegram_text"
|
|
EVENT_TELEGRAM_SENT = "telegram_sent"
|
|
|
|
PARSER_HTML = "html"
|
|
PARSER_MD = "markdown"
|
|
|
|
DEFAULT_TRUSTED_NETWORKS = [ip_network("149.154.160.0/20"), ip_network("91.108.4.0/22")]
|
|
|
|
CONFIG_SCHEMA = vol.Schema(
|
|
{
|
|
DOMAIN: vol.All(
|
|
cv.ensure_list,
|
|
[
|
|
vol.Schema(
|
|
{
|
|
vol.Required(CONF_PLATFORM): vol.In(
|
|
("broadcast", "polling", "webhooks")
|
|
),
|
|
vol.Required(CONF_API_KEY): cv.string,
|
|
vol.Required(CONF_ALLOWED_CHAT_IDS): vol.All(
|
|
cv.ensure_list, [vol.Coerce(int)]
|
|
),
|
|
vol.Optional(ATTR_PARSER, default=PARSER_MD): cv.string,
|
|
vol.Optional(CONF_PROXY_URL): cv.string,
|
|
vol.Optional(CONF_PROXY_PARAMS): dict,
|
|
# webhooks
|
|
vol.Optional(CONF_URL): cv.url,
|
|
vol.Optional(
|
|
CONF_TRUSTED_NETWORKS, default=DEFAULT_TRUSTED_NETWORKS
|
|
): vol.All(cv.ensure_list, [ip_network]),
|
|
}
|
|
)
|
|
],
|
|
)
|
|
},
|
|
extra=vol.ALLOW_EXTRA,
|
|
)
|
|
|
|
BASE_SERVICE_SCHEMA = vol.Schema(
|
|
{
|
|
vol.Optional(ATTR_TARGET): vol.All(cv.ensure_list, [vol.Coerce(int)]),
|
|
vol.Optional(ATTR_PARSER): cv.string,
|
|
vol.Optional(ATTR_DISABLE_NOTIF): cv.boolean,
|
|
vol.Optional(ATTR_DISABLE_WEB_PREV): cv.boolean,
|
|
vol.Optional(ATTR_KEYBOARD): vol.All(cv.ensure_list, [cv.string]),
|
|
vol.Optional(ATTR_KEYBOARD_INLINE): cv.ensure_list,
|
|
vol.Optional(ATTR_TIMEOUT): cv.positive_int,
|
|
vol.Optional(ATTR_MESSAGE_TAG): cv.string,
|
|
},
|
|
extra=vol.ALLOW_EXTRA,
|
|
)
|
|
|
|
SERVICE_SCHEMA_SEND_MESSAGE = BASE_SERVICE_SCHEMA.extend(
|
|
{vol.Required(ATTR_MESSAGE): cv.template, vol.Optional(ATTR_TITLE): cv.template}
|
|
)
|
|
|
|
SERVICE_SCHEMA_SEND_FILE = BASE_SERVICE_SCHEMA.extend(
|
|
{
|
|
vol.Optional(ATTR_URL): cv.template,
|
|
vol.Optional(ATTR_FILE): cv.template,
|
|
vol.Optional(ATTR_CAPTION): cv.template,
|
|
vol.Optional(ATTR_USERNAME): cv.string,
|
|
vol.Optional(ATTR_PASSWORD): cv.string,
|
|
vol.Optional(ATTR_AUTHENTICATION): cv.string,
|
|
vol.Optional(ATTR_VERIFY_SSL): cv.boolean,
|
|
}
|
|
)
|
|
|
|
SERVICE_SCHEMA_SEND_STICKER = SERVICE_SCHEMA_SEND_FILE.extend(
|
|
{vol.Optional(ATTR_STICKER_ID): cv.string}
|
|
)
|
|
|
|
SERVICE_SCHEMA_SEND_LOCATION = BASE_SERVICE_SCHEMA.extend(
|
|
{
|
|
vol.Required(ATTR_LONGITUDE): cv.template,
|
|
vol.Required(ATTR_LATITUDE): cv.template,
|
|
}
|
|
)
|
|
|
|
SERVICE_SCHEMA_EDIT_MESSAGE = SERVICE_SCHEMA_SEND_MESSAGE.extend(
|
|
{
|
|
vol.Required(ATTR_MESSAGEID): vol.Any(
|
|
cv.positive_int, vol.All(cv.string, "last")
|
|
),
|
|
vol.Required(ATTR_CHAT_ID): vol.Coerce(int),
|
|
}
|
|
)
|
|
|
|
SERVICE_SCHEMA_EDIT_CAPTION = vol.Schema(
|
|
{
|
|
vol.Required(ATTR_MESSAGEID): vol.Any(
|
|
cv.positive_int, vol.All(cv.string, "last")
|
|
),
|
|
vol.Required(ATTR_CHAT_ID): vol.Coerce(int),
|
|
vol.Required(ATTR_CAPTION): cv.template,
|
|
vol.Optional(ATTR_KEYBOARD_INLINE): cv.ensure_list,
|
|
},
|
|
extra=vol.ALLOW_EXTRA,
|
|
)
|
|
|
|
SERVICE_SCHEMA_EDIT_REPLYMARKUP = vol.Schema(
|
|
{
|
|
vol.Required(ATTR_MESSAGEID): vol.Any(
|
|
cv.positive_int, vol.All(cv.string, "last")
|
|
),
|
|
vol.Required(ATTR_CHAT_ID): vol.Coerce(int),
|
|
vol.Required(ATTR_KEYBOARD_INLINE): cv.ensure_list,
|
|
},
|
|
extra=vol.ALLOW_EXTRA,
|
|
)
|
|
|
|
SERVICE_SCHEMA_ANSWER_CALLBACK_QUERY = vol.Schema(
|
|
{
|
|
vol.Required(ATTR_MESSAGE): cv.template,
|
|
vol.Required(ATTR_CALLBACK_QUERY_ID): vol.Coerce(int),
|
|
vol.Optional(ATTR_SHOW_ALERT): cv.boolean,
|
|
},
|
|
extra=vol.ALLOW_EXTRA,
|
|
)
|
|
|
|
SERVICE_SCHEMA_DELETE_MESSAGE = vol.Schema(
|
|
{
|
|
vol.Required(ATTR_CHAT_ID): vol.Coerce(int),
|
|
vol.Required(ATTR_MESSAGEID): vol.Any(
|
|
cv.positive_int, vol.All(cv.string, "last")
|
|
),
|
|
},
|
|
extra=vol.ALLOW_EXTRA,
|
|
)
|
|
|
|
SERVICE_SCHEMA_LEAVE_CHAT = vol.Schema({vol.Required(ATTR_CHAT_ID): vol.Coerce(int)})
|
|
|
|
SERVICE_MAP = {
|
|
SERVICE_SEND_MESSAGE: SERVICE_SCHEMA_SEND_MESSAGE,
|
|
SERVICE_SEND_PHOTO: SERVICE_SCHEMA_SEND_FILE,
|
|
SERVICE_SEND_STICKER: SERVICE_SCHEMA_SEND_STICKER,
|
|
SERVICE_SEND_ANIMATION: SERVICE_SCHEMA_SEND_FILE,
|
|
SERVICE_SEND_VIDEO: SERVICE_SCHEMA_SEND_FILE,
|
|
SERVICE_SEND_VOICE: SERVICE_SCHEMA_SEND_FILE,
|
|
SERVICE_SEND_DOCUMENT: SERVICE_SCHEMA_SEND_FILE,
|
|
SERVICE_SEND_LOCATION: SERVICE_SCHEMA_SEND_LOCATION,
|
|
SERVICE_EDIT_MESSAGE: SERVICE_SCHEMA_EDIT_MESSAGE,
|
|
SERVICE_EDIT_CAPTION: SERVICE_SCHEMA_EDIT_CAPTION,
|
|
SERVICE_EDIT_REPLYMARKUP: SERVICE_SCHEMA_EDIT_REPLYMARKUP,
|
|
SERVICE_ANSWER_CALLBACK_QUERY: SERVICE_SCHEMA_ANSWER_CALLBACK_QUERY,
|
|
SERVICE_DELETE_MESSAGE: SERVICE_SCHEMA_DELETE_MESSAGE,
|
|
SERVICE_LEAVE_CHAT: SERVICE_SCHEMA_LEAVE_CHAT,
|
|
}
|
|
|
|
|
|
def load_data(
|
|
hass,
|
|
url=None,
|
|
filepath=None,
|
|
username=None,
|
|
password=None,
|
|
authentication=None,
|
|
num_retries=5,
|
|
verify_ssl=None,
|
|
):
|
|
"""Load data into ByteIO/File container from a source."""
|
|
try:
|
|
if url is not None:
|
|
# Load data from URL
|
|
params = {"timeout": 15}
|
|
if authentication == HTTP_BEARER_AUTHENTICATION and password is not None:
|
|
params["headers"] = {"Authorization": f"Bearer {password}"}
|
|
elif username is not None and password is not None:
|
|
if authentication == HTTP_DIGEST_AUTHENTICATION:
|
|
params["auth"] = HTTPDigestAuth(username, password)
|
|
else:
|
|
params["auth"] = HTTPBasicAuth(username, password)
|
|
if verify_ssl is not None:
|
|
params["verify"] = verify_ssl
|
|
retry_num = 0
|
|
while retry_num < num_retries:
|
|
req = requests.get(url, **params)
|
|
if not req.ok:
|
|
_LOGGER.warning(
|
|
"Status code %s (retry #%s) loading %s",
|
|
req.status_code,
|
|
retry_num + 1,
|
|
url,
|
|
)
|
|
else:
|
|
data = io.BytesIO(req.content)
|
|
if data.read():
|
|
data.seek(0)
|
|
data.name = url
|
|
return data
|
|
_LOGGER.warning("Empty data (retry #%s) in %s)", retry_num + 1, url)
|
|
retry_num += 1
|
|
_LOGGER.warning("Can't load data in %s after %s retries", url, retry_num)
|
|
elif filepath is not None:
|
|
if hass.config.is_allowed_path(filepath):
|
|
return open(filepath, "rb")
|
|
|
|
_LOGGER.warning("'%s' are not secure to load data from!", filepath)
|
|
else:
|
|
_LOGGER.warning("Can't load data. No data found in params!")
|
|
|
|
except (OSError, TypeError) as error:
|
|
_LOGGER.error("Can't load data into ByteIO: %s", error)
|
|
|
|
return None
|
|
|
|
|
|
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
|
|
"""Set up the Telegram bot component."""
|
|
if not config[DOMAIN]:
|
|
return False
|
|
|
|
for p_config in config[DOMAIN]:
|
|
|
|
p_type = p_config.get(CONF_PLATFORM)
|
|
|
|
platform = importlib.import_module(f".{p_config[CONF_PLATFORM]}", __name__)
|
|
|
|
_LOGGER.info("Setting up %s.%s", DOMAIN, p_type)
|
|
try:
|
|
receiver_service = await platform.async_setup_platform(hass, p_config)
|
|
if receiver_service is False:
|
|
_LOGGER.error("Failed to initialize Telegram bot %s", p_type)
|
|
return False
|
|
|
|
except Exception: # pylint: disable=broad-except
|
|
_LOGGER.exception("Error setting up platform %s", p_type)
|
|
return False
|
|
|
|
bot = initialize_bot(p_config)
|
|
notify_service = TelegramNotificationService(
|
|
hass, bot, p_config.get(CONF_ALLOWED_CHAT_IDS), p_config.get(ATTR_PARSER)
|
|
)
|
|
|
|
async def async_send_telegram_message(service: ServiceCall) -> None:
|
|
"""Handle sending Telegram Bot message service calls."""
|
|
|
|
def _render_template_attr(data, attribute):
|
|
if attribute_templ := data.get(attribute):
|
|
if any(
|
|
isinstance(attribute_templ, vtype) for vtype in (float, int, str)
|
|
):
|
|
data[attribute] = attribute_templ
|
|
else:
|
|
attribute_templ.hass = hass
|
|
try:
|
|
data[attribute] = attribute_templ.async_render(
|
|
parse_result=False
|
|
)
|
|
except TemplateError as exc:
|
|
_LOGGER.error(
|
|
"TemplateError in %s: %s -> %s",
|
|
attribute,
|
|
attribute_templ.template,
|
|
exc,
|
|
)
|
|
data[attribute] = attribute_templ.template
|
|
|
|
msgtype = service.service
|
|
kwargs = dict(service.data)
|
|
for attribute in (
|
|
ATTR_MESSAGE,
|
|
ATTR_TITLE,
|
|
ATTR_URL,
|
|
ATTR_FILE,
|
|
ATTR_CAPTION,
|
|
ATTR_LONGITUDE,
|
|
ATTR_LATITUDE,
|
|
):
|
|
_render_template_attr(kwargs, attribute)
|
|
_LOGGER.debug("New telegram message %s: %s", msgtype, kwargs)
|
|
|
|
if msgtype == SERVICE_SEND_MESSAGE:
|
|
await hass.async_add_executor_job(
|
|
partial(notify_service.send_message, **kwargs)
|
|
)
|
|
elif msgtype in [
|
|
SERVICE_SEND_PHOTO,
|
|
SERVICE_SEND_ANIMATION,
|
|
SERVICE_SEND_VIDEO,
|
|
SERVICE_SEND_VOICE,
|
|
SERVICE_SEND_DOCUMENT,
|
|
]:
|
|
await hass.async_add_executor_job(
|
|
partial(notify_service.send_file, msgtype, **kwargs)
|
|
)
|
|
elif msgtype == SERVICE_SEND_STICKER:
|
|
await hass.async_add_executor_job(
|
|
partial(notify_service.send_sticker, **kwargs)
|
|
)
|
|
elif msgtype == SERVICE_SEND_LOCATION:
|
|
await hass.async_add_executor_job(
|
|
partial(notify_service.send_location, **kwargs)
|
|
)
|
|
elif msgtype == SERVICE_ANSWER_CALLBACK_QUERY:
|
|
await hass.async_add_executor_job(
|
|
partial(notify_service.answer_callback_query, **kwargs)
|
|
)
|
|
elif msgtype == SERVICE_DELETE_MESSAGE:
|
|
await hass.async_add_executor_job(
|
|
partial(notify_service.delete_message, **kwargs)
|
|
)
|
|
else:
|
|
await hass.async_add_executor_job(
|
|
partial(notify_service.edit_message, msgtype, **kwargs)
|
|
)
|
|
|
|
# Register notification services
|
|
for service_notif, schema in SERVICE_MAP.items():
|
|
hass.services.async_register(
|
|
DOMAIN, service_notif, async_send_telegram_message, schema=schema
|
|
)
|
|
|
|
return True
|
|
|
|
|
|
def initialize_bot(p_config):
|
|
"""Initialize telegram bot with proxy support."""
|
|
|
|
api_key = p_config.get(CONF_API_KEY)
|
|
proxy_url = p_config.get(CONF_PROXY_URL)
|
|
proxy_params = p_config.get(CONF_PROXY_PARAMS)
|
|
|
|
if proxy_url is not None:
|
|
request = Request(
|
|
con_pool_size=8, proxy_url=proxy_url, urllib3_proxy_kwargs=proxy_params
|
|
)
|
|
else:
|
|
request = Request(con_pool_size=8)
|
|
return Bot(token=api_key, request=request)
|
|
|
|
|
|
class TelegramNotificationService:
|
|
"""Implement the notification services for the Telegram Bot domain."""
|
|
|
|
def __init__(self, hass, bot, allowed_chat_ids, parser):
|
|
"""Initialize the service."""
|
|
|
|
self.allowed_chat_ids = allowed_chat_ids
|
|
self._default_user = self.allowed_chat_ids[0]
|
|
self._last_message_id = {user: None for user in self.allowed_chat_ids}
|
|
self._parsers = {PARSER_HTML: ParseMode.HTML, PARSER_MD: ParseMode.MARKDOWN}
|
|
self._parse_mode = self._parsers.get(parser)
|
|
self.bot = bot
|
|
self.hass = hass
|
|
|
|
def _get_msg_ids(self, msg_data, chat_id):
|
|
"""Get the message id to edit.
|
|
|
|
This can be one of (message_id, inline_message_id) from a msg dict,
|
|
returning a tuple.
|
|
**You can use 'last' as message_id** to edit
|
|
the message last sent in the chat_id.
|
|
"""
|
|
message_id = inline_message_id = None
|
|
if ATTR_MESSAGEID in msg_data:
|
|
message_id = msg_data[ATTR_MESSAGEID]
|
|
if (
|
|
isinstance(message_id, str)
|
|
and (message_id == "last")
|
|
and (self._last_message_id[chat_id] is not None)
|
|
):
|
|
message_id = self._last_message_id[chat_id]
|
|
else:
|
|
inline_message_id = msg_data["inline_message_id"]
|
|
return message_id, inline_message_id
|
|
|
|
def _get_target_chat_ids(self, target):
|
|
"""Validate chat_id targets or return default target (first).
|
|
|
|
:param target: optional list of integers ([12234, -12345])
|
|
:return list of chat_id targets (integers)
|
|
"""
|
|
if target is not None:
|
|
if isinstance(target, int):
|
|
target = [target]
|
|
chat_ids = [t for t in target if t in self.allowed_chat_ids]
|
|
if chat_ids:
|
|
return chat_ids
|
|
_LOGGER.warning(
|
|
"Disallowed targets: %s, using default: %s", target, self._default_user
|
|
)
|
|
return [self._default_user]
|
|
|
|
def _get_msg_kwargs(self, data):
|
|
"""Get parameters in message data kwargs."""
|
|
|
|
def _make_row_inline_keyboard(row_keyboard):
|
|
"""Make a list of InlineKeyboardButtons.
|
|
|
|
It can accept:
|
|
- a list of tuples like:
|
|
`[(text_b1, data_callback_b1),
|
|
(text_b2, data_callback_b2), ...]
|
|
- a string like: `/cmd1, /cmd2, /cmd3`
|
|
- or a string like: `text_b1:/cmd1, text_b2:/cmd2`
|
|
"""
|
|
|
|
buttons = []
|
|
if isinstance(row_keyboard, str):
|
|
for key in row_keyboard.split(","):
|
|
if ":/" in key:
|
|
# commands like: 'Label:/cmd' become ('Label', '/cmd')
|
|
label = key.split(":/")[0]
|
|
command = key[len(label) + 1 :]
|
|
buttons.append(
|
|
InlineKeyboardButton(label, callback_data=command)
|
|
)
|
|
else:
|
|
# commands like: '/cmd' become ('CMD', '/cmd')
|
|
label = key.strip()[1:].upper()
|
|
buttons.append(InlineKeyboardButton(label, callback_data=key))
|
|
elif isinstance(row_keyboard, list):
|
|
for entry in row_keyboard:
|
|
text_btn, data_btn = entry
|
|
buttons.append(
|
|
InlineKeyboardButton(text_btn, callback_data=data_btn)
|
|
)
|
|
else:
|
|
raise ValueError(str(row_keyboard))
|
|
return buttons
|
|
|
|
# Defaults
|
|
params = {
|
|
ATTR_PARSER: self._parse_mode,
|
|
ATTR_DISABLE_NOTIF: False,
|
|
ATTR_DISABLE_WEB_PREV: None,
|
|
ATTR_REPLY_TO_MSGID: None,
|
|
ATTR_REPLYMARKUP: None,
|
|
ATTR_TIMEOUT: None,
|
|
ATTR_MESSAGE_TAG: None,
|
|
}
|
|
if data is not None:
|
|
if ATTR_PARSER in data:
|
|
params[ATTR_PARSER] = self._parsers.get(
|
|
data[ATTR_PARSER], self._parse_mode
|
|
)
|
|
if ATTR_TIMEOUT in data:
|
|
params[ATTR_TIMEOUT] = data[ATTR_TIMEOUT]
|
|
if ATTR_DISABLE_NOTIF in data:
|
|
params[ATTR_DISABLE_NOTIF] = data[ATTR_DISABLE_NOTIF]
|
|
if ATTR_DISABLE_WEB_PREV in data:
|
|
params[ATTR_DISABLE_WEB_PREV] = data[ATTR_DISABLE_WEB_PREV]
|
|
if ATTR_REPLY_TO_MSGID in data:
|
|
params[ATTR_REPLY_TO_MSGID] = data[ATTR_REPLY_TO_MSGID]
|
|
if ATTR_MESSAGE_TAG in data:
|
|
params[ATTR_MESSAGE_TAG] = data[ATTR_MESSAGE_TAG]
|
|
# Keyboards:
|
|
if ATTR_KEYBOARD in data:
|
|
keys = data.get(ATTR_KEYBOARD)
|
|
keys = keys if isinstance(keys, list) else [keys]
|
|
if keys:
|
|
params[ATTR_REPLYMARKUP] = ReplyKeyboardMarkup(
|
|
[[key.strip() for key in row.split(",")] for row in keys]
|
|
)
|
|
else:
|
|
params[ATTR_REPLYMARKUP] = ReplyKeyboardRemove(True)
|
|
|
|
elif ATTR_KEYBOARD_INLINE in data:
|
|
keys = data.get(ATTR_KEYBOARD_INLINE)
|
|
keys = keys if isinstance(keys, list) else [keys]
|
|
params[ATTR_REPLYMARKUP] = InlineKeyboardMarkup(
|
|
[_make_row_inline_keyboard(row) for row in keys]
|
|
)
|
|
return params
|
|
|
|
def _send_msg(self, func_send, msg_error, message_tag, *args_msg, **kwargs_msg):
|
|
"""Send one message."""
|
|
|
|
try:
|
|
out = func_send(*args_msg, **kwargs_msg)
|
|
if not isinstance(out, bool) and hasattr(out, ATTR_MESSAGEID):
|
|
chat_id = out.chat_id
|
|
message_id = out[ATTR_MESSAGEID]
|
|
self._last_message_id[chat_id] = message_id
|
|
_LOGGER.debug(
|
|
"Last message ID: %s (from chat_id %s)",
|
|
self._last_message_id,
|
|
chat_id,
|
|
)
|
|
|
|
event_data = {
|
|
ATTR_CHAT_ID: chat_id,
|
|
ATTR_MESSAGEID: message_id,
|
|
}
|
|
if message_tag is not None:
|
|
event_data[ATTR_MESSAGE_TAG] = message_tag
|
|
self.hass.bus.fire(EVENT_TELEGRAM_SENT, event_data)
|
|
elif not isinstance(out, bool):
|
|
_LOGGER.warning(
|
|
"Update last message: out_type:%s, out=%s", type(out), out
|
|
)
|
|
return out
|
|
except TelegramError as exc:
|
|
_LOGGER.error(
|
|
"%s: %s. Args: %s, kwargs: %s", msg_error, exc, args_msg, kwargs_msg
|
|
)
|
|
|
|
def send_message(self, message="", target=None, **kwargs):
|
|
"""Send a message to one or multiple pre-allowed chat IDs."""
|
|
title = kwargs.get(ATTR_TITLE)
|
|
text = f"{title}\n{message}" if title else message
|
|
params = self._get_msg_kwargs(kwargs)
|
|
for chat_id in self._get_target_chat_ids(target):
|
|
_LOGGER.debug("Send message in chat ID %s with params: %s", chat_id, params)
|
|
self._send_msg(
|
|
self.bot.send_message,
|
|
"Error sending message",
|
|
params[ATTR_MESSAGE_TAG],
|
|
chat_id,
|
|
text,
|
|
parse_mode=params[ATTR_PARSER],
|
|
disable_web_page_preview=params[ATTR_DISABLE_WEB_PREV],
|
|
disable_notification=params[ATTR_DISABLE_NOTIF],
|
|
reply_to_message_id=params[ATTR_REPLY_TO_MSGID],
|
|
reply_markup=params[ATTR_REPLYMARKUP],
|
|
timeout=params[ATTR_TIMEOUT],
|
|
)
|
|
|
|
def delete_message(self, chat_id=None, **kwargs):
|
|
"""Delete a previously sent message."""
|
|
chat_id = self._get_target_chat_ids(chat_id)[0]
|
|
message_id, _ = self._get_msg_ids(kwargs, chat_id)
|
|
_LOGGER.debug("Delete message %s in chat ID %s", message_id, chat_id)
|
|
deleted = self._send_msg(
|
|
self.bot.delete_message, "Error deleting message", None, chat_id, message_id
|
|
)
|
|
# reduce message_id anyway:
|
|
if self._last_message_id[chat_id] is not None:
|
|
# change last msg_id for deque(n_msgs)?
|
|
self._last_message_id[chat_id] -= 1
|
|
return deleted
|
|
|
|
def edit_message(self, type_edit, chat_id=None, **kwargs):
|
|
"""Edit a previously sent message."""
|
|
chat_id = self._get_target_chat_ids(chat_id)[0]
|
|
message_id, inline_message_id = self._get_msg_ids(kwargs, chat_id)
|
|
params = self._get_msg_kwargs(kwargs)
|
|
_LOGGER.debug(
|
|
"Edit message %s in chat ID %s with params: %s",
|
|
message_id or inline_message_id,
|
|
chat_id,
|
|
params,
|
|
)
|
|
if type_edit == SERVICE_EDIT_MESSAGE:
|
|
message = kwargs.get(ATTR_MESSAGE)
|
|
title = kwargs.get(ATTR_TITLE)
|
|
text = f"{title}\n{message}" if title else message
|
|
_LOGGER.debug("Editing message with ID %s", message_id or inline_message_id)
|
|
return self._send_msg(
|
|
self.bot.edit_message_text,
|
|
"Error editing text message",
|
|
params[ATTR_MESSAGE_TAG],
|
|
text,
|
|
chat_id=chat_id,
|
|
message_id=message_id,
|
|
inline_message_id=inline_message_id,
|
|
parse_mode=params[ATTR_PARSER],
|
|
disable_web_page_preview=params[ATTR_DISABLE_WEB_PREV],
|
|
reply_markup=params[ATTR_REPLYMARKUP],
|
|
timeout=params[ATTR_TIMEOUT],
|
|
)
|
|
if type_edit == SERVICE_EDIT_CAPTION:
|
|
return self._send_msg(
|
|
self.bot.edit_message_caption,
|
|
"Error editing message attributes",
|
|
params[ATTR_MESSAGE_TAG],
|
|
chat_id=chat_id,
|
|
message_id=message_id,
|
|
inline_message_id=inline_message_id,
|
|
caption=kwargs.get(ATTR_CAPTION),
|
|
reply_markup=params[ATTR_REPLYMARKUP],
|
|
timeout=params[ATTR_TIMEOUT],
|
|
parse_mode=params[ATTR_PARSER],
|
|
)
|
|
|
|
return self._send_msg(
|
|
self.bot.edit_message_reply_markup,
|
|
"Error editing message attributes",
|
|
params[ATTR_MESSAGE_TAG],
|
|
chat_id=chat_id,
|
|
message_id=message_id,
|
|
inline_message_id=inline_message_id,
|
|
reply_markup=params[ATTR_REPLYMARKUP],
|
|
timeout=params[ATTR_TIMEOUT],
|
|
)
|
|
|
|
def answer_callback_query(
|
|
self, message, callback_query_id, show_alert=False, **kwargs
|
|
):
|
|
"""Answer a callback originated with a press in an inline keyboard."""
|
|
params = self._get_msg_kwargs(kwargs)
|
|
_LOGGER.debug(
|
|
"Answer callback query with callback ID %s: %s, alert: %s",
|
|
callback_query_id,
|
|
message,
|
|
show_alert,
|
|
)
|
|
self._send_msg(
|
|
self.bot.answer_callback_query,
|
|
"Error sending answer callback query",
|
|
params[ATTR_MESSAGE_TAG],
|
|
callback_query_id,
|
|
text=message,
|
|
show_alert=show_alert,
|
|
timeout=params[ATTR_TIMEOUT],
|
|
)
|
|
|
|
def send_file(self, file_type=SERVICE_SEND_PHOTO, target=None, **kwargs):
|
|
"""Send a photo, sticker, video, or document."""
|
|
params = self._get_msg_kwargs(kwargs)
|
|
file_content = load_data(
|
|
self.hass,
|
|
url=kwargs.get(ATTR_URL),
|
|
filepath=kwargs.get(ATTR_FILE),
|
|
username=kwargs.get(ATTR_USERNAME),
|
|
password=kwargs.get(ATTR_PASSWORD),
|
|
authentication=kwargs.get(ATTR_AUTHENTICATION),
|
|
verify_ssl=kwargs.get(ATTR_VERIFY_SSL),
|
|
)
|
|
|
|
if file_content:
|
|
for chat_id in self._get_target_chat_ids(target):
|
|
_LOGGER.debug("Sending file to chat ID %s", chat_id)
|
|
|
|
if file_type == SERVICE_SEND_PHOTO:
|
|
self._send_msg(
|
|
self.bot.send_photo,
|
|
"Error sending photo",
|
|
params[ATTR_MESSAGE_TAG],
|
|
chat_id=chat_id,
|
|
photo=file_content,
|
|
caption=kwargs.get(ATTR_CAPTION),
|
|
disable_notification=params[ATTR_DISABLE_NOTIF],
|
|
reply_markup=params[ATTR_REPLYMARKUP],
|
|
timeout=params[ATTR_TIMEOUT],
|
|
parse_mode=params[ATTR_PARSER],
|
|
)
|
|
|
|
elif file_type == SERVICE_SEND_STICKER:
|
|
self._send_msg(
|
|
self.bot.send_sticker,
|
|
"Error sending sticker",
|
|
params[ATTR_MESSAGE_TAG],
|
|
chat_id=chat_id,
|
|
sticker=file_content,
|
|
disable_notification=params[ATTR_DISABLE_NOTIF],
|
|
reply_markup=params[ATTR_REPLYMARKUP],
|
|
timeout=params[ATTR_TIMEOUT],
|
|
)
|
|
|
|
elif file_type == SERVICE_SEND_VIDEO:
|
|
self._send_msg(
|
|
self.bot.send_video,
|
|
"Error sending video",
|
|
params[ATTR_MESSAGE_TAG],
|
|
chat_id=chat_id,
|
|
video=file_content,
|
|
caption=kwargs.get(ATTR_CAPTION),
|
|
disable_notification=params[ATTR_DISABLE_NOTIF],
|
|
reply_markup=params[ATTR_REPLYMARKUP],
|
|
timeout=params[ATTR_TIMEOUT],
|
|
parse_mode=params[ATTR_PARSER],
|
|
)
|
|
elif file_type == SERVICE_SEND_DOCUMENT:
|
|
self._send_msg(
|
|
self.bot.send_document,
|
|
"Error sending document",
|
|
params[ATTR_MESSAGE_TAG],
|
|
chat_id=chat_id,
|
|
document=file_content,
|
|
caption=kwargs.get(ATTR_CAPTION),
|
|
disable_notification=params[ATTR_DISABLE_NOTIF],
|
|
reply_markup=params[ATTR_REPLYMARKUP],
|
|
timeout=params[ATTR_TIMEOUT],
|
|
parse_mode=params[ATTR_PARSER],
|
|
)
|
|
elif file_type == SERVICE_SEND_VOICE:
|
|
self._send_msg(
|
|
self.bot.send_voice,
|
|
"Error sending voice",
|
|
params[ATTR_MESSAGE_TAG],
|
|
chat_id=chat_id,
|
|
voice=file_content,
|
|
caption=kwargs.get(ATTR_CAPTION),
|
|
disable_notification=params[ATTR_DISABLE_NOTIF],
|
|
reply_markup=params[ATTR_REPLYMARKUP],
|
|
timeout=params[ATTR_TIMEOUT],
|
|
)
|
|
elif file_type == SERVICE_SEND_ANIMATION:
|
|
self._send_msg(
|
|
self.bot.send_animation,
|
|
"Error sending animation",
|
|
params[ATTR_MESSAGE_TAG],
|
|
chat_id=chat_id,
|
|
animation=file_content,
|
|
caption=kwargs.get(ATTR_CAPTION),
|
|
disable_notification=params[ATTR_DISABLE_NOTIF],
|
|
reply_markup=params[ATTR_REPLYMARKUP],
|
|
timeout=params[ATTR_TIMEOUT],
|
|
parse_mode=params[ATTR_PARSER],
|
|
)
|
|
|
|
file_content.seek(0)
|
|
else:
|
|
_LOGGER.error("Can't send file with kwargs: %s", kwargs)
|
|
|
|
def send_sticker(self, target=None, **kwargs):
|
|
"""Send a sticker from a telegram sticker pack."""
|
|
params = self._get_msg_kwargs(kwargs)
|
|
stickerid = kwargs.get(ATTR_STICKER_ID)
|
|
if stickerid:
|
|
for chat_id in self._get_target_chat_ids(target):
|
|
self._send_msg(
|
|
self.bot.send_sticker,
|
|
"Error sending sticker",
|
|
params[ATTR_MESSAGE_TAG],
|
|
chat_id=chat_id,
|
|
sticker=stickerid,
|
|
disable_notification=params[ATTR_DISABLE_NOTIF],
|
|
reply_markup=params[ATTR_REPLYMARKUP],
|
|
timeout=params[ATTR_TIMEOUT],
|
|
)
|
|
else:
|
|
self.send_file(SERVICE_SEND_STICKER, target, **kwargs)
|
|
|
|
def send_location(self, latitude, longitude, target=None, **kwargs):
|
|
"""Send a location."""
|
|
latitude = float(latitude)
|
|
longitude = float(longitude)
|
|
params = self._get_msg_kwargs(kwargs)
|
|
for chat_id in self._get_target_chat_ids(target):
|
|
_LOGGER.debug(
|
|
"Send location %s/%s to chat ID %s", latitude, longitude, chat_id
|
|
)
|
|
self._send_msg(
|
|
self.bot.send_location,
|
|
"Error sending location",
|
|
params[ATTR_MESSAGE_TAG],
|
|
chat_id=chat_id,
|
|
latitude=latitude,
|
|
longitude=longitude,
|
|
disable_notification=params[ATTR_DISABLE_NOTIF],
|
|
timeout=params[ATTR_TIMEOUT],
|
|
)
|
|
|
|
def leave_chat(self, chat_id=None):
|
|
"""Remove bot from chat."""
|
|
chat_id = self._get_target_chat_ids(chat_id)[0]
|
|
_LOGGER.debug("Leave from chat ID %s", chat_id)
|
|
leaved = self._send_msg(
|
|
self.bot.leave_chat, "Error leaving chat", None, chat_id
|
|
)
|
|
return leaved
|
|
|
|
|
|
class BaseTelegramBotEntity:
|
|
"""The base class for the telegram bot."""
|
|
|
|
def __init__(self, hass, allowed_chat_ids):
|
|
"""Initialize the bot base class."""
|
|
self.allowed_chat_ids = allowed_chat_ids
|
|
self.hass = hass
|
|
|
|
def _get_message_data(self, msg_data):
|
|
"""Return boolean msg_data_is_ok and dict msg_data."""
|
|
if not msg_data:
|
|
return False, None
|
|
bad_fields = (
|
|
"text" not in msg_data and "data" not in msg_data and "chat" not in msg_data
|
|
)
|
|
if bad_fields or "from" not in msg_data:
|
|
# Message is not correct.
|
|
_LOGGER.error("Incoming message does not have required data (%s)", msg_data)
|
|
return False, None
|
|
|
|
if (
|
|
msg_data["from"].get("id") not in self.allowed_chat_ids
|
|
and msg_data["chat"].get("id") not in self.allowed_chat_ids
|
|
# and msg_data["message"]["chat"].get("id") not in self.allowed_chat_ids
|
|
):
|
|
# Neither from id nor chat id was in allowed_chat_ids,
|
|
# origin is not allowed.
|
|
_LOGGER.error("Incoming message is not allowed (%s)", msg_data)
|
|
return True, None
|
|
|
|
data = {
|
|
ATTR_USER_ID: msg_data["from"]["id"],
|
|
ATTR_FROM_FIRST: msg_data["from"]["first_name"],
|
|
}
|
|
if "message_id" in msg_data:
|
|
data[ATTR_MSGID] = msg_data["message_id"]
|
|
if "last_name" in msg_data["from"]:
|
|
data[ATTR_FROM_LAST] = msg_data["from"]["last_name"]
|
|
if "chat" in msg_data:
|
|
data[ATTR_CHAT_ID] = msg_data["chat"]["id"]
|
|
elif ATTR_MESSAGE in msg_data and "chat" in msg_data[ATTR_MESSAGE]:
|
|
data[ATTR_CHAT_ID] = msg_data[ATTR_MESSAGE]["chat"]["id"]
|
|
|
|
return True, data
|
|
|
|
def _get_channel_post_data(self, msg_data):
|
|
"""Return boolean msg_data_is_ok and dict msg_data."""
|
|
if not msg_data:
|
|
return False, None
|
|
|
|
if "sender_chat" in msg_data and "chat" in msg_data and "text" in msg_data:
|
|
if (
|
|
msg_data["sender_chat"].get("id") not in self.allowed_chat_ids
|
|
and msg_data["chat"].get("id") not in self.allowed_chat_ids
|
|
):
|
|
# Neither sender_chat id nor chat id was in allowed_chat_ids,
|
|
# origin is not allowed.
|
|
_LOGGER.error("Incoming message is not allowed (%s)", msg_data)
|
|
return True, None
|
|
|
|
data = {
|
|
ATTR_MSGID: msg_data["message_id"],
|
|
ATTR_CHAT_ID: msg_data["chat"]["id"],
|
|
ATTR_TEXT: msg_data["text"],
|
|
}
|
|
return True, data
|
|
|
|
_LOGGER.error("Incoming message does not have required data (%s)", msg_data)
|
|
return False, None
|
|
|
|
def process_message(self, data):
|
|
"""Check for basic message rules and fire an event if message is ok."""
|
|
if ATTR_MSG in data or ATTR_EDITED_MSG in data:
|
|
event = EVENT_TELEGRAM_COMMAND
|
|
if ATTR_MSG in data:
|
|
data = data.get(ATTR_MSG)
|
|
else:
|
|
data = data.get(ATTR_EDITED_MSG)
|
|
message_ok, event_data = self._get_message_data(data)
|
|
if event_data is None:
|
|
return message_ok
|
|
|
|
if ATTR_MSGID in data:
|
|
event_data[ATTR_MSGID] = data[ATTR_MSGID]
|
|
|
|
if "text" in data:
|
|
if data["text"][0] == "/":
|
|
pieces = data["text"].split(" ")
|
|
event_data[ATTR_COMMAND] = pieces[0]
|
|
event_data[ATTR_ARGS] = pieces[1:]
|
|
else:
|
|
event_data[ATTR_TEXT] = data["text"]
|
|
event = EVENT_TELEGRAM_TEXT
|
|
else:
|
|
_LOGGER.warning("Message without text data received: %s", data)
|
|
event_data[ATTR_TEXT] = str(data)
|
|
event = EVENT_TELEGRAM_TEXT
|
|
|
|
self.hass.bus.async_fire(event, event_data)
|
|
return True
|
|
if ATTR_CALLBACK_QUERY in data:
|
|
event = EVENT_TELEGRAM_CALLBACK
|
|
data = data.get(ATTR_CALLBACK_QUERY)
|
|
message_ok, event_data = self._get_message_data(data)
|
|
if event_data is None:
|
|
return message_ok
|
|
|
|
query_data = event_data[ATTR_DATA] = data[ATTR_DATA]
|
|
|
|
if query_data[0] == "/":
|
|
pieces = query_data.split(" ")
|
|
event_data[ATTR_COMMAND] = pieces[0]
|
|
event_data[ATTR_ARGS] = pieces[1:]
|
|
|
|
event_data[ATTR_MSG] = data[ATTR_MSG]
|
|
event_data[ATTR_CHAT_INSTANCE] = data[ATTR_CHAT_INSTANCE]
|
|
event_data[ATTR_MSGID] = data[ATTR_MSGID]
|
|
|
|
self.hass.bus.async_fire(event, event_data)
|
|
return True
|
|
if ATTR_CHANNEL_POST in data:
|
|
event = EVENT_TELEGRAM_TEXT
|
|
data = data.get(ATTR_CHANNEL_POST)
|
|
message_ok, event_data = self._get_channel_post_data(data)
|
|
if event_data is None:
|
|
return message_ok
|
|
|
|
self.hass.bus.async_fire(event, event_data)
|
|
return True
|
|
|
|
_LOGGER.warning("Message with unknown data received: %s", data)
|
|
return True |