Mercurial > libervia-backend
comparison sat/plugins/plugin_comp_ap_gateway/http_server.py @ 3764:125c7043b277
comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
this patch implements those major features:
- `publish` is implemented on virtual pubsub service, thus XMPP entities can now publish
to AP using this service
- replies to XMPP items are managed
- `inReplyTo` is filled when converting XMPP items to AP objects
- `follow` and `unfollow` (actually an `undo` activity) are implemented and mapped to
XMPP's (un)subscribe. On subscription, AP actor's `outbox` collection is converted to
XMPP and put in cache. Subscriptions are always public.
- `following` and `followers` collections are mapped to XMPP's Public Pubsub Subscription
(which should be XEP-0465, but the XEP is not yet published at the time of commit), in
both directions.
- new helper methods to check if an URL is local and to get JID from actor ID
doc will follow to explain behaviour
rel 365
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 13 May 2022 19:12:33 +0200 |
parents | a8c7e5cef0cb |
children | b5c9021020df |
comparison
equal
deleted
inserted
replaced
3763:b2ade5ecdbab | 3764:125c7043b277 |
---|---|
15 | 15 |
16 # You should have received a copy of the GNU Affero General Public License | 16 # You should have received a copy of the GNU Affero General Public License |
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. | 17 # along with this program. If not, see <http://www.gnu.org/licenses/>. |
18 | 18 |
19 import time | 19 import time |
20 from typing import Optional, Dict, List | 20 from typing import Optional, Dict, List, Any |
21 import json | 21 import json |
22 from urllib import parse | 22 from urllib import parse |
23 from collections import deque | 23 from collections import deque |
24 import unicodedata | 24 import unicodedata |
25 from pprint import pformat | 25 from pprint import pformat |
26 | 26 |
27 from twisted.web import http, resource as web_resource, server | 27 from twisted.web import http, resource as web_resource, server |
28 from twisted.python import failure | |
28 from twisted.internet import reactor, defer | 29 from twisted.internet import reactor, defer |
29 from twisted.words.protocols.jabber import jid, error | 30 from twisted.words.protocols.jabber import jid, error |
30 from wokkel import pubsub, rsm | 31 from wokkel import pubsub, rsm |
31 | 32 |
32 from sat.core import exceptions | 33 from sat.core import exceptions |
33 from sat.core.constants import Const as C | 34 from sat.core.constants import Const as C |
34 from sat.core.i18n import _ | 35 from sat.core.i18n import _ |
35 from sat.core.log import getLogger | 36 from sat.core.log import getLogger |
37 from sat.tools import utils | |
36 from sat.tools.common import date_utils | 38 from sat.tools.common import date_utils |
37 from sat.memory.sqla_mapping import SubscriptionState | 39 from sat.memory.sqla_mapping import SubscriptionState |
38 | 40 |
39 from .constants import ( | 41 from .constants import ( |
40 CONTENT_TYPE_AP, TYPE_ACTOR, TYPE_INBOX, TYPE_SHARED_INBOX, TYPE_OUTBOX, | 42 CONTENT_TYPE_AP, TYPE_ACTOR, TYPE_INBOX, TYPE_SHARED_INBOX, TYPE_OUTBOX, |
41 AP_REQUEST_TYPES, PAGE_SIZE, ACTIVITY_TYPES_LOWER, ACTIVIY_NO_ACCOUNT_ALLOWED, | 43 AP_REQUEST_TYPES, PAGE_SIZE, ACTIVITY_TYPES_LOWER, ACTIVIY_NO_ACCOUNT_ALLOWED, |
42 SIGN_HEADERS, HS2019, SIGN_EXP | 44 SIGN_HEADERS, HS2019, SIGN_EXP, TYPE_FOLLOWERS, TYPE_FOLLOWING |
43 ) | 45 ) |
44 from .regex import RE_SIG_PARAM | 46 from .regex import RE_SIG_PARAM |
45 | 47 |
46 | 48 |
47 log = getLogger(__name__) | 49 log = getLogger(__name__) |
68 msg: Optional[str] = None | 70 msg: Optional[str] = None |
69 ) -> None: | 71 ) -> None: |
70 """Log and set HTTP return code and associated message""" | 72 """Log and set HTTP return code and associated message""" |
71 log.warning(msg) | 73 log.warning(msg) |
72 request.setResponseCode(http_code, None if msg is None else msg.encode()) | 74 request.setResponseCode(http_code, None if msg is None else msg.encode()) |
75 | |
76 def _onRequestError(self, failure_: failure.Failure, request: "HTTPRequest") -> None: | |
77 log.error(f"Internal error: {failure_.value}") | |
78 self.responseCode( | |
79 request, | |
80 http.INTERNAL_SERVER_ERROR, | |
81 f"internal error: {failure_.value}" | |
82 ) | |
83 request.finish() | |
73 | 84 |
74 async def webfinger(self, request): | 85 async def webfinger(self, request): |
75 url_parsed = parse.urlparse(request.uri.decode()) | 86 url_parsed = parse.urlparse(request.uri.decode()) |
76 query = parse.parse_qs(url_parsed.query) | 87 query = parse.parse_qs(url_parsed.query) |
77 resource = query.get("resource", [""])[0] | 88 resource = query.get("resource", [""])[0] |
95 } | 106 } |
96 request.setHeader("content-type", CONTENT_TYPE_AP) | 107 request.setHeader("content-type", CONTENT_TYPE_AP) |
97 request.write(json.dumps(resp).encode()) | 108 request.write(json.dumps(resp).encode()) |
98 request.finish() | 109 request.finish() |
99 | 110 |
100 async def handleFollowActivity( | 111 async def handleUndoActivity( |
101 self, | 112 self, |
102 request: "HTTPRequest", | 113 request: "HTTPRequest", |
103 data: dict, | 114 data: dict, |
104 account_jid: jid.JID, | 115 account_jid: jid.JID, |
105 node: Optional[str], | 116 node: Optional[str], |
108 signing_actor: str | 119 signing_actor: str |
109 ) -> None: | 120 ) -> None: |
110 if node is None: | 121 if node is None: |
111 node = self.apg._m.namespace | 122 node = self.apg._m.namespace |
112 client = await self.apg.getVirtualClient(signing_actor) | 123 client = await self.apg.getVirtualClient(signing_actor) |
124 objects = await self.apg.apGetList(data, "object") | |
125 for obj in objects: | |
126 type_ = obj.get("type") | |
127 actor = await self.apg.apGetSenderActor(obj) | |
128 if actor != signing_actor: | |
129 log.warning(f"ignoring object not attributed to signing actor: {data}") | |
130 continue | |
131 try: | |
132 target_account = obj["object"] | |
133 except KeyError: | |
134 log.warning(f'ignoring invalid object, missing "object" key: {data}') | |
135 continue | |
136 if not self.apg.isLocalURL(target_account): | |
137 log.warning(f"ignoring unfollow request to non local actor: {data}") | |
138 continue | |
139 | |
140 if type_ == "Follow": | |
141 await self.apg._p.unsubscribe( | |
142 client, | |
143 account_jid, | |
144 node, | |
145 sender=client.jid, | |
146 ) | |
147 else: | |
148 log.warning(f"Unmanaged undo type: {type_!r}") | |
149 | |
150 async def handleFollowActivity( | |
151 self, | |
152 request: "HTTPRequest", | |
153 data: dict, | |
154 account_jid: jid.JID, | |
155 node: Optional[str], | |
156 ap_account: str, | |
157 ap_url: str, | |
158 signing_actor: str | |
159 ) -> None: | |
160 if node is None: | |
161 node = self.apg._m.namespace | |
162 client = await self.apg.getVirtualClient(signing_actor) | |
113 try: | 163 try: |
114 subscription = await self.apg._p.subscribe( | 164 subscription = await self.apg._p.subscribe( |
115 client, | 165 client, |
116 account_jid, | 166 account_jid, |
117 node | 167 node, |
168 # subscriptions from AP are always public | |
169 options=self.apg._pps.setPublicOpt() | |
118 ) | 170 ) |
119 except pubsub.SubscriptionPending: | 171 except pubsub.SubscriptionPending: |
120 log.info(f"subscription to node {node!r} of {account_jid} is pending") | 172 log.info(f"subscription to node {node!r} of {account_jid} is pending") |
121 # TODO: manage SubscriptionUnconfigured | 173 # TODO: manage SubscriptionUnconfigured |
122 else: | 174 else: |
127 actor_id = self.apg.buildAPURL(TYPE_ACTOR, ap_account) | 179 actor_id = self.apg.buildAPURL(TYPE_ACTOR, ap_account) |
128 accept_data = self.apg.createActivity( | 180 accept_data = self.apg.createActivity( |
129 "Accept", actor_id, object_=data | 181 "Accept", actor_id, object_=data |
130 ) | 182 ) |
131 await self.apg.signAndPost(inbox, actor_id, accept_data) | 183 await self.apg.signAndPost(inbox, actor_id, accept_data) |
184 await self.apg._c.synchronise(client, account_jid, node, resync=False) | |
132 | 185 |
133 async def handleAcceptActivity( | 186 async def handleAcceptActivity( |
134 self, | 187 self, |
135 request: "HTTPRequest", | 188 request: "HTTPRequest", |
136 data: dict, | 189 data: dict, |
214 node: Optional[str], | 267 node: Optional[str], |
215 ap_account: str, | 268 ap_account: str, |
216 actor_url: str, | 269 actor_url: str, |
217 signing_actor: Optional[str] | 270 signing_actor: Optional[str] |
218 ) -> dict: | 271 ) -> dict: |
219 inbox_url = self.apg.buildAPURL(TYPE_INBOX, ap_account) | 272 inbox = self.apg.buildAPURL(TYPE_INBOX, ap_account) |
220 shared_inbox = self.apg.buildAPURL(TYPE_SHARED_INBOX) | 273 shared_inbox = self.apg.buildAPURL(TYPE_SHARED_INBOX) |
221 outbox_url = self.apg.buildAPURL(TYPE_OUTBOX, ap_account) | 274 outbox = self.apg.buildAPURL(TYPE_OUTBOX, ap_account) |
275 followers = self.apg.buildAPURL(TYPE_FOLLOWERS, ap_account) | |
276 following = self.apg.buildAPURL(TYPE_FOLLOWING, ap_account) | |
222 | 277 |
223 # we have to use AP account as preferredUsername because it is used to retrieve | 278 # we have to use AP account as preferredUsername because it is used to retrieve |
224 # actor handle (see https://socialhub.activitypub.rocks/t/how-to-retrieve-user-server-tld-handle-from-actors-url/2196) | 279 # actor handle (see https://socialhub.activitypub.rocks/t/how-to-retrieve-user-server-tld-handle-from-actors-url/2196) |
225 preferred_username = ap_account.split("@", 1)[0] | 280 preferred_username = ap_account.split("@", 1)[0] |
226 return { | 281 return { |
230 ], | 285 ], |
231 | 286 |
232 "id": actor_url, | 287 "id": actor_url, |
233 "type": "Person", | 288 "type": "Person", |
234 "preferredUsername": preferred_username, | 289 "preferredUsername": preferred_username, |
235 "inbox": inbox_url, | 290 "inbox": inbox, |
236 "outbox": outbox_url, | 291 "outbox": outbox, |
292 "followers": followers, | |
293 "following": following, | |
237 "publicKey": { | 294 "publicKey": { |
238 "id": f"{actor_url}#main-key", | 295 "id": f"{actor_url}#main-key", |
239 "owner": actor_url, | 296 "owner": actor_url, |
240 "publicKeyPem": self.apg.public_key_pem | 297 "publicKeyPem": self.apg.public_key_pem |
241 }, | 298 }, |
278 node: Optional[str], | 335 node: Optional[str], |
279 ap_account: str, | 336 ap_account: str, |
280 ap_url: str, | 337 ap_url: str, |
281 query_data: Dict[str, List[str]] | 338 query_data: Dict[str, List[str]] |
282 ) -> dict: | 339 ) -> dict: |
340 if node is None: | |
341 node = self.apg._m.namespace | |
283 # we only keep useful keys, and sort to have consistent URL which can | 342 # we only keep useful keys, and sort to have consistent URL which can |
284 # be used as ID | 343 # be used as ID |
285 url_keys = sorted(set(query_data) & {"page", "index", "before", "after"}) | 344 url_keys = sorted(set(query_data) & {"page", "index", "before", "after"}) |
286 query_data = {k: query_data[k] for k in url_keys} | 345 query_data = {k: query_data[k] for k in url_keys} |
287 try: | 346 try: |
441 else: | 500 else: |
442 await method( | 501 await method( |
443 request, data, account_jid, node, ap_account, ap_url, signing_actor | 502 request, data, account_jid, node, ap_account, ap_url, signing_actor |
444 ) | 503 ) |
445 | 504 |
505 async def APFollowersRequest( | |
506 self, | |
507 request: "HTTPRequest", | |
508 account_jid: jid.JID, | |
509 node: Optional[str], | |
510 ap_account: Optional[str], | |
511 ap_url: str, | |
512 signing_actor: Optional[str] | |
513 ) -> dict: | |
514 if node is None: | |
515 node = self.apg._m.namespace | |
516 client = self.apg.client | |
517 subscribers = await self.apg._pps.getPublicNodeSubscriptions( | |
518 client, account_jid, node | |
519 ) | |
520 followers = [] | |
521 for subscriber in subscribers.keys(): | |
522 if subscriber.host == self.apg.client.jid.userhost(): | |
523 # the subscriber is an AP user subscribed with this gateway | |
524 ap_account = self.apg._e.unescape(subscriber.user) | |
525 else: | |
526 # regular XMPP user | |
527 ap_account = await self.apg.getAPAccountFromJidAndNode(subscriber, node) | |
528 followers.append(ap_account) | |
529 | |
530 url = self.getCanonicalURL(request) | |
531 return { | |
532 "@context": "https://www.w3.org/ns/activitystreams", | |
533 "type": "OrderedCollection", | |
534 "id": url, | |
535 "totalItems": len(subscribers), | |
536 "first": { | |
537 "type": "OrderedCollectionPage", | |
538 "id": url, | |
539 "orderedItems": followers | |
540 } | |
541 } | |
542 | |
543 async def APFollowingRequest( | |
544 self, | |
545 request: "HTTPRequest", | |
546 account_jid: jid.JID, | |
547 node: Optional[str], | |
548 ap_account: Optional[str], | |
549 ap_url: str, | |
550 signing_actor: Optional[str] | |
551 ) -> dict[str, Any]: | |
552 client = self.apg.client | |
553 subscriptions = await self.apg._pps.subscriptions( | |
554 client, account_jid, node | |
555 ) | |
556 following = [] | |
557 for sub_dict in subscriptions: | |
558 service = jid.JID(sub_dict["service"]) | |
559 if service.host == self.apg.client.jid.userhost(): | |
560 # the subscription is to an AP actor with this gateway | |
561 ap_account = self.apg._e.unescape(service.user) | |
562 else: | |
563 # regular XMPP user | |
564 ap_account = await self.apg.getAPAccountFromJidAndNode( | |
565 service, sub_dict["node"] | |
566 ) | |
567 following.append(ap_account) | |
568 | |
569 url = self.getCanonicalURL(request) | |
570 return { | |
571 "@context": "https://www.w3.org/ns/activitystreams", | |
572 "type": "OrderedCollection", | |
573 "id": url, | |
574 "totalItems": len(subscriptions), | |
575 "first": { | |
576 "type": "OrderedCollectionPage", | |
577 "id": url, | |
578 "orderedItems": following | |
579 } | |
580 } | |
581 | |
446 async def APRequest( | 582 async def APRequest( |
447 self, | 583 self, |
448 request: "HTTPRequest", | 584 request: "HTTPRequest", |
449 signing_actor: Optional[str] = None | 585 signing_actor: Optional[str] = None |
450 ) -> None: | 586 ) -> None: |
488 f"invalid signature: {e}" | 624 f"invalid signature: {e}" |
489 ) | 625 ) |
490 request.finish() | 626 request.finish() |
491 return | 627 return |
492 | 628 |
493 return await self.APRequest(request, signing_actor) | 629 try: |
630 return await self.APRequest(request, signing_actor) | |
631 except Exception as e: | |
632 self._onRequestError(failure.Failure(e), request) | |
494 | 633 |
495 async def checkSigningActor(self, data: dict, signing_actor: str) -> None: | 634 async def checkSigningActor(self, data: dict, signing_actor: str) -> None: |
496 """That that signing actor correspond to actor declared in data | 635 """That that signing actor correspond to actor declared in data |
497 | 636 |
498 @param data: request payload | 637 @param data: request payload |
661 path = request.path.decode().lstrip("/") | 800 path = request.path.decode().lstrip("/") |
662 if path.startswith(".well-known/webfinger"): | 801 if path.startswith(".well-known/webfinger"): |
663 defer.ensureDeferred(self.webfinger(request)) | 802 defer.ensureDeferred(self.webfinger(request)) |
664 return server.NOT_DONE_YET | 803 return server.NOT_DONE_YET |
665 elif path.startswith(self.apg.ap_path): | 804 elif path.startswith(self.apg.ap_path): |
666 defer.ensureDeferred(self.APRequest(request)) | 805 d = defer.ensureDeferred(self.APRequest(request)) |
806 d.addErrback(self._onRequestError, request) | |
667 return server.NOT_DONE_YET | 807 return server.NOT_DONE_YET |
668 | 808 |
669 return web_resource.NoResource().render(request) | 809 return web_resource.NoResource().render(request) |
670 | 810 |
671 def render_POST(self, request): | 811 def render_POST(self, request): |