view libervia/web/pages/blog/view/page_meta.py @ 1598:86c7a3a625d5

server: always start a new session on connection: The session was kept when a user was connecting from service profile (but not from other profiles), this was leading to session fixation vulnerability (an attacker on the same machine could get service profile session cookie, and use it when a victim would log-in). This patch fixes it by always starting a new session on connection. fix 443
author Goffi <goffi@goffi.org>
date Fri, 23 Feb 2024 13:35:24 +0100
parents eb00d593801d
children
line wrap: on
line source

#!/usr/bin/env python3

import html
from typing import Any, Dict, Optional

from libervia.backend.core.i18n import D_, _
from libervia.backend.core.log import getLogger
from libervia.backend.tools.common import uri
from libervia.backend.tools.common import data_format
from libervia.backend.tools.common import regex
from libervia.backend.tools.common.template import safe
from twisted.web import server
from twisted.words.protocols.jabber import jid

from libervia.web.server import utils
from libervia.web.server.constants import Const as C
from libervia.web.server.utils import SubPage

log = getLogger(__name__)

"""generic blog (with service/node provided)"""
name = 'blog_view'
template = "blog/articles.html"
uri_handlers = {('pubsub', 'microblog'): 'microblog_uri'}

URL_LIMIT_MARK = 90  # if canonical URL is longer than that, text will not be appended


def microblog_uri(self, uri_data):
    args = [uri_data['path'], uri_data['node']]
    if 'item' in uri_data:
        args.extend(['id', uri_data['item']])
    return self.get_url(*args)

def parse_url(self, request):
    """URL is /[service]/[node]/[filter_keyword]/[item]|[other]

    if [node] is '@', default namespace is used
    if a value is unset, default one will be used
    keyword can be one of:
        id: next value is a item id
        tag: next value is a blog tag
    """
    data = self.get_r_data(request)

    try:
        service = self.next_path(request)
    except IndexError:
        data['service'] = ''
    else:
        try:
            data["service"] = jid.JID(service)
        except Exception:
            log.warning(_("bad service entered: {}").format(service))
            self.page_error(request, C.HTTP_BAD_REQUEST)

    try:
        node = self.next_path(request)
    except IndexError:
        node = '@'
    data['node'] = '' if node == '@' else node

    try:
        filter_kw = data['filter_keyword'] = self.next_path(request)
    except IndexError:
        filter_kw = '@'
    else:
        if filter_kw == '@':
            # No filter, this is used when a subpage is needed, notably Atom feed
            pass
        elif filter_kw == 'id':
            try:
                data['item'] = self.next_path(request)
            except IndexError:
                self.page_error(request, C.HTTP_BAD_REQUEST)
            # we get one more argument in case text has been added to have a nice URL
            try:
                self.next_path(request)
            except IndexError:
                pass
        elif filter_kw == 'tag':
            try:
                data['tag'] = self.next_path(request)
            except IndexError:
                self.page_error(request, C.HTTP_BAD_REQUEST)
        else:
            # invalid filter keyword
            log.warning(_("invalid filter keyword: {filter_kw}").format(
                filter_kw=filter_kw))
            self.page_error(request, C.HTTP_BAD_REQUEST)

    # if URL is parsed here, we'll have atom.xml available and we need to
    # add the link to the page
    atom_url = self.get_url_by_path(
        SubPage('blog_view'),
        service,
        node,
        filter_kw,
        SubPage('blog_feed_atom'),
    )
    request.template_data['atom_url'] = atom_url
    request.template_data.setdefault('links', []).append({
        "href": atom_url,
        "type": "application/atom+xml",
        "rel": "alternate",
        "title": "{service}'s blog".format(service=service)})


def add_breadcrumb(self, request, breadcrumbs):
    data = self.get_r_data(request)
    breadcrumbs.append({
        "label": D_("Feed"),
        "url": self.get_url(data["service"].full(), data.get("node", "@"))
    })
    if "item" in data:
        breadcrumbs.append({
            "label": D_("Post"),
        })


