Mercurial > libervia-backend
comparison sat/plugins/plugin_comp_ap_gateway.py @ 3728:b15644cae50d
component AP gateway: JID/node ⟺ AP outbox conversion:
- convert a combination of JID and optional pubsub node to AP actor handle (see
`getJIDAndNode` for details) and vice versa
- the gateway now provides a Pubsub service
- retrieve pubsub node and convert it to AP collection, AP pagination is converted to RSM
- do the opposite: convert AP collection to pubsub and handle RSM request. Due to
ActivityStream collection pagination limitations, some RSM request produce inefficient
requests, but caching should be used most of the time in the future and avoid the
problem.
- set specific name to HTTP Server
- new `local_only` setting (`True` by default) to indicate if the gateway can request or
not XMPP Pubsub nodes from other servers
- disco info now specifies important features such as Pubsub RSM, and nodes metadata
ticket 363
author | Goffi <goffi@goffi.org> |
---|---|
date | Tue, 25 Jan 2022 17:54:06 +0100 |
parents | 8353cc3b8db9 |
children |
comparison
equal
deleted
inserted
replaced
3727:a6dfd3db372b | 3728:b15644cae50d |
---|---|
14 # GNU Affero General Public License for more details. | 14 # GNU Affero General Public License for more details. |
15 | 15 |
16 # You should have received a copy of the GNU Affero General Public License | 16 # You should have received a copy of the GNU Affero General Public License |
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. | 17 # along with this program. If not, see <http://www.gnu.org/licenses/>. |
18 | 18 |
19 import time | |
20 import json | |
21 import base64 | 19 import base64 |
22 import hashlib | 20 import hashlib |
21 import json | |
22 from pathlib import Path | |
23 import time | |
24 from typing import Optional, Dict, Tuple, List, Union | |
23 from urllib import parse | 25 from urllib import parse |
24 from typing import Tuple | 26 import calendar |
25 from pathlib import Path | 27 import re |
26 import shortuuid | 28 import unicodedata |
27 from cryptography.hazmat.primitives.asymmetric import rsa | 29 |
30 import dateutil | |
28 from cryptography.hazmat.primitives import serialization | 31 from cryptography.hazmat.primitives import serialization |
29 from cryptography.hazmat.primitives import hashes | 32 from cryptography.hazmat.primitives import hashes |
33 from cryptography.hazmat.primitives.asymmetric import rsa | |
30 from cryptography.hazmat.primitives.asymmetric import padding | 34 from cryptography.hazmat.primitives.asymmetric import padding |
31 from twisted.internet import reactor, threads, defer | 35 import shortuuid |
32 from twisted.web import server, resource as web_resource, http | |
33 from twisted.words.protocols.jabber import jid | |
34 import treq | 36 import treq |
35 from treq.response import _Response as TReqResponse | 37 from treq.response import _Response as TReqResponse |
38 from twisted.internet import defer, reactor, threads | |
39 from twisted.web import http, resource as web_resource, server | |
40 from twisted.words.protocols.jabber import jid, error | |
41 from twisted.words.xish import domish | |
42 from wokkel import pubsub, rsm | |
43 | |
44 from sat.core import exceptions | |
45 from sat.core.constants import Const as C | |
46 from sat.core.core_types import SatXMPPEntity | |
36 from sat.core.i18n import _ | 47 from sat.core.i18n import _ |
37 from sat.core.constants import Const as C | |
38 from sat.core import exceptions | |
39 from sat.core.log import getLogger | 48 from sat.core.log import getLogger |
40 from sat.core.core_types import SatXMPPEntity | |
41 from sat.tools.common import tls, data_format | |
42 from sat.tools import utils | 49 from sat.tools import utils |
50 from sat.tools.common import data_format, tls | |
51 from sat.tools.common.async_utils import async_lru | |
52 from sat.tools.utils import ensure_deferred | |
43 | 53 |
44 | 54 |
45 log = getLogger(__name__) | 55 log = getLogger(__name__) |
46 | 56 |
47 IMPORT_NAME = "ap-gateway" | 57 IMPORT_NAME = "ap-gateway" |
50 C.PI_NAME: "ActivityPub Gateway component", | 60 C.PI_NAME: "ActivityPub Gateway component", |
51 C.PI_IMPORT_NAME: IMPORT_NAME, | 61 C.PI_IMPORT_NAME: IMPORT_NAME, |
52 C.PI_MODES: [C.PLUG_MODE_COMPONENT], | 62 C.PI_MODES: [C.PLUG_MODE_COMPONENT], |
53 C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT, | 63 C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT, |
54 C.PI_PROTOCOLS: [], | 64 C.PI_PROTOCOLS: [], |
55 C.PI_DEPENDENCIES: ["XEP-0106"], | 65 C.PI_DEPENDENCIES: ["XEP-0106", "XEP-0277", "XEP-0060"], |
56 C.PI_RECOMMENDATIONS: [], | 66 C.PI_RECOMMENDATIONS: [], |
57 C.PI_MAIN: "APGateway", | 67 C.PI_MAIN: "APGateway", |
58 C.PI_HANDLER: C.BOOL_FALSE, | 68 C.PI_HANDLER: C.BOOL_TRUE, |
59 C.PI_DESCRIPTION: _( | 69 C.PI_DESCRIPTION: _( |
60 "Gateway for bidirectional communication between XMPP and ActivityPub." | 70 "Gateway for bidirectional communication between XMPP and ActivityPub." |
61 ), | 71 ), |
62 } | 72 } |
63 | 73 |
74 VERSION = unicodedata.normalize( | |
75 'NFKD', | |
76 f"{C.APP_NAME} ActivityPub Gateway {C.APP_VERSION}" | |
77 ) | |
78 HEXA_ENC = r"(?P<hex>[0-9a-fA-f]{2})" | |
79 RE_PERIOD_ENC = re.compile(f"\\.{HEXA_ENC}") | |
80 RE_PERCENT_ENC = re.compile(f"%{HEXA_ENC}") | |
81 RE_ALLOWED_UNQUOTED = re.compile(r"^[a-zA-Z0-9_-]+$") | |
64 CONF_SECTION = f"component {IMPORT_NAME}" | 82 CONF_SECTION = f"component {IMPORT_NAME}" |
65 CONTENT_TYPE_AP = "application/activity+json; charset=utf-8" | 83 CONTENT_TYPE_AP = "application/activity+json; charset=utf-8" |
66 TYPE_ACTOR = "actor" | 84 TYPE_ACTOR = "actor" |
67 TYPE_INBOX = "inbox" | 85 TYPE_INBOX = "inbox" |
86 TYPE_OUTBOX = "outbox" | |
68 TYPE_ITEM = "item" | 87 TYPE_ITEM = "item" |
69 MEDIA_TYPE_AP = "application/activity+json" | 88 MEDIA_TYPE_AP = "application/activity+json" |
89 # mapping from AP metadata to microblog data | |
90 AP_MB_MAP = { | |
91 "content": "content_xhtml", | |
92 | |
93 } | |
94 AP_REQUEST_TYPES = {"actor", "outbox"} | |
95 PAGE_SIZE = 10 | |
96 | |
97 LRU_MAX_SIZE = 200 | |
70 | 98 |
71 | 99 |
72 class HTTPAPGServer(web_resource.Resource): | 100 class HTTPAPGServer(web_resource.Resource): |
73 """HTTP Server handling ActivityPub S2S protocol""" | 101 """HTTP Server handling ActivityPub S2S protocol""" |
74 isLeaf = True | 102 isLeaf = True |
75 | 103 |
76 def __init__(self, ap_gateway): | 104 def __init__(self, ap_gateway): |
77 self.apg = ap_gateway | 105 self.apg = ap_gateway |
78 super().__init__() | 106 super().__init__() |
79 | 107 |
80 def webfinger(self, request): | 108 async def webfinger(self, request): |
81 url_parsed = parse.urlparse(request.uri.decode()) | 109 url_parsed = parse.urlparse(request.uri.decode()) |
82 query = parse.parse_qs(url_parsed.query) | 110 query = parse.parse_qs(url_parsed.query) |
83 resource = query.get("resource", [""])[0] | 111 resource = query.get("resource", [""])[0] |
84 account = resource[5:].strip() | 112 account = resource[5:].strip() |
85 log.info(f"request pour {account}") | |
86 if not resource.startswith("acct:") or not account: | 113 if not resource.startswith("acct:") or not account: |
87 return web_resource.ErrorPage( | 114 return web_resource.ErrorPage( |
88 http.BAD_REQUEST, "Bad Request" , "Invalid webfinger resource" | 115 http.BAD_REQUEST, "Bad Request" , "Invalid webfinger resource" |
89 ).render(request) | 116 ).render(request) |
90 | 117 |
99 "href": actor_url | 126 "href": actor_url |
100 } | 127 } |
101 ] | 128 ] |
102 } | 129 } |
103 request.setHeader("content-type", CONTENT_TYPE_AP) | 130 request.setHeader("content-type", CONTENT_TYPE_AP) |
104 return json.dumps(resp).encode() | 131 request.write(json.dumps(resp).encode()) |
105 | 132 request.finish() |
106 def APRequest(self, request): | 133 |
107 path = request.path.decode() | 134 async def APActorRequest( |
108 actor_url = parse.urljoin( | 135 self, |
109 f"https://{self.apg.public_url}", | 136 request: "HTTPRequest", |
110 path | 137 account_jid: jid.JID, |
111 ) | 138 node: Optional[str], |
112 __, account = self.apg.parseAPURL(actor_url) | 139 ap_account: str, |
113 inbox_url = self.apg.buildAPURL(TYPE_INBOX, account) | 140 actor_url: str |
114 username = account.split("@", 1)[0] | 141 ) -> dict: |
115 actor = { | 142 inbox_url = self.apg.buildAPURL(TYPE_INBOX, ap_account) |
143 outbox_url = self.apg.buildAPURL(TYPE_OUTBOX, ap_account) | |
144 | |
145 # we have to use AP account as preferredUsername because it is used to retrieve | |
146 # actor handle (see https://socialhub.activitypub.rocks/t/how-to-retrieve-user-server-tld-handle-from-actors-url/2196) | |
147 preferred_username = ap_account.split("@", 1)[0] | |
148 return { | |
116 "@context": [ | 149 "@context": [ |
117 "https://www.w3.org/ns/activitystreams", | 150 "https://www.w3.org/ns/activitystreams", |
118 "https://w3id.org/security/v1" | 151 "https://w3id.org/security/v1" |
119 ], | 152 ], |
120 | 153 |
121 "id": actor_url, | 154 "id": actor_url, |
122 "type": "Person", | 155 "type": "Person", |
123 "preferredUsername": username, | 156 "preferredUsername": preferred_username, |
124 "inbox": inbox_url, | 157 "inbox": inbox_url, |
125 | 158 "outbox": outbox_url, |
126 "publicKey": { | 159 "publicKey": { |
127 "id": f"{actor_url}#main-key", | 160 "id": f"{actor_url}#main-key", |
128 "owner": actor_url, | 161 "owner": actor_url, |
129 "publicKeyPem": self.apg.public_key_pem | 162 "publicKeyPem": self.apg.public_key_pem |
130 } | 163 } |
131 } | 164 } |
165 | |
166 def getCanonicalURL(self, request: "HTTPRequest") -> str: | |
167 return parse.urljoin( | |
168 f"https://{self.apg.public_url}", | |
169 request.path.decode().rstrip("/") | |
170 ) | |
171 | |
172 def queryData2RSMRequest( | |
173 self, | |
174 query_data: Dict[str, List[str]] | |
175 ) -> rsm.RSMRequest: | |
176 """Get RSM kwargs to use with RSMRequest from query data""" | |
177 page = query_data.get("page") | |
178 | |
179 if page == ["first"]: | |
180 return rsm.RSMRequest(max_=PAGE_SIZE, before="") | |
181 elif page == ["last"]: | |
182 return rsm.RSMRequest(max_=PAGE_SIZE) | |
183 else: | |
184 for query_key in ("index", "before", "after"): | |
185 try: | |
186 kwargs={query_key: query_data[query_key][0], "max_": PAGE_SIZE} | |
187 except (KeyError, IndexError, ValueError): | |
188 pass | |
189 else: | |
190 return rsm.RSMRequest(**kwargs) | |
191 raise ValueError(f"Invalid query data: {query_data!r}") | |
192 | |
193 async def APOutboxPageRequest( | |
194 self, | |
195 request: "HTTPRequest", | |
196 account_jid: jid.JID, | |
197 node: Optional[str], | |
198 ap_account: str, | |
199 actor_url: str, | |
200 query_data: Dict[str, List[str]] | |
201 ) -> dict: | |
202 # we only keep useful keys, and sort to have consistent URL which can | |
203 # be used as ID | |
204 url_keys = sorted(set(query_data) & {"page", "index", "before", "after"}) | |
205 query_data = {k: query_data[k] for k in url_keys} | |
206 rsm_kwargs = self.queryData2RSMRequest(query_data) | |
207 try: | |
208 items, metadata = await self.apg._p.getItems( | |
209 client=self.apg.client, | |
210 service=account_jid, | |
211 node=node, | |
212 rsm_request=self.queryData2RSMRequest(query_data), | |
213 extra = {C.KEY_USE_CACHE: False} | |
214 ) | |
215 except error.StanzaError as e: | |
216 log.warning(f"Can't get data from pubsub node {node} at {account_jid}: {e}") | |
217 return {} | |
218 | |
219 base_url = self.getCanonicalURL(request) | |
220 url = f"{base_url}?{parse.urlencode(query_data, True)}" | |
221 data = { | |
222 "@context": "https://www.w3.org/ns/activitystreams", | |
223 "id": url, | |
224 "type": "OrderedCollectionPage", | |
225 "partOf": base_url, | |
226 "orderedItems" : [ | |
227 await self.apg.mbdata2APitem( | |
228 self.apg.client, | |
229 await self.apg._m.item2mbdata( | |
230 self.apg.client, | |
231 item, | |
232 account_jid, | |
233 node | |
234 ) | |
235 ) | |
236 for item in reversed(items) | |
237 ] | |
238 } | |
239 | |
240 # AP OrderedCollection must be in reversed chronological order, thus the opposite | |
241 # of what we get with RSM (at least with Libervia Pubsub) | |
242 if not metadata["complete"]: | |
243 try: | |
244 last= metadata["rsm"]["last"] | |
245 except KeyError: | |
246 last = None | |
247 data["prev"] = f"{base_url}?{parse.urlencode({'after': last})}" | |
248 if metadata["rsm"]["index"] != 0: | |
249 try: | |
250 first= metadata["rsm"]["first"] | |
251 except KeyError: | |
252 first = None | |
253 data["next"] = f"{base_url}?{parse.urlencode({'before': first})}" | |
254 | |
255 return data | |
256 | |
257 async def APOutboxRequest( | |
258 self, | |
259 request: "HTTPRequest", | |
260 account_jid: jid.JID, | |
261 node: Optional[str], | |
262 ap_account: str, | |
263 actor_url: str | |
264 ) -> dict: | |
265 if node is None: | |
266 node = self.apg._m.namespace | |
267 | |
268 parsed_url = parse.urlparse(request.uri.decode()) | |
269 query_data = parse.parse_qs(parsed_url.query) | |
270 if query_data: | |
271 return await self.APOutboxPageRequest( | |
272 request, account_jid, node, ap_account, actor_url, query_data | |
273 ) | |
274 | |
275 # XXX: we can't use disco#info here because this request won't work on a bare jid | |
276 # due to security considerations of XEP-0030 (we don't have presence | |
277 # subscription). | |
278 # The current workaround is to do a request as if RSM was available, and actually | |
279 # check its availability according to result. | |
280 try: | |
281 __, metadata = await self.apg._p.getItems( | |
282 client=self.apg.client, | |
283 service=account_jid, | |
284 node=node, | |
285 max_items=0, | |
286 rsm_request=rsm.RSMRequest(max_=0) | |
287 ) | |
288 except error.StanzaError as e: | |
289 log.warning(f"Can't get data from pubsub node {node} at {account_jid}: {e}") | |
290 return {} | |
291 try: | |
292 items_count = metadata["rsm"]["count"] | |
293 except KeyError: | |
294 log.warning( | |
295 f"No RSM metadata found when requesting pubsub node {node} at " | |
296 f"{account_jid}, defaulting to items_count=20" | |
297 ) | |
298 items_count = 20 | |
299 | |
300 url = self.getCanonicalURL(request) | |
301 url_first_page = f"{url}?{parse.urlencode({'page': 'first'})}" | |
302 url_last_page = f"{url}?{parse.urlencode({'page': 'last'})}" | |
303 return { | |
304 "@context": "https://www.w3.org/ns/activitystreams", | |
305 "id": url, | |
306 "totalItems": items_count, | |
307 "type": "OrderedCollection", | |
308 "first": url_first_page, | |
309 "last": url_last_page, | |
310 } | |
311 | |
312 async def APRequest(self, request): | |
313 path = request.path.decode() | |
314 actor_url = parse.urljoin( | |
315 f"https://{self.apg.public_url}", | |
316 path | |
317 ) | |
318 request_type, ap_account = self.apg.parseAPURL(actor_url) | |
319 account_jid, node = await self.apg.getJIDAndNode(ap_account) | |
320 if request_type not in AP_REQUEST_TYPES: | |
321 raise exceptions.DataError(f"Invalid request type: {request_type!r}") | |
322 method = getattr(self, f"AP{request_type.title()}Request") | |
323 ret_data = await method(request, account_jid, node, ap_account, actor_url) | |
132 request.setHeader("content-type", CONTENT_TYPE_AP) | 324 request.setHeader("content-type", CONTENT_TYPE_AP) |
133 return json.dumps(actor).encode() | 325 request.write(json.dumps(ret_data).encode()) |
326 request.finish() | |
327 | |
328 def render(self, request): | |
329 request.setHeader("server", VERSION) | |
330 return super().render(request) | |
134 | 331 |
135 def render_GET(self, request): | 332 def render_GET(self, request): |
136 path = request.path.decode().lstrip("/") | 333 path = request.path.decode().lstrip("/") |
137 if path.startswith(".well-known/webfinger"): | 334 if path.startswith(".well-known/webfinger"): |
138 return self.webfinger(request) | 335 defer.ensureDeferred(self.webfinger(request)) |
336 return server.NOT_DONE_YET | |
139 elif path.startswith(self.apg.ap_path): | 337 elif path.startswith(self.apg.ap_path): |
140 return self.APRequest(request) | 338 defer.ensureDeferred(self.APRequest(request)) |
339 return server.NOT_DONE_YET | |
340 | |
141 return web_resource.NoResource().render(request) | 341 return web_resource.NoResource().render(request) |
142 | 342 |
143 | 343 |
144 class HTTPRequest(server.Request): | 344 class HTTPRequest(server.Request): |
145 pass | 345 pass |
155 class APGateway: | 355 class APGateway: |
156 | 356 |
157 def __init__(self, host): | 357 def __init__(self, host): |
158 self.host = host | 358 self.host = host |
159 self.initialised = False | 359 self.initialised = False |
360 self._m = host.plugins["XEP-0277"] | |
361 self._p = host.plugins["XEP-0060"] | |
160 | 362 |
161 host.bridge.addMethod( | 363 host.bridge.addMethod( |
162 "APSend", | 364 "APSend", |
163 ".plugin", | 365 ".plugin", |
164 in_sign="sss", | 366 in_sign="sss", |
165 out_sign="", | 367 out_sign="", |
166 method=self._publishMessage, | 368 method=self._publishMessage, |
167 async_=True, | 369 async_=True, |
168 ) | 370 ) |
169 | 371 |
372 def getHandler(self, __): | |
373 return APPubsubService(self) | |
374 | |
170 async def init(self, client): | 375 async def init(self, client): |
171 if self.initialised: | 376 if self.initialised: |
172 return | 377 return |
173 | 378 |
174 self.initialised = True | 379 self.initialised = True |
202 self.public_key_pem = self.public_key.public_bytes( | 407 self.public_key_pem = self.public_key.public_bytes( |
203 encoding=serialization.Encoding.PEM, | 408 encoding=serialization.Encoding.PEM, |
204 format=serialization.PublicFormat.SubjectPublicKeyInfo | 409 format=serialization.PublicFormat.SubjectPublicKeyInfo |
205 ).decode() | 410 ).decode() |
206 | 411 |
207 # params (URL and port) | 412 # params |
413 # URL and port | |
208 self.public_url = self.host.memory.getConfig( | 414 self.public_url = self.host.memory.getConfig( |
209 CONF_SECTION, "public_url" | 415 CONF_SECTION, "public_url" |
210 ) or self.host.memory.getConfig( | 416 ) or self.host.memory.getConfig( |
211 CONF_SECTION, "xmpp_domain" | 417 CONF_SECTION, "xmpp_domain" |
212 ) | 418 ) |
230 if connection_type not in ('http', 'https'): | 436 if connection_type not in ('http', 'https'): |
231 raise exceptions.ConfigError( | 437 raise exceptions.ConfigError( |
232 'bad ap-gateay http_connection_type, you must use one of "http" or ' | 438 'bad ap-gateay http_connection_type, you must use one of "http" or ' |
233 '"https"' | 439 '"https"' |
234 ) | 440 ) |
441 self.max_items = self.host.memory.getConfig( | |
442 CONF_SECTION, 'new_node_max_items', 50 | |
443 | |
444 ) | |
235 self.ap_path = self.host.memory.getConfig(CONF_SECTION, 'ap_path', '_ap') | 445 self.ap_path = self.host.memory.getConfig(CONF_SECTION, 'ap_path', '_ap') |
236 self.base_ap_url = parse.urljoin(f"https://{self.public_url}", f"{self.ap_path}/") | 446 self.base_ap_url = parse.urljoin(f"https://{self.public_url}", f"{self.ap_path}/") |
447 # True (default) if we provide gateway only to entities/services from our server | |
448 self.local_only = C.bool( | |
449 self.host.memory.getConfig(CONF_SECTION, 'local_only', C.BOOL_TRUE) | |
450 ) | |
237 | 451 |
238 # HTTP server launch | 452 # HTTP server launch |
239 self.server = HTTPServer(self) | 453 self.server = HTTPServer(self) |
240 if connection_type == 'http': | 454 if connection_type == 'http': |
241 reactor.listenTCP(self.http_port, self.server) | 455 reactor.listenTCP(self.http_port, self.server) |
245 tls.TLSOptionsCheck(options) | 459 tls.TLSOptionsCheck(options) |
246 context_factory = tls.getTLSContextFactory(options) | 460 context_factory = tls.getTLSContextFactory(options) |
247 reactor.listenSSL(self.http_port, self.server, context_factory) | 461 reactor.listenSSL(self.http_port, self.server, context_factory) |
248 | 462 |
249 async def profileConnecting(self, client): | 463 async def profileConnecting(self, client): |
464 self.client = client | |
250 await self.init(client) | 465 await self.init(client) |
466 | |
467 async def apGet(self, url: str) -> dict: | |
468 """Retrieve AP JSON from given URL | |
469 | |
470 @raise error.StanzaError: "service-unavailable" is sent when something went wrong | |
471 with AP server | |
472 """ | |
473 try: | |
474 return await treq.json_content(await treq.get( | |
475 url, | |
476 headers = { | |
477 "Accept": [MEDIA_TYPE_AP], | |
478 "Content-Type": [MEDIA_TYPE_AP], | |
479 } | |
480 )) | |
481 except Exception as e: | |
482 raise error.StanzaError( | |
483 "service-unavailable", | |
484 text=f"Can't get AP data at {url}: {e}" | |
485 ) | |
486 | |
487 def mustEncode(self, text: str) -> bool: | |
488 """Indicate if a text must be period encoded""" | |
489 return ( | |
490 not RE_ALLOWED_UNQUOTED.match(text) | |
491 or text.startswith("___") | |
492 or "---" in text | |
493 ) | |
494 | |
495 def periodEncode(self, text: str) -> str: | |
496 """Period encode a text | |
497 | |
498 see [getJIDAndNode] for reasons of period encoding | |
499 """ | |
500 return ( | |
501 parse.quote(text, safe="") | |
502 .replace("---", "%2d%2d%2d") | |
503 .replace("___", "%5f%5f%5f") | |
504 .replace(".", "%2e") | |
505 .replace("~", "%7e") | |
506 .replace("%", ".") | |
507 ) | |
508 | |
509 async def getAPAccountFromJidAndNode( | |
510 self, | |
511 jid_: jid.JID, | |
512 node: Optional[str] | |
513 ) -> str: | |
514 """Construct AP account from JID and node | |
515 | |
516 The account construction will use escaping when necessary | |
517 """ | |
518 if not node or node == self._m.namespace: | |
519 node = None | |
520 | |
521 if node and not jid_.user and not self.mustEncode(node): | |
522 is_pubsub = self.isPubsub(jid_) | |
523 # when we have a pubsub service, the user part can be used to set the node | |
524 # this produces more user-friendly AP accounts | |
525 if is_pubsub: | |
526 jid_.user = node | |
527 node = None | |
528 | |
529 is_local = self.isLocal(jid_) | |
530 user = jid_.user if is_local else jid_.userhost() | |
531 if user is None: | |
532 user = "" | |
533 account_elts = [] | |
534 if node and self.mustEncode(node) or self.mustEncode(user): | |
535 account_elts = ["___"] | |
536 if node: | |
537 node = self.periodEncode(node) | |
538 user = self.periodEncode(user) | |
539 | |
540 if not user: | |
541 raise exceptions.InternalError("there should be a user part") | |
542 | |
543 if node: | |
544 account_elts.extend((node, "---")) | |
545 | |
546 account_elts.extend(( | |
547 user, "@", jid_.host if is_local else self.client.jid.userhost() | |
548 )) | |
549 return "".join(account_elts) | |
550 | |
551 def isLocal(self, jid_: jid.JID) -> bool: | |
552 """Returns True if jid_ use a domain or subdomain of gateway's host""" | |
553 local_host = self.client.host.split(".") | |
554 assert local_host | |
555 return jid_.host.split(".")[-len(local_host):] == local_host | |
556 | |
557 async def isPubsub(self, jid_: jid.JID) -> bool: | |
558 """Indicate if a JID is a Pubsub service""" | |
559 host_disco = await self.host.getDiscoInfos(self.client, jid_) | |
560 return ( | |
561 ("pubsub", "service") in host_disco.identities | |
562 and not ("pubsub", "pep") in host_disco.identities | |
563 ) | |
564 | |
565 async def getJIDAndNode(self, ap_account: str) -> Tuple[jid.JID, Optional[str]]: | |
566 """Decode raw AP account handle to get XMPP JID and Pubsub Node | |
567 | |
568 Username are case insensitive. | |
569 | |
570 By default, the username correspond to local username (i.e. username from | |
571 component's server). | |
572 | |
573 If local name's domain is a pubsub service (and not PEP), the username is taken as | |
574 a pubsub node. | |
575 | |
576 If ``---`` is present in username, the part before is used as pubsub node, and the | |
577 rest as a JID user part. | |
578 | |
579 If username starts with ``___``, characters are encoded using period encoding | |
580 (i.e. percent encoding where a ``.`` is used instead of ``%``). | |
581 | |
582 This horror is necessary due to limitation in some AP implementation (notably | |
583 Mastodon), cf. https://github.com/mastodon/mastodon/issues/17222 | |
584 | |
585 examples:: | |
586 | |
587 ``toto@example.org`` => JID = toto@example.org, node = None | |
588 | |
589 ``___toto.40example.net@example.org`` => JID = toto@example.net (this one is a non-local JID, and will work only if setings ``local_only`` is False), node = None | |
590 | |
591 ``toto@pubsub.example.org`` (with pubsub.example.org being a pubsub service) => | |
592 JID = pubsub.example.org, node = toto | |
593 | |
594 ``tata---toto@example.org`` => JID = toto@example.org, node = tata | |
595 | |
596 ``___urn.3axmpp.3a.microblog.3a0@pubsub.example.org`` (with pubsub.example.org | |
597 being a pubsub service) ==> JID = pubsub.example.org, node = urn:xmpp:microblog:0 | |
598 | |
599 @param ap_account: ActivityPub account handle (``username@domain.tld``) | |
600 @return: service JID and pubsub node | |
601 if pubsub is None, default microblog pubsub node (and possibly other nodes | |
602 that plugins may hanlde) will be used | |
603 @raise ValueError: invalid account | |
604 @raise PermissionError: non local jid is used when gateway doesn't allow them | |
605 """ | |
606 if ap_account.count("@") != 1: | |
607 raise ValueError("Invalid AP account") | |
608 if ap_account.startswith("___"): | |
609 encoded = True | |
610 ap_account = ap_account[3:] | |
611 else: | |
612 encoded = False | |
613 | |
614 username, domain = ap_account.split("@") | |
615 | |
616 if "---" in username: | |
617 node, username = username.rsplit("---", 1) | |
618 else: | |
619 node = None | |
620 | |
621 if encoded: | |
622 username = parse.unquote( | |
623 RE_PERIOD_ENC.sub(r"%\g<hex>", username), | |
624 errors="strict" | |
625 ) | |
626 if node: | |
627 node = parse.unquote( | |
628 RE_PERIOD_ENC.sub(r"%\g<hex>", node), | |
629 errors="strict" | |
630 ) | |
631 | |
632 if "@" in username: | |
633 username, domain = username.rsplit("@", 1) | |
634 | |
635 if not node: | |
636 # we need to check host disco, because disco request to user may be | |
637 # blocked for privacy reason (see | |
638 # https://xmpp.org/extensions/xep-0030.html#security) | |
639 is_pubsub = await self.isPubsub(jid.JID(domain)) | |
640 | |
641 if is_pubsub: | |
642 # if the host is a pubsub service and not a PEP, we consider that username | |
643 # is in fact the node name | |
644 node = username | |
645 username = None | |
646 | |
647 jid_s = f"{username}@{domain}" if username else domain | |
648 try: | |
649 jid_ = jid.JID(jid_s) | |
650 except RuntimeError: | |
651 raise ValueError(f"Invalid jid: {jid_s!r}") | |
652 | |
653 if self.local_only and not self.isLocal(jid_): | |
654 raise exceptions.PermissionError( | |
655 "This gateway is configured to map only local entities and services" | |
656 ) | |
657 | |
658 return jid_, node | |
251 | 659 |
252 def parseAPURL(self, url: str) -> Tuple[str, str]: | 660 def parseAPURL(self, url: str) -> Tuple[str, str]: |
253 """Parse an URL leading to an AP endpoint | 661 """Parse an URL leading to an AP endpoint |
254 | 662 |
255 @param url: URL to parse (schema is not mandatory) | 663 @param url: URL to parse (schema is not mandatory) |
311 mess_data: dict = data_format.deserialise(mess_data_s) # type: ignore | 719 mess_data: dict = data_format.deserialise(mess_data_s) # type: ignore |
312 service = jid.JID(service_s) | 720 service = jid.JID(service_s) |
313 client = self.host.getClient(profile) | 721 client = self.host.getClient(profile) |
314 return defer.ensureDeferred(self.publishMessage(client, mess_data, service)) | 722 return defer.ensureDeferred(self.publishMessage(client, mess_data, service)) |
315 | 723 |
316 async def getAPActorData(self, account: str) -> dict: | 724 async def getAPActorIdFromAccount(self, account: str) -> str: |
317 """Retrieve ActivityPub Actor data | 725 """Retrieve account ID from it's handle using WebFinger |
318 | 726 |
319 @param account: ActivityPub Actor identifier | 727 @param account: AP handle (user@domain.tld) |
728 @return: Actor ID (which is an URL) | |
320 """ | 729 """ |
321 if account.count("@") != 1 or "/" in account: | 730 if account.count("@") != 1 or "/" in account: |
322 raise ValueError("Invalid account: {account!r}") | 731 raise ValueError("Invalid account: {account!r}") |
323 host = account.split("@")[1] | 732 host = account.split("@")[1] |
324 try: | 733 try: |
341 break | 750 break |
342 else: | 751 else: |
343 raise ValueError( | 752 raise ValueError( |
344 f"No ActivityPub link found for {account!r}" | 753 f"No ActivityPub link found for {account!r}" |
345 ) | 754 ) |
755 return href | |
756 | |
757 async def getAPActorDataFromId(self, account: str) -> dict: | |
758 """Retrieve ActivityPub Actor data | |
759 | |
760 @param account: ActivityPub Actor identifier | |
761 """ | |
762 href = await self.getAPActorIdFromAccount(account) | |
763 return await self.apGet(href) | |
764 | |
765 @async_lru(maxsize=LRU_MAX_SIZE) | |
766 async def getAPAccountFromId(self, actor_id: str): | |
767 """Retrieve AP account from the ID URL | |
768 | |
769 @param actor_id: AP ID of the actor (URL to the actor data) | |
770 """ | |
771 url_parsed = parse.urlparse(actor_id) | |
772 actor_data = await self.apGet(actor_id) | |
773 username = actor_data.get("preferredUsername") | |
774 if not username: | |
775 raise exceptions.DataError( | |
776 'No "preferredUsername" field found, can\'t retrieve actor account' | |
777 ) | |
778 account = f"{username}@{url_parsed.hostname}" | |
779 # we try to retrieve the actor ID from the account to check it | |
780 found_id = await self.getAPActorIdFromAccount(account) | |
781 if found_id != actor_id: | |
782 # cf. https://socialhub.activitypub.rocks/t/how-to-retrieve-user-server-tld-handle-from-actors-url/2196 | |
783 msg = ( | |
784 f"Account ID found on WebFinger {found_id!r} doesn't match our actor ID " | |
785 f"({actor_id!r}). This AP instance doesn't seems to use " | |
786 '"preferredUsername" as we expect.' | |
787 ) | |
788 log.warning(msg) | |
789 raise exceptions.DataError(msg) | |
790 return account | |
791 | |
792 async def getAPItems( | |
793 self, | |
794 account: str, | |
795 max_items: Optional[int] = None, | |
796 chronological_pagination: bool = True, | |
797 after_id: Optional[str] = None, | |
798 start_index: Optional[int] = None, | |
799 ) -> Tuple[List[domish.Element], rsm.RSMResponse]: | |
800 """Retrieve AP items and convert them to XMPP items | |
801 | |
802 @param account: AP account to get items from | |
803 @param max_items: maximum number of items to retrieve | |
804 retrieve all items by default | |
805 @param chronological_pagination: get pages in chronological order | |
806 AP use reversed chronological order for pagination, "first" page returns more | |
807 recent items. If "chronological_pagination" is True, "last" AP page will be | |
808 retrieved first. | |
809 @param after_id: if set, retrieve items starting from given ID | |
810 Due to ActivityStream Collection Paging limitations, this is inefficient and | |
811 if ``after_id`` is not already in cache, we have to retrieve every page until | |
812 we find it. | |
813 In most common cases, ``after_id`` should be in cache though (client usually | |
814 use known ID when in-order pagination is used). | |
815 @param start_index: start retrieving items from the one with given index | |
816 Due to ActivityStream Collection Paging limitations, this is inefficient and | |
817 all pages before the requested index will be retrieved to count items. | |
818 @return: XMPP Pubsub items and corresponding RSM Response | |
819 Items are always returned in chronological order in the result | |
820 """ | |
821 actor_data = await self.getAPActorDataFromId(account) | |
822 outbox = actor_data.get("outbox") | |
823 rsm_resp: Dict[str, Union[bool, int]] = {} | |
824 if not outbox: | |
825 raise exceptions.DataError(f"No outbox found for actor {account}") | |
826 outbox_data = await self.apGet(outbox) | |
346 try: | 827 try: |
347 ap_actor_data = await treq.json_content(await treq.get( | 828 count = outbox_data["totalItems"] |
348 href, | 829 except KeyError: |
349 headers = { | 830 log.warning( |
350 "Accept": [MEDIA_TYPE_AP], | 831 f'"totalItems" not found in outbox of {account}, defaulting to 20' |
351 "Content-Type": [MEDIA_TYPE_AP], | 832 ) |
352 } | 833 count = 20 |
353 )) | 834 else: |
354 except Exception as e: | 835 log.info(f"{account}'s outbox has {count} item(s)") |
355 raise exceptions.DataError(f"Can't get ActivityPub actor data: {e}") | 836 rsm_resp["count"] = count |
356 | 837 |
357 return ap_actor_data | 838 if start_index is not None: |
839 assert chronological_pagination and after_id is None | |
840 if start_index >= count: | |
841 return [], rsm_resp | |
842 elif start_index == 0: | |
843 # this is the default behaviour | |
844 pass | |
845 elif start_index > 5000: | |
846 raise error.StanzaError( | |
847 "feature-not-implemented", | |
848 text="Maximum limit for previous_index has been reached, this limit" | |
849 "is set to avoid DoS" | |
850 ) | |
851 else: | |
852 # we'll convert "start_index" to "after_id", thus we need the item just | |
853 # before "start_index" | |
854 previous_index = start_index - 1 | |
855 retrieved_items = 0 | |
856 current_page = outbox_data["last"] | |
857 while retrieved_items < count: | |
858 page_data, items = await self.parseAPPage(current_page) | |
859 if not items: | |
860 log.warning(f"found an empty AP page at {current_page}") | |
861 return [], rsm_resp | |
862 page_start_idx = retrieved_items | |
863 retrieved_items += len(items) | |
864 if previous_index <= retrieved_items: | |
865 after_id = items[previous_index - page_start_idx]["id"] | |
866 break | |
867 try: | |
868 current_page = page_data["prev"] | |
869 except KeyError: | |
870 log.warning( | |
871 f"missing previous page link at {current_page}: {page_data!r}" | |
872 ) | |
873 raise error.StanzaError( | |
874 "service-unavailable", | |
875 "Error while retrieving previous page from AP service at " | |
876 f"{current_page}" | |
877 ) | |
878 | |
879 init_page = "last" if chronological_pagination else "first" | |
880 page = outbox_data.get(init_page) | |
881 if not page: | |
882 raise exceptions.DataError( | |
883 f"Initial page {init_page!r} not found for outbox {outbox}" | |
884 ) | |
885 items = [] | |
886 page_items = [] | |
887 retrieved_items = 0 | |
888 found_after_id = False | |
889 | |
890 while retrieved_items < count: | |
891 __, page_items = await self.parseAPPage(page) | |
892 retrieved_items += len(page_items) | |
893 if after_id is not None and not found_after_id: | |
894 # if we have an after_id, we ignore all items until the requested one is | |
895 # found | |
896 limit_idx = [i["id"] for i in page_items].index(after_id) | |
897 if limit_idx == -1: | |
898 # if "after_id" is not found, we don't add any item from this page | |
899 log.debug(f"{after_id!r} not found at {page}, skipping") | |
900 else: | |
901 found_after_id = True | |
902 if chronological_pagination: | |
903 page_items = page_items[limit_idx+1:] | |
904 start_index = retrieved_items - len(page_items) + limit_idx + 1 | |
905 else: | |
906 page_items = page_items[:limit_idx] | |
907 start_index = count - (retrieved_items - len(page_items) + | |
908 limit_idx + 1) | |
909 items.extend(page_items) | |
910 else: | |
911 items.extend(page_items) | |
912 if max_items is not None and len(items) >= max_items: | |
913 if chronological_pagination: | |
914 items = items[:max_items] | |
915 else: | |
916 items = items[-max_items:] | |
917 break | |
918 page = outbox_data.get("prev" if chronological_pagination else "next") | |
919 if not page: | |
920 break | |
921 | |
922 if after_id is not None and not found_after_id: | |
923 raise error.StanzaError("item-not-found") | |
924 | |
925 if after_id is None: | |
926 rsm_resp["index"] = 0 if chronological_pagination else count - len(items) | |
927 | |
928 if start_index is not None: | |
929 rsm_resp["index"] = start_index | |
930 elif after_id is not None: | |
931 log.warning("Can't determine index of first element") | |
932 elif chronological_pagination: | |
933 rsm_resp["index"] = 0 | |
934 else: | |
935 rsm_resp["index"] = count - len(items) | |
936 if items: | |
937 rsm_resp.update({ | |
938 "first": items[0]["id"], | |
939 "last": items[-1]["id"] | |
940 }) | |
941 | |
942 return items, rsm.RSMResponse(**rsm_resp) | |
943 | |
944 async def parseAPPage(self, url: str) -> Tuple[dict, List[domish.Element]]: | |
945 """Convert AP objects from an AP page to XMPP items | |
946 | |
947 @param url: url linking and AP page | |
948 @return: page data, pubsub items | |
949 """ | |
950 page_data = await self.apGet(url) | |
951 ap_items = page_data.get("orderedItems") | |
952 if not ap_items: | |
953 log.warning('No "orderedItems" collection found') | |
954 return page_data, [] | |
955 items = [] | |
956 # AP Collections are in antichronological order, but we expect chronological in | |
957 # Pubsub, thus we reverse it | |
958 for ap_item in reversed(ap_items): | |
959 try: | |
960 ap_object, mb_data = await self.apItem2MBdata(ap_item) | |
961 except (exceptions.DataError, NotImplementedError, error.StanzaError): | |
962 continue | |
963 | |
964 item_elt = await self._m.data2entry( | |
965 self.client, mb_data, ap_object["id"], None, self._m.namespace | |
966 ) | |
967 item_elt["publisher"] = mb_data["author_jid"].full() | |
968 items.append(item_elt) | |
969 | |
970 return page_data, items | |
971 | |
972 async def apItem2MBdata(self, ap_item: dict) -> Tuple[dict, dict]: | |
973 """Convert AP item to microblog data | |
974 | |
975 @return: AP Item's Object and microblog data | |
976 @raise exceptions.DataError: something is invalid in the AP item | |
977 @raise NotImplemented: some AP data is not handled yet | |
978 @raise error.StanzaError: error while contacting the AP server | |
979 """ | |
980 ap_object = ap_item.get("object") | |
981 if not ap_object: | |
982 log.warning(f'No "object" found in AP item {ap_item!r}') | |
983 raise exceptions.DataError | |
984 if isinstance(ap_object, str): | |
985 ap_object = await self.apGet(ap_object) | |
986 obj_id = ap_object.get("id") | |
987 if not obj_id: | |
988 log.warning(f'No "id" found in AP object: {ap_object!r}') | |
989 raise exceptions.DataError | |
990 if ap_object.get("inReplyTo") is not None: | |
991 raise NotImplementedError | |
992 mb_data = {} | |
993 for ap_key, mb_key in AP_MB_MAP.items(): | |
994 data = ap_object.get(ap_key) | |
995 if data is None: | |
996 continue | |
997 mb_data[mb_key] = data | |
998 | |
999 # content | |
1000 try: | |
1001 language, content_xhtml = ap_object["contentMap"].popitem() | |
1002 except (KeyError, AttributeError): | |
1003 try: | |
1004 mb_data["content_xhtml"] = mb_data["content"] | |
1005 except KeyError: | |
1006 log.warning(f"no content found:\n{ap_object!r}") | |
1007 raise exceptions.DataError | |
1008 else: | |
1009 mb_data["language"] = language | |
1010 mb_data["content_xhtml"] = content_xhtml | |
1011 | |
1012 # author | |
1013 actor = ap_item.get("actor") | |
1014 if not actor: | |
1015 log.warning(f"no actor associated to object id {obj_id!r}") | |
1016 raise exceptions.DataError | |
1017 elif isinstance(actor, list): | |
1018 # we only keep first item of list as author | |
1019 # TODO: handle multiple actors | |
1020 if len(actor) > 1: | |
1021 log.warning("multiple actors are not managed") | |
1022 actor = actor[0] | |
1023 | |
1024 if isinstance(actor, dict): | |
1025 actor = actor.get("id") | |
1026 if not actor: | |
1027 log.warning(f"no actor id found: {actor!r}") | |
1028 raise exceptions.DataError | |
1029 | |
1030 if isinstance(actor, str): | |
1031 account = await self.getAPAccountFromId(actor) | |
1032 mb_data["author"] = account.split("@", 1)[0] | |
1033 author_jid = mb_data["author_jid"] = jid.JID( | |
1034 None, | |
1035 ( | |
1036 self.host.plugins["XEP-0106"].escape(account), | |
1037 self.client.jid.host, | |
1038 None | |
1039 ) | |
1040 ) | |
1041 else: | |
1042 log.warning(f"unknown actor type found: {actor!r}") | |
1043 raise exceptions.DataError | |
1044 | |
1045 # published/updated | |
1046 for field in ("published", "updated"): | |
1047 value = ap_object.get(field) | |
1048 if not value and field == "updated": | |
1049 value = ap_object.get("published") | |
1050 if value: | |
1051 try: | |
1052 mb_data[field] = calendar.timegm( | |
1053 dateutil.parser.parse(str(value)).utctimetuple() | |
1054 ) | |
1055 except dateutil.parser.ParserError as e: | |
1056 log.warning(f"Can't parse {field!r} field: {e}") | |
1057 return ap_object, mb_data | |
1058 | |
1059 async def mbdata2APitem(self, client: SatXMPPEntity, mb_data: dict) -> dict: | |
1060 """Convert Libervia Microblog Data to ActivityPub item""" | |
1061 if not mb_data.get("id"): | |
1062 mb_data["id"] = shortuuid.uuid() | |
1063 if not mb_data.get("author_jid"): | |
1064 mb_data["author_jid"] = client.jid | |
1065 ap_account = await self.getAPAccountFromJidAndNode( | |
1066 jid.JID(mb_data["author_jid"]), | |
1067 None | |
1068 ) | |
1069 url_actor = self.buildAPURL(TYPE_ACTOR, ap_account) | |
1070 url_item = self.buildAPURL(TYPE_ITEM, ap_account, mb_data["id"]) | |
1071 return { | |
1072 "@context": "https://www.w3.org/ns/activitystreams", | |
1073 "id": url_item, | |
1074 "type": "Create", | |
1075 "actor": url_actor, | |
1076 | |
1077 "object": { | |
1078 "id": url_item, | |
1079 "type": "Note", | |
1080 "published": utils.xmpp_date(mb_data["published"]), | |
1081 "attributedTo": url_actor, | |
1082 "content": mb_data.get("content_xhtml") or mb_data["content"], | |
1083 "to": "https://www.w3.org/ns/activitystreams#Public" | |
1084 } | |
1085 } | |
358 | 1086 |
359 async def publishMessage( | 1087 async def publishMessage( |
360 self, | 1088 self, |
361 client: SatXMPPEntity, | 1089 client: SatXMPPEntity, |
362 mess_data: dict, | 1090 mess_data: dict, |
380 @param service: JID corresponding to the AP actor. | 1108 @param service: JID corresponding to the AP actor. |
381 """ | 1109 """ |
382 if not service.user: | 1110 if not service.user: |
383 raise ValueError("service must have a local part") | 1111 raise ValueError("service must have a local part") |
384 account = self.host.plugins["XEP-0106"].unescape(service.user) | 1112 account = self.host.plugins["XEP-0106"].unescape(service.user) |
385 ap_actor_data = await self.getAPActorData(account) | 1113 ap_actor_data = await self.getAPActorDataFromId(account) |
386 | 1114 |
387 try: | 1115 try: |
388 inbox_url = ap_actor_data["endpoints"]["sharedInbox"] | 1116 inbox_url = ap_actor_data["endpoints"]["sharedInbox"] |
389 except KeyError: | 1117 except KeyError: |
390 raise exceptions.DataError("Can't get ActivityPub actor inbox") | 1118 raise exceptions.DataError("Can't get ActivityPub actor inbox") |
391 | 1119 |
392 if not mess_data.get("id"): | 1120 item_data = await self.mbdata2APitem(client, mess_data) |
393 mess_data["id"] = shortuuid.uuid() | 1121 url_actor = item_data["object"]["attributedTo"] |
394 url_actor = self.buildAPURL(TYPE_ACTOR, client.jid.userhost()) | |
395 url_item = self.buildAPURL(TYPE_ITEM, client.jid.userhost(), mess_data["id"]) | |
396 now = time.time() | |
397 item_data = { | |
398 "@context": "https://www.w3.org/ns/activitystreams", | |
399 "id": url_item, | |
400 "type": "Create", | |
401 "actor": url_actor, | |
402 | |
403 "object": { | |
404 "id": url_item, | |
405 "type": "Note", | |
406 "published": utils.xmpp_date(now), | |
407 "attributedTo": url_actor, | |
408 "inReplyTo": mess_data["node"], | |
409 "content": mess_data.get("content_xhtml") or mess_data["content"], | |
410 "to": "https://www.w3.org/ns/activitystreams#Public" | |
411 } | |
412 } | |
413 resp = await self.signAndPost(inbox_url, url_actor, item_data) | 1122 resp = await self.signAndPost(inbox_url, url_actor, item_data) |
414 if resp.code != 202: | 1123 if resp.code != 202: |
415 raise exceptions.NetworkError(f"unexpected return code: {resp.code}") | 1124 raise exceptions.NetworkError(f"unexpected return code: {resp.code}") |
1125 | |
1126 | |
1127 class APPubsubService(rsm.PubSubService): | |
1128 """Pubsub service for XMPP requests""" | |
1129 | |
1130 def __init__(self, apg): | |
1131 super(APPubsubService, self).__init__() | |
1132 self.host = apg.host | |
1133 self.apg = apg | |
1134 self.discoIdentity = { | |
1135 "category": "pubsub", | |
1136 "type": "service", | |
1137 "name": "Libervia ActivityPub Gateway", | |
1138 } | |
1139 | |
1140 @ensure_deferred | |
1141 async def publish(self, requestor, service, nodeIdentifier, items): | |
1142 raise NotImplementedError | |
1143 | |
1144 @ensure_deferred | |
1145 async def items( | |
1146 self, | |
1147 requestor: jid.JID, | |
1148 service: jid.JID, | |
1149 node: str, | |
1150 maxItems: Optional[int], | |
1151 itemIdentifiers: Optional[List[str]], | |
1152 rsm_req: Optional[rsm.RSMRequest] | |
1153 ) -> List[domish.Element]: | |
1154 if not service.user: | |
1155 return [] | |
1156 ap_account = self.host.plugins["XEP-0106"].unescape(service.user) | |
1157 if ap_account.count("@") != 1: | |
1158 log.warning(f"Invalid AP account used by {requestor}: {ap_account!r}") | |
1159 return [] | |
1160 if node != self.apg._m.namespace: | |
1161 raise error.StanzaError( | |
1162 "feature-not-implemented", | |
1163 text=f"{VERSION} only supports {self.apg._m.namespace} " | |
1164 "node for now" | |
1165 ) | |
1166 if rsm_req is None: | |
1167 if maxItems is None: | |
1168 maxItems = 20 | |
1169 kwargs = { | |
1170 "max_items": maxItems, | |
1171 "chronological_pagination": False, | |
1172 } | |
1173 else: | |
1174 if len( | |
1175 [v for v in (rsm_req.after, rsm_req.before, rsm_req.index) | |
1176 if v is not None] | |
1177 ) > 1: | |
1178 raise error.StanzaError( | |
1179 "bad-request", | |
1180 text="You can't use after, before and index at the same time" | |
1181 ) | |
1182 kwargs = {"max_items": rsm_req.max} | |
1183 if rsm_req.after is not None: | |
1184 kwargs["after_id"] = rsm_req.after | |
1185 elif rsm_req.before is not None: | |
1186 kwargs["chronological_pagination"] = False | |
1187 if rsm_req.before != "": | |
1188 kwargs["after_id"] = rsm_req.before | |
1189 elif rsm_req.index is not None: | |
1190 kwargs["start_index"] = rsm_req.index | |
1191 | |
1192 log.info( | |
1193 f"No cache found for node {node} at {service} (AP account {ap_account}), " | |
1194 "using Collection Paging to RSM translation" | |
1195 ) | |
1196 return await self.apg.getAPItems(ap_account, **kwargs) | |
1197 | |
1198 @ensure_deferred | |
1199 async def retract(self, requestor, service, nodeIdentifier, itemIdentifiers): | |
1200 raise NotImplementedError | |
1201 | |
1202 def getNodeInfo( | |
1203 self, | |
1204 requestor: jid.JID, | |
1205 service: jid.JID, | |
1206 nodeIdentifier: str, | |
1207 pep: bool = False, | |
1208 recipient: Optional[jid.JID] = None | |
1209 ) -> Optional[dict]: | |
1210 if not nodeIdentifier: | |
1211 return None | |
1212 info = { | |
1213 "type": "leaf", | |
1214 "meta-data": [ | |
1215 {"var": "pubsub#persist_items", "type": "boolean", "value": True}, | |
1216 {"var": "pubsub#max_items", "value": "max"}, | |
1217 {"var": "pubsub#access_model", "type": "list-single", "value": "open"}, | |
1218 {"var": "pubsub#publish_model", "type": "list-single", "value": "open"}, | |
1219 | |
1220 ] | |
1221 | |
1222 } | |
1223 return info |