-
-
Notifications
You must be signed in to change notification settings - Fork 30.5k
/
notify.py
330 lines (274 loc) · 9.7 KB
/
notify.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
"""Slack platform for notify component."""
from __future__ import annotations
import asyncio
import logging
import os
from typing import Any, TypedDict
from urllib.parse import urlparse
from aiohttp import BasicAuth, FormData
from aiohttp.client_exceptions import ClientError
from slack import WebClient
from slack.errors import SlackApiError
import voluptuous as vol
from homeassistant.components.notify import (
ATTR_DATA,
ATTR_TARGET,
ATTR_TITLE,
BaseNotificationService,
)
from homeassistant.const import ATTR_ICON, CONF_PATH
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers import aiohttp_client, config_validation as cv, template
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
from .const import (
ATTR_BLOCKS,
ATTR_BLOCKS_TEMPLATE,
ATTR_FILE,
ATTR_PASSWORD,
ATTR_PATH,
ATTR_THREAD_TS,
ATTR_URL,
ATTR_USERNAME,
CONF_DEFAULT_CHANNEL,
DATA_CLIENT,
SLACK_DATA,
)
_LOGGER = logging.getLogger(__name__)
FILE_PATH_SCHEMA = vol.Schema({vol.Required(CONF_PATH): cv.isfile})
FILE_URL_SCHEMA = vol.Schema(
{
vol.Required(ATTR_URL): cv.url,
vol.Inclusive(ATTR_USERNAME, "credentials"): cv.string,
vol.Inclusive(ATTR_PASSWORD, "credentials"): cv.string,
}
)
DATA_FILE_SCHEMA = vol.Schema(
{
vol.Required(ATTR_FILE): vol.Any(FILE_PATH_SCHEMA, FILE_URL_SCHEMA),
vol.Optional(ATTR_THREAD_TS): cv.string,
}
)
DATA_TEXT_ONLY_SCHEMA = vol.Schema(
{
vol.Optional(ATTR_USERNAME): cv.string,
vol.Optional(ATTR_ICON): cv.string,
vol.Optional(ATTR_BLOCKS): list,
vol.Optional(ATTR_BLOCKS_TEMPLATE): list,
vol.Optional(ATTR_THREAD_TS): cv.string,
}
)
DATA_SCHEMA = vol.All(
cv.ensure_list, [vol.Any(DATA_FILE_SCHEMA, DATA_TEXT_ONLY_SCHEMA)]
)
class AuthDictT(TypedDict, total=False):
"""Type for auth request data."""
auth: BasicAuth
class FormDataT(TypedDict, total=False):
"""Type for form data, file upload."""
channels: str
filename: str
initial_comment: str
title: str
token: str
thread_ts: str # Optional key
class MessageT(TypedDict, total=False):
"""Type for message data."""
link_names: bool
text: str
username: str # Optional key
icon_url: str # Optional key
icon_emoji: str # Optional key
blocks: list[Any] # Optional key
thread_ts: str # Optional key
async def async_get_service(
hass: HomeAssistant,
config: ConfigType,
discovery_info: DiscoveryInfoType | None = None,
) -> SlackNotificationService | None:
"""Set up the Slack notification service."""
if discovery_info:
return SlackNotificationService(
hass,
discovery_info[SLACK_DATA][DATA_CLIENT],
discovery_info,
)
return None
@callback
def _async_get_filename_from_url(url: str) -> str:
"""Return the filename of a passed URL."""
parsed_url = urlparse(url)
return os.path.basename(parsed_url.path)
@callback
def _async_sanitize_channel_names(channel_list: list[str]) -> list[str]:
"""Remove any # symbols from a channel list."""
return [channel.lstrip("#") for channel in channel_list]
class SlackNotificationService(BaseNotificationService):
"""Define the Slack notification logic."""
def __init__(
self,
hass: HomeAssistant,
client: WebClient,
config: dict[str, str],
) -> None:
"""Initialize."""
self._hass = hass
self._client = client
self._config = config
async def _async_send_local_file_message(
self,
path: str,
targets: list[str],
message: str,
title: str | None,
thread_ts: str | None,
) -> None:
"""Upload a local file (with message) to Slack."""
if not self._hass.config.is_allowed_path(path):
_LOGGER.error("Path does not exist or is not allowed: %s", path)
return
parsed_url = urlparse(path)
filename = os.path.basename(parsed_url.path)
try:
await self._client.files_upload(
channels=",".join(targets),
file=path,
filename=filename,
initial_comment=message,
title=title or filename,
thread_ts=thread_ts or "",
)
except (SlackApiError, ClientError) as err:
_LOGGER.error("Error while uploading file-based message: %r", err)
async def _async_send_remote_file_message(
self,
url: str,
targets: list[str],
message: str,
title: str | None,
thread_ts: str | None,
*,
username: str | None = None,
password: str | None = None,
) -> None:
"""Upload a remote file (with message) to Slack.
Note that we bypass the python-slackclient WebClient and use aiohttp directly,
as the former would require us to download the entire remote file into memory
first before uploading it to Slack.
"""
if not self._hass.config.is_allowed_external_url(url):
_LOGGER.error("URL is not allowed: %s", url)
return
filename = _async_get_filename_from_url(url)
session = aiohttp_client.async_get_clientsession(self._hass)
kwargs: AuthDictT = {}
if username and password is not None:
kwargs = {"auth": BasicAuth(username, password=password)}
resp = await session.request("get", url, **kwargs)
try:
resp.raise_for_status()
except ClientError as err:
_LOGGER.error("Error while retrieving %s: %r", url, err)
return
form_data: FormDataT = {
"channels": ",".join(targets),
"filename": filename,
"initial_comment": message,
"title": title or filename,
"token": self._client.token,
}
if thread_ts:
form_data["thread_ts"] = thread_ts
data = FormData(form_data, charset="utf-8")
data.add_field("file", resp.content, filename=filename)
try:
await session.post("https://slack.com/api/files.upload", data=data)
except ClientError as err:
_LOGGER.error("Error while uploading file message: %r", err)
async def _async_send_text_only_message(
self,
targets: list[str],
message: str,
title: str | None,
thread_ts: str | None,
*,
username: str | None = None,
icon: str | None = None,
blocks: Any | None = None,
) -> None:
"""Send a text-only message."""
message_dict: MessageT = {"link_names": True, "text": message}
if username:
message_dict["username"] = username
if icon:
if icon.lower().startswith(("http://", "https://")):
message_dict["icon_url"] = icon
else:
message_dict["icon_emoji"] = icon
if blocks:
message_dict["blocks"] = blocks
if thread_ts:
message_dict["thread_ts"] = thread_ts
tasks = {
target: self._client.chat_postMessage(**message_dict, channel=target)
for target in targets
}
results = await asyncio.gather(*tasks.values(), return_exceptions=True)
for target, result in zip(tasks, results, strict=False):
if isinstance(result, SlackApiError):
_LOGGER.error(
"There was a Slack API error while sending to %s: %r",
target,
result,
)
elif isinstance(result, ClientError):
_LOGGER.error("Error while sending message to %s: %r", target, result)
async def async_send_message(self, message: str, **kwargs: Any) -> None:
"""Send a message to Slack."""
data = kwargs.get(ATTR_DATA) or {}
try:
DATA_SCHEMA(data)
except vol.Invalid as err:
_LOGGER.error("Invalid message data: %s", err)
data = {}
title = kwargs.get(ATTR_TITLE)
targets = _async_sanitize_channel_names(
kwargs.get(ATTR_TARGET, [self._config[CONF_DEFAULT_CHANNEL]])
)
# Message Type 1: A text-only message
if ATTR_FILE not in data:
if ATTR_BLOCKS_TEMPLATE in data:
value = cv.template_complex(data[ATTR_BLOCKS_TEMPLATE])
template.attach(self._hass, value)
blocks = template.render_complex(value)
elif ATTR_BLOCKS in data:
blocks = data[ATTR_BLOCKS]
else:
blocks = None
return await self._async_send_text_only_message(
targets,
message,
title,
username=data.get(ATTR_USERNAME, self._config.get(ATTR_USERNAME)),
icon=data.get(ATTR_ICON, self._config.get(ATTR_ICON)),
thread_ts=data.get(ATTR_THREAD_TS),
blocks=blocks,
)
# Message Type 2: A message that uploads a remote file
if ATTR_URL in data[ATTR_FILE]:
return await self._async_send_remote_file_message(
data[ATTR_FILE][ATTR_URL],
targets,
message,
title,
thread_ts=data.get(ATTR_THREAD_TS),
username=data[ATTR_FILE].get(ATTR_USERNAME),
password=data[ATTR_FILE].get(ATTR_PASSWORD),
)
# Message Type 3: A message that uploads a local file
return await self._async_send_local_file_message(
data[ATTR_FILE][ATTR_PATH],
targets,
message,
title,
thread_ts=data.get(ATTR_THREAD_TS),
)