async def append_comments(
    self,
    request: server.Request,
    blog_items: dict,
    profile: str,
    _seen: Optional[set] = None
) -> None:
    """Recursively download and append comments of items

    @param blog_items: items data
    @param profile: Libervia profile
    @param _seen: used to avoid infinite recursion. For internal use only
    """
    if _seen is None:
        _seen = set()
    await self.fill_missing_identities(
        request, [i['author_jid'] for i in blog_items['items']])
    extra: Dict[str, Any] = {C.KEY_ORDER_BY: C.ORDER_BY_CREATION}
    if not self.use_cache(request):
        extra[C.KEY_USE_CACHE] = False
    for blog_item in blog_items['items']:
        for comment_data in blog_item['comments']:
            service = comment_data['service']
            node = comment_data['node']
            service_node = (service, node)
            if service_node in _seen:
                log.warning(
                    f"Items from {node!r} at {service} have already been retrieved, "
                    "there is a recursion at this service"
                )
                comment_data["items"] = []
                continue
            else:
                _seen.add(service_node)
            try:
                comments_data = await self.host.bridge_call('mb_get',
                                      service,
                                      node,
                                      C.NO_LIMIT,
                                      [],
                                      data_format.serialise(
                                          extra
                                      ),
                                      profile)
            except Exception as e:
                log.warning(
                    _("Can't get comments at {node} (service: {service}): {msg}").format(
                        service=service,
                        node=node,
                        msg=e))
                comment_data['items'] = []
                continue

            comments = data_format.deserialise(comments_data)
            if comments is None:
                log.error(f"Comments should not be None: {comment_data}")
                comment_data["items"] = []
                continue
            comment_data['items'] = comments['items']
            await append_comments(self, request, comments, profile, _seen=_seen)

async def get_blog_items(
    self,
    request: server.Request,
    service: jid.JID,
    node: str,
    item_id,
    extra: Dict[str, Any],
    profile: str
) -> dict:
    try:
        if item_id:
            items_id = [item_id]
        else:
            items_id = []
        if not self.use_cache(request):
            extra[C.KEY_USE_CACHE] = False
        blog_data = await self.host.bridge_call('mb_get',
                              service.userhost(),
                              node,
                              C.NO_LIMIT,
                              items_id,
                              data_format.serialise(extra),
                              profile)
    except Exception as e:
        # FIXME: need a better way to test errors in bridge errback
        if "forbidden" in str(e):
            self.page_error(request, 401)
        else:
            log.warning(_("can't retrieve blog for [{service}]: {msg}".format(
                service = service.userhost(), msg=e)))
            blog_data = {"items": []}
    else:
        blog_data = data_format.deserialise(blog_data)

    return blog_data

