Mercurial > libervia-web
comparison libervia/web/pages/blog/view/page_meta.py @ 1518:eb00d593801d
refactoring: rename `libervia` to `libervia.web` + update imports following backend changes
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Jun 2023 16:49:28 +0200 |
parents | libervia/pages/blog/view/page_meta.py@106bae41f5c8 |
children |
comparison
equal
deleted
inserted
replaced
1517:b8ed9726525b | 1518:eb00d593801d |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 import html | |
4 from typing import Any, Dict, Optional | |
5 | |
6 from libervia.backend.core.i18n import D_, _ | |
7 from libervia.backend.core.log import getLogger | |
8 from libervia.backend.tools.common import uri | |
9 from libervia.backend.tools.common import data_format | |
10 from libervia.backend.tools.common import regex | |
11 from libervia.backend.tools.common.template import safe | |
12 from twisted.web import server | |
13 from twisted.words.protocols.jabber import jid | |
14 | |
15 from libervia.web.server import utils | |
16 from libervia.web.server.constants import Const as C | |
17 from libervia.web.server.utils import SubPage | |
18 | |
19 log = getLogger(__name__) | |
20 | |
21 """generic blog (with service/node provided)""" | |
22 name = 'blog_view' | |
23 template = "blog/articles.html" | |
24 uri_handlers = {('pubsub', 'microblog'): 'microblog_uri'} | |
25 | |
26 URL_LIMIT_MARK = 90 # if canonical URL is longer than that, text will not be appended | |
27 | |
28 | |
29 def microblog_uri(self, uri_data): | |
30 args = [uri_data['path'], uri_data['node']] | |
31 if 'item' in uri_data: | |
32 args.extend(['id', uri_data['item']]) | |
33 return self.get_url(*args) | |
34 | |
35 def parse_url(self, request): | |
36 """URL is /[service]/[node]/[filter_keyword]/[item]|[other] | |
37 | |
38 if [node] is '@', default namespace is used | |
39 if a value is unset, default one will be used | |
40 keyword can be one of: | |
41 id: next value is a item id | |
42 tag: next value is a blog tag | |
43 """ | |
44 data = self.get_r_data(request) | |
45 | |
46 try: | |
47 service = self.next_path(request) | |
48 except IndexError: | |
49 data['service'] = '' | |
50 else: | |
51 try: | |
52 data["service"] = jid.JID(service) | |
53 except Exception: | |
54 log.warning(_("bad service entered: {}").format(service)) | |
55 self.page_error(request, C.HTTP_BAD_REQUEST) | |
56 | |
57 try: | |
58 node = self.next_path(request) | |
59 except IndexError: | |
60 node = '@' | |
61 data['node'] = '' if node == '@' else node | |
62 | |
63 try: | |
64 filter_kw = data['filter_keyword'] = self.next_path(request) | |
65 except IndexError: | |
66 filter_kw = '@' | |
67 else: | |
68 if filter_kw == '@': | |
69 # No filter, this is used when a subpage is needed, notably Atom feed | |
70 pass | |
71 elif filter_kw == 'id': | |
72 try: | |
73 data['item'] = self.next_path(request) | |
74 except IndexError: | |
75 self.page_error(request, C.HTTP_BAD_REQUEST) | |
76 # we get one more argument in case text has been added to have a nice URL | |
77 try: | |
78 self.next_path(request) | |
79 except IndexError: | |
80 pass | |
81 elif filter_kw == 'tag': | |
82 try: | |
83 data['tag'] = self.next_path(request) | |
84 except IndexError: | |
85 self.page_error(request, C.HTTP_BAD_REQUEST) | |
86 else: | |
87 # invalid filter keyword | |
88 log.warning(_("invalid filter keyword: {filter_kw}").format( | |
89 filter_kw=filter_kw)) | |
90 self.page_error(request, C.HTTP_BAD_REQUEST) | |
91 | |
92 # if URL is parsed here, we'll have atom.xml available and we need to | |
93 # add the link to the page | |
94 atom_url = self.get_url_by_path( | |
95 SubPage('blog_view'), | |
96 service, | |
97 node, | |
98 filter_kw, | |
99 SubPage('blog_feed_atom'), | |
100 ) | |
101 request.template_data['atom_url'] = atom_url | |
102 request.template_data.setdefault('links', []).append({ | |
103 "href": atom_url, | |
104 "type": "application/atom+xml", | |
105 "rel": "alternate", | |
106 "title": "{service}'s blog".format(service=service)}) | |
107 | |
108 | |
109 def add_breadcrumb(self, request, breadcrumbs): | |
110 data = self.get_r_data(request) | |
111 breadcrumbs.append({ | |
112 "label": D_("Feed"), | |
113 "url": self.get_url(data["service"].full(), data.get("node", "@")) | |
114 }) | |
115 if "item" in data: | |
116 breadcrumbs.append({ | |
117 "label": D_("Post"), | |
118 }) | |
119 | |
120 | |
121 async def append_comments( | |
122 self, | |
123 request: server.Request, | |
124 blog_items: dict, | |
125 profile: str, | |
126 _seen: Optional[set] = None | |
127 ) -> None: | |
128 """Recursively download and append comments of items | |
129 | |
130 @param blog_items: items data | |
131 @param profile: Libervia profile | |
132 @param _seen: used to avoid infinite recursion. For internal use only | |
133 """ | |
134 if _seen is None: | |
135 _seen = set() | |
136 await self.fill_missing_identities( | |
137 request, [i['author_jid'] for i in blog_items['items']]) | |
138 extra: Dict[str, Any] = {C.KEY_ORDER_BY: C.ORDER_BY_CREATION} | |
139 if not self.use_cache(request): | |
140 extra[C.KEY_USE_CACHE] = False | |
141 for blog_item in blog_items['items']: | |
142 for comment_data in blog_item['comments']: | |
143 service = comment_data['service'] | |
144 node = comment_data['node'] | |
145 service_node = (service, node) | |
146 if service_node in _seen: | |
147 log.warning( | |
148 f"Items from {node!r} at {service} have already been retrieved, " | |
149 "there is a recursion at this service" | |
150 ) | |
151 comment_data["items"] = [] | |
152 continue | |
153 else: | |
154 _seen.add(service_node) | |
155 try: | |
156 comments_data = await self.host.bridge_call('mb_get', | |
157 service, | |
158 node, | |
159 C.NO_LIMIT, | |
160 [], | |
161 data_format.serialise( | |
162 extra | |
163 ), | |
164 profile) | |
165 except Exception as e: | |
166 log.warning( | |
167 _("Can't get comments at {node} (service: {service}): {msg}").format( | |
168 service=service, | |
169 node=node, | |
170 msg=e)) | |
171 comment_data['items'] = [] | |
172 continue | |
173 | |
174 comments = data_format.deserialise(comments_data) | |
175 if comments is None: | |
176 log.error(f"Comments should not be None: {comment_data}") | |
177 comment_data["items"] = [] | |
178 continue | |
179 comment_data['items'] = comments['items'] | |
180 await append_comments(self, request, comments, profile, _seen=_seen) | |
181 | |
182 async def get_blog_items( | |
183 self, | |
184 request: server.Request, | |
185 service: jid.JID, | |
186 node: str, | |
187 item_id, | |
188 extra: Dict[str, Any], | |
189 profile: str | |
190 ) -> dict: | |
191 try: | |
192 if item_id: | |
193 items_id = [item_id] | |
194 else: | |
195 items_id = [] | |
196 if not self.use_cache(request): | |
197 extra[C.KEY_USE_CACHE] = False | |
198 blog_data = await self.host.bridge_call('mb_get', | |
199 service.userhost(), | |
200 node, | |
201 C.NO_LIMIT, | |
202 items_id, | |
203 data_format.serialise(extra), | |
204 profile) | |
205 except Exception as e: | |
206 # FIXME: need a better way to test errors in bridge errback | |
207 if "forbidden" in str(e): | |
208 self.page_error(request, 401) | |
209 else: | |
210 log.warning(_("can't retrieve blog for [{service}]: {msg}".format( | |
211 service = service.userhost(), msg=e))) | |
212 blog_data = {"items": []} | |
213 else: | |
214 blog_data = data_format.deserialise(blog_data) | |
215 | |
216 return blog_data | |
217 | |
218 async def prepare_render(self, request): | |
219 data = self.get_r_data(request) | |
220 template_data = request.template_data | |
221 page_max = data.get("page_max", 10) | |
222 # if the comments are not explicitly hidden, we show them | |
223 service, node, item_id, show_comments = ( | |
224 data.get('service', ''), | |
225 data.get('node', ''), | |
226 data.get('item'), | |
227 data.get('show_comments', True) | |
228 ) | |
229 profile = self.get_profile(request) | |
230 if profile is None: | |
231 profile = C.SERVICE_PROFILE | |
232 profile_connected = False | |
233 else: | |
234 profile_connected = True | |
235 | |
236 ## pagination/filtering parameters | |
237 if item_id: | |
238 extra = {} | |
239 else: | |
240 extra = self.get_pubsub_extra(request, page_max=page_max) | |
241 tag = data.get('tag') | |
242 if tag: | |
243 extra[f'mam_filter_{C.MAM_FILTER_CATEGORY}'] = tag | |
244 self.handle_search(request, extra) | |
245 | |
246 ## main data ## | |
247 # we get data from backend/XMPP here | |
248 blog_items = await get_blog_items(self, request, service, node, item_id, extra, profile) | |
249 | |
250 ## navigation ## | |
251 # no let's fill service, node and pagination URLs | |
252 if 'service' not in template_data: | |
253 template_data['service'] = service | |
254 if 'node' not in template_data: | |
255 template_data['node'] = node | |
256 target_profile = template_data.get('target_profile') | |
257 | |
258 if blog_items: | |
259 if item_id: | |
260 template_data["previous_page_url"] = self.get_url( | |
261 service.full(), | |
262 node, | |
263 before=item_id, | |
264 page_max=1 | |
265 ) | |
266 template_data["next_page_url"] = self.get_url( | |
267 service.full(), | |
268 node, | |
269 after=item_id, | |
270 page_max=1 | |
271 ) | |
272 blog_items["rsm"] = { | |
273 "last": item_id, | |
274 "first": item_id, | |
275 } | |
276 blog_items["complete"] = False | |
277 else: | |
278 self.set_pagination(request, blog_items) | |
279 else: | |
280 if item_id: | |
281 # if item id has been specified in URL and it's not found, | |
282 # we must return an error | |
283 self.page_error(request, C.HTTP_NOT_FOUND) | |
284 | |
285 ## identities ## | |
286 # identities are used to show nice nickname or avatars | |
287 await self.fill_missing_identities(request, [i['author_jid'] for i in blog_items['items']]) | |
288 | |
289 ## Comments ## | |
290 # if comments are requested, we need to take them | |
291 if show_comments: | |
292 await append_comments(self, request, blog_items, profile) | |
293 | |
294 ## URLs ## | |
295 # We will fill items_http_uri and tags_http_uri in template_data with suitable urls | |
296 # if we know the profile, we use it instead of service + blog (nicer url) | |
297 if target_profile is None: | |
298 blog_base_url_item = self.get_page_by_name('blog_view').get_url(service.full(), node or '@', 'id') | |
299 blog_base_url_tag = self.get_page_by_name('blog_view').get_url(service.full(), node or '@', 'tag') | |
300 else: | |
301 blog_base_url_item = self.get_url_by_names([('user', [target_profile]), ('user_blog', ['id'])]) | |
302 blog_base_url_tag = self.get_url_by_names([('user', [target_profile]), ('user_blog', ['tag'])]) | |
303 # we also set the background image if specified by user | |
304 bg_img = await self.host.bridge_call('param_get_a_async', 'Background', 'Blog page', 'value', -1, template_data['target_profile']) | |
305 if bg_img: | |
306 template_data['dynamic_style'] = safe(""" | |
307 :root { | |
308 --bg-img: url("%s"); | |
309 } | |
310 """ % html.escape(bg_img, True)) | |
311 | |
312 template_data['blog_items'] = data['blog_items'] = blog_items | |
313 if request.args.get(b'reverse') == ['1']: | |
314 template_data['blog_items'].items.reverse() | |
315 template_data['items_http_uri'] = items_http_uri = {} | |
316 template_data['tags_http_uri'] = tags_http_uri = {} | |
317 | |
318 | |
319 for item in blog_items['items']: | |
320 blog_canonical_url = '/'.join([blog_base_url_item, utils.quote(item['id'])]) | |
321 if len(blog_canonical_url) > URL_LIMIT_MARK: | |
322 blog_url = blog_canonical_url | |
323 elif '-' not in item['id']: | |
324 # we add text from title or body at the end of URL | |
325 # to make it more human readable | |
326 # we do it only if there is no "-", as a "-" probably means that | |
327 # item's id is already user friendly. | |
328 # TODO: to be removed, this is only kept for a transition period until | |
329 # user friendly item IDs are more common. | |
330 text = regex.url_friendly_text(item.get('title', item['content'])) | |
331 if text: | |
332 blog_url = blog_canonical_url + '/' + text | |
333 else: | |
334 blog_url = blog_canonical_url | |
335 else: | |
336 blog_url = blog_canonical_url | |
337 | |
338 items_http_uri[item['id']] = self.host.get_ext_base_url(request, blog_url) | |
339 for tag in item['tags']: | |
340 if tag not in tags_http_uri: | |
341 tag_url = '/'.join([blog_base_url_tag, utils.quote(tag)]) | |
342 tags_http_uri[tag] = self.host.get_ext_base_url(request, tag_url) | |
343 | |
344 # if True, page should display a comment box | |
345 template_data['allow_commenting'] = data.get('allow_commenting', profile_connected) | |
346 | |
347 # last but not least, we add a xmpp: link to the node | |
348 uri_args = {'path': service.full()} | |
349 if node: | |
350 uri_args['node'] = node | |
351 if item_id: | |
352 uri_args['item'] = item_id | |
353 template_data['xmpp_uri'] = uri.build_xmpp_uri( | |
354 'pubsub', subtype='microblog', **uri_args | |
355 ) | |
356 | |
357 | |
358 async def on_data_post(self, request): | |
359 profile = self.get_profile(request) | |
360 if profile is None: | |
361 self.page_error(request, C.HTTP_FORBIDDEN) | |
362 type_ = self.get_posted_data(request, 'type') | |
363 if type_ == 'comment': | |
364 service, node, body = self.get_posted_data(request, ('service', 'node', 'body')) | |
365 | |
366 if not body: | |
367 self.page_error(request, C.HTTP_BAD_REQUEST) | |
368 comment_data = {"content_rich": body} | |
369 try: | |
370 await self.host.bridge_call('mb_send', | |
371 service, | |
372 node, | |
373 data_format.serialise(comment_data), | |
374 profile) | |
375 except Exception as e: | |
376 if "forbidden" in str(e): | |
377 self.page_error(request, 401) | |
378 else: | |
379 raise e | |
380 else: | |
381 log.warning(_("Unhandled data type: {}").format(type_)) |