-
-
Notifications
You must be signed in to change notification settings - Fork 0
/
__init__.py
128 lines (115 loc) · 3.83 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
import bleach
from datasette import hookimpl, __version__
from datasette.utils.asgi import Response
from feedgen.feed import FeedGenerator
import hashlib
import html
REQUIRED_COLUMNS = {"atom_id", "atom_updated", "atom_title"}
@hookimpl
def register_output_renderer():
return {"extension": "atom", "render": render_atom, "can_render": can_render_atom}
def render_atom(
datasette, request, sql, columns, rows, database, table, query_name, view_name, data
):
from datasette.views.base import DatasetteError
if not REQUIRED_COLUMNS.issubset(columns):
raise DatasetteError(
"SQL query must return columns {}".format(", ".join(REQUIRED_COLUMNS)),
status=400,
)
fg = FeedGenerator()
fg.generator(
generator="Datasette",
version=__version__,
uri="https://github.com/simonw/datasette",
)
fg.id(request.url)
fg.link(href=request.url, rel="self")
fg.updated(max(row["atom_updated"] for row in rows))
title = request.args.get("_feed_title", sql)
if table:
title += "/" + table
if data.get("human_description_en"):
title += ": " + data["human_description_en"]
# If this is a canned query the configured title for that over-rides all others
if query_name:
try:
title = datasette.metadata(database=database)["queries"][query_name][
"title"
]
except (KeyError, TypeError):
pass
fg.title(title)
clean_function = clean
if query_name:
# Check allow_unsafe_html_in_canned_queries
plugin_config = datasette.plugin_config("datasette-atom")
if plugin_config:
allow_unsafe_html_in_canned_queries = plugin_config.get(
"allow_unsafe_html_in_canned_queries"
)
if allow_unsafe_html_in_canned_queries is True:
clean_function = lambda s: s
elif isinstance(allow_unsafe_html_in_canned_queries, dict):
allowlist = allow_unsafe_html_in_canned_queries.get(database) or []
if query_name in allowlist:
clean_function = lambda s: s
# And the rows
for row in reversed(rows):
entry = fg.add_entry()
entry.id(str(row["atom_id"]))
if "atom_content_html" in columns:
entry.content(clean_function(row["atom_content_html"]), type="html")
elif "atom_content" in columns:
entry.content(row["atom_content"], type="text")
entry.updated(row["atom_updated"])
entry.title(str(row["atom_title"]))
# atom_link is optional
if "atom_link" in columns:
entry.link(href=row["atom_link"])
if "atom_author_name" in columns and row["atom_author_name"]:
author = {
"name": row["atom_author_name"],
}
for key in ("uri", "email"):
colname = "atom_author_{}".format(key)
if colname in columns and row[colname]:
author[key] = row[colname]
entry.author(author)
return Response(
fg.atom_str(pretty=True),
content_type="application/xml; charset=utf-8",
status=200,
)
def can_render_atom(columns):
return REQUIRED_COLUMNS.issubset(columns)
def clean(html):
cleaned = bleach.clean(
html,
tags=[
"a",
"abbr",
"acronym",
"b",
"blockquote",
"br",
"code",
"em",
"i",
"li",
"ol",
"strong",
"ul",
"pre",
"p",
"h1",
"h2",
"h3",
"h4",
"h5",
"h6",
"img",
],
attributes={"a": ["href", "title"], "img": ["alt", "src"]},
)
return cleaned