comparison sat/plugins/plugin_comp_ap_gateway/__init__.py @ 3729:86eea17cafa7

component AP gateway: split plugin in several files: constants, HTTP server and Pubsub service have been put in separated files. rel: 363
author Goffi <goffi@goffi.org>
date Mon, 31 Jan 2022 18:35:49 +0100
parents sat/plugins/plugin_comp_ap_gateway.py@b15644cae50d
children bf0505d41c09
comparison
equal deleted inserted replaced
3728:b15644cae50d 3729:86eea17cafa7
1 #!/usr/bin/env python3
2
3 # Libervia ActivityPub Gateway
4 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
5
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU Affero General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
10
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU Affero General Public License for more details.
15
16 # You should have received a copy of the GNU Affero General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19 import base64
20 import hashlib
21 import json
22 from pathlib import Path
23 from typing import Optional, Dict, Tuple, List, Union
24 from urllib import parse
25 import calendar
26 import re
27
28 import dateutil
29 from cryptography.hazmat.primitives import serialization
30 from cryptography.hazmat.primitives import hashes
31 from cryptography.hazmat.primitives.asymmetric import rsa
32 from cryptography.hazmat.primitives.asymmetric import padding
33 import shortuuid
34 import treq
35 from treq.response import _Response as TReqResponse
36 from twisted.internet import defer, reactor, threads
37 from twisted.web import http
38 from twisted.words.protocols.jabber import jid, error
39 from twisted.words.xish import domish
40 from wokkel import rsm
41
42 from sat.core import exceptions
43 from sat.core.constants import Const as C
44 from sat.core.core_types import SatXMPPEntity
45 from sat.core.i18n import _
46 from sat.core.log import getLogger
47 from sat.tools import utils
48 from sat.tools.common import data_format, tls
49 from sat.tools.common.async_utils import async_lru
50
51 from .constants import (IMPORT_NAME, CONF_SECTION, TYPE_ACTOR, TYPE_ITEM, MEDIA_TYPE_AP,
52 AP_MB_MAP, LRU_MAX_SIZE)
53 from .http_server import HTTPServer
54 from .pubsub_service import APPubsubService
55
56
57 log = getLogger(__name__)
58
59 IMPORT_NAME = "ap-gateway"
60
61 PLUGIN_INFO = {
62 C.PI_NAME: "ActivityPub Gateway component",
63 C.PI_IMPORT_NAME: IMPORT_NAME,
64 C.PI_MODES: [C.PLUG_MODE_COMPONENT],
65 C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT,
66 C.PI_PROTOCOLS: [],
67 C.PI_DEPENDENCIES: ["XEP-0106", "XEP-0277", "XEP-0060"],
68 C.PI_RECOMMENDATIONS: [],
69 C.PI_MAIN: "APGateway",
70 C.PI_HANDLER: C.BOOL_TRUE,
71 C.PI_DESCRIPTION: _(
72 "Gateway for bidirectional communication between XMPP and ActivityPub."
73 ),
74 }
75
76 HEXA_ENC = r"(?P<hex>[0-9a-fA-f]{2})"
77 RE_PERIOD_ENC = re.compile(f"\\.{HEXA_ENC}")
78 RE_PERCENT_ENC = re.compile(f"%{HEXA_ENC}")
79 RE_ALLOWED_UNQUOTED = re.compile(r"^[a-zA-Z0-9_-]+$")
80
81
82 class APGateway:
83
84 def __init__(self, host):
85 self.host = host
86 self.initialised = False
87 self._m = host.plugins["XEP-0277"]
88 self._p = host.plugins["XEP-0060"]
89
90 host.bridge.addMethod(
91 "APSend",
92 ".plugin",
93 in_sign="sss",
94 out_sign="",
95 method=self._publishMessage,
96 async_=True,
97 )
98
99 def getHandler(self, __):
100 return APPubsubService(self)
101
102 async def init(self, client):
103 if self.initialised:
104 return
105
106 self.initialised = True
107 log.info(_("ActivityPub Gateway initialization"))
108
109 # RSA keys
110 stored_data = await self.host.memory.storage.getPrivates(
111 IMPORT_NAME, ["rsa_key"], profile=client.profile
112 )
113 private_key_pem = stored_data.get("rsa_key")
114 if private_key_pem is None:
115 self.private_key = await threads.deferToThread(
116 rsa.generate_private_key,
117 public_exponent=65537,
118 key_size=4096,
119 )
120 private_key_pem = self.private_key.private_bytes(
121 encoding=serialization.Encoding.PEM,
122 format=serialization.PrivateFormat.PKCS8,
123 encryption_algorithm=serialization.NoEncryption()
124 ).decode()
125 await self.host.memory.storage.setPrivateValue(
126 IMPORT_NAME, "rsa_key", private_key_pem, profile=client.profile
127 )
128 else:
129 self.private_key = serialization.load_pem_private_key(
130 private_key_pem.encode(),
131 password=None,
132 )
133 self.public_key = self.private_key.public_key()
134 self.public_key_pem = self.public_key.public_bytes(
135 encoding=serialization.Encoding.PEM,
136 format=serialization.PublicFormat.SubjectPublicKeyInfo
137 ).decode()
138
139 # params
140 # URL and port
141 self.public_url = self.host.memory.getConfig(
142 CONF_SECTION, "public_url"
143 ) or self.host.memory.getConfig(
144 CONF_SECTION, "xmpp_domain"
145 )
146 if self.public_url is None:
147 log.error(
148 '"public_url" not set in configuration, this is mandatory to have'
149 "ActivityPub Gateway running. Please set this option it to public facing "
150 f"url in {CONF_SECTION!r} configuration section."
151 )
152 return
153 if parse.urlparse(self.public_url).scheme:
154 log.error(
155 "Scheme must not be specified in \"public_url\", please remove it from "
156 "\"public_url\" configuration option. ActivityPub Gateway won't be run."
157 )
158 return
159 self.http_port = int(self.host.memory.getConfig(
160 CONF_SECTION, 'http_port', 8123))
161 connection_type = self.host.memory.getConfig(
162 CONF_SECTION, 'http_connection_type', 'https')
163 if connection_type not in ('http', 'https'):
164 raise exceptions.ConfigError(
165 'bad ap-gateay http_connection_type, you must use one of "http" or '
166 '"https"'
167 )
168 self.max_items = self.host.memory.getConfig(
169 CONF_SECTION, 'new_node_max_items', 50
170
171 )
172 self.ap_path = self.host.memory.getConfig(CONF_SECTION, 'ap_path', '_ap')
173 self.base_ap_url = parse.urljoin(f"https://{self.public_url}", f"{self.ap_path}/")
174 # True (default) if we provide gateway only to entities/services from our server
175 self.local_only = C.bool(
176 self.host.memory.getConfig(CONF_SECTION, 'local_only', C.BOOL_TRUE)
177 )
178
179 # HTTP server launch
180 self.server = HTTPServer(self)
181 if connection_type == 'http':
182 reactor.listenTCP(self.http_port, self.server)
183 else:
184 options = tls.getOptionsFromConfig(
185 self.host.memory.config, CONF_SECTION)
186 tls.TLSOptionsCheck(options)
187 context_factory = tls.getTLSContextFactory(options)
188 reactor.listenSSL(self.http_port, self.server, context_factory)
189
190 async def profileConnecting(self, client):
191 self.client = client
192 await self.init(client)
193
194 async def apGet(self, url: str) -> dict:
195 """Retrieve AP JSON from given URL
196
197 @raise error.StanzaError: "service-unavailable" is sent when something went wrong
198 with AP server
199 """
200 try:
201 return await treq.json_content(await treq.get(
202 url,
203 headers = {
204 "Accept": [MEDIA_TYPE_AP],
205 "Content-Type": [MEDIA_TYPE_AP],
206 }
207 ))
208 except Exception as e:
209 raise error.StanzaError(
210 "service-unavailable",
211 text=f"Can't get AP data at {url}: {e}"
212 )
213
214 def mustEncode(self, text: str) -> bool:
215 """Indicate if a text must be period encoded"""
216 return (
217 not RE_ALLOWED_UNQUOTED.match(text)
218 or text.startswith("___")
219 or "---" in text
220 )
221
222 def periodEncode(self, text: str) -> str:
223 """Period encode a text
224
225 see [getJIDAndNode] for reasons of period encoding
226 """
227 return (
228 parse.quote(text, safe="")
229 .replace("---", "%2d%2d%2d")
230 .replace("___", "%5f%5f%5f")
231 .replace(".", "%2e")
232 .replace("~", "%7e")
233 .replace("%", ".")
234 )
235
236 async def getAPAccountFromJidAndNode(
237 self,
238 jid_: jid.JID,
239 node: Optional[str]
240 ) -> str:
241 """Construct AP account from JID and node
242
243 The account construction will use escaping when necessary
244 """
245 if not node or node == self._m.namespace:
246 node = None
247
248 if node and not jid_.user and not self.mustEncode(node):
249 is_pubsub = await self.isPubsub(jid_)
250 # when we have a pubsub service, the user part can be used to set the node
251 # this produces more user-friendly AP accounts
252 if is_pubsub:
253 jid_.user = node
254 node = None
255
256 is_local = self.isLocal(jid_)
257 user = jid_.user if is_local else jid_.userhost()
258 if user is None:
259 user = ""
260 account_elts = []
261 if node and self.mustEncode(node) or self.mustEncode(user):
262 account_elts = ["___"]
263 if node:
264 node = self.periodEncode(node)
265 user = self.periodEncode(user)
266
267 if not user:
268 raise exceptions.InternalError("there should be a user part")
269
270 if node:
271 account_elts.extend((node, "---"))
272
273 account_elts.extend((
274 user, "@", jid_.host if is_local else self.client.jid.userhost()
275 ))
276 return "".join(account_elts)
277
278 def isLocal(self, jid_: jid.JID) -> bool:
279 """Returns True if jid_ use a domain or subdomain of gateway's host"""
280 local_host = self.client.host.split(".")
281 assert local_host
282 return jid_.host.split(".")[-len(local_host):] == local_host
283
284 async def isPubsub(self, jid_: jid.JID) -> bool:
285 """Indicate if a JID is a Pubsub service"""
286 host_disco = await self.host.getDiscoInfos(self.client, jid_)
287 return (
288 ("pubsub", "service") in host_disco.identities
289 and not ("pubsub", "pep") in host_disco.identities
290 )
291
292 async def getJIDAndNode(self, ap_account: str) -> Tuple[jid.JID, Optional[str]]:
293 """Decode raw AP account handle to get XMPP JID and Pubsub Node
294
295 Username are case insensitive.
296
297 By default, the username correspond to local username (i.e. username from
298 component's server).
299
300 If local name's domain is a pubsub service (and not PEP), the username is taken as
301 a pubsub node.
302
303 If ``---`` is present in username, the part before is used as pubsub node, and the
304 rest as a JID user part.
305
306 If username starts with ``___``, characters are encoded using period encoding
307 (i.e. percent encoding where a ``.`` is used instead of ``%``).
308
309 This horror is necessary due to limitation in some AP implementation (notably
310 Mastodon), cf. https://github.com/mastodon/mastodon/issues/17222
311
312 examples:
313
314 ``toto@example.org`` => JID = toto@example.org, node = None
315
316 ``___toto.40example.net@example.org`` => JID = toto@example.net (this one is a
317 non-local JID, and will work only if setings ``local_only`` is False), node = None
318
319 ``toto@pubsub.example.org`` (with pubsub.example.org being a pubsub service) =>
320 JID = pubsub.example.org, node = toto
321
322 ``tata---toto@example.org`` => JID = toto@example.org, node = tata
323
324 ``___urn.3axmpp.3amicroblog.3a0@pubsub.example.org`` (with pubsub.example.org
325 being a pubsub service) ==> JID = pubsub.example.org, node = urn:xmpp:microblog:0
326
327 @param ap_account: ActivityPub account handle (``username@domain.tld``)
328 @return: service JID and pubsub node
329 if pubsub is None, default microblog pubsub node (and possibly other nodes
330 that plugins may hanlde) will be used
331 @raise ValueError: invalid account
332 @raise PermissionError: non local jid is used when gateway doesn't allow them
333 """
334 if ap_account.count("@") != 1:
335 raise ValueError("Invalid AP account")
336 if ap_account.startswith("___"):
337 encoded = True
338 ap_account = ap_account[3:]
339 else:
340 encoded = False
341
342 username, domain = ap_account.split("@")
343
344 if "---" in username:
345 node, username = username.rsplit("---", 1)
346 else:
347 node = None
348
349 if encoded:
350 username = parse.unquote(
351 RE_PERIOD_ENC.sub(r"%\g<hex>", username),
352 errors="strict"
353 )
354 if node:
355 node = parse.unquote(
356 RE_PERIOD_ENC.sub(r"%\g<hex>", node),
357 errors="strict"
358 )
359
360 if "@" in username:
361 username, domain = username.rsplit("@", 1)
362
363 if not node:
364 # we need to check host disco, because disco request to user may be
365 # blocked for privacy reason (see
366 # https://xmpp.org/extensions/xep-0030.html#security)
367 is_pubsub = await self.isPubsub(jid.JID(domain))
368
369 if is_pubsub:
370 # if the host is a pubsub service and not a PEP, we consider that username
371 # is in fact the node name
372 node = username
373 username = None
374
375 jid_s = f"{username}@{domain}" if username else domain
376 try:
377 jid_ = jid.JID(jid_s)
378 except RuntimeError:
379 raise ValueError(f"Invalid jid: {jid_s!r}")
380
381 if self.local_only and not self.isLocal(jid_):
382 raise exceptions.PermissionError(
383 "This gateway is configured to map only local entities and services"
384 )
385
386 return jid_, node
387
388 def parseAPURL(self, url: str) -> Tuple[str, str]:
389 """Parse an URL leading to an AP endpoint
390
391 @param url: URL to parse (schema is not mandatory)
392 @return: endpoint type and AP account
393 """
394 path = parse.urlparse(url).path.lstrip("/")
395 type_, account = path[len(self.ap_path):].lstrip("/").split("/", 1)
396 return type_, parse.unquote(account)
397
398 def buildAPURL(self, type_:str , *args: str) -> str:
399 """Build an AP endpoint URL
400
401 @param type_: type of AP endpoing
402 @param arg: endpoint dependant arguments
403 """
404 return parse.urljoin(
405 self.base_ap_url,
406 str(Path(type_).joinpath(*(parse.quote_plus(a) for a in args)))
407 )
408
409 async def signAndPost(self, url: str, url_actor: str, doc: dict) -> TReqResponse:
410 """Sign a documentent and post it to AP server
411
412 @param url: AP server endpoint
413 @param url_actor: URL generated by this gateway for local actor
414 @param doc: document to send
415 """
416 p_url = parse.urlparse(url)
417 date = http.datetimeToString().decode()
418 body = json.dumps(doc).encode()
419 digest_hash = base64.b64encode(hashlib.sha256(body).digest()).decode()
420 digest = f"sha-256={digest_hash}"
421 to_sign = (
422 f"(request-target): post {p_url.path}\nhost: {p_url.hostname}\n"
423 f"date: {date}\ndigest: {digest}"
424 )
425 signature = base64.b64encode(self.private_key.sign(
426 to_sign.encode(),
427 # we have to use PKCS1v15 padding to be compatible with Mastodon
428 padding.PKCS1v15(),
429 hashes.SHA256()
430 )).decode()
431 h_signature = (
432 f'keyId="{url_actor}",headers="(request-target) host date digest",'
433 f'signature="{signature}"'
434 )
435 return await treq.post(
436 url,
437 body,
438 headers={
439 "Host": [p_url.hostname],
440 "Date": [date],
441 "Digest": [digest],
442 "Signature": [h_signature],
443 }
444 )
445
446 def _publishMessage(self, mess_data_s: str, service_s: str, profile: str):
447 mess_data: dict = data_format.deserialise(mess_data_s) # type: ignore
448 service = jid.JID(service_s)
449 client = self.host.getClient(profile)
450 return defer.ensureDeferred(self.publishMessage(client, mess_data, service))
451
452 async def getAPActorIdFromAccount(self, account: str) -> str:
453 """Retrieve account ID from it's handle using WebFinger
454
455 @param account: AP handle (user@domain.tld)
456 @return: Actor ID (which is an URL)
457 """
458 if account.count("@") != 1 or "/" in account:
459 raise ValueError("Invalid account: {account!r}")
460 host = account.split("@")[1]
461 try:
462 finger_data = await treq.json_content(await treq.get(
463 f"https://{host}/.well-known/webfinger?"
464 f"resource=acct:{parse.quote_plus(account)}",
465 ))
466 except Exception as e:
467 raise exceptions.DataError(f"Can't get webfinger data: {e}")
468 for link in finger_data.get("links", []):
469 if (
470 link.get("type") == "application/activity+json"
471 and link.get("rel") == "self"
472 ):
473 href = link.get("href", "").strip()
474 if not href:
475 raise ValueError(
476 f"Invalid webfinger data for {account:r}: missing href"
477 )
478 break
479 else:
480 raise ValueError(
481 f"No ActivityPub link found for {account!r}"
482 )
483 return href
484
485 async def getAPActorDataFromId(self, account: str) -> dict:
486 """Retrieve ActivityPub Actor data
487
488 @param account: ActivityPub Actor identifier
489 """
490 href = await self.getAPActorIdFromAccount(account)
491 return await self.apGet(href)
492
493 @async_lru(maxsize=LRU_MAX_SIZE)
494 async def getAPAccountFromId(self, actor_id: str):
495 """Retrieve AP account from the ID URL
496
497 @param actor_id: AP ID of the actor (URL to the actor data)
498 """
499 url_parsed = parse.urlparse(actor_id)
500 actor_data = await self.apGet(actor_id)
501 username = actor_data.get("preferredUsername")
502 if not username:
503 raise exceptions.DataError(
504 'No "preferredUsername" field found, can\'t retrieve actor account'
505 )
506 account = f"{username}@{url_parsed.hostname}"
507 # we try to retrieve the actor ID from the account to check it
508 found_id = await self.getAPActorIdFromAccount(account)
509 if found_id != actor_id:
510 # cf. https://socialhub.activitypub.rocks/t/how-to-retrieve-user-server-tld-handle-from-actors-url/2196
511 msg = (
512 f"Account ID found on WebFinger {found_id!r} doesn't match our actor ID "
513 f"({actor_id!r}). This AP instance doesn't seems to use "
514 '"preferredUsername" as we expect.'
515 )
516 log.warning(msg)
517 raise exceptions.DataError(msg)
518 return account
519
520 async def getAPItems(
521 self,
522 account: str,
523 max_items: Optional[int] = None,
524 chronological_pagination: bool = True,
525 after_id: Optional[str] = None,
526 start_index: Optional[int] = None,
527 ) -> Tuple[List[domish.Element], rsm.RSMResponse]:
528 """Retrieve AP items and convert them to XMPP items
529
530 @param account: AP account handle to get items from
531 @param max_items: maximum number of items to retrieve
532 retrieve all items by default
533 @param chronological_pagination: get pages in chronological order
534 AP use reversed chronological order for pagination, "first" page returns more
535 recent items. If "chronological_pagination" is True, "last" AP page will be
536 retrieved first.
537 @param after_id: if set, retrieve items starting from given ID
538 Due to ActivityStream Collection Paging limitations, this is inefficient and
539 if ``after_id`` is not already in cache, we have to retrieve every page until
540 we find it.
541 In most common cases, ``after_id`` should be in cache though (client usually
542 use known ID when in-order pagination is used).
543 @param start_index: start retrieving items from the one with given index
544 Due to ActivityStream Collection Paging limitations, this is inefficient and
545 all pages before the requested index will be retrieved to count items.
546 @return: XMPP Pubsub items and corresponding RSM Response
547 Items are always returned in chronological order in the result
548 """
549 actor_data = await self.getAPActorDataFromId(account)
550 outbox = actor_data.get("outbox")
551 rsm_resp: Dict[str, Union[bool, int]] = {}
552 if not outbox:
553 raise exceptions.DataError(f"No outbox found for actor {account}")
554 outbox_data = await self.apGet(outbox)
555 try:
556 count = outbox_data["totalItems"]
557 except KeyError:
558 log.warning(
559 f'"totalItems" not found in outbox of {account}, defaulting to 20'
560 )
561 count = 20
562 else:
563 log.info(f"{account}'s outbox has {count} item(s)")
564 rsm_resp["count"] = count
565
566 if start_index is not None:
567 assert chronological_pagination and after_id is None
568 if start_index >= count:
569 return [], rsm_resp
570 elif start_index == 0:
571 # this is the default behaviour
572 pass
573 elif start_index > 5000:
574 raise error.StanzaError(
575 "feature-not-implemented",
576 text="Maximum limit for previous_index has been reached, this limit"
577 "is set to avoid DoS"
578 )
579 else:
580 # we'll convert "start_index" to "after_id", thus we need the item just
581 # before "start_index"
582 previous_index = start_index - 1
583 retrieved_items = 0
584 current_page = outbox_data["last"]
585 while retrieved_items < count:
586 page_data, items = await self.parseAPPage(current_page)
587 if not items:
588 log.warning(f"found an empty AP page at {current_page}")
589 return [], rsm_resp
590 page_start_idx = retrieved_items
591 retrieved_items += len(items)
592 if previous_index <= retrieved_items:
593 after_id = items[previous_index - page_start_idx]["id"]
594 break
595 try:
596 current_page = page_data["prev"]
597 except KeyError:
598 log.warning(
599 f"missing previous page link at {current_page}: {page_data!r}"
600 )
601 raise error.StanzaError(
602 "service-unavailable",
603 "Error while retrieving previous page from AP service at "
604 f"{current_page}"
605 )
606
607 init_page = "last" if chronological_pagination else "first"
608 page = outbox_data.get(init_page)
609 if not page:
610 raise exceptions.DataError(
611 f"Initial page {init_page!r} not found for outbox {outbox}"
612 )
613 items = []
614 page_items = []
615 retrieved_items = 0
616 found_after_id = False
617
618 while retrieved_items < count:
619 __, page_items = await self.parseAPPage(page)
620 retrieved_items += len(page_items)
621 if after_id is not None and not found_after_id:
622 # if we have an after_id, we ignore all items until the requested one is
623 # found
624 try:
625 limit_idx = [i["id"] for i in page_items].index(after_id)
626 except ValueError:
627 # if "after_id" is not found, we don't add any item from this page
628 log.debug(f"{after_id!r} not found at {page}, skipping")
629 else:
630 found_after_id = True
631 if chronological_pagination:
632 start_index = retrieved_items - len(page_items) + limit_idx + 1
633 page_items = page_items[limit_idx+1:]
634 else:
635 start_index = count - (retrieved_items - len(page_items) +
636 limit_idx + 1)
637 page_items = page_items[:limit_idx]
638 items.extend(page_items)
639 else:
640 items.extend(page_items)
641 if max_items is not None and len(items) >= max_items:
642 if chronological_pagination:
643 items = items[:max_items]
644 else:
645 items = items[-max_items:]
646 break
647 page = outbox_data.get("prev" if chronological_pagination else "next")
648 if not page:
649 break
650
651 if after_id is not None and not found_after_id:
652 raise error.StanzaError("item-not-found")
653
654 if after_id is None:
655 rsm_resp["index"] = 0 if chronological_pagination else count - len(items)
656
657 if start_index is not None:
658 rsm_resp["index"] = start_index
659 elif after_id is not None:
660 log.warning("Can't determine index of first element")
661 elif chronological_pagination:
662 rsm_resp["index"] = 0
663 else:
664 rsm_resp["index"] = count - len(items)
665 if items:
666 rsm_resp.update({
667 "first": items[0]["id"],
668 "last": items[-1]["id"]
669 })
670
671 return items, rsm.RSMResponse(**rsm_resp)
672
673 async def parseAPPage(self, url: str) -> Tuple[dict, List[domish.Element]]:
674 """Convert AP objects from an AP page to XMPP items
675
676 @param url: url linking and AP page
677 @return: page data, pubsub items
678 """
679 page_data = await self.apGet(url)
680 ap_items = page_data.get("orderedItems")
681 if not ap_items:
682 log.warning('No "orderedItems" collection found')
683 return page_data, []
684 items = []
685 # AP Collections are in antichronological order, but we expect chronological in
686 # Pubsub, thus we reverse it
687 for ap_item in reversed(ap_items):
688 try:
689 ap_object, mb_data = await self.apItem2MBdata(ap_item)
690 except (exceptions.DataError, NotImplementedError, error.StanzaError):
691 continue
692
693 item_elt = await self._m.data2entry(
694 self.client, mb_data, ap_object["id"], None, self._m.namespace
695 )
696 item_elt["publisher"] = mb_data["author_jid"].full()
697 items.append(item_elt)
698
699 return page_data, items
700
701 async def apItem2MBdata(self, ap_item: dict) -> Tuple[dict, dict]:
702 """Convert AP item to microblog data
703
704 @return: AP Item's Object and microblog data
705 @raise exceptions.DataError: something is invalid in the AP item
706 @raise NotImplemented: some AP data is not handled yet
707 @raise error.StanzaError: error while contacting the AP server
708 """
709 ap_object = ap_item.get("object")
710 if not ap_object:
711 log.warning(f'No "object" found in AP item {ap_item!r}')
712 raise exceptions.DataError
713 if isinstance(ap_object, str):
714 ap_object = await self.apGet(ap_object)
715 obj_id = ap_object.get("id")
716 if not obj_id:
717 log.warning(f'No "id" found in AP object: {ap_object!r}')
718 raise exceptions.DataError
719 if ap_object.get("inReplyTo") is not None:
720 raise NotImplementedError
721 mb_data = {}
722 for ap_key, mb_key in AP_MB_MAP.items():
723 data = ap_object.get(ap_key)
724 if data is None:
725 continue
726 mb_data[mb_key] = data
727
728 # content
729 try:
730 language, content_xhtml = ap_object["contentMap"].popitem()
731 except (KeyError, AttributeError):
732 try:
733 mb_data["content_xhtml"] = mb_data["content"]
734 except KeyError:
735 log.warning(f"no content found:\n{ap_object!r}")
736 raise exceptions.DataError
737 else:
738 mb_data["language"] = language
739 mb_data["content_xhtml"] = content_xhtml
740
741 # author
742 actor = ap_item.get("actor")
743 if not actor:
744 log.warning(f"no actor associated to object id {obj_id!r}")
745 raise exceptions.DataError
746 elif isinstance(actor, list):
747 # we only keep first item of list as author
748 # TODO: handle multiple actors
749 if len(actor) > 1:
750 log.warning("multiple actors are not managed")
751 actor = actor[0]
752
753 if isinstance(actor, dict):
754 actor = actor.get("id")
755 if not actor:
756 log.warning(f"no actor id found: {actor!r}")
757 raise exceptions.DataError
758
759 if isinstance(actor, str):
760 account = await self.getAPAccountFromId(actor)
761 mb_data["author"] = account.split("@", 1)[0]
762 author_jid = mb_data["author_jid"] = jid.JID(
763 None,
764 (
765 self.host.plugins["XEP-0106"].escape(account),
766 self.client.jid.host,
767 None
768 )
769 )
770 else:
771 log.warning(f"unknown actor type found: {actor!r}")
772 raise exceptions.DataError
773
774 # published/updated
775 for field in ("published", "updated"):
776 value = ap_object.get(field)
777 if not value and field == "updated":
778 value = ap_object.get("published")
779 if value:
780 try:
781 mb_data[field] = calendar.timegm(
782 dateutil.parser.parse(str(value)).utctimetuple()
783 )
784 except dateutil.parser.ParserError as e:
785 log.warning(f"Can't parse {field!r} field: {e}")
786 return ap_object, mb_data
787
788 async def mbdata2APitem(self, client: SatXMPPEntity, mb_data: dict) -> dict:
789 """Convert Libervia Microblog Data to ActivityPub item"""
790 if not mb_data.get("id"):
791 mb_data["id"] = shortuuid.uuid()
792 if not mb_data.get("author_jid"):
793 mb_data["author_jid"] = client.jid
794 ap_account = await self.getAPAccountFromJidAndNode(
795 jid.JID(mb_data["author_jid"]),
796 None
797 )
798 url_actor = self.buildAPURL(TYPE_ACTOR, ap_account)
799 url_item = self.buildAPURL(TYPE_ITEM, ap_account, mb_data["id"])
800 return {
801 "@context": "https://www.w3.org/ns/activitystreams",
802 "id": url_item,
803 "type": "Create",
804 "actor": url_actor,
805
806 "object": {
807 "id": url_item,
808 "type": "Note",
809 "published": utils.xmpp_date(mb_data["published"]),
810 "attributedTo": url_actor,
811 "content": mb_data.get("content_xhtml") or mb_data["content"],
812 "to": "https://www.w3.org/ns/activitystreams#Public"
813 }
814 }
815
816 async def publishMessage(
817 self,
818 client: SatXMPPEntity,
819 mess_data: dict,
820 service: jid.JID
821 ) -> None:
822 """Send an AP message
823
824 .. note::
825
826 This is a temporary method used for development only
827
828 @param mess_data: message data. Following keys must be set:
829
830 ``node``
831 identifier of message which is being replied (this will
832 correspond to pubsub node in the future)
833
834 ``content_xhtml`` or ``content``
835 message body (respectively in XHTML or plain text)
836
837 @param service: JID corresponding to the AP actor.
838 """
839 if not service.user:
840 raise ValueError("service must have a local part")
841 account = self.host.plugins["XEP-0106"].unescape(service.user)
842 ap_actor_data = await self.getAPActorDataFromId(account)
843
844 try:
845 inbox_url = ap_actor_data["endpoints"]["sharedInbox"]
846 except KeyError:
847 raise exceptions.DataError("Can't get ActivityPub actor inbox")
848
849 item_data = await self.mbdata2APitem(client, mess_data)
850 url_actor = item_data["object"]["attributedTo"]
851 resp = await self.signAndPost(inbox_url, url_actor, item_data)
852 if resp.code != 202:
853 raise exceptions.NetworkError(f"unexpected return code: {resp.code}")