Mercurial > libervia-backend
view sat/plugins/plugin_xep_0080.py @ 3981:acc9dfc8ba8d
component AP gateway: parse body immediately on `POST` request:
the body is parsed immediately during a `POST` request: this avoids duplication of code,
and allows to check the body data before continuing (will be used to filter some requests
in a future patch).
author | Goffi <goffi@goffi.org> |
---|---|
date | Tue, 15 Nov 2022 18:07:34 +0100 |
parents | ed470361ea2e |
children | 524856bd7b19 |
line wrap: on
line source
#!/usr/bin/env python3 # Copyright (C) 2009-2022 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from typing import Dict, Any from twisted.words.xish import domish from sat.core.constants import Const as C from sat.core.i18n import _ from sat.core.log import getLogger from sat.core import exceptions from sat.tools import utils log = getLogger(__name__) PLUGIN_INFO = { C.PI_NAME: "User Location", C.PI_IMPORT_NAME: "XEP-0080", C.PI_TYPE: "XEP", C.PI_MODES: C.PLUG_MODE_BOTH, C.PI_PROTOCOLS: ["XEP-0080"], C.PI_MAIN: "XEP_0080", C.PI_HANDLER: "no", C.PI_DESCRIPTION: _("""Implementation of XEP-0080 (User Location)"""), } NS_GEOLOC = "http://jabber.org/protocol/geoloc" KEYS_TYPES = { "accuracy": float, "alt": float, "altaccuracy": float, "area": str, "bearing": float, "building": str, "country": str, "countrycode": str, "datum": str, "description": str, "error": float, "floor": str, "lat": float, "locality": str, "lon": float, "postalcode": str, "region": str, "room": str, "speed": float, "street": str, "text": str, "timestamp": "datetime", "tzo": str, "uri": str } class XEP_0080: def __init__(self, host): log.info(_("XEP-0080 (User Location) plugin initialization")) host.registerNamespace("geoloc", NS_GEOLOC) def get_geoloc_elt( self, location_data: Dict[str, Any], ) -> domish.Element: """Generate the element describing the location @param geoloc: metadata description the location Keys correspond to ones found at https://xmpp.org/extensions/xep-0080.html#format, with following additional keys: - id (str): Identifier for this location - language (str): language of the human readable texts All keys are optional. @return: ``<geoloc/>`` element """ geoloc_elt = domish.Element((NS_GEOLOC, "geoloc")) for key, value in location_data.items(): try: key_type = KEYS_TYPES[key] except KeyError: if key == "id": # "id" attribute is not specified for XEP-0080's <geoloc/> element, # but it may be used in a parent element (that's the case for events) pass elif key == "language": geoloc_elt["xml:lang"] = value else: log.warning(f"Unknown location key {key}: {location_data}") continue if key_type == "datetime": content = utils.xmpp_date(value) else: content = str(value) geoloc_elt.addElement(key, content=content) return geoloc_elt def parse_geoloc_elt( self, geoloc_elt: domish.Element ) -> Dict[str, Any]: """Parse <geoloc/> element @param geoloc_elt: <geoloc/> element a parent element can also be used @return: geoloc data. It's a dict whose keys correspond to [get_geoloc_elt] parameters @raise exceptions.NotFound: no <geoloc/> element has been found """ if geoloc_elt.name != "geoloc": try: geoloc_elt = next(geoloc_elt.elements(NS_GEOLOC, "geoloc")) except StopIteration: raise exceptions.NotFound data: Dict[str, Any] = {} for elt in geoloc_elt.elements(): if elt.uri != NS_GEOLOC: log.warning(f"unmanaged geoloc element: {elt.toXml()}") continue try: data_type = KEYS_TYPES[elt.name] except KeyError: log.warning(f"unknown geoloc element: {elt.toXml()}") continue try: if data_type == "datetime": data[elt.name] = utils.parse_xmpp_date(str(elt)) else: data[elt.name] = data_type(str(elt)) except Exception as e: log.warning(f"can't parse element: {elt.toXml()}") continue return data