Mercurial > libervia-web
diff libervia/web/pages/blog/view/page_meta.py @ 1518:eb00d593801d
refactoring: rename `libervia` to `libervia.web` + update imports following backend changes
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Jun 2023 16:49:28 +0200 |
parents | libervia/pages/blog/view/page_meta.py@106bae41f5c8 |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/libervia/web/pages/blog/view/page_meta.py Fri Jun 02 16:49:28 2023 +0200 @@ -0,0 +1,381 @@ +#!/usr/bin/env python3 + +import html +from typing import Any, Dict, Optional + +from libervia.backend.core.i18n import D_, _ +from libervia.backend.core.log import getLogger +from libervia.backend.tools.common import uri +from libervia.backend.tools.common import data_format +from libervia.backend.tools.common import regex +from libervia.backend.tools.common.template import safe +from twisted.web import server +from twisted.words.protocols.jabber import jid + +from libervia.web.server import utils +from libervia.web.server.constants import Const as C +from libervia.web.server.utils import SubPage + +log = getLogger(__name__) + +"""generic blog (with service/node provided)""" +name = 'blog_view' +template = "blog/articles.html" +uri_handlers = {('pubsub', 'microblog'): 'microblog_uri'} + +URL_LIMIT_MARK = 90 # if canonical URL is longer than that, text will not be appended + + +def microblog_uri(self, uri_data): + args = [uri_data['path'], uri_data['node']] + if 'item' in uri_data: + args.extend(['id', uri_data['item']]) + return self.get_url(*args) + +def parse_url(self, request): + """URL is /[service]/[node]/[filter_keyword]/[item]|[other] + + if [node] is '@', default namespace is used + if a value is unset, default one will be used + keyword can be one of: + id: next value is a item id + tag: next value is a blog tag + """ + data = self.get_r_data(request) + + try: + service = self.next_path(request) + except IndexError: + data['service'] = '' + else: + try: + data["service"] = jid.JID(service) + except Exception: + log.warning(_("bad service entered: {}").format(service)) + self.page_error(request, C.HTTP_BAD_REQUEST) + + try: + node = self.next_path(request) + except IndexError: + node = '@' + data['node'] = '' if node == '@' else node + + try: + filter_kw = data['filter_keyword'] = self.next_path(request) + except IndexError: + filter_kw = '@' + else: + if filter_kw == '@': + # No filter, this is used when a subpage is needed, notably Atom feed + pass + elif filter_kw == 'id': + try: + data['item'] = self.next_path(request) + except IndexError: + self.page_error(request, C.HTTP_BAD_REQUEST) + # we get one more argument in case text has been added to have a nice URL + try: + self.next_path(request) + except IndexError: + pass + elif filter_kw == 'tag': + try: + data['tag'] = self.next_path(request) + except IndexError: + self.page_error(request, C.HTTP_BAD_REQUEST) + else: + # invalid filter keyword + log.warning(_("invalid filter keyword: {filter_kw}").format( + filter_kw=filter_kw)) + self.page_error(request, C.HTTP_BAD_REQUEST) + + # if URL is parsed here, we'll have atom.xml available and we need to + # add the link to the page + atom_url = self.get_url_by_path( + SubPage('blog_view'), + service, + node, + filter_kw, + SubPage('blog_feed_atom'), + ) + request.template_data['atom_url'] = atom_url + request.template_data.setdefault('links', []).append({ + "href": atom_url, + "type": "application/atom+xml", + "rel": "alternate", + "title": "{service}'s blog".format(service=service)}) + + +def add_breadcrumb(self, request, breadcrumbs): + data = self.get_r_data(request) + breadcrumbs.append({ + "label": D_("Feed"), + "url": self.get_url(data["service"].full(), data.get("node", "@")) + }) + if "item" in data: + breadcrumbs.append({ + "label": D_("Post"), + }) + + +async def append_comments( + self, + request: server.Request, + blog_items: dict, + profile: str, + _seen: Optional[set] = None +) -> None: + """Recursively download and append comments of items + + @param blog_items: items data + @param profile: Libervia profile + @param _seen: used to avoid infinite recursion. For internal use only + """ + if _seen is None: + _seen = set() + await self.fill_missing_identities( + request, [i['author_jid'] for i in blog_items['items']]) + extra: Dict[str, Any] = {C.KEY_ORDER_BY: C.ORDER_BY_CREATION} + if not self.use_cache(request): + extra[C.KEY_USE_CACHE] = False + for blog_item in blog_items['items']: + for comment_data in blog_item['comments']: + service = comment_data['service'] + node = comment_data['node'] + service_node = (service, node) + if service_node in _seen: + log.warning( + f"Items from {node!r} at {service} have already been retrieved, " + "there is a recursion at this service" + ) + comment_data["items"] = [] + continue + else: + _seen.add(service_node) + try: + comments_data = await self.host.bridge_call('mb_get', + service, + node, + C.NO_LIMIT, + [], + data_format.serialise( + extra + ), + profile) + except Exception as e: + log.warning( + _("Can't get comments at {node} (service: {service}): {msg}").format( + service=service, + node=node, + msg=e)) + comment_data['items'] = [] + continue + + comments = data_format.deserialise(comments_data) + if comments is None: + log.error(f"Comments should not be None: {comment_data}") + comment_data["items"] = [] + continue + comment_data['items'] = comments['items'] + await append_comments(self, request, comments, profile, _seen=_seen) + +async def get_blog_items( + self, + request: server.Request, + service: jid.JID, + node: str, + item_id, + extra: Dict[str, Any], + profile: str +) -> dict: + try: + if item_id: + items_id = [item_id] + else: + items_id = [] + if not self.use_cache(request): + extra[C.KEY_USE_CACHE] = False + blog_data = await self.host.bridge_call('mb_get', + service.userhost(), + node, + C.NO_LIMIT, + items_id, + data_format.serialise(extra), + profile) + except Exception as e: + # FIXME: need a better way to test errors in bridge errback + if "forbidden" in str(e): + self.page_error(request, 401) + else: + log.warning(_("can't retrieve blog for [{service}]: {msg}".format( + service = service.userhost(), msg=e))) + blog_data = {"items": []} + else: + blog_data = data_format.deserialise(blog_data) + + return blog_data + +async def prepare_render(self, request): + data = self.get_r_data(request) + template_data = request.template_data + page_max = data.get("page_max", 10) + # if the comments are not explicitly hidden, we show them + service, node, item_id, show_comments = ( + data.get('service', ''), + data.get('node', ''), + data.get('item'), + data.get('show_comments', True) + ) + profile = self.get_profile(request) + if profile is None: + profile = C.SERVICE_PROFILE + profile_connected = False + else: + profile_connected = True + + ## pagination/filtering parameters + if item_id: + extra = {} + else: + extra = self.get_pubsub_extra(request, page_max=page_max) + tag = data.get('tag') + if tag: + extra[f'mam_filter_{C.MAM_FILTER_CATEGORY}'] = tag + self.handle_search(request, extra) + + ## main data ## + # we get data from backend/XMPP here + blog_items = await get_blog_items(self, request, service, node, item_id, extra, profile) + + ## navigation ## + # no let's fill service, node and pagination URLs + if 'service' not in template_data: + template_data['service'] = service + if 'node' not in template_data: + template_data['node'] = node + target_profile = template_data.get('target_profile') + + if blog_items: + if item_id: + template_data["previous_page_url"] = self.get_url( + service.full(), + node, + before=item_id, + page_max=1 + ) + template_data["next_page_url"] = self.get_url( + service.full(), + node, + after=item_id, + page_max=1 + ) + blog_items["rsm"] = { + "last": item_id, + "first": item_id, + } + blog_items["complete"] = False + else: + self.set_pagination(request, blog_items) + else: + if item_id: + # if item id has been specified in URL and it's not found, + # we must return an error + self.page_error(request, C.HTTP_NOT_FOUND) + + ## identities ## + # identities are used to show nice nickname or avatars + await self.fill_missing_identities(request, [i['author_jid'] for i in blog_items['items']]) + + ## Comments ## + # if comments are requested, we need to take them + if show_comments: + await append_comments(self, request, blog_items, profile) + + ## URLs ## + # We will fill items_http_uri and tags_http_uri in template_data with suitable urls + # if we know the profile, we use it instead of service + blog (nicer url) + if target_profile is None: + blog_base_url_item = self.get_page_by_name('blog_view').get_url(service.full(), node or '@', 'id') + blog_base_url_tag = self.get_page_by_name('blog_view').get_url(service.full(), node or '@', 'tag') + else: + blog_base_url_item = self.get_url_by_names([('user', [target_profile]), ('user_blog', ['id'])]) + blog_base_url_tag = self.get_url_by_names([('user', [target_profile]), ('user_blog', ['tag'])]) + # we also set the background image if specified by user + bg_img = await self.host.bridge_call('param_get_a_async', 'Background', 'Blog page', 'value', -1, template_data['target_profile']) + if bg_img: + template_data['dynamic_style'] = safe(""" + :root { + --bg-img: url("%s"); + } + """ % html.escape(bg_img, True)) + + template_data['blog_items'] = data['blog_items'] = blog_items + if request.args.get(b'reverse') == ['1']: + template_data['blog_items'].items.reverse() + template_data['items_http_uri'] = items_http_uri = {} + template_data['tags_http_uri'] = tags_http_uri = {} + + + for item in blog_items['items']: + blog_canonical_url = '/'.join([blog_base_url_item, utils.quote(item['id'])]) + if len(blog_canonical_url) > URL_LIMIT_MARK: + blog_url = blog_canonical_url + elif '-' not in item['id']: + # we add text from title or body at the end of URL + # to make it more human readable + # we do it only if there is no "-", as a "-" probably means that + # item's id is already user friendly. + # TODO: to be removed, this is only kept for a transition period until + # user friendly item IDs are more common. + text = regex.url_friendly_text(item.get('title', item['content'])) + if text: + blog_url = blog_canonical_url + '/' + text + else: + blog_url = blog_canonical_url + else: + blog_url = blog_canonical_url + + items_http_uri[item['id']] = self.host.get_ext_base_url(request, blog_url) + for tag in item['tags']: + if tag not in tags_http_uri: + tag_url = '/'.join([blog_base_url_tag, utils.quote(tag)]) + tags_http_uri[tag] = self.host.get_ext_base_url(request, tag_url) + + # if True, page should display a comment box + template_data['allow_commenting'] = data.get('allow_commenting', profile_connected) + + # last but not least, we add a xmpp: link to the node + uri_args = {'path': service.full()} + if node: + uri_args['node'] = node + if item_id: + uri_args['item'] = item_id + template_data['xmpp_uri'] = uri.build_xmpp_uri( + 'pubsub', subtype='microblog', **uri_args + ) + + +async def on_data_post(self, request): + profile = self.get_profile(request) + if profile is None: + self.page_error(request, C.HTTP_FORBIDDEN) + type_ = self.get_posted_data(request, 'type') + if type_ == 'comment': + service, node, body = self.get_posted_data(request, ('service', 'node', 'body')) + + if not body: + self.page_error(request, C.HTTP_BAD_REQUEST) + comment_data = {"content_rich": body} + try: + await self.host.bridge_call('mb_send', + service, + node, + data_format.serialise(comment_data), + profile) + except Exception as e: + if "forbidden" in str(e): + self.page_error(request, 401) + else: + raise e + else: + log.warning(_("Unhandled data type: {}").format(type_))