changeset 995:f88325b56a6a

server: dynamic pages first draft: /!\ new dependency: autobahn This patch introduce server part of dynamic pages. Dynamic pages use websockets to establish constant connection with a Libervia page, allowing to receive real time data or update it. The feature is activated by specifying "dynamic = true" in the page. Once activated, page can implement "on_data" method which will be called when data are sent by the page. To send data the other way, the page can use request.sendData. The new "registerSignal" method allows to use an "on_signal" method to be called each time given signal is received, with automatic (and optional) filtering on profile. New renderPartial and renderAndUpdate method allow to append new HTML elements to the dynamic page.
author Goffi <goffi@goffi.org>
date Wed, 03 Jan 2018 01:10:12 +0100
parents b92b06f023cb
children d821c112e656
files setup.py src/server/pages.py src/server/server.py src/server/session_iface.py src/server/websockets.py
diffstat 5 files changed, 324 insertions(+), 5 deletions(-) [+]
line wrap: on
line diff
--- a/setup.py	Wed Dec 13 00:37:12 2017 +0100
+++ b/setup.py	Wed Jan 03 01:10:12 2018 +0100
@@ -302,6 +302,6 @@
                   for root, dirs, files in os.walk(C.THEMES_DIR)],
       scripts=[],
       zip_safe=False,
-      install_requires=['sat', 'twisted', 'txJSON-RPC==0.3.1', 'zope.interface', 'pyopenssl', 'jinja2', 'shortuuid'],
+      install_requires=['sat', 'twisted', 'txJSON-RPC==0.3.1', 'zope.interface', 'pyopenssl', 'jinja2', 'shortuuid', 'autobahn'],
       cmdclass={'install': CustomInstall},
       )
--- a/src/server/pages.py	Wed Dec 13 00:37:12 2017 +0100
+++ b/src/server/pages.py	Wed Jan 03 01:10:12 2018 +0100
@@ -33,10 +33,14 @@
 from libervia.server.utils import quote
 import libervia
 
+from collections import namedtuple
+import uuid
 import os.path
 import urllib
 import time
 
+WebsocketMeta = namedtuple("WebsocketMeta", ('url', 'token', 'debug'))
+
 
 class Cache(object):
 
@@ -66,6 +70,7 @@
     isLeaf = True  # we handle subpages ourself
     named_pages = {}
     uri_callbacks = {}
+    signals_handlers = {}
     pages_redirects = {}
     cache = {}
     # Set of tuples (service/node/sub_id) of nodes subscribed for caching
@@ -73,8 +78,9 @@
     cache_pubsub_sub = set()
     main_menu = None
 
-    def __init__(self, host, root_dir, url, name=None, redirect=None, access=None, parse_url=None,
-                 prepare_render=None, render=None, template=None, on_data_post=None):
+    def __init__(self, host, root_dir, url, name=None, redirect=None, access=None, dynamic=False, parse_url=None,
+                 prepare_render=None, render=None, template=None,
+                 on_data_post=None, on_data=None, on_signal=None):
         """initiate LiberviaPages
 
         LiberviaPages are the main resources of Libervia, using easy to set python files
@@ -96,6 +102,7 @@
             Pages inherit from parent pages: e.g. if a "settings" page is restricted to admins,
             and if "settings/blog" is public, it still can only be accessed by admins.
             see C.PAGES_ACCESS_* for details
+        @param dynamic(bool): if True, activate websocket for bidirectional communication
         @param parse_url(callable, None): if set it will be called to handle the URL path
             after this method, the page will be rendered if noting is left in path (request.postpath)
             else a the request will be transmitted to a subpage
@@ -110,6 +117,10 @@
             None if not post is handled
             on_data_post can return a string with following value:
                 - C.POST_NO_CONFIRM: confirm flag will not be set
+        @param on_data(callable, None): method to call when dynamic data is sent
+            this method is used with Libervia's websocket mechanism
+        @param on_signal(callable, None): method to call when a registered signal is received
+            this method is used with Libervia's websocket mechanism
         """
 
         web_resource.Resource.__init__(self)
