comparison sat/plugins/plugin_comp_ap_gateway/http_server.py @ 3764:125c7043b277

comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers: this patch implements those major features: - `publish` is implemented on virtual pubsub service, thus XMPP entities can now publish to AP using this service - replies to XMPP items are managed - `inReplyTo` is filled when converting XMPP items to AP objects - `follow` and `unfollow` (actually an `undo` activity) are implemented and mapped to XMPP's (un)subscribe. On subscription, AP actor's `outbox` collection is converted to XMPP and put in cache. Subscriptions are always public. - `following` and `followers` collections are mapped to XMPP's Public Pubsub Subscription (which should be XEP-0465, but the XEP is not yet published at the time of commit), in both directions. - new helper methods to check if an URL is local and to get JID from actor ID doc will follow to explain behaviour rel 365
author Goffi <goffi@goffi.org>
date Fri, 13 May 2022 19:12:33 +0200
parents a8c7e5cef0cb
children b5c9021020df
comparison
equal deleted inserted replaced
3763:b2ade5ecdbab 3764:125c7043b277
15 15
16 # You should have received a copy of the GNU Affero General Public License 16 # You should have received a copy of the GNU Affero General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. 17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 18
19 import time 19 import time
20 from typing import Optional, Dict, List 20 from typing import Optional, Dict, List, Any
21 import json 21 import json
22 from urllib import parse 22 from urllib import parse
23 from collections import deque 23 from collections import deque
24 import unicodedata 24 import unicodedata
25 from pprint import pformat 25 from pprint import pformat
26 26
27 from twisted.web import http, resource as web_resource, server 27 from twisted.web import http, resource as web_resource, server
28 from twisted.python import failure
28 from twisted.internet import reactor, defer 29 from twisted.internet import reactor, defer
29 from twisted.words.protocols.jabber import jid, error 30 from twisted.words.protocols.jabber import jid, error
30 from wokkel import pubsub, rsm 31 from wokkel import pubsub, rsm
31 32
32 from sat.core import exceptions 33 from sat.core import exceptions
33 from sat.core.constants import Const as C 34 from sat.core.constants import Const as C
34 from sat.core.i18n import _ 35 from sat.core.i18n import _
35 from sat.core.log import getLogger 36 from sat.core.log import getLogger
37 from sat.tools import utils
36 from sat.tools.common import date_utils 38 from sat.tools.common import date_utils
37 from sat.memory.sqla_mapping import SubscriptionState 39 from sat.memory.sqla_mapping import SubscriptionState
38 40
39 from .constants import ( 41 from .constants import (
40 CONTENT_TYPE_AP, TYPE_ACTOR, TYPE_INBOX, TYPE_SHARED_INBOX, TYPE_OUTBOX, 42 CONTENT_TYPE_AP, TYPE_ACTOR, TYPE_INBOX, TYPE_SHARED_INBOX, TYPE_OUTBOX,
41 AP_REQUEST_TYPES, PAGE_SIZE, ACTIVITY_TYPES_LOWER, ACTIVIY_NO_ACCOUNT_ALLOWED, 43 AP_REQUEST_TYPES, PAGE_SIZE, ACTIVITY_TYPES_LOWER, ACTIVIY_NO_ACCOUNT_ALLOWED,
42 SIGN_HEADERS, HS2019, SIGN_EXP 44 SIGN_HEADERS, HS2019, SIGN_EXP, TYPE_FOLLOWERS, TYPE_FOLLOWING
43 ) 45 )
44 from .regex import RE_SIG_PARAM 46 from .regex import RE_SIG_PARAM
45 47
46 48
47 log = getLogger(__name__) 49 log = getLogger(__name__)
68 msg: Optional[str] = None 70 msg: Optional[str] = None
69 ) -> None: 71 ) -> None:
70 """Log and set HTTP return code and associated message""" 72 """Log and set HTTP return code and associated message"""
71 log.warning(msg) 73 log.warning(msg)
72 request.setResponseCode(http_code, None if msg is None else msg.encode()) 74 request.setResponseCode(http_code, None if msg is None else msg.encode())
75
76 def _onRequestError(self, failure_: failure.Failure, request: "HTTPRequest") -> None:
77 log.error(f"Internal error: {failure_.value}")
78 self.responseCode(
79 request,
80 http.INTERNAL_SERVER_ERROR,
81 f"internal error: {failure_.value}"
82 )
83 request.finish()
73 84
74 async def webfinger(self, request): 85 async def webfinger(self, request):
75 url_parsed = parse.urlparse(request.uri.decode()) 86 url_parsed = parse.urlparse(request.uri.decode())
76 query = parse.parse_qs(url_parsed.query) 87 query = parse.parse_qs(url_parsed.query)
77 resource = query.get("resource", [""])[0] 88 resource = query.get("resource", [""])[0]
95 } 106 }
96 request.setHeader("content-type", CONTENT_TYPE_AP) 107 request.setHeader("content-type", CONTENT_TYPE_AP)
97 request.write(json.dumps(resp).encode()) 108 request.write(json.dumps(resp).encode())
98 request.finish() 109 request.finish()
99 110
100 async def handleFollowActivity( 111 async def handleUndoActivity(
101 self, 112 self,
102 request: "HTTPRequest", 113 request: "HTTPRequest",
103 data: dict, 114 data: dict,
104 account_jid: jid.JID, 115 account_jid: jid.JID,
105 node: Optional[str], 116 node: Optional[str],
108 signing_actor: str 119 signing_actor: str
109 ) -> None: 120 ) -> None:
110 if node is None: 121 if node is None:
111 node = self.apg._m.namespace 122 node = self.apg._m.namespace
112 client = await self.apg.getVirtualClient(signing_actor) 123 client = await self.apg.getVirtualClient(signing_actor)
124 objects = await self.apg.apGetList(data, "object")
125 for obj in objects:
126 type_ = obj.get("type")
127 actor = await self.apg.apGetSenderActor(obj)
128 if actor != signing_actor:
129 log.warning(f"ignoring object not attributed to signing actor: {data}")
130 continue
131 try:
132 target_account = obj["object"]
133 except KeyError:
134 log.warning(f'ignoring invalid object, missing "object" key: {data}')
135 continue
136 if not self.apg.isLocalURL(target_account):
137 log.warning(f"ignoring unfollow request to non local actor: {data}")
138 continue
139
140 if type_ == "Follow":
141 await self.apg._p.unsubscribe(
142 client,
143 account_jid,
144 node,
145 sender=client.jid,
146 )
147 else:
148 log.warning(f"Unmanaged undo type: {type_!r}")
149
150 async def handleFollowActivity(
151 self,
152 request: "HTTPRequest",
153 data: dict,
154 account_jid: jid.JID,
155 node: Optional[str],
156 ap_account: str,
157 ap_url: str,
158 signing_actor: str
159 ) -> None:
160 if node is None:
161 node = self.apg._m.namespace
162 client = await self.apg.getVirtualClient(signing_actor)
113 try: 163 try:
114 subscription = await self.apg._p.subscribe( 164 subscription = await self.apg._p.subscribe(
115 client, 165 client,
116 account_jid, 166 account_jid,
117 node 167 node,
168 # subscriptions from AP are always public
169 options=self.apg._pps.setPublicOpt()
118 ) 170 )
119 except pubsub.SubscriptionPending: 171 except pubsub.SubscriptionPending:
120 log.info(f"subscription to node {node!r} of {account_jid} is pending") 172 log.info(f"subscription to node {node!r} of {account_jid} is pending")
121 # TODO: manage SubscriptionUnconfigured 173 # TODO: manage SubscriptionUnconfigured
122 else: 174 else:
127 actor_id = self.apg.buildAPURL(TYPE_ACTOR, ap_account) 179 actor_id = self.apg.buildAPURL(TYPE_ACTOR, ap_account)
128 accept_data = self.apg.createActivity( 180 accept_data = self.apg.createActivity(
129 "Accept", actor_id, object_=data 181 "Accept", actor_id, object_=data
130 ) 182 )
131 await self.apg.signAndPost(inbox, actor_id, accept_data) 183 await self.apg.signAndPost(inbox, actor_id, accept_data)
184 await self.apg._c.synchronise(client, account_jid, node, resync=False)
132 185
133 async def handleAcceptActivity( 186 async def handleAcceptActivity(
134 self, 187 self,
135 request: "HTTPRequest", 188 request: "HTTPRequest",
136 data: dict, 189 data: dict,
214 node: Optional[str], 267 node: Optional[str],
215 ap_account: str, 268 ap_account: str,
216 actor_url: str, 269 actor_url: str,
217 signing_actor: Optional[str] 270 signing_actor: Optional[str]
218 ) -> dict: 271 ) -> dict:
219 inbox_url = self.apg.buildAPURL(TYPE_INBOX, ap_account) 272 inbox = self.apg.buildAPURL(TYPE_INBOX, ap_account)
220 shared_inbox = self.apg.buildAPURL(TYPE_SHARED_INBOX) 273 shared_inbox = self.apg.buildAPURL(TYPE_SHARED_INBOX)
221 outbox_url = self.apg.buildAPURL(TYPE_OUTBOX, ap_account) 274 outbox = self.apg.buildAPURL(TYPE_OUTBOX, ap_account)
275 followers = self.apg.buildAPURL(TYPE_FOLLOWERS, ap_account)
276 following = self.apg.buildAPURL(TYPE_FOLLOWING, ap_account)
222 277
223 # we have to use AP account as preferredUsername because it is used to retrieve 278 # we have to use AP account as preferredUsername because it is used to retrieve
224 # actor handle (see https://socialhub.activitypub.rocks/t/how-to-retrieve-user-server-tld-handle-from-actors-url/2196) 279 # actor handle (see https://socialhub.activitypub.rocks/t/how-to-retrieve-user-server-tld-handle-from-actors-url/2196)
225 preferred_username = ap_account.split("@", 1)[0] 280 preferred_username = ap_account.split("@", 1)[0]
226 return { 281 return {
230 ], 285 ],
231 286
232 "id": actor_url, 287 "id": actor_url,
233 "type": "Person", 288 "type": "Person",
234 "preferredUsername": preferred_username, 289 "preferredUsername": preferred_username,
235 "inbox": inbox_url, 290 "inbox": inbox,
236 "outbox": outbox_url, 291 "outbox": outbox,
292 "followers": followers,
293 "following": following,
237 "publicKey": { 294 "publicKey": {
238 "id": f"{actor_url}#main-key", 295 "id": f"{actor_url}#main-key",
239 "owner": actor_url, 296 "owner": actor_url,
240 "publicKeyPem": self.apg.public_key_pem 297 "publicKeyPem": self.apg.public_key_pem
241 }, 298 },
278 node: Optional[str], 335 node: Optional[str],
279 ap_account: str, 336 ap_account: str,
280 ap_url: str, 337 ap_url: str,
281 query_data: Dict[str, List[str]] 338 query_data: Dict[str, List[str]]
282 ) -> dict: 339 ) -> dict:
340 if node is None:
341 node = self.apg._m.namespace
283 # we only keep useful keys, and sort to have consistent URL which can 342 # we only keep useful keys, and sort to have consistent URL which can
284 # be used as ID 343 # be used as ID
285 url_keys = sorted(set(query_data) & {"page", "index", "before", "after"}) 344 url_keys = sorted(set(query_data) & {"page", "index", "before", "after"})
286 query_data = {k: query_data[k] for k in url_keys} 345 query_data = {k: query_data[k] for k in url_keys}
287 try: 346 try:
441 else: 500 else:
442 await method( 501 await method(
443 request, data, account_jid, node, ap_account, ap_url, signing_actor 502 request, data, account_jid, node, ap_account, ap_url, signing_actor
444 ) 503 )
445 504
505 async def APFollowersRequest(
506 self,
507 request: "HTTPRequest",
508 account_jid: jid.JID,
509 node: Optional[str],
510 ap_account: Optional[str],
511 ap_url: str,
512 signing_actor: Optional[str]
513 ) -> dict:
514 if node is None:
515 node = self.apg._m.namespace
516 client = self.apg.client
517 subscribers = await self.apg._pps.getPublicNodeSubscriptions(
518 client, account_jid, node
519 )
520 followers = []
521 for subscriber in subscribers.keys():
522 if subscriber.host == self.apg.client.jid.userhost():
523 # the subscriber is an AP user subscribed with this gateway
524 ap_account = self.apg._e.unescape(subscriber.user)
525 else:
526 # regular XMPP user
527 ap_account = await self.apg.getAPAccountFromJidAndNode(subscriber, node)
528 followers.append(ap_account)
529
530 url = self.getCanonicalURL(request)
531 return {
532 "@context": "https://www.w3.org/ns/activitystreams",
533 "type": "OrderedCollection",
534 "id": url,
535 "totalItems": len(subscribers),
536 "first": {
537 "type": "OrderedCollectionPage",
538 "id": url,
539 "orderedItems": followers
540 }
541 }
542
543 async def APFollowingRequest(
544 self,
545 request: "HTTPRequest",
546 account_jid: jid.JID,
547 node: Optional[str],
548 ap_account: Optional[str],
549 ap_url: str,
550 signing_actor: Optional[str]
551 ) -> dict[str, Any]:
552 client = self.apg.client
553 subscriptions = await self.apg._pps.subscriptions(
554 client, account_jid, node
555 )
556 following = []
557 for sub_dict in subscriptions:
558 service = jid.JID(sub_dict["service"])
559 if service.host == self.apg.client.jid.userhost():
560 # the subscription is to an AP actor with this gateway
561 ap_account = self.apg._e.unescape(service.user)
562 else:
563 # regular XMPP user
564 ap_account = await self.apg.getAPAccountFromJidAndNode(
565 service, sub_dict["node"]
566 )
567 following.append(ap_account)
568
569 url = self.getCanonicalURL(request)
570 return {
571 "@context": "https://www.w3.org/ns/activitystreams",
572 "type": "OrderedCollection",
573 "id": url,
574 "totalItems": len(subscriptions),
575 "first": {
576 "type": "OrderedCollectionPage",
577 "id": url,
578 "orderedItems": following
579 }
580 }
581
446 async def APRequest( 582 async def APRequest(
447 self, 583 self,
448 request: "HTTPRequest", 584 request: "HTTPRequest",
449 signing_actor: Optional[str] = None 585 signing_actor: Optional[str] = None
450 ) -> None: 586 ) -> None:
488 f"invalid signature: {e}" 624 f"invalid signature: {e}"
489 ) 625 )
490 request.finish() 626 request.finish()
491 return 627 return
492 628
493 return await self.APRequest(request, signing_actor) 629 try:
630 return await self.APRequest(request, signing_actor)
631 except Exception as e:
632 self._onRequestError(failure.Failure(e), request)
494 633
495 async def checkSigningActor(self, data: dict, signing_actor: str) -> None: 634 async def checkSigningActor(self, data: dict, signing_actor: str) -> None:
496 """That that signing actor correspond to actor declared in data 635 """That that signing actor correspond to actor declared in data
497 636
498 @param data: request payload 637 @param data: request payload
661 path = request.path.decode().lstrip("/") 800 path = request.path.decode().lstrip("/")
662 if path.startswith(".well-known/webfinger"): 801 if path.startswith(".well-known/webfinger"):
663 defer.ensureDeferred(self.webfinger(request)) 802 defer.ensureDeferred(self.webfinger(request))
664 return server.NOT_DONE_YET 803 return server.NOT_DONE_YET
665 elif path.startswith(self.apg.ap_path): 804 elif path.startswith(self.apg.ap_path):
666 defer.ensureDeferred(self.APRequest(request)) 805 d = defer.ensureDeferred(self.APRequest(request))
806 d.addErrback(self._onRequestError, request)
667 return server.NOT_DONE_YET 807 return server.NOT_DONE_YET
668 808
669 return web_resource.NoResource().render(request) 809 return web_resource.NoResource().render(request)
670 810
671 def render_POST(self, request): 811 def render_POST(self, request):