comparison sat/plugins/plugin_comp_ap_gateway.py @ 3728:b15644cae50d

component AP gateway: JID/node ⟺ AP outbox conversion: - convert a combination of JID and optional pubsub node to AP actor handle (see `getJIDAndNode` for details) and vice versa - the gateway now provides a Pubsub service - retrieve pubsub node and convert it to AP collection, AP pagination is converted to RSM - do the opposite: convert AP collection to pubsub and handle RSM request. Due to ActivityStream collection pagination limitations, some RSM request produce inefficient requests, but caching should be used most of the time in the future and avoid the problem. - set specific name to HTTP Server - new `local_only` setting (`True` by default) to indicate if the gateway can request or not XMPP Pubsub nodes from other servers - disco info now specifies important features such as Pubsub RSM, and nodes metadata ticket 363
author Goffi <goffi@goffi.org>
date Tue, 25 Jan 2022 17:54:06 +0100
parents 8353cc3b8db9
children
comparison
equal deleted inserted replaced
3727:a6dfd3db372b 3728:b15644cae50d
14 # GNU Affero General Public License for more details. 14 # GNU Affero General Public License for more details.
15 15
16 # You should have received a copy of the GNU Affero General Public License 16 # You should have received a copy of the GNU Affero General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. 17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 18
19 import time
20 import json
21 import base64 19 import base64
22 import hashlib 20 import hashlib
21 import json
22 from pathlib import Path
23 import time
24 from typing import Optional, Dict, Tuple, List, Union
23 from urllib import parse 25 from urllib import parse
24 from typing import Tuple 26 import calendar
25 from pathlib import Path 27 import re
26 import shortuuid 28 import unicodedata
27 from cryptography.hazmat.primitives.asymmetric import rsa 29
30 import dateutil
28 from cryptography.hazmat.primitives import serialization 31 from cryptography.hazmat.primitives import serialization
29 from cryptography.hazmat.primitives import hashes 32 from cryptography.hazmat.primitives import hashes
33 from cryptography.hazmat.primitives.asymmetric import rsa
30 from cryptography.hazmat.primitives.asymmetric import padding 34 from cryptography.hazmat.primitives.asymmetric import padding
31 from twisted.internet import reactor, threads, defer 35 import shortuuid
32 from twisted.web import server, resource as web_resource, http
33 from twisted.words.protocols.jabber import jid
34 import treq 36 import treq
35 from treq.response import _Response as TReqResponse 37 from treq.response import _Response as TReqResponse
38 from twisted.internet import defer, reactor, threads
39 from twisted.web import http, resource as web_resource, server
40 from twisted.words.protocols.jabber import jid, error
41 from twisted.words.xish import domish
42 from wokkel import pubsub, rsm
43
44 from sat.core import exceptions
45 from sat.core.constants import Const as C
46 from sat.core.core_types import SatXMPPEntity
36 from sat.core.i18n import _ 47 from sat.core.i18n import _
37 from sat.core.constants import Const as C
38 from sat.core import exceptions
39 from sat.core.log import getLogger 48 from sat.core.log import getLogger
40 from sat.core.core_types import SatXMPPEntity
41 from sat.tools.common import tls, data_format
42 from sat.tools import utils 49 from sat.tools import utils
50 from sat.tools.common import data_format, tls
51 from sat.tools.common.async_utils import async_lru
52 from sat.tools.utils import ensure_deferred
43 53
44 54
45 log = getLogger(__name__) 55 log = getLogger(__name__)
46 56
47 IMPORT_NAME = "ap-gateway" 57 IMPORT_NAME = "ap-gateway"
50 C.PI_NAME: "ActivityPub Gateway component", 60 C.PI_NAME: "ActivityPub Gateway component",
51 C.PI_IMPORT_NAME: IMPORT_NAME, 61 C.PI_IMPORT_NAME: IMPORT_NAME,
52 C.PI_MODES: [C.PLUG_MODE_COMPONENT], 62 C.PI_MODES: [C.PLUG_MODE_COMPONENT],
53 C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT, 63 C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT,
54 C.PI_PROTOCOLS: [], 64 C.PI_PROTOCOLS: [],
55 C.PI_DEPENDENCIES: ["XEP-0106"], 65 C.PI_DEPENDENCIES: ["XEP-0106", "XEP-0277", "XEP-0060"],
56 C.PI_RECOMMENDATIONS: [], 66 C.PI_RECOMMENDATIONS: [],
57 C.PI_MAIN: "APGateway", 67 C.PI_MAIN: "APGateway",
58 C.PI_HANDLER: C.BOOL_FALSE, 68 C.PI_HANDLER: C.BOOL_TRUE,
59 C.PI_DESCRIPTION: _( 69 C.PI_DESCRIPTION: _(
60 "Gateway for bidirectional communication between XMPP and ActivityPub." 70 "Gateway for bidirectional communication between XMPP and ActivityPub."
61 ), 71 ),
62 } 72 }
63 73
74 VERSION = unicodedata.normalize(
75 'NFKD',
76 f"{C.APP_NAME} ActivityPub Gateway {C.APP_VERSION}"
77 )
78 HEXA_ENC = r"(?P<hex>[0-9a-fA-f]{2})"
79 RE_PERIOD_ENC = re.compile(f"\\.{HEXA_ENC}")
80 RE_PERCENT_ENC = re.compile(f"%{HEXA_ENC}")
81 RE_ALLOWED_UNQUOTED = re.compile(r"^[a-zA-Z0-9_-]+$")
64 CONF_SECTION = f"component {IMPORT_NAME}" 82 CONF_SECTION = f"component {IMPORT_NAME}"
65 CONTENT_TYPE_AP = "application/activity+json; charset=utf-8" 83 CONTENT_TYPE_AP = "application/activity+json; charset=utf-8"
66 TYPE_ACTOR = "actor" 84 TYPE_ACTOR = "actor"
67 TYPE_INBOX = "inbox" 85 TYPE_INBOX = "inbox"
86 TYPE_OUTBOX = "outbox"
68 TYPE_ITEM = "item" 87 TYPE_ITEM = "item"
69 MEDIA_TYPE_AP = "application/activity+json" 88 MEDIA_TYPE_AP = "application/activity+json"
89 # mapping from AP metadata to microblog data
90 AP_MB_MAP = {
91 "content": "content_xhtml",
92
93 }
94 AP_REQUEST_TYPES = {"actor", "outbox"}
95 PAGE_SIZE = 10
96
97 LRU_MAX_SIZE = 200
70 98
71 99
72 class HTTPAPGServer(web_resource.Resource): 100 class HTTPAPGServer(web_resource.Resource):
73 """HTTP Server handling ActivityPub S2S protocol""" 101 """HTTP Server handling ActivityPub S2S protocol"""
74 isLeaf = True 102 isLeaf = True
75 103
76 def __init__(self, ap_gateway): 104 def __init__(self, ap_gateway):
77 self.apg = ap_gateway 105 self.apg = ap_gateway
78 super().__init__() 106 super().__init__()
79 107
80 def webfinger(self, request): 108 async def webfinger(self, request):
81 url_parsed = parse.urlparse(request.uri.decode()) 109 url_parsed = parse.urlparse(request.uri.decode())
82 query = parse.parse_qs(url_parsed.query) 110 query = parse.parse_qs(url_parsed.query)
83 resource = query.get("resource", [""])[0] 111 resource = query.get("resource", [""])[0]
84 account = resource[5:].strip() 112 account = resource[5:].strip()
85 log.info(f"request pour {account}")
86 if not resource.startswith("acct:") or not account: 113 if not resource.startswith("acct:") or not account:
87 return web_resource.ErrorPage( 114 return web_resource.ErrorPage(
88 http.BAD_REQUEST, "Bad Request" , "Invalid webfinger resource" 115 http.BAD_REQUEST, "Bad Request" , "Invalid webfinger resource"
89 ).render(request) 116 ).render(request)
90 117
99 "href": actor_url 126 "href": actor_url
100 } 127 }
101 ] 128 ]
102 } 129 }
103 request.setHeader("content-type", CONTENT_TYPE_AP) 130 request.setHeader("content-type", CONTENT_TYPE_AP)
104 return json.dumps(resp).encode() 131 request.write(json.dumps(resp).encode())
105 132 request.finish()
106 def APRequest(self, request): 133
107 path = request.path.decode() 134 async def APActorRequest(
108 actor_url = parse.urljoin( 135 self,
109 f"https://{self.apg.public_url}", 136 request: "HTTPRequest",
110 path 137 account_jid: jid.JID,
111 ) 138 node: Optional[str],
112 __, account = self.apg.parseAPURL(actor_url) 139 ap_account: str,
113 inbox_url = self.apg.buildAPURL(TYPE_INBOX, account) 140 actor_url: str
114 username = account.split("@", 1)[0] 141 ) -> dict:
115 actor = { 142 inbox_url = self.apg.buildAPURL(TYPE_INBOX, ap_account)
143 outbox_url = self.apg.buildAPURL(TYPE_OUTBOX, ap_account)
144
145 # we have to use AP account as preferredUsername because it is used to retrieve
146 # actor handle (see https://socialhub.activitypub.rocks/t/how-to-retrieve-user-server-tld-handle-from-actors-url/2196)
147 preferred_username = ap_account.split("@", 1)[0]
148 return {
116 "@context": [ 149 "@context": [
117 "https://www.w3.org/ns/activitystreams", 150 "https://www.w3.org/ns/activitystreams",
118 "https://w3id.org/security/v1" 151 "https://w3id.org/security/v1"
119 ], 152 ],
120 153
121 "id": actor_url, 154 "id": actor_url,
122 "type": "Person", 155 "type": "Person",
123 "preferredUsername": username, 156 "preferredUsername": preferred_username,
124 "inbox": inbox_url, 157 "inbox": inbox_url,
125 158 "outbox": outbox_url,
126 "publicKey": { 159 "publicKey": {
127 "id": f"{actor_url}#main-key", 160 "id": f"{actor_url}#main-key",
128 "owner": actor_url, 161 "owner": actor_url,
129 "publicKeyPem": self.apg.public_key_pem 162 "publicKeyPem": self.apg.public_key_pem
130 } 163 }
131 } 164 }
165
166 def getCanonicalURL(self, request: "HTTPRequest") -> str:
167 return parse.urljoin(
168 f"https://{self.apg.public_url}",
169 request.path.decode().rstrip("/")
170 )
171
172 def queryData2RSMRequest(
173 self,
174 query_data: Dict[str, List[str]]
175 ) -> rsm.RSMRequest:
176 """Get RSM kwargs to use with RSMRequest from query data"""
177 page = query_data.get("page")
178
179 if page == ["first"]:
180 return rsm.RSMRequest(max_=PAGE_SIZE, before="")
181 elif page == ["last"]:
182 return rsm.RSMRequest(max_=PAGE_SIZE)
183 else:
184 for query_key in ("index", "before", "after"):
185 try:
186 kwargs={query_key: query_data[query_key][0], "max_": PAGE_SIZE}
187 except (KeyError, IndexError, ValueError):
188 pass
189 else:
190 return rsm.RSMRequest(**kwargs)
191 raise ValueError(f"Invalid query data: {query_data!r}")
192
193 async def APOutboxPageRequest(
194 self,
195 request: "HTTPRequest",
196 account_jid: jid.JID,
197 node: Optional[str],
198 ap_account: str,
199 actor_url: str,
200 query_data: Dict[str, List[str]]
201 ) -> dict:
202 # we only keep useful keys, and sort to have consistent URL which can
203 # be used as ID
204 url_keys = sorted(set(query_data) & {"page", "index", "before", "after"})
205 query_data = {k: query_data[k] for k in url_keys}
206 rsm_kwargs = self.queryData2RSMRequest(query_data)
207 try:
208 items, metadata = await self.apg._p.getItems(
209 client=self.apg.client,
210 service=account_jid,
211 node=node,
212 rsm_request=self.queryData2RSMRequest(query_data),
213 extra = {C.KEY_USE_CACHE: False}
214 )
215 except error.StanzaError as e:
216 log.warning(f"Can't get data from pubsub node {node} at {account_jid}: {e}")
217 return {}
218
219 base_url = self.getCanonicalURL(request)
220 url = f"{base_url}?{parse.urlencode(query_data, True)}"
221 data = {
222 "@context": "https://www.w3.org/ns/activitystreams",
223 "id": url,
224 "type": "OrderedCollectionPage",
225 "partOf": base_url,
226 "orderedItems" : [
227 await self.apg.mbdata2APitem(
228 self.apg.client,
229 await self.apg._m.item2mbdata(
230 self.apg.client,
231 item,
232 account_jid,
233 node
234 )
235 )
236 for item in reversed(items)
237 ]
238 }
239
240 # AP OrderedCollection must be in reversed chronological order, thus the opposite
241 # of what we get with RSM (at least with Libervia Pubsub)
242 if not metadata["complete"]:
243 try:
244 last= metadata["rsm"]["last"]
245 except KeyError:
246 last = None
247 data["prev"] = f"{base_url}?{parse.urlencode({'after': last})}"
248 if metadata["rsm"]["index"] != 0:
249 try:
250 first= metadata["rsm"]["first"]
251 except KeyError:
252 first = None
253 data["next"] = f"{base_url}?{parse.urlencode({'before': first})}"
254
255 return data
256
257 async def APOutboxRequest(
258 self,
259 request: "HTTPRequest",
260 account_jid: jid.JID,
261 node: Optional[str],
262 ap_account: str,
263 actor_url: str
264 ) -> dict:
265 if node is None:
266 node = self.apg._m.namespace
267
268 parsed_url = parse.urlparse(request.uri.decode())
269 query_data = parse.parse_qs(parsed_url.query)
270 if query_data:
271 return await self.APOutboxPageRequest(
272 request, account_jid, node, ap_account, actor_url, query_data
273 )
274
275 # XXX: we can't use disco#info here because this request won't work on a bare jid
276 # due to security considerations of XEP-0030 (we don't have presence
277 # subscription).
278 # The current workaround is to do a request as if RSM was available, and actually
279 # check its availability according to result.
280 try:
281 __, metadata = await self.apg._p.getItems(
282 client=self.apg.client,
283 service=account_jid,
284 node=node,
285 max_items=0,
286 rsm_request=rsm.RSMRequest(max_=0)
287 )
288 except error.StanzaError as e:
289 log.warning(f"Can't get data from pubsub node {node} at {account_jid}: {e}")
290 return {}
291 try:
292 items_count = metadata["rsm"]["count"]
293 except KeyError:
294 log.warning(
295 f"No RSM metadata found when requesting pubsub node {node} at "
296 f"{account_jid}, defaulting to items_count=20"
297 )
298 items_count = 20
299
300 url = self.getCanonicalURL(request)
301 url_first_page = f"{url}?{parse.urlencode({'page': 'first'})}"
302 url_last_page = f"{url}?{parse.urlencode({'page': 'last'})}"
303 return {
304 "@context": "https://www.w3.org/ns/activitystreams",
305 "id": url,
306 "totalItems": items_count,
307 "type": "OrderedCollection",
308 "first": url_first_page,
309 "last": url_last_page,
310 }
311
312 async def APRequest(self, request):
313 path = request.path.decode()
314 actor_url = parse.urljoin(
315 f"https://{self.apg.public_url}",
316 path
317 )
318 request_type, ap_account = self.apg.parseAPURL(actor_url)
319 account_jid, node = await self.apg.getJIDAndNode(ap_account)
320 if request_type not in AP_REQUEST_TYPES:
321 raise exceptions.DataError(f"Invalid request type: {request_type!r}")
322 method = getattr(self, f"AP{request_type.title()}Request")
323 ret_data = await method(request, account_jid, node, ap_account, actor_url)
132 request.setHeader("content-type", CONTENT_TYPE_AP) 324 request.setHeader("content-type", CONTENT_TYPE_AP)
133 return json.dumps(actor).encode() 325 request.write(json.dumps(ret_data).encode())
326 request.finish()
327
328 def render(self, request):
329 request.setHeader("server", VERSION)
330 return super().render(request)
134 331
135 def render_GET(self, request): 332 def render_GET(self, request):
136 path = request.path.decode().lstrip("/") 333 path = request.path.decode().lstrip("/")
137 if path.startswith(".well-known/webfinger"): 334 if path.startswith(".well-known/webfinger"):
138 return self.webfinger(request) 335 defer.ensureDeferred(self.webfinger(request))
336 return server.NOT_DONE_YET
139 elif path.startswith(self.apg.ap_path): 337 elif path.startswith(self.apg.ap_path):
140 return self.APRequest(request) 338 defer.ensureDeferred(self.APRequest(request))
339 return server.NOT_DONE_YET
340
141 return web_resource.NoResource().render(request) 341 return web_resource.NoResource().render(request)
142 342
143 343
144 class HTTPRequest(server.Request): 344 class HTTPRequest(server.Request):
145 pass 345 pass
155 class APGateway: 355 class APGateway:
156 356
157 def __init__(self, host): 357 def __init__(self, host):
158 self.host = host 358 self.host = host
159 self.initialised = False 359 self.initialised = False
360 self._m = host.plugins["XEP-0277"]
361 self._p = host.plugins["XEP-0060"]
160 362
161 host.bridge.addMethod( 363 host.bridge.addMethod(
162 "APSend", 364 "APSend",
163 ".plugin", 365 ".plugin",
164 in_sign="sss", 366 in_sign="sss",
165 out_sign="", 367 out_sign="",
166 method=self._publishMessage, 368 method=self._publishMessage,
167 async_=True, 369 async_=True,
168 ) 370 )
169 371
372 def getHandler(self, __):
373 return APPubsubService(self)
374
170 async def init(self, client): 375 async def init(self, client):
171 if self.initialised: 376 if self.initialised:
172 return 377 return
173 378
174 self.initialised = True 379 self.initialised = True
202 self.public_key_pem = self.public_key.public_bytes( 407 self.public_key_pem = self.public_key.public_bytes(
203 encoding=serialization.Encoding.PEM, 408 encoding=serialization.Encoding.PEM,
204 format=serialization.PublicFormat.SubjectPublicKeyInfo 409 format=serialization.PublicFormat.SubjectPublicKeyInfo
205 ).decode() 410 ).decode()
206 411
207 # params (URL and port) 412 # params
413 # URL and port
208 self.public_url = self.host.memory.getConfig( 414 self.public_url = self.host.memory.getConfig(
209 CONF_SECTION, "public_url" 415 CONF_SECTION, "public_url"
210 ) or self.host.memory.getConfig( 416 ) or self.host.memory.getConfig(
211 CONF_SECTION, "xmpp_domain" 417 CONF_SECTION, "xmpp_domain"
212 ) 418 )
230 if connection_type not in ('http', 'https'): 436 if connection_type not in ('http', 'https'):
231 raise exceptions.ConfigError( 437 raise exceptions.ConfigError(
232 'bad ap-gateay http_connection_type, you must use one of "http" or ' 438 'bad ap-gateay http_connection_type, you must use one of "http" or '
233 '"https"' 439 '"https"'
234 ) 440 )
441 self.max_items = self.host.memory.getConfig(
442 CONF_SECTION, 'new_node_max_items', 50
443
444 )
235 self.ap_path = self.host.memory.getConfig(CONF_SECTION, 'ap_path', '_ap') 445 self.ap_path = self.host.memory.getConfig(CONF_SECTION, 'ap_path', '_ap')
236 self.base_ap_url = parse.urljoin(f"https://{self.public_url}", f"{self.ap_path}/") 446 self.base_ap_url = parse.urljoin(f"https://{self.public_url}", f"{self.ap_path}/")
447 # True (default) if we provide gateway only to entities/services from our server
448 self.local_only = C.bool(
449 self.host.memory.getConfig(CONF_SECTION, 'local_only', C.BOOL_TRUE)
450 )
237 451
238 # HTTP server launch 452 # HTTP server launch
239 self.server = HTTPServer(self) 453 self.server = HTTPServer(self)
240 if connection_type == 'http': 454 if connection_type == 'http':
241 reactor.listenTCP(self.http_port, self.server) 455 reactor.listenTCP(self.http_port, self.server)
245 tls.TLSOptionsCheck(options) 459 tls.TLSOptionsCheck(options)
246 context_factory = tls.getTLSContextFactory(options) 460 context_factory = tls.getTLSContextFactory(options)
247 reactor.listenSSL(self.http_port, self.server, context_factory) 461 reactor.listenSSL(self.http_port, self.server, context_factory)
248 462
249 async def profileConnecting(self, client): 463 async def profileConnecting(self, client):
464 self.client = client
250 await self.init(client) 465 await self.init(client)
466
467 async def apGet(self, url: str) -> dict:
468 """Retrieve AP JSON from given URL
469
470 @raise error.StanzaError: "service-unavailable" is sent when something went wrong
471 with AP server
472 """
473 try:
474 return await treq.json_content(await treq.get(
475 url,
476 headers = {
477 "Accept": [MEDIA_TYPE_AP],
478 "Content-Type": [MEDIA_TYPE_AP],
479 }
480 ))
481 except Exception as e:
482 raise error.StanzaError(
483 "service-unavailable",
484 text=f"Can't get AP data at {url}: {e}"
485 )
486
487 def mustEncode(self, text: str) -> bool:
488 """Indicate if a text must be period encoded"""
489 return (
490 not RE_ALLOWED_UNQUOTED.match(text)
491 or text.startswith("___")
492 or "---" in text
493 )
494
495 def periodEncode(self, text: str) -> str:
496 """Period encode a text
497
498 see [getJIDAndNode] for reasons of period encoding
499 """
500 return (
501 parse.quote(text, safe="")
502 .replace("---", "%2d%2d%2d")
503 .replace("___", "%5f%5f%5f")
504 .replace(".", "%2e")
505 .replace("~", "%7e")
506 .replace("%", ".")
507 )
508
509 async def getAPAccountFromJidAndNode(
510 self,
511 jid_: jid.JID,
512 node: Optional[str]
513 ) -> str:
514 """Construct AP account from JID and node
515
516 The account construction will use escaping when necessary
517 """
518 if not node or node == self._m.namespace:
519 node = None
520
521 if node and not jid_.user and not self.mustEncode(node):
522 is_pubsub = self.isPubsub(jid_)
523 # when we have a pubsub service, the user part can be used to set the node
524 # this produces more user-friendly AP accounts
525 if is_pubsub:
526 jid_.user = node
527 node = None
528
529 is_local = self.isLocal(jid_)
530 user = jid_.user if is_local else jid_.userhost()
531 if user is None:
532 user = ""
533 account_elts = []
534 if node and self.mustEncode(node) or self.mustEncode(user):
535 account_elts = ["___"]
536 if node:
537 node = self.periodEncode(node)
538 user = self.periodEncode(user)
539
540 if not user:
541 raise exceptions.InternalError("there should be a user part")
542
543 if node:
544 account_elts.extend((node, "---"))
545
546 account_elts.extend((
547 user, "@", jid_.host if is_local else self.client.jid.userhost()
548 ))
549 return "".join(account_elts)
550
551 def isLocal(self, jid_: jid.JID) -> bool:
552 """Returns True if jid_ use a domain or subdomain of gateway's host"""
553 local_host = self.client.host.split(".")
554 assert local_host
555 return jid_.host.split(".")[-len(local_host):] == local_host
556
557 async def isPubsub(self, jid_: jid.JID) -> bool:
558 """Indicate if a JID is a Pubsub service"""
559 host_disco = await self.host.getDiscoInfos(self.client, jid_)
560 return (
561 ("pubsub", "service") in host_disco.identities
562 and not ("pubsub", "pep") in host_disco.identities
563 )
564
565 async def getJIDAndNode(self, ap_account: str) -> Tuple[jid.JID, Optional[str]]:
566 """Decode raw AP account handle to get XMPP JID and Pubsub Node
567
568 Username are case insensitive.
569
570 By default, the username correspond to local username (i.e. username from
571 component's server).
572
573 If local name's domain is a pubsub service (and not PEP), the username is taken as
574 a pubsub node.
575
576 If ``---`` is present in username, the part before is used as pubsub node, and the
577 rest as a JID user part.
578
579 If username starts with ``___``, characters are encoded using period encoding
580 (i.e. percent encoding where a ``.`` is used instead of ``%``).
581
582 This horror is necessary due to limitation in some AP implementation (notably
583 Mastodon), cf. https://github.com/mastodon/mastodon/issues/17222
584
585 examples::
586
587 ``toto@example.org`` => JID = toto@example.org, node = None
588
589 ``___toto.40example.net@example.org`` => JID = toto@example.net (this one is a non-local JID, and will work only if setings ``local_only`` is False), node = None
590
591 ``toto@pubsub.example.org`` (with pubsub.example.org being a pubsub service) =>
592 JID = pubsub.example.org, node = toto
593
594 ``tata---toto@example.org`` => JID = toto@example.org, node = tata
595
596 ``___urn.3axmpp.3a.microblog.3a0@pubsub.example.org`` (with pubsub.example.org
597 being a pubsub service) ==> JID = pubsub.example.org, node = urn:xmpp:microblog:0
598
599 @param ap_account: ActivityPub account handle (``username@domain.tld``)
600 @return: service JID and pubsub node
601 if pubsub is None, default microblog pubsub node (and possibly other nodes
602 that plugins may hanlde) will be used
603 @raise ValueError: invalid account
604 @raise PermissionError: non local jid is used when gateway doesn't allow them
605 """
606 if ap_account.count("@") != 1:
607 raise ValueError("Invalid AP account")
608 if ap_account.startswith("___"):
609 encoded = True
610 ap_account = ap_account[3:]
611 else:
612 encoded = False
613
614 username, domain = ap_account.split("@")
615
616 if "---" in username:
617 node, username = username.rsplit("---", 1)
618 else:
619 node = None
620
621 if encoded:
622 username = parse.unquote(
623 RE_PERIOD_ENC.sub(r"%\g<hex>", username),
624 errors="strict"
625 )
626 if node:
627 node = parse.unquote(
628 RE_PERIOD_ENC.sub(r"%\g<hex>", node),
629 errors="strict"
630 )
631
632 if "@" in username:
633 username, domain = username.rsplit("@", 1)
634
635 if not node:
636 # we need to check host disco, because disco request to user may be
637 # blocked for privacy reason (see
638 # https://xmpp.org/extensions/xep-0030.html#security)
639 is_pubsub = await self.isPubsub(jid.JID(domain))
640
641 if is_pubsub:
642 # if the host is a pubsub service and not a PEP, we consider that username
643 # is in fact the node name
644 node = username
645 username = None
646
647 jid_s = f"{username}@{domain}" if username else domain
648 try:
649 jid_ = jid.JID(jid_s)
650 except RuntimeError:
651 raise ValueError(f"Invalid jid: {jid_s!r}")
652
653 if self.local_only and not self.isLocal(jid_):
654 raise exceptions.PermissionError(
655 "This gateway is configured to map only local entities and services"
656 )
657
658 return jid_, node
251 659
252 def parseAPURL(self, url: str) -> Tuple[str, str]: 660 def parseAPURL(self, url: str) -> Tuple[str, str]:
253 """Parse an URL leading to an AP endpoint 661 """Parse an URL leading to an AP endpoint
254 662
255 @param url: URL to parse (schema is not mandatory) 663 @param url: URL to parse (schema is not mandatory)
311 mess_data: dict = data_format.deserialise(mess_data_s) # type: ignore 719 mess_data: dict = data_format.deserialise(mess_data_s) # type: ignore
312 service = jid.JID(service_s) 720 service = jid.JID(service_s)
313 client = self.host.getClient(profile) 721 client = self.host.getClient(profile)
314 return defer.ensureDeferred(self.publishMessage(client, mess_data, service)) 722 return defer.ensureDeferred(self.publishMessage(client, mess_data, service))
315 723
316 async def getAPActorData(self, account: str) -> dict: 724 async def getAPActorIdFromAccount(self, account: str) -> str:
317 """Retrieve ActivityPub Actor data 725 """Retrieve account ID from it's handle using WebFinger
318 726
319 @param account: ActivityPub Actor identifier 727 @param account: AP handle (user@domain.tld)
728 @return: Actor ID (which is an URL)
320 """ 729 """
321 if account.count("@") != 1 or "/" in account: 730 if account.count("@") != 1 or "/" in account:
322 raise ValueError("Invalid account: {account!r}") 731 raise ValueError("Invalid account: {account!r}")
323 host = account.split("@")[1] 732 host = account.split("@")[1]
324 try: 733 try:
341 break 750 break
342 else: 751 else:
343 raise ValueError( 752 raise ValueError(
344 f"No ActivityPub link found for {account!r}" 753 f"No ActivityPub link found for {account!r}"
345 ) 754 )
755 return href
756
757 async def getAPActorDataFromId(self, account: str) -> dict:
758 """Retrieve ActivityPub Actor data
759
760 @param account: ActivityPub Actor identifier
761 """
762 href = await self.getAPActorIdFromAccount(account)
763 return await self.apGet(href)
764
765 @async_lru(maxsize=LRU_MAX_SIZE)
766 async def getAPAccountFromId(self, actor_id: str):
767 """Retrieve AP account from the ID URL
768
769 @param actor_id: AP ID of the actor (URL to the actor data)
770 """
771 url_parsed = parse.urlparse(actor_id)
772 actor_data = await self.apGet(actor_id)
773 username = actor_data.get("preferredUsername")
774 if not username:
775 raise exceptions.DataError(
776 'No "preferredUsername" field found, can\'t retrieve actor account'
777 )
778 account = f"{username}@{url_parsed.hostname}"
779 # we try to retrieve the actor ID from the account to check it
780 found_id = await self.getAPActorIdFromAccount(account)
781 if found_id != actor_id:
782 # cf. https://socialhub.activitypub.rocks/t/how-to-retrieve-user-server-tld-handle-from-actors-url/2196
783 msg = (
784 f"Account ID found on WebFinger {found_id!r} doesn't match our actor ID "
785 f"({actor_id!r}). This AP instance doesn't seems to use "
786 '"preferredUsername" as we expect.'
787 )
788 log.warning(msg)
789 raise exceptions.DataError(msg)
790 return account
791
792 async def getAPItems(
793 self,
794 account: str,
795 max_items: Optional[int] = None,
796 chronological_pagination: bool = True,
797 after_id: Optional[str] = None,
798 start_index: Optional[int] = None,
799 ) -> Tuple[List[domish.Element], rsm.RSMResponse]:
800 """Retrieve AP items and convert them to XMPP items
801
802 @param account: AP account to get items from
803 @param max_items: maximum number of items to retrieve
804 retrieve all items by default
805 @param chronological_pagination: get pages in chronological order
806 AP use reversed chronological order for pagination, "first" page returns more
807 recent items. If "chronological_pagination" is True, "last" AP page will be
808 retrieved first.
809 @param after_id: if set, retrieve items starting from given ID
810 Due to ActivityStream Collection Paging limitations, this is inefficient and
811 if ``after_id`` is not already in cache, we have to retrieve every page until
812 we find it.
813 In most common cases, ``after_id`` should be in cache though (client usually
814 use known ID when in-order pagination is used).
815 @param start_index: start retrieving items from the one with given index
816 Due to ActivityStream Collection Paging limitations, this is inefficient and
817 all pages before the requested index will be retrieved to count items.
818 @return: XMPP Pubsub items and corresponding RSM Response
819 Items are always returned in chronological order in the result
820 """
821 actor_data = await self.getAPActorDataFromId(account)
822 outbox = actor_data.get("outbox")
823 rsm_resp: Dict[str, Union[bool, int]] = {}
824 if not outbox:
825 raise exceptions.DataError(f"No outbox found for actor {account}")
826 outbox_data = await self.apGet(outbox)
346 try: 827 try:
347 ap_actor_data = await treq.json_content(await treq.get( 828 count = outbox_data["totalItems"]
348 href, 829 except KeyError:
349 headers = { 830 log.warning(
350 "Accept": [MEDIA_TYPE_AP], 831 f'"totalItems" not found in outbox of {account}, defaulting to 20'
351 "Content-Type": [MEDIA_TYPE_AP], 832 )
352 } 833 count = 20
353 )) 834 else:
354 except Exception as e: 835 log.info(f"{account}'s outbox has {count} item(s)")
355 raise exceptions.DataError(f"Can't get ActivityPub actor data: {e}") 836 rsm_resp["count"] = count
356 837
357 return ap_actor_data 838 if start_index is not None:
839 assert chronological_pagination and after_id is None
840 if start_index >= count:
841 return [], rsm_resp
842 elif start_index == 0:
843 # this is the default behaviour
844 pass
845 elif start_index > 5000:
846 raise error.StanzaError(
847 "feature-not-implemented",
848 text="Maximum limit for previous_index has been reached, this limit"
849 "is set to avoid DoS"
850 )
851 else:
852 # we'll convert "start_index" to "after_id", thus we need the item just
853 # before "start_index"
854 previous_index = start_index - 1
855 retrieved_items = 0
856 current_page = outbox_data["last"]
857 while retrieved_items < count:
858 page_data, items = await self.parseAPPage(current_page)
859 if not items:
860 log.warning(f"found an empty AP page at {current_page}")
861 return [], rsm_resp
862 page_start_idx = retrieved_items
863 retrieved_items += len(items)
864 if previous_index <= retrieved_items:
865 after_id = items[previous_index - page_start_idx]["id"]
866 break
867 try:
868 current_page = page_data["prev"]
869 except KeyError:
870 log.warning(
871 f"missing previous page link at {current_page}: {page_data!r}"
872 )
873 raise error.StanzaError(
874 "service-unavailable",
875 "Error while retrieving previous page from AP service at "
876 f"{current_page}"
877 )
878
879 init_page = "last" if chronological_pagination else "first"
880 page = outbox_data.get(init_page)
881 if not page:
882 raise exceptions.DataError(
883 f"Initial page {init_page!r} not found for outbox {outbox}"
884 )
885 items = []
886 page_items = []
887 retrieved_items = 0
888 found_after_id = False
889
890 while retrieved_items < count:
891 __, page_items = await self.parseAPPage(page)
892 retrieved_items += len(page_items)
893 if after_id is not None and not found_after_id:
894 # if we have an after_id, we ignore all items until the requested one is
895 # found
896 limit_idx = [i["id"] for i in page_items].index(after_id)
897 if limit_idx == -1:
898 # if "after_id" is not found, we don't add any item from this page
899 log.debug(f"{after_id!r} not found at {page}, skipping")
900 else:
901 found_after_id = True
902 if chronological_pagination:
903 page_items = page_items[limit_idx+1:]
904 start_index = retrieved_items - len(page_items) + limit_idx + 1
905 else:
906 page_items = page_items[:limit_idx]
907 start_index = count - (retrieved_items - len(page_items) +
908 limit_idx + 1)
909 items.extend(page_items)
910 else:
911 items.extend(page_items)
912 if max_items is not None and len(items) >= max_items:
913 if chronological_pagination:
914 items = items[:max_items]
915 else:
916 items = items[-max_items:]
917 break
918 page = outbox_data.get("prev" if chronological_pagination else "next")
919 if not page:
920 break
921
922 if after_id is not None and not found_after_id:
923 raise error.StanzaError("item-not-found")
924
925 if after_id is None:
926 rsm_resp["index"] = 0 if chronological_pagination else count - len(items)
927
928 if start_index is not None:
929 rsm_resp["index"] = start_index
930 elif after_id is not None:
931 log.warning("Can't determine index of first element")
932 elif chronological_pagination:
933 rsm_resp["index"] = 0
934 else:
935 rsm_resp["index"] = count - len(items)
936 if items:
937 rsm_resp.update({
938 "first": items[0]["id"],
939 "last": items[-1]["id"]
940 })
941
942 return items, rsm.RSMResponse(**rsm_resp)
943
944 async def parseAPPage(self, url: str) -> Tuple[dict, List[domish.Element]]:
945 """Convert AP objects from an AP page to XMPP items
946
947 @param url: url linking and AP page
948 @return: page data, pubsub items
949 """
950 page_data = await self.apGet(url)
951 ap_items = page_data.get("orderedItems")
952 if not ap_items:
953 log.warning('No "orderedItems" collection found')
954 return page_data, []
955 items = []
956 # AP Collections are in antichronological order, but we expect chronological in
957 # Pubsub, thus we reverse it
958 for ap_item in reversed(ap_items):
959 try:
960 ap_object, mb_data = await self.apItem2MBdata(ap_item)
961 except (exceptions.DataError, NotImplementedError, error.StanzaError):
962 continue
963
964 item_elt = await self._m.data2entry(
965 self.client, mb_data, ap_object["id"], None, self._m.namespace
966 )
967 item_elt["publisher"] = mb_data["author_jid"].full()
968 items.append(item_elt)
969
970 return page_data, items
971
972 async def apItem2MBdata(self, ap_item: dict) -> Tuple[dict, dict]:
973 """Convert AP item to microblog data
974
975 @return: AP Item's Object and microblog data
976 @raise exceptions.DataError: something is invalid in the AP item
977 @raise NotImplemented: some AP data is not handled yet
978 @raise error.StanzaError: error while contacting the AP server
979 """
980 ap_object = ap_item.get("object")
981 if not ap_object:
982 log.warning(f'No "object" found in AP item {ap_item!r}')
983 raise exceptions.DataError
984 if isinstance(ap_object, str):
985 ap_object = await self.apGet(ap_object)
986 obj_id = ap_object.get("id")
987 if not obj_id:
988 log.warning(f'No "id" found in AP object: {ap_object!r}')
989 raise exceptions.DataError
990 if ap_object.get("inReplyTo") is not None:
991 raise NotImplementedError
992 mb_data = {}
993 for ap_key, mb_key in AP_MB_MAP.items():
994 data = ap_object.get(ap_key)
995 if data is None:
996 continue
997 mb_data[mb_key] = data
998
999 # content
1000 try:
1001 language, content_xhtml = ap_object["contentMap"].popitem()
1002 except (KeyError, AttributeError):
1003 try:
1004 mb_data["content_xhtml"] = mb_data["content"]
1005 except KeyError:
1006 log.warning(f"no content found:\n{ap_object!r}")
1007 raise exceptions.DataError
1008 else:
1009 mb_data["language"] = language
1010 mb_data["content_xhtml"] = content_xhtml
1011
1012 # author
1013 actor = ap_item.get("actor")
1014 if not actor:
1015 log.warning(f"no actor associated to object id {obj_id!r}")
1016 raise exceptions.DataError
1017 elif isinstance(actor, list):
1018 # we only keep first item of list as author
1019 # TODO: handle multiple actors
1020 if len(actor) > 1:
1021 log.warning("multiple actors are not managed")
1022 actor = actor[0]
1023
1024 if isinstance(actor, dict):
1025 actor = actor.get("id")
1026 if not actor:
1027 log.warning(f"no actor id found: {actor!r}")
1028 raise exceptions.DataError
1029
1030 if isinstance(actor, str):
1031 account = await self.getAPAccountFromId(actor)
1032 mb_data["author"] = account.split("@", 1)[0]
1033 author_jid = mb_data["author_jid"] = jid.JID(
1034 None,
1035 (
1036 self.host.plugins["XEP-0106"].escape(account),
1037 self.client.jid.host,
1038 None
1039 )
1040 )
1041 else:
1042 log.warning(f"unknown actor type found: {actor!r}")
1043 raise exceptions.DataError
1044
1045 # published/updated
1046 for field in ("published", "updated"):
1047 value = ap_object.get(field)
1048 if not value and field == "updated":
1049 value = ap_object.get("published")
1050 if value:
1051 try:
1052 mb_data[field] = calendar.timegm(
1053 dateutil.parser.parse(str(value)).utctimetuple()
1054 )
1055 except dateutil.parser.ParserError as e:
1056 log.warning(f"Can't parse {field!r} field: {e}")
1057 return ap_object, mb_data
1058
1059 async def mbdata2APitem(self, client: SatXMPPEntity, mb_data: dict) -> dict:
1060 """Convert Libervia Microblog Data to ActivityPub item"""
1061 if not mb_data.get("id"):
1062 mb_data["id"] = shortuuid.uuid()
1063 if not mb_data.get("author_jid"):
1064 mb_data["author_jid"] = client.jid
1065 ap_account = await self.getAPAccountFromJidAndNode(
1066 jid.JID(mb_data["author_jid"]),
1067 None
1068 )
1069 url_actor = self.buildAPURL(TYPE_ACTOR, ap_account)
1070 url_item = self.buildAPURL(TYPE_ITEM, ap_account, mb_data["id"])
1071 return {
1072 "@context": "https://www.w3.org/ns/activitystreams",
1073 "id": url_item,
1074 "type": "Create",
1075 "actor": url_actor,
1076
1077 "object": {
1078 "id": url_item,
1079 "type": "Note",
1080 "published": utils.xmpp_date(mb_data["published"]),
1081 "attributedTo": url_actor,
1082 "content": mb_data.get("content_xhtml") or mb_data["content"],
1083 "to": "https://www.w3.org/ns/activitystreams#Public"
1084 }
1085 }
358 1086
359 async def publishMessage( 1087 async def publishMessage(
360 self, 1088 self,
361 client: SatXMPPEntity, 1089 client: SatXMPPEntity,
362 mess_data: dict, 1090 mess_data: dict,
380 @param service: JID corresponding to the AP actor. 1108 @param service: JID corresponding to the AP actor.
381 """ 1109 """
382 if not service.user: 1110 if not service.user:
383 raise ValueError("service must have a local part") 1111 raise ValueError("service must have a local part")
384 account = self.host.plugins["XEP-0106"].unescape(service.user) 1112 account = self.host.plugins["XEP-0106"].unescape(service.user)
385 ap_actor_data = await self.getAPActorData(account) 1113 ap_actor_data = await self.getAPActorDataFromId(account)
386 1114
387 try: 1115 try:
388 inbox_url = ap_actor_data["endpoints"]["sharedInbox"] 1116 inbox_url = ap_actor_data["endpoints"]["sharedInbox"]
389 except KeyError: 1117 except KeyError:
390 raise exceptions.DataError("Can't get ActivityPub actor inbox") 1118 raise exceptions.DataError("Can't get ActivityPub actor inbox")
391 1119
392 if not mess_data.get("id"): 1120 item_data = await self.mbdata2APitem(client, mess_data)
393 mess_data["id"] = shortuuid.uuid() 1121 url_actor = item_data["object"]["attributedTo"]
394 url_actor = self.buildAPURL(TYPE_ACTOR, client.jid.userhost())
395 url_item = self.buildAPURL(TYPE_ITEM, client.jid.userhost(), mess_data["id"])
396 now = time.time()
397 item_data = {
398 "@context": "https://www.w3.org/ns/activitystreams",
399 "id": url_item,
400 "type": "Create",
401 "actor": url_actor,
402
403 "object": {
404 "id": url_item,
405 "type": "Note",
406 "published": utils.xmpp_date(now),
407 "attributedTo": url_actor,
408 "inReplyTo": mess_data["node"],
409 "content": mess_data.get("content_xhtml") or mess_data["content"],
410 "to": "https://www.w3.org/ns/activitystreams#Public"
411 }
412 }
413 resp = await self.signAndPost(inbox_url, url_actor, item_data) 1122 resp = await self.signAndPost(inbox_url, url_actor, item_data)
414 if resp.code != 202: 1123 if resp.code != 202:
415 raise exceptions.NetworkError(f"unexpected return code: {resp.code}") 1124 raise exceptions.NetworkError(f"unexpected return code: {resp.code}")
1125
1126
1127 class APPubsubService(rsm.PubSubService):
1128 """Pubsub service for XMPP requests"""
1129
1130 def __init__(self, apg):
1131 super(APPubsubService, self).__init__()
1132 self.host = apg.host
1133 self.apg = apg
1134 self.discoIdentity = {
1135 "category": "pubsub",
1136 "type": "service",
1137 "name": "Libervia ActivityPub Gateway",
1138 }
1139
1140 @ensure_deferred
1141 async def publish(self, requestor, service, nodeIdentifier, items):
1142 raise NotImplementedError
1143
1144 @ensure_deferred
1145 async def items(
1146 self,
1147 requestor: jid.JID,
1148 service: jid.JID,
1149 node: str,
1150 maxItems: Optional[int],
1151 itemIdentifiers: Optional[List[str]],
1152 rsm_req: Optional[rsm.RSMRequest]
1153 ) -> List[domish.Element]:
1154 if not service.user:
1155 return []
1156 ap_account = self.host.plugins["XEP-0106"].unescape(service.user)
1157 if ap_account.count("@") != 1:
1158 log.warning(f"Invalid AP account used by {requestor}: {ap_account!r}")
1159 return []
1160 if node != self.apg._m.namespace:
1161 raise error.StanzaError(
1162 "feature-not-implemented",
1163 text=f"{VERSION} only supports {self.apg._m.namespace} "
1164 "node for now"
1165 )
1166 if rsm_req is None:
1167 if maxItems is None:
1168 maxItems = 20
1169 kwargs = {
1170 "max_items": maxItems,
1171 "chronological_pagination": False,
1172 }
1173 else:
1174 if len(
1175 [v for v in (rsm_req.after, rsm_req.before, rsm_req.index)
1176 if v is not None]
1177 ) > 1:
1178 raise error.StanzaError(
1179 "bad-request",
1180 text="You can't use after, before and index at the same time"
1181 )
1182 kwargs = {"max_items": rsm_req.max}
1183 if rsm_req.after is not None:
1184 kwargs["after_id"] = rsm_req.after
1185 elif rsm_req.before is not None:
1186 kwargs["chronological_pagination"] = False
1187 if rsm_req.before != "":
1188 kwargs["after_id"] = rsm_req.before
1189 elif rsm_req.index is not None:
1190 kwargs["start_index"] = rsm_req.index
1191
1192 log.info(
1193 f"No cache found for node {node} at {service} (AP account {ap_account}), "
1194 "using Collection Paging to RSM translation"
1195 )
1196 return await self.apg.getAPItems(ap_account, **kwargs)
1197
1198 @ensure_deferred
1199 async def retract(self, requestor, service, nodeIdentifier, itemIdentifiers):
1200 raise NotImplementedError
1201
1202 def getNodeInfo(
1203 self,
1204 requestor: jid.JID,
1205 service: jid.JID,
1206 nodeIdentifier: str,
1207 pep: bool = False,
1208 recipient: Optional[jid.JID] = None
1209 ) -> Optional[dict]:
1210 if not nodeIdentifier:
1211 return None
1212 info = {
1213 "type": "leaf",
1214 "meta-data": [
1215 {"var": "pubsub#persist_items", "type": "boolean", "value": True},
1216 {"var": "pubsub#max_items", "value": "max"},
1217 {"var": "pubsub#access_model", "type": "list-single", "value": "open"},
1218 {"var": "pubsub#publish_model", "type": "list-single", "value": "open"},
1219
1220 ]
1221
1222 }
1223 return info