@@ -130,6 +141,7 @@
         if access not in (C.PAGES_ACCESS_PUBLIC, C.PAGES_ACCESS_PROFILE, C.PAGES_ACCESS_NONE):
             raise NotImplementedError(_(u"{} access is not implemented yet").format(access))
         self.access = access
+        self.dynamic = dynamic
         if redirect is not None:
             # only page access and name make sense in case of full redirection
             # so we check that rendering methods/values are not set
@@ -145,6 +157,8 @@
         self.template = template
         self.render_method = render
         self.on_data_post = on_data_post
+        self.on_data = on_data
+        self.on_signal = on_signal
         if access == C.PAGES_ACCESS_NONE:
             # none pages just return a 404, no further check is needed
             return
@@ -200,11 +214,15 @@
                     name=page_data.get('name'),
                     redirect=page_data.get('redirect'),
                     access=page_data.get('access'),
+                    dynamic=page_data.get('dynamic', False),
                     parse_url=page_data.get('parse_url'),
                     prepare_render=page_data.get('prepare_render'),
                     render=page_data.get('render'),
                     template=page_data.get('template'),
-                    on_data_post=page_data.get('on_data_post'))
+                    on_data_post=page_data.get('on_data_post'),
+                    on_data=page_data.get('on_data'),
+                    on_signal=page_data.get('on_signal'),
+                    )
                 parent.putChild(d, resource)
                 log.info(u"Added /{path} page".format(path=u'[...]/'.join(new_path)))
                 if 'uri_handlers' in page_data:
@@ -258,6 +276,29 @@
         cls.uri_callbacks[uri_tuple] = {u'callback': get_uri_cb,
                                         u'pre_path': pre_path}
 
+    def registerSignal(self, request, signal, check_profile=True):
+        r"""register a signal handler
+
+        the page must be dynamic
+        when signal is received, self.on_signal will be called with:
+            - request
+            - signal name
+            - signal arguments
+        signal handler will be removed when connection with dynamic page will be lost
+        @param signal(unicode): name of the signal
+            last arg of signal must be profile, as it will be checked to filter signals
+        @param check_profile(bool): if True, signal profile (which MUST be last arg) will be
+            checked against session profile.
+            /!\ if False, profile will not be checked/filtered, be sure to know what you are doing
+                if you unset this option /!\
+        """
+        # FIXME: add a timeout, if socket is not opened before it, signal handler must be removed
+        if not self.dynamic:
+            log.error(_(u"You can't register signal if page is not dynamic"))
+            return
+        LiberviaPage.signals_handlers.setdefault(signal, {})[id(request)] = (self, request, check_profile)
+        request._signals_registered.append(signal)
+
     def getPagePathFromURI(self, uri):
         """Retrieve page URL from xmpp: URI
 
@@ -467,6 +508,61 @@
         else:
             cache.clear()
 
+    @classmethod
+    def onSignal(cls, host, signal, *args):
+        """Generic method which receive registered signals
+
+        if a callback is registered for this signal, call it
+        @param host: Libervia instance
+        @param signal(unicode): name of the signal
+        @param *args: args of the signals
+        """
+        for page, request, check_profile in cls.signals_handlers.get(signal, {}).itervalues():
+            if check_profile:
+                signal_profile = args[-1]
+                request_profile = page.getProfile(request)
+                if not request_profile:
+                    # if you want to use signal without session, unset check_profile
+                    # (be sure to know what you are doing)
+                    log.error(_(u"no session started, signal can't be checked"))
+                    continue
+                if signal_profile != request_profile:
+                    # we ignore the signal, it's not for our profile
+                    continue
+            if request._signals_cache is not None:
+                # socket is not yet opened, we cache the signal
+                request._signals_cache.append((request, signal, args))
+                log.debug(u"signal [{signal}] cached: {args}".format(
+                    signal = signal,
+                    args = args))
+            else:
+                page.on_signal(page, request, signal, *args)
+
+    def onSocketOpen(self, request):
+        """Called for dynamic pages when socket has just been opened
+
+        we send all cached signals
+        """
+        assert request._signals_cache is not None
+        cache = request._signals_cache
+        request._signals_cache = None
+        for request, signal, args in cache:
+            self.on_signal(self, request, signal, *args)
+
+    def onSocketClose(self, request):
+        """Called for dynamic pages when socket has just been closed
+
+        we remove signal handler
+        """
+        for signal in request._signals_registered:
+            try:
+                del LiberviaPage.signals_handlers[signal][id(request)]
+            except KeyError:
+                log.error(_(u"Can't find signal handler for [{signal}], this should not happen").format(
+                    signal = signal))
+            else:
+                log.debug(_(u"Removed signal handler"))
+
     def HTTPRedirect(self, request, url):
         """redirect to an URL using HTTP redirection
 
@@ -604,6 +700,7 @@
             self.template,
             root_path = '/templates/',
             media_path = '/' + C.MEDIA_DIR,
+            cache_path = session_data.cache_dir,
             main_menu = LiberviaPage.main_menu,
             **template_data)
 
@@ -765,6 +862,48 @@
 
         return data
 
+    def renderPartial(self, request, template, template_data):
+        """Render a template to be inserted in dynamic page
+
+        this is NOT the normal page rendering method, it is used only to update
+        dynamic pages
+        @param template(unicode): path of the template to render
+        @param template_data(dict): template_data to use
+        """
+        if not self.dynamic:
+            raise exceptions.InternalError(_(u"renderPartial must only be used with dynamic pages"))
+        session_data = self.host.getSessionData(request, session_iface.ISATSession)
+
+        return self.host.renderer.render(
+            template,
+            root_path = '/templates/',
+            media_path = '/' + C.MEDIA_DIR,
+            cache_path = session_data.cache_dir,
+            main_menu = LiberviaPage.main_menu,
+            **template_data)
+
+    def renderAndUpdate(self, request, template, selectors, template_data_update, update_type="append"):
+        """Helper method to render a partial page element and update the page
+
+        this is NOT the normal page rendering method, it is used only to update
+        dynamic pages
+        @param request(server.Request): current HTTP request
+        @param template: same as for [renderPartial]
+        @param selectors: CSS selectors to use
+        @param template_data_update: template data to use
+            template data cached in request will be copied then updated
+            with this data
+        @parap update_type(unicode): one of:
+            append: append rendered element to selected element
+        """
+        template_data = request.template_data.copy()
+        template_data.update(template_data_update)
+        html = self.renderPartial(request, template, template_data)
+        request.sendData(u'dom',
+                        selectors=selectors,
+                        update_type=update_type,
+                        html=html)
+
     def renderPage(self, request, skip_parse_url=False):
         """Main method to handle the workflow of a LiberviaPage"""
 
@@ -774,6 +913,18 @@
             csrf_token = session_data.csrf_token
             request.template_data = {u'profile': session_data.profile,
                                      u'csrf_token': csrf_token}
+            if self.dynamic:
+                # we need to activate dynamic page
+                # we set data for template, and create/register token
+                socket_token = unicode(uuid.uuid4())
+                socket_url = self.host.getWebsocketURL(request)
+                socket_debug = C.boolConst(self.host.debug)
+                request.template_data['websocket'] = WebsocketMeta(socket_url, socket_token, socket_debug)
+                self.host.registerWSToken(socket_token, self, request)
+                # we will keep track of handlers to remove
+                request._signals_registered = []
+                # we will cache registered signals until socket is opened
+                request._signals_cache = []
 
             # XXX: here is the code which need to be executed once
             #      at the beginning of the request hanling
--- a/src/server/server.py	Wed Dec 13 00:37:12 2017 +0100
+++ b/src/server/server.py	Wed Jan 03 01:10:12 2018 +0100
@@ -51,6 +51,7 @@
 import urllib
 from httplib import HTTPS_PORT
 import libervia