async def prepare_render(self, request):
    data = self.get_r_data(request)
    template_data = request.template_data
    page_max = data.get("page_max", 10)
    # if the comments are not explicitly hidden, we show them
    service, node, item_id, show_comments = (
        data.get('service', ''),
        data.get('node', ''),
        data.get('item'),
        data.get('show_comments', True)
    )
    profile = self.get_profile(request)
    if profile is None:
        profile = C.SERVICE_PROFILE
        profile_connected = False
    else:
        profile_connected = True

    ## pagination/filtering parameters
    if item_id:
        extra = {}
    else:
        extra = self.get_pubsub_extra(request, page_max=page_max)
        tag = data.get('tag')
        if tag:
            extra[f'mam_filter_{C.MAM_FILTER_CATEGORY}'] = tag
        self.handle_search(request, extra)

    ## main data ##
    # we get data from backend/XMPP here
    blog_items = await get_blog_items(self, request, service, node, item_id, extra, profile)

    ## navigation ##
    # no let's fill service, node and pagination URLs
    if 'service' not in template_data:
        template_data['service'] = service
    if 'node' not in template_data:
        template_data['node'] = node
    target_profile = template_data.get('target_profile')

    if blog_items:
        if item_id:
            template_data["previous_page_url"] = self.get_url(
                service.full(),
                node,
                before=item_id,
                page_max=1
            )
            template_data["next_page_url"] = self.get_url(
                service.full(),
                node,
                after=item_id,
                page_max=1
            )
            blog_items["rsm"] = {
                "last": item_id,
                "first": item_id,
            }
            blog_items["complete"] = False
        else:
            self.set_pagination(request, blog_items)
    else:
        if item_id:
            # if item id has been specified in URL and it's not found,
            # we must return an error
            self.page_error(request, C.HTTP_NOT_FOUND)

    ## identities ##
    # identities are used to show nice nickname or avatars
    await self.fill_missing_identities(request, [i['author_jid'] for i in blog_items['items']])

    ## Comments ##
    # if comments are requested, we need to take them
    if show_comments:
        await append_comments(self, request, blog_items, profile)

    ## URLs ##
    # We will fill items_http_uri and tags_http_uri in template_data with suitable urls
    # if we know the profile, we use it instead of service + blog (nicer url)
    if target_profile is None:
        blog_base_url_item = self.get_page_by_name('blog_view').get_url(service.full(), node or '@', 'id')
        blog_base_url_tag = self.get_page_by_name('blog_view').get_url(service.full(), node or '@', 'tag')
    else:
        blog_base_url_item = self.get_url_by_names([('user', [target_profile]), ('user_blog', ['id'])])
        blog_base_url_tag = self.get_url_by_names([('user', [target_profile]), ('user_blog', ['tag'])])
        # we also set the background image if specified by user
        bg_img = await self.host.bridge_call('param_get_a_async', 'Background', 'Blog page', 'value', -1, template_data['target_profile'])
        if bg_img:
            template_data['dynamic_style'] = safe("""
                :root {
                    --bg-img: url("%s");
                }
                """ % html.escape(bg_img, True))

    template_data['blog_items'] = data['blog_items'] = blog_items
    if request.args.get(b'reverse') == ['1']:
        template_data['blog_items'].items.reverse()
    template_data['items_http_uri'] = items_http_uri = {}
    template_data['tags_http_uri'] = tags_http_uri = {}


    for item in blog_items['items']:
        blog_canonical_url = '/'.join([blog_base_url_item, utils.quote(item['id'])])
        if len(blog_canonical_url) > URL_LIMIT_MARK:
            blog_url = blog_canonical_url
        elif '-' not in item['id']:
            # we add text from title or body at the end of URL
            # to make it more human readable
            # we do it only if there is no "-", as a "-" probably means that
            # item's id is already user friendly.
            # TODO: to be removed,  this is only kept for a transition period until
            #   user friendly item IDs are more common.
            text = regex.url_friendly_text(item.get('title', item['content']))
            if text:
                blog_url = blog_canonical_url + '/' + text
            else:
                blog_url = blog_canonical_url
        else:
            blog_url = blog_canonical_url

        items_http_uri[item['id']] = self.host.get_ext_base_url(request, blog_url)
        for tag in item['tags']:
            if tag not in tags_http_uri:
                tag_url = '/'.join([blog_base_url_tag, utils.quote(tag)])
                tags_http_uri[tag] = self.host.get_ext_base_url(request, tag_url)

    # if True, page should display a comment box
    template_data['allow_commenting'] = data.get('allow_commenting', profile_connected)

    # last but not least, we add a xmpp: link to the node
    uri_args = {'path': service.full()}
    if node:
        uri_args['node'] = node
    if item_id:
        uri_args['item'] = item_id
    template_data['xmpp_uri'] = uri.build_xmpp_uri(
        'pubsub', subtype='microblog', **uri_args
    )


async def on_data_post(self, request):
    profile = self.get_profile(request)
    if profile is None:
        self.page_error(request, C.HTTP_FORBIDDEN)
    type_ = self.get_posted_data(request, 'type')
    if type_ == 'comment':
        service, node, body = self.get_posted_data(request, ('service', 'node', 'body'))

        if not body:
            self.page_error(request, C.HTTP_BAD_REQUEST)
        comment_data = {"content_rich": body}
        try:
            await self.host.bridge_call('mb_send',
                                       service,
                                       node,
                                       data_format.serialise(comment_data),
                                       profile)
        except Exception as e:
            if "forbidden" in str(e):
                self.page_error(request, 401)
            else:
                raise e
    else:
        log.warning(_("Unhandled data type: {}").format(type_))