Mercurial > libervia-web
view libervia/pages/blog/view/page_meta.py @ 1513:ff95501abe74
server (websocket): don't crash when a force-close is failing
author | Goffi <goffi@goffi.org> |
---|---|
date | Mon, 22 May 2023 11:57:49 +0200 |
parents | 106bae41f5c8 |
children |
line wrap: on
line source
#!/usr/bin/env python3 import html from typing import Any, Dict, Optional from sat.core.i18n import D_, _ from sat.core.log import getLogger from sat.tools.common import uri from sat.tools.common import data_format from sat.tools.common import regex from sat.tools.common.template import safe from twisted.web import server from twisted.words.protocols.jabber import jid from libervia.server import utils from libervia.server.constants import Const as C from libervia.server.utils import SubPage log = getLogger(__name__) """generic blog (with service/node provided)""" name = 'blog_view' template = "blog/articles.html" uri_handlers = {('pubsub', 'microblog'): 'microblog_uri'} URL_LIMIT_MARK = 90 # if canonical URL is longer than that, text will not be appended def microblog_uri(self, uri_data): args = [uri_data['path'], uri_data['node']] if 'item' in uri_data: args.extend(['id', uri_data['item']]) return self.get_url(*args) def parse_url(self, request): """URL is /[service]/[node]/[filter_keyword]/[item]|[other] if [node] is '@', default namespace is used if a value is unset, default one will be used keyword can be one of: id: next value is a item id tag: next value is a blog tag """ data = self.get_r_data(request) try: service = self.next_path(request) except IndexError: data['service'] = '' else: try: data["service"] = jid.JID(service) except Exception: log.warning(_("bad service entered: {}").format(service)) self.page_error(request, C.HTTP_BAD_REQUEST) try: node = self.next_path(request) except IndexError: node = '@' data['node'] = '' if node == '@' else node try: filter_kw = data['filter_keyword'] = self.next_path(request) except IndexError: filter_kw = '@' else: if filter_kw == '@': # No filter, this is used when a subpage is needed, notably Atom feed pass elif filter_kw == 'id': try: data['item'] = self.next_path(request) except IndexError: self.page_error(request, C.HTTP_BAD_REQUEST) # we get one more argument in case text has been added to have a nice URL try: self.next_path(request) except IndexError: pass elif filter_kw == 'tag': try: data['tag'] = self.next_path(request) except IndexError: self.page_error(request, C.HTTP_BAD_REQUEST) else: # invalid filter keyword log.warning(_("invalid filter keyword: {filter_kw}").format( filter_kw=filter_kw)) self.page_error(request, C.HTTP_BAD_REQUEST) # if URL is parsed here, we'll have atom.xml available and we need to # add the link to the page atom_url = self.get_url_by_path( SubPage('blog_view'), service, node, filter_kw, SubPage('blog_feed_atom'), ) request.template_data['atom_url'] = atom_url request.template_data.setdefault('links', []).append({ "href": atom_url, "type": "application/atom+xml", "rel": "alternate", "title": "{service}'s blog".format(service=service)}) def add_breadcrumb(self, request, breadcrumbs): data = self.get_r_data(request) breadcrumbs.append({ "label": D_("Feed"), "url": self.get_url(data["service"].full(), data.get("node", "@")) }) if "item" in data: breadcrumbs.append({ "label": D_("Post"), }) async def append_comments( self, request: server.Request, blog_items: dict, profile: str, _seen: Optional[set] = None ) -> None: """Recursively download and append comments of items @param blog_items: items data @param profile: Libervia profile @param _seen: used to avoid infinite recursion. For internal use only """ if _seen is None: _seen = set() await self.fill_missing_identities( request, [i['author_jid'] for i in blog_items['items']]) extra: Dict[str, Any] = {C.KEY_ORDER_BY: C.ORDER_BY_CREATION} if not self.use_cache(request): extra[C.KEY_USE_CACHE] = False for blog_item in blog_items['items']: for comment_data in blog_item['comments']: service = comment_data['service'] node = comment_data['node'] service_node = (service, node) if service_node in _seen: log.warning( f"Items from {node!r} at {service} have already been retrieved, " "there is a recursion at this service" ) comment_data["items"] = [] continue else: _seen.add(service_node) try: comments_data = await self.host.bridge_call('mb_get', service, node, C.NO_LIMIT, [], data_format.serialise( extra ), profile) except Exception as e: log.warning( _("Can't get comments at {node} (service: {service}): {msg}").format( service=service, node=node, msg=e)) comment_data['items'] = [] continue comments = data_format.deserialise(comments_data) if comments is None: log.error(f"Comments should not be None: {comment_data}") comment_data["items"] = [] continue comment_data['items'] = comments['items'] await append_comments(self, request, comments, profile, _seen=_seen) async def get_blog_items( self, request: server.Request, service: jid.JID, node: str, item_id, extra: Dict[str, Any], profile: str ) -> dict: try: if item_id: items_id = [item_id] else: items_id = [] if not self.use_cache(request): extra[C.KEY_USE_CACHE] = False blog_data = await self.host.bridge_call('mb_get', service.userhost(), node, C.NO_LIMIT, items_id, data_format.serialise(extra), profile) except Exception as e: # FIXME: need a better way to test errors in bridge errback if "forbidden" in str(e): self.page_error(request, 401) else: log.warning(_("can't retrieve blog for [{service}]: {msg}".format( service = service.userhost(), msg=e))) blog_data = {"items": []} else: blog_data = data_format.deserialise(blog_data) return blog_data async def prepare_render(self, request): data = self.get_r_data(request) template_data = request.template_data page_max = data.get("page_max", 10) # if the comments are not explicitly hidden, we show them service, node, item_id, show_comments = ( data.get('service', ''), data.get('node', ''), data.get('item'), data.get('show_comments', True) ) profile = self.get_profile(request) if profile is None: profile = C.SERVICE_PROFILE profile_connected = False else: profile_connected = True ## pagination/filtering parameters if item_id: extra = {} else: extra = self.get_pubsub_extra(request, page_max=page_max) tag = data.get('tag') if tag: extra[f'mam_filter_{C.MAM_FILTER_CATEGORY}'] = tag self.handle_search(request, extra) ## main data ## # we get data from backend/XMPP here blog_items = await get_blog_items(self, request, service, node, item_id, extra, profile) ## navigation ## # no let's fill service, node and pagination URLs if 'service' not in template_data: template_data['service'] = service if 'node' not in template_data: template_data['node'] = node target_profile = template_data.get('target_profile') if blog_items: if item_id: template_data["previous_page_url"] = self.get_url( service.full(), node, before=item_id, page_max=1 ) template_data["next_page_url"] = self.get_url( service.full(), node, after=item_id, page_max=1 ) blog_items["rsm"] = { "last": item_id, "first": item_id, } blog_items["complete"] = False else: self.set_pagination(request, blog_items) else: if item_id: # if item id has been specified in URL and it's not found, # we must return an error self.page_error(request, C.HTTP_NOT_FOUND) ## identities ## # identities are used to show nice nickname or avatars await self.fill_missing_identities(request, [i['author_jid'] for i in blog_items['items']]) ## Comments ## # if comments are requested, we need to take them if show_comments: await append_comments(self, request, blog_items, profile) ## URLs ## # We will fill items_http_uri and tags_http_uri in template_data with suitable urls # if we know the profile, we use it instead of service + blog (nicer url) if target_profile is None: blog_base_url_item = self.get_page_by_name('blog_view').get_url(service.full(), node or '@', 'id') blog_base_url_tag = self.get_page_by_name('blog_view').get_url(service.full(), node or '@', 'tag') else: blog_base_url_item = self.get_url_by_names([('user', [target_profile]), ('user_blog', ['id'])]) blog_base_url_tag = self.get_url_by_names([('user', [target_profile]), ('user_blog', ['tag'])]) # we also set the background image if specified by user bg_img = await self.host.bridge_call('param_get_a_async', 'Background', 'Blog page', 'value', -1, template_data['target_profile']) if bg_img: template_data['dynamic_style'] = safe(""" :root { --bg-img: url("%s"); } """ % html.escape(bg_img, True)) template_data['blog_items'] = data['blog_items'] = blog_items if request.args.get(b'reverse') == ['1']: template_data['blog_items'].items.reverse() template_data['items_http_uri'] = items_http_uri = {} template_data['tags_http_uri'] = tags_http_uri = {} for item in blog_items['items']: blog_canonical_url = '/'.join([blog_base_url_item, utils.quote(item['id'])]) if len(blog_canonical_url) > URL_LIMIT_MARK: blog_url = blog_canonical_url elif '-' not in item['id']: # we add text from title or body at the end of URL # to make it more human readable # we do it only if there is no "-", as a "-" probably means that # item's id is already user friendly. # TODO: to be removed, this is only kept for a transition period until # user friendly item IDs are more common. text = regex.url_friendly_text(item.get('title', item['content'])) if text: blog_url = blog_canonical_url + '/' + text else: blog_url = blog_canonical_url else: blog_url = blog_canonical_url items_http_uri[item['id']] = self.host.get_ext_base_url(request, blog_url) for tag in item['tags']: if tag not in tags_http_uri: tag_url = '/'.join([blog_base_url_tag, utils.quote(tag)]) tags_http_uri[tag] = self.host.get_ext_base_url(request, tag_url) # if True, page should display a comment box template_data['allow_commenting'] = data.get('allow_commenting', profile_connected) # last but not least, we add a xmpp: link to the node uri_args = {'path': service.full()} if node: uri_args['node'] = node if item_id: uri_args['item'] = item_id template_data['xmpp_uri'] = uri.build_xmpp_uri( 'pubsub', subtype='microblog', **uri_args ) async def on_data_post(self, request): profile = self.get_profile(request) if profile is None: self.page_error(request, C.HTTP_FORBIDDEN) type_ = self.get_posted_data(request, 'type') if type_ == 'comment': service, node, body = self.get_posted_data(request, ('service', 'node', 'body')) if not body: self.page_error(request, C.HTTP_BAD_REQUEST) comment_data = {"content_rich": body} try: await self.host.bridge_call('mb_send', service, node, data_format.serialise(comment_data), profile) except Exception as e: if "forbidden" in str(e): self.page_error(request, 401) else: raise e else: log.warning(_("Unhandled data type: {}").format(type_))