+from libervia.server import websockets
 from libervia.server.pages import LiberviaPage
 from libervia.server.utils import quote
 from functools import partial
@@ -830,7 +831,7 @@
             defer.returnValue(avatar)
         else:
             filename = os.path.basename(avatar)
-            avatar_url = os.path.join(C.CACHE_DIR, session_data.uuid, filename)
+            avatar_url = os.path.join(session_data.cache_dir, filename)
             defer.returnValue(avatar_url)
 
     def jsonrpc_getAccountDialogUI(self):
@@ -1364,6 +1365,7 @@
 
 
 class Libervia(service.Service):
+    debug = defer.Deferred.debug  # True if twistd/Libervia is launched in debug mode
 
     def __init__(self, options):
         self.options = options
@@ -1452,9 +1454,20 @@
         self.putChild('blog', MicroBlog(self))
         self.putChild(C.THEMES_URL, ProtectedFile(self.themes_dir))
 
+        # websocket
+        if self.options['connection_type'] in ('https', 'both'):
+            wss = websockets.LiberviaPageWSProtocol.getResource(self, secure=True)
+            self.putChild('wss', wss)
+        if self.options['connection_type'] in ('http', 'both'):
+            ws = websockets.LiberviaPageWSProtocol.getResource(self, secure=False)
+            self.putChild('ws', ws)
+
+        # Libervia pages
         LiberviaPage.importPages(self)
         LiberviaPage.setMenu(self.options['menu_json'])
+        ## following signal is needed for cache handling in Libervia pages
         self.bridge.register_signal("psEventRaw", partial(LiberviaPage.onNodeEvent, self), "plugin")
+        self.bridge.register_signal("messageNew", partial(LiberviaPage.onSignal, self, "messageNew"))
 
         # media dirs
         # FIXME: get rid of dirname and "/" in C.XXX_DIR
@@ -1794,6 +1807,31 @@
         else:
             return (iface(session) for iface in args)
 
+    ## Websocket (dynamic pages) ##
+
+    def getWebsocketURL(self, request):
+        if request.isSecure():
+            ws = 'wss'
+        else:
+            ws = 'ws'
+
+        if self.base_url_ext:
+            base_url = self.base_url_ext
+        else:
+            o = self.options
+            if request.isSecure():
+                port = o['port_https_ext'] or o['port_https']
+            else:
+                port = o['port']
+            base_url = request.getRequestHostname().decode('utf-8') + u':' + unicode(port)+ u'/'
+
+        return u'{ws}://{base_url}{ws}'.format(
+            ws = ws,
+            base_url = base_url)
+
+    def registerWSToken(self, token, page, request):
+        websockets.LiberviaPageWSProtocol.registerToken(token, page, request)
+
     ## TLS related methods ##
 
     def _TLSOptionsCheck(self):
--- a/src/server/session_iface.py	Wed Dec 13 00:37:12 2017 +0100
+++ b/src/server/session_iface.py	Wed Jan 03 01:10:12 2018 +0100
@@ -18,6 +18,8 @@
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 from zope.interface import Interface, Attribute, implements
 from sat.tools.common import data_objects
+from libervia.server.constants import Const as C
+import os.path
 import shortuuid
 
 FLAGS_KEY = '_flags'
@@ -40,6 +42,10 @@
         self.csrf_token = unicode(shortuuid.uuid())
         self.pages_data = {}  # used to keep data accross reloads (key is page instance)
 
+    @property
+    def cache_dir(self):
+        return os.path.join(u'/', C.CACHE_DIR, self.uuid) + u'/'
+
     def getPageData(self, page, key):
         """get session data for a page
 
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/server/websockets.py	Wed Jan 03 01:10:12 2018 +0100
@@ -0,0 +1,124 @@
+#!/usr/bin/python
+# -*- coding: utf-8 -*-
+
+# Libervia: a Salut à Toi frontend
+# Copyright (C) 2011-2017 Jérôme Poisson <goffi@goffi.org>
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+from sat.core.i18n import _
+from sat.core.log import getLogger
+log = getLogger(__name__)
+from sat.core import exceptions
+
+from autobahn.twisted import websocket
+from autobahn.twisted import resource as resource
+from autobahn.websocket import types
+
+import json
+
+LIBERVIA_PROTOCOL = 'libervia_page'
+
+
+class LiberviaPageWSProtocol(websocket.WebSocketServerProtocol):
+    host = None
+    tokens_map = {}
+
+    def onConnect(self, request):
+        prefix = LIBERVIA_PROTOCOL + u'_'
+        for protocol in request.protocols:
+            if protocol.startswith(prefix):
+                token = protocol[len(prefix):].strip()
+                if token:
+                    break
+        else:
+            raise types.ConnectionDeny(types.ConnectionDeny.NOT_IMPLEMENTED,
+                                       u"Can't use this subprotocol")
+
+        if token not in self.tokens_map:
+            log.warning(_(u"Can't activate page socket: unknown token"))
+            raise types.ConnectionDeny(types.ConnectionDeny.FORBIDDEN,
+                                       u"Bad token, please reload page")
+        self.token = token
+        self.page = self.tokens_map[token]['page']
+        self.request = self.tokens_map[token]['request']
+        return protocol
+
+    def onOpen(self):
+        log.debug(_(u"Websocket opened for {page} (token: {token})".format(
+            page = self.page,
+            token = self.token)))
+        self.request.sendData = self.sendJSONData
+        self.page.onSocketOpen(self.request)
+
+    def onMessage(self, payload, isBinary):
+        try:
+            data_json = json.loads(payload.decode('utf8'))
+        except ValueError as e:
+            log.warning(_(u"Not valid JSON, ignoring data: {msg}\n{data}").format(msg=e, data=payload))
+            return
+        # we request page first, to raise an AttributeError
+        # if it is not set (which should never happen)
+        page = self.page
+        try:
+            cb = page.on_data
+        except AttributeError:
+            log.warning(_(u'No "on_data" method set on dynamic page, ignoring data:\n{data}').format(data=data_json))
+        else:
+            cb(page, self.request, data_json)
+
+    def onClose(self, wasClean, code, reason):
+        try:
+            token = self.token
+        except AttributeError:
+            log.warning(_(u'Websocket closed but no token is associated'))
+            return
+
+        self.page.onSocketClose(self.request)
+
+        try:
+            del self.tokens_map[token]
+            del self.request.sendData
+        except (KeyError, AttributeError):
+            raise exceptions.InternalError(_(u"Token or sendData doesn't exist, this should never happen!"))
+        log.debug(_(u"Websocket closed for {page} (token: {token}). {reason}".format(
+            page = self.page,
+            token = self.token,
+            reason = u'' if wasClean else _(u'Reason: {reason}').format(reason=reason))))
+
+    def sendJSONData(self, type_, **data):
+        assert 'type' not in data
+        data['type'] = type_
+        self.sendMessage(json.dumps(data, ensure_ascii = False).encode('utf8'))
+
+    @classmethod
+    def getBaseURL(cls, host, secure):
+        return u"ws{sec}://localhost:{port}".format(
+               sec='s' if secure else '',
+               port=cls.host.options['port_https' if secure else 'port'])
+
+    @classmethod
+    def getResource(cls, host, secure):
+        if cls.host is None:
+            cls.host = host
+        factory = websocket.WebSocketServerFactory(cls.getBaseURL(host, secure))
+        factory.protocol = cls
+        return resource.WebSocketResource(factory)
+
+    @classmethod
+    def registerToken(cls, token, page, request):
+        if token in cls.tokens_map:
+            raise exceptions.ConflictError(_(u'This token is already registered'))
+        cls.tokens_map[token] = {'page': page,
+                                 'request': request}