comparison sat/plugins/plugin_comp_ap_gateway/__init__.py @ 4037:524856bd7b19

massive refactoring to switch from camelCase to snake_case: historically, Libervia (SàT before) was using camelCase as allowed by PEP8 when using a pre-PEP8 code, to use the same coding style as in Twisted. However, snake_case is more readable and it's better to follow PEP8 best practices, so it has been decided to move on full snake_case. Because Libervia has a huge codebase, this ended with a ugly mix of camelCase and snake_case. To fix that, this patch does a big refactoring by renaming every function and method (including bridge) that are not coming from Twisted or Wokkel, to use fully snake_case. This is a massive change, and may result in some bugs.
author Goffi <goffi@goffi.org>
date Sat, 08 Apr 2023 13:54:42 +0200
parents 26c3e1bc7fb7
children c23cad65ae99
comparison
equal deleted inserted replaced
4036:c4464d7ae97b 4037:524856bd7b19
149 self._pa = host.plugins["XEP-0470"] 149 self._pa = host.plugins["XEP-0470"]
150 self._c = host.plugins["PUBSUB_CACHE"] 150 self._c = host.plugins["PUBSUB_CACHE"]
151 self._t = host.plugins["TEXT_SYNTAXES"] 151 self._t = host.plugins["TEXT_SYNTAXES"]
152 self._i = host.plugins["IDENTITY"] 152 self._i = host.plugins["IDENTITY"]
153 self._events = host.plugins["XEP-0471"] 153 self._events = host.plugins["XEP-0471"]
154 self._p.addManagedNode( 154 self._p.add_managed_node(
155 "", 155 "",
156 items_cb=self._itemsReceived, 156 items_cb=self._items_received,
157 # we want to be sure that the callbacks are launched before pubsub cache's 157 # we want to be sure that the callbacks are launched before pubsub cache's
158 # one, as we need to inspect items before they are actually removed from cache 158 # one, as we need to inspect items before they are actually removed from cache
159 # or updated 159 # or updated
160 priority=1000 160 priority=1000
161 ) 161 )
162 self.pubsub_service = APPubsubService(self) 162 self.pubsub_service = APPubsubService(self)
163 self.ad_hoc = APAdHocService(self) 163 self.ad_hoc = APAdHocService(self)
164 self.ap_events = APEvents(self) 164 self.ap_events = APEvents(self)
165 host.trigger.add("messageReceived", self._messageReceivedTrigger, priority=-1000) 165 host.trigger.add("messageReceived", self._message_received_trigger, priority=-1000)
166 host.trigger.add("XEP-0424_retractReceived", self._onMessageRetract) 166 host.trigger.add("XEP-0424_retractReceived", self._on_message_retract)
167 host.trigger.add("XEP-0372_ref_received", self._onReferenceReceived) 167 host.trigger.add("XEP-0372_ref_received", self._on_reference_received)
168 168
169 host.bridge.addMethod( 169 host.bridge.add_method(
170 "APSend", 170 "ap_send",
171 ".plugin", 171 ".plugin",
172 in_sign="sss", 172 in_sign="sss",
173 out_sign="", 173 out_sign="",
174 method=self._publishMessage, 174 method=self._publish_message,
175 async_=True, 175 async_=True,
176 ) 176 )
177 177
178 def getHandler(self, __): 178 def get_handler(self, __):
179 return self.pubsub_service 179 return self.pubsub_service
180 180
181 async def init(self, client): 181 async def init(self, client):
182 if self.initialised: 182 if self.initialised:
183 return 183 return
184 184
185 self.initialised = True 185 self.initialised = True
186 log.info(_("ActivityPub Gateway initialization")) 186 log.info(_("ActivityPub Gateway initialization"))
187 187
188 # RSA keys 188 # RSA keys
189 stored_data = await self.host.memory.storage.getPrivates( 189 stored_data = await self.host.memory.storage.get_privates(
190 IMPORT_NAME, ["rsa_key"], profile=client.profile 190 IMPORT_NAME, ["rsa_key"], profile=client.profile
191 ) 191 )
192 private_key_pem = stored_data.get("rsa_key") 192 private_key_pem = stored_data.get("rsa_key")
193 if private_key_pem is None: 193 if private_key_pem is None:
194 self.private_key = await threads.deferToThread( 194 self.private_key = await threads.deferToThread(
199 private_key_pem = self.private_key.private_bytes( 199 private_key_pem = self.private_key.private_bytes(
200 encoding=serialization.Encoding.PEM, 200 encoding=serialization.Encoding.PEM,
201 format=serialization.PrivateFormat.PKCS8, 201 format=serialization.PrivateFormat.PKCS8,
202 encryption_algorithm=serialization.NoEncryption() 202 encryption_algorithm=serialization.NoEncryption()
203 ).decode() 203 ).decode()
204 await self.host.memory.storage.setPrivateValue( 204 await self.host.memory.storage.set_private_value(
205 IMPORT_NAME, "rsa_key", private_key_pem, profile=client.profile 205 IMPORT_NAME, "rsa_key", private_key_pem, profile=client.profile
206 ) 206 )
207 else: 207 else:
208 self.private_key = serialization.load_pem_private_key( 208 self.private_key = serialization.load_pem_private_key(
209 private_key_pem.encode(), 209 private_key_pem.encode(),
215 format=serialization.PublicFormat.SubjectPublicKeyInfo 215 format=serialization.PublicFormat.SubjectPublicKeyInfo
216 ).decode() 216 ).decode()
217 217
218 # params 218 # params
219 # URL and port 219 # URL and port
220 self.public_url = self.host.memory.getConfig( 220 self.public_url = self.host.memory.config_get(
221 CONF_SECTION, "public_url" 221 CONF_SECTION, "public_url"
222 ) or self.host.memory.getConfig( 222 ) or self.host.memory.config_get(
223 CONF_SECTION, "xmpp_domain" 223 CONF_SECTION, "xmpp_domain"
224 ) 224 )
225 if self.public_url is None: 225 if self.public_url is None:
226 log.error( 226 log.error(
227 '"public_url" not set in configuration, this is mandatory to have' 227 '"public_url" not set in configuration, this is mandatory to have'
233 log.error( 233 log.error(
234 "Scheme must not be specified in \"public_url\", please remove it from " 234 "Scheme must not be specified in \"public_url\", please remove it from "
235 "\"public_url\" configuration option. ActivityPub Gateway won't be run." 235 "\"public_url\" configuration option. ActivityPub Gateway won't be run."
236 ) 236 )
237 return 237 return
238 self.http_port = int(self.host.memory.getConfig( 238 self.http_port = int(self.host.memory.config_get(
239 CONF_SECTION, 'http_port', 8123)) 239 CONF_SECTION, 'http_port', 8123))
240 connection_type = self.host.memory.getConfig( 240 connection_type = self.host.memory.config_get(
241 CONF_SECTION, 'http_connection_type', 'https') 241 CONF_SECTION, 'http_connection_type', 'https')
242 if connection_type not in ('http', 'https'): 242 if connection_type not in ('http', 'https'):
243 raise exceptions.ConfigError( 243 raise exceptions.ConfigError(
244 'bad ap-gateay http_connection_type, you must use one of "http" or ' 244 'bad ap-gateay http_connection_type, you must use one of "http" or '
245 '"https"' 245 '"https"'
246 ) 246 )
247 self.max_items = int(self.host.memory.getConfig( 247 self.max_items = int(self.host.memory.config_get(
248 CONF_SECTION, 'new_node_max_items', 50 248 CONF_SECTION, 'new_node_max_items', 50
249 249
250 )) 250 ))
251 self.comments_max_depth = int(self.host.memory.getConfig( 251 self.comments_max_depth = int(self.host.memory.config_get(
252 CONF_SECTION, 'comments_max_depth', 0 252 CONF_SECTION, 'comments_max_depth', 0
253 )) 253 ))
254 self.ap_path = self.host.memory.getConfig(CONF_SECTION, 'ap_path', '_ap') 254 self.ap_path = self.host.memory.config_get(CONF_SECTION, 'ap_path', '_ap')
255 self.base_ap_url = parse.urljoin(f"https://{self.public_url}", f"{self.ap_path}/") 255 self.base_ap_url = parse.urljoin(f"https://{self.public_url}", f"{self.ap_path}/")
256 # True (default) if we provide gateway only to entities/services from our server 256 # True (default) if we provide gateway only to entities/services from our server
257 self.local_only = C.bool( 257 self.local_only = C.bool(
258 self.host.memory.getConfig(CONF_SECTION, 'local_only', C.BOOL_TRUE) 258 self.host.memory.config_get(CONF_SECTION, 'local_only', C.BOOL_TRUE)
259 ) 259 )
260 # if True (default), mention will be parsed in non-private content coming from 260 # if True (default), mention will be parsed in non-private content coming from
261 # XMPP. This is necessary as XEP-0372 are coming separately from item where the 261 # XMPP. This is necessary as XEP-0372 are coming separately from item where the
262 # mention is done, which is hard to impossible to translate to ActivityPub (where 262 # mention is done, which is hard to impossible to translate to ActivityPub (where
263 # mention specified inside the item directly). See documentation for details. 263 # mention specified inside the item directly). See documentation for details.
264 self.auto_mentions = C.bool( 264 self.auto_mentions = C.bool(
265 self.host.memory.getConfig(CONF_SECTION, "auto_mentions", C.BOOL_TRUE) 265 self.host.memory.config_get(CONF_SECTION, "auto_mentions", C.BOOL_TRUE)
266 ) 266 )
267 267
268 html_redirect: Dict[str, Union[str, dict]] = self.host.memory.getConfig( 268 html_redirect: Dict[str, Union[str, dict]] = self.host.memory.config_get(
269 CONF_SECTION, 'html_redirect_dict', {} 269 CONF_SECTION, 'html_redirect_dict', {}
270 ) 270 )
271 self.html_redirect: Dict[str, List[dict]] = {} 271 self.html_redirect: Dict[str, List[dict]] = {}
272 for url_type, target in html_redirect.items(): 272 for url_type, target in html_redirect.items():
273 if isinstance(target, str): 273 if isinstance(target, str):
289 # HTTP server launch 289 # HTTP server launch
290 self.server = HTTPServer(self) 290 self.server = HTTPServer(self)
291 if connection_type == 'http': 291 if connection_type == 'http':
292 reactor.listenTCP(self.http_port, self.server) 292 reactor.listenTCP(self.http_port, self.server)
293 else: 293 else:
294 options = tls.getOptionsFromConfig( 294 options = tls.get_options_from_config(
295 self.host.memory.config, CONF_SECTION) 295 self.host.memory.config, CONF_SECTION)
296 tls.TLSOptionsCheck(options) 296 tls.tls_options_check(options)
297 context_factory = tls.getTLSContextFactory(options) 297 context_factory = tls.get_tls_context_factory(options)
298 reactor.listenSSL(self.http_port, self.server, context_factory) 298 reactor.listenSSL(self.http_port, self.server, context_factory)
299 299
300 async def profileConnecting(self, client): 300 async def profile_connecting(self, client):
301 self.client = client 301 self.client = client
302 client.sendHistory = True 302 client.sendHistory = True
303 client._ap_storage = persistent.LazyPersistentBinaryDict( 303 client._ap_storage = persistent.LazyPersistentBinaryDict(
304 IMPORT_NAME, 304 IMPORT_NAME,
305 client.profile 305 client.profile
306 ) 306 )
307 await self.init(client) 307 await self.init(client)
308 308
309 def profileConnected(self, client): 309 def profile_connected(self, client):
310 self.ad_hoc.init(client) 310 self.ad_hoc.init(client)
311 311
312 async def _itemsReceived( 312 async def _items_received(
313 self, 313 self,
314 client: SatXMPPEntity, 314 client: SatXMPPEntity,
315 itemsEvent: pubsub.ItemsEvent 315 itemsEvent: pubsub.ItemsEvent
316 ) -> None: 316 ) -> None:
317 """Callback called when pubsub items are received 317 """Callback called when pubsub items are received
324 """ 324 """
325 if client != self.client: 325 if client != self.client:
326 return 326 return
327 # we need recipient as JID and not gateway own JID to be able to use methods such 327 # we need recipient as JID and not gateway own JID to be able to use methods such
328 # as "subscribe" 328 # as "subscribe"
329 client = self.client.getVirtualClient(itemsEvent.sender) 329 client = self.client.get_virtual_client(itemsEvent.sender)
330 recipient = itemsEvent.recipient 330 recipient = itemsEvent.recipient
331 if not recipient.user: 331 if not recipient.user:
332 log.debug("ignoring items event without local part specified") 332 log.debug("ignoring items event without local part specified")
333 return 333 return
334 334
335 ap_account = self._e.unescape(recipient.user) 335 ap_account = self._e.unescape(recipient.user)
336 336
337 if self._pa.isAttachmentNode(itemsEvent.nodeIdentifier): 337 if self._pa.is_attachment_node(itemsEvent.nodeIdentifier):
338 await self.convertAndPostAttachments( 338 await self.convert_and_post_attachments(
339 client, ap_account, itemsEvent.sender, itemsEvent.nodeIdentifier, 339 client, ap_account, itemsEvent.sender, itemsEvent.nodeIdentifier,
340 itemsEvent.items 340 itemsEvent.items
341 ) 341 )
342 else: 342 else:
343 await self.convertAndPostItems( 343 await self.convert_and_post_items(
344 client, ap_account, itemsEvent.sender, itemsEvent.nodeIdentifier, 344 client, ap_account, itemsEvent.sender, itemsEvent.nodeIdentifier,
345 itemsEvent.items 345 itemsEvent.items
346 ) 346 )
347 347
348 async def getVirtualClient(self, actor_id: str) -> SatXMPPEntity: 348 async def get_virtual_client(self, actor_id: str) -> SatXMPPEntity:
349 """Get client for this component with a specified jid 349 """Get client for this component with a specified jid
350 350
351 This is needed to perform operations with the virtual JID corresponding to the AP 351 This is needed to perform operations with the virtual JID corresponding to the AP
352 actor instead of the JID of the gateway itself. 352 actor instead of the JID of the gateway itself.
353 @param actor_id: ID of the actor 353 @param actor_id: ID of the actor
354 @return: virtual client 354 @return: virtual client
355 """ 355 """
356 local_jid = await self.getJIDFromId(actor_id) 356 local_jid = await self.get_jid_from_id(actor_id)
357 return self.client.getVirtualClient(local_jid) 357 return self.client.get_virtual_client(local_jid)
358 358
359 def is_activity(self, data: dict) -> bool: 359 def is_activity(self, data: dict) -> bool:
360 """Return True if the data has an activity type""" 360 """Return True if the data has an activity type"""
361 try: 361 try:
362 return (data.get("type") or "").lower() in ACTIVITY_TYPES_LOWER 362 return (data.get("type") or "").lower() in ACTIVITY_TYPES_LOWER
363 except (KeyError, TypeError): 363 except (KeyError, TypeError):
364 return False 364 return False
365 365
366 async def apGet(self, url: str) -> dict: 366 async def ap_get(self, url: str) -> dict:
367 """Retrieve AP JSON from given URL 367 """Retrieve AP JSON from given URL
368 368
369 @raise error.StanzaError: "service-unavailable" is sent when something went wrong 369 @raise error.StanzaError: "service-unavailable" is sent when something went wrong
370 with AP server 370 with AP server
371 """ 371 """
390 "service-unavailable", 390 "service-unavailable",
391 text=f"Can't get AP data at {url}: {e}" 391 text=f"Can't get AP data at {url}: {e}"
392 ) 392 )
393 393
394 @overload 394 @overload
395 async def apGetObject(self, data: dict, key: str) -> Optional[dict]: 395 async def ap_get_object(self, data: dict, key: str) -> Optional[dict]:
396 ... 396 ...
397 397
398 @overload 398 @overload
399 async def apGetObject( 399 async def ap_get_object(
400 self, data: Union[str, dict], key: None = None 400 self, data: Union[str, dict], key: None = None
401 ) -> dict: 401 ) -> dict:
402 ... 402 ...
403 403
404 async def apGetObject(self, data, key = None): 404 async def ap_get_object(self, data, key = None):
405 """Retrieve an AP object, dereferencing when necessary 405 """Retrieve an AP object, dereferencing when necessary
406 406
407 This method is to be used with attributes marked as "Functional" in 407 This method is to be used with attributes marked as "Functional" in
408 https://www.w3.org/TR/activitystreams-vocabulary 408 https://www.w3.org/TR/activitystreams-vocabulary
409 @param data: AP object where an other object is looked for, or the object itself 409 @param data: AP object where an other object is looked for, or the object itself
414 value = data.get(key) 414 value = data.get(key)
415 else: 415 else:
416 value = data 416 value = data
417 if value is None: 417 if value is None:
418 if key is None: 418 if key is None:
419 raise ValueError("None can't be used with apGetObject is key is None") 419 raise ValueError("None can't be used with ap_get_object is key is None")
420 return None 420 return None
421 elif isinstance(value, dict): 421 elif isinstance(value, dict):
422 return value 422 return value
423 elif isinstance(value, str): 423 elif isinstance(value, str):
424 if self.isLocalURL(value): 424 if self.is_local_url(value):
425 return await self.apGetLocalObject(value) 425 return await self.ap_get_local_object(value)
426 else: 426 else:
427 return await self.apGet(value) 427 return await self.ap_get(value)
428 else: 428 else:
429 raise NotImplementedError( 429 raise NotImplementedError(
430 "was expecting a string or a dict, got {type(value)}: {value!r}}" 430 "was expecting a string or a dict, got {type(value)}: {value!r}}"
431 ) 431 )
432 432
433 async def apGetLocalObject( 433 async def ap_get_local_object(
434 self, 434 self,
435 url: str 435 url: str
436 ) -> dict: 436 ) -> dict:
437 """Retrieve or generate local object 437 """Retrieve or generate local object
438 438
439 for now, only handle XMPP items to convert to AP 439 for now, only handle XMPP items to convert to AP
440 """ 440 """
441 url_type, url_args = self.parseAPURL(url) 441 url_type, url_args = self.parse_apurl(url)
442 if url_type == TYPE_ITEM: 442 if url_type == TYPE_ITEM:
443 try: 443 try:
444 account, item_id = url_args 444 account, item_id = url_args
445 except ValueError: 445 except ValueError:
446 raise ValueError(f"invalid URL: {url}") 446 raise ValueError(f"invalid URL: {url}")
447 author_jid, node = await self.getJIDAndNode(account) 447 author_jid, node = await self.get_jid_and_node(account)
448 if node is None: 448 if node is None:
449 node = self._m.namespace 449 node = self._m.namespace
450 cached_node = await self.host.memory.storage.getPubsubNode( 450 cached_node = await self.host.memory.storage.get_pubsub_node(
451 self.client, author_jid, node 451 self.client, author_jid, node
452 ) 452 )
453 if not cached_node: 453 if not cached_node:
454 log.debug(f"node {node!r} at {author_jid} is not found in cache") 454 log.debug(f"node {node!r} at {author_jid} is not found in cache")
455 found_item = None 455 found_item = None
456 else: 456 else:
457 cached_items, __ = await self.host.memory.storage.getItems( 457 cached_items, __ = await self.host.memory.storage.get_items(
458 cached_node, item_ids=[item_id] 458 cached_node, item_ids=[item_id]
459 ) 459 )
460 if not cached_items: 460 if not cached_items:
461 log.debug( 461 log.debug(
462 f"item {item_id!r} of {node!r} at {author_jid} is not found in " 462 f"item {item_id!r} of {node!r} at {author_jid} is not found in "
466 else: 466 else:
467 found_item = cached_items[0].data 467 found_item = cached_items[0].data
468 468
469 if found_item is None: 469 if found_item is None:
470 # the node is not in cache, we have to make a request to retrieve the item 470 # the node is not in cache, we have to make a request to retrieve the item
471 # If the node doesn't exist, getItems will raise a NotFound exception 471 # If the node doesn't exist, get_items will raise a NotFound exception
472 found_items, __ = await self._p.getItems( 472 found_items, __ = await self._p.get_items(
473 self.client, author_jid, node, item_ids=[item_id] 473 self.client, author_jid, node, item_ids=[item_id]
474 ) 474 )
475 try: 475 try:
476 found_item = found_items[0] 476 found_item = found_items[0]
477 except IndexError: 477 except IndexError:
497 else: 497 else:
498 raise NotImplementedError( 498 raise NotImplementedError(
499 'only object from "item" URLs can be retrieved for now' 499 'only object from "item" URLs can be retrieved for now'
500 ) 500 )
501 501
502 async def apGetList( 502 async def ap_get_list(
503 self, 503 self,
504 data: dict, 504 data: dict,
505 key: str, 505 key: str,
506 only_ids: bool = False 506 only_ids: bool = False
507 ) -> Optional[List[Dict[str, Any]]]: 507 ) -> Optional[List[Dict[str, Any]]]:
508 """Retrieve a list of objects from AP data, dereferencing when necessary 508 """Retrieve a list of objects from AP data, dereferencing when necessary
509 509
510 This method is to be used with non functional vocabularies. Use ``apGetObject`` 510 This method is to be used with non functional vocabularies. Use ``ap_get_object``
511 otherwise. 511 otherwise.
512 If the value is a dictionary, it will be wrapped in a list 512 If the value is a dictionary, it will be wrapped in a list
513 @param data: AP object where a list of objects is looked for 513 @param data: AP object where a list of objects is looked for
514 @param key: key of the list to look for 514 @param key: key of the list to look for
515 @param only_ids: if Trye, only items IDs are retrieved 515 @param only_ids: if Trye, only items IDs are retrieved
517 """ 517 """
518 value = data.get(key) 518 value = data.get(key)
519 if value is None: 519 if value is None:
520 return None 520 return None
521 elif isinstance(value, str): 521 elif isinstance(value, str):
522 if self.isLocalURL(value): 522 if self.is_local_url(value):
523 value = await self.apGetLocalObject(value) 523 value = await self.ap_get_local_object(value)
524 else: 524 else:
525 value = await self.apGet(value) 525 value = await self.ap_get(value)
526 if isinstance(value, dict): 526 if isinstance(value, dict):
527 return [value] 527 return [value]
528 if not isinstance(value, list): 528 if not isinstance(value, list):
529 raise ValueError(f"A list was expected, got {type(value)}: {value!r}") 529 raise ValueError(f"A list was expected, got {type(value)}: {value!r}")
530 if only_ids: 530 if only_ids:
531 return [ 531 return [
532 {"id": v["id"]} if isinstance(v, dict) else {"id": v} 532 {"id": v["id"]} if isinstance(v, dict) else {"id": v}
533 for v in value 533 for v in value
534 ] 534 ]
535 else: 535 else:
536 return [await self.apGetObject(i) for i in value] 536 return [await self.ap_get_object(i) for i in value]
537 537
538 async def apGetActors( 538 async def ap_get_actors(
539 self, 539 self,
540 data: dict, 540 data: dict,
541 key: str, 541 key: str,
542 as_account: bool = True 542 as_account: bool = True
543 ) -> List[str]: 543 ) -> List[str]:
573 if not value: 573 if not value:
574 raise exceptions.DataError( 574 raise exceptions.DataError(
575 f"list of actors is empty" 575 f"list of actors is empty"
576 ) 576 )
577 if as_account: 577 if as_account:
578 return [await self.getAPAccountFromId(actor_id) for actor_id in value] 578 return [await self.get_ap_account_from_id(actor_id) for actor_id in value]
579 else: 579 else:
580 return value 580 return value
581 581
582 async def apGetSenderActor( 582 async def ap_get_sender_actor(
583 self, 583 self,
584 data: dict, 584 data: dict,
585 ) -> str: 585 ) -> str:
586 """Retrieve actor who sent data 586 """Retrieve actor who sent data
587 587
590 @param data: AP object 590 @param data: AP object
591 @return: actor id of the sender 591 @return: actor id of the sender
592 @raise exceptions.NotFound: no actor has been found in data 592 @raise exceptions.NotFound: no actor has been found in data
593 """ 593 """
594 try: 594 try:
595 actors = await self.apGetActors(data, "actor", as_account=False) 595 actors = await self.ap_get_actors(data, "actor", as_account=False)
596 except exceptions.DataError: 596 except exceptions.DataError:
597 actors = None 597 actors = None
598 if not actors: 598 if not actors:
599 try: 599 try:
600 actors = await self.apGetActors(data, "attributedTo", as_account=False) 600 actors = await self.ap_get_actors(data, "attributedTo", as_account=False)
601 except exceptions.DataError: 601 except exceptions.DataError:
602 raise exceptions.NotFound( 602 raise exceptions.NotFound(
603 'actor not specified in "actor" or "attributedTo"' 603 'actor not specified in "actor" or "attributedTo"'
604 ) 604 )
605 try: 605 try:
606 return actors[0] 606 return actors[0]
607 except IndexError: 607 except IndexError:
608 raise exceptions.NotFound("list of actors is empty") 608 raise exceptions.NotFound("list of actors is empty")
609 609
610 def mustEncode(self, text: str) -> bool: 610 def must_encode(self, text: str) -> bool:
611 """Indicate if a text must be period encoded""" 611 """Indicate if a text must be period encoded"""
612 return ( 612 return (
613 not RE_ALLOWED_UNQUOTED.match(text) 613 not RE_ALLOWED_UNQUOTED.match(text)
614 or text.startswith("___") 614 or text.startswith("___")
615 or "---" in text 615 or "---" in text
616 ) 616 )
617 617
618 def periodEncode(self, text: str) -> str: 618 def period_encode(self, text: str) -> str:
619 """Period encode a text 619 """Period encode a text
620 620
621 see [getJIDAndNode] for reasons of period encoding 621 see [get_jid_and_node] for reasons of period encoding
622 """ 622 """
623 return ( 623 return (
624 parse.quote(text, safe="") 624 parse.quote(text, safe="")
625 .replace("---", "%2d%2d%2d") 625 .replace("---", "%2d%2d%2d")
626 .replace("___", "%5f%5f%5f") 626 .replace("___", "%5f%5f%5f")
627 .replace(".", "%2e") 627 .replace(".", "%2e")
628 .replace("~", "%7e") 628 .replace("~", "%7e")
629 .replace("%", ".") 629 .replace("%", ".")
630 ) 630 )
631 631
632 async def getAPAccountFromJidAndNode( 632 async def get_ap_account_from_jid_and_node(
633 self, 633 self,
634 jid_: jid.JID, 634 jid_: jid.JID,
635 node: Optional[str] 635 node: Optional[str]
636 ) -> str: 636 ) -> str:
637 """Construct AP account from JID and node 637 """Construct AP account from JID and node
642 node = None 642 node = None
643 643
644 if self.client is None: 644 if self.client is None:
645 raise exceptions.InternalError("Client is not set yet") 645 raise exceptions.InternalError("Client is not set yet")
646 646
647 if self.isVirtualJID(jid_): 647 if self.is_virtual_jid(jid_):
648 # this is an proxy JID to an AP Actor 648 # this is an proxy JID to an AP Actor
649 return self._e.unescape(jid_.user) 649 return self._e.unescape(jid_.user)
650 650
651 if node and not jid_.user and not self.mustEncode(node): 651 if node and not jid_.user and not self.must_encode(node):
652 is_pubsub = await self.isPubsub(jid_) 652 is_pubsub = await self.is_pubsub(jid_)
653 # when we have a pubsub service, the user part can be used to set the node 653 # when we have a pubsub service, the user part can be used to set the node
654 # this produces more user-friendly AP accounts 654 # this produces more user-friendly AP accounts
655 if is_pubsub: 655 if is_pubsub:
656 jid_.user = node 656 jid_.user = node
657 node = None 657 node = None
658 658
659 is_local = self.isLocal(jid_) 659 is_local = self.is_local(jid_)
660 user = jid_.user if is_local else jid_.userhost() 660 user = jid_.user if is_local else jid_.userhost()
661 if user is None: 661 if user is None:
662 user = "" 662 user = ""
663 account_elts = [] 663 account_elts = []
664 if node and self.mustEncode(node) or self.mustEncode(user): 664 if node and self.must_encode(node) or self.must_encode(user):
665 account_elts = ["___"] 665 account_elts = ["___"]
666 if node: 666 if node:
667 node = self.periodEncode(node) 667 node = self.period_encode(node)
668 user = self.periodEncode(user) 668 user = self.period_encode(user)
669 669
670 if not user: 670 if not user:
671 raise exceptions.InternalError("there should be a user part") 671 raise exceptions.InternalError("there should be a user part")
672 672
673 if node: 673 if node:
676 account_elts.extend(( 676 account_elts.extend((
677 user, "@", jid_.host if is_local else self.client.jid.userhost() 677 user, "@", jid_.host if is_local else self.client.jid.userhost()
678 )) 678 ))
679 return "".join(account_elts) 679 return "".join(account_elts)
680 680
681 def isLocal(self, jid_: jid.JID) -> bool: 681 def is_local(self, jid_: jid.JID) -> bool:
682 """Returns True if jid_ use a domain or subdomain of gateway's host""" 682 """Returns True if jid_ use a domain or subdomain of gateway's host"""
683 local_host = self.client.host.split(".") 683 local_host = self.client.host.split(".")
684 assert local_host 684 assert local_host
685 return jid_.host.split(".")[-len(local_host):] == local_host 685 return jid_.host.split(".")[-len(local_host):] == local_host
686 686
687 async def isPubsub(self, jid_: jid.JID) -> bool: 687 async def is_pubsub(self, jid_: jid.JID) -> bool:
688 """Indicate if a JID is a Pubsub service""" 688 """Indicate if a JID is a Pubsub service"""
689 host_disco = await self.host.getDiscoInfos(self.client, jid_) 689 host_disco = await self.host.get_disco_infos(self.client, jid_)
690 return ( 690 return (
691 ("pubsub", "service") in host_disco.identities 691 ("pubsub", "service") in host_disco.identities
692 and not ("pubsub", "pep") in host_disco.identities 692 and not ("pubsub", "pep") in host_disco.identities
693 ) 693 )
694 694
695 async def getJIDAndNode(self, ap_account: str) -> Tuple[jid.JID, Optional[str]]: 695 async def get_jid_and_node(self, ap_account: str) -> Tuple[jid.JID, Optional[str]]:
696 """Decode raw AP account handle to get XMPP JID and Pubsub Node 696 """Decode raw AP account handle to get XMPP JID and Pubsub Node
697 697
698 Username are case insensitive. 698 Username are case insensitive.
699 699
700 By default, the username correspond to local username (i.e. username from 700 By default, the username correspond to local username (i.e. username from
765 765
766 if not node: 766 if not node:
767 # we need to check host disco, because disco request to user may be 767 # we need to check host disco, because disco request to user may be
768 # blocked for privacy reason (see 768 # blocked for privacy reason (see
769 # https://xmpp.org/extensions/xep-0030.html#security) 769 # https://xmpp.org/extensions/xep-0030.html#security)
770 is_pubsub = await self.isPubsub(jid.JID(domain)) 770 is_pubsub = await self.is_pubsub(jid.JID(domain))
771 771
772 if is_pubsub: 772 if is_pubsub:
773 # if the host is a pubsub service and not a PEP, we consider that username 773 # if the host is a pubsub service and not a PEP, we consider that username
774 # is in fact the node name 774 # is in fact the node name
775 node = username 775 node = username
779 try: 779 try:
780 jid_ = jid.JID(jid_s) 780 jid_ = jid.JID(jid_s)
781 except RuntimeError: 781 except RuntimeError:
782 raise ValueError(f"Invalid jid: {jid_s!r}") 782 raise ValueError(f"Invalid jid: {jid_s!r}")
783 783
784 if self.local_only and not self.isLocal(jid_): 784 if self.local_only and not self.is_local(jid_):
785 raise exceptions.PermissionError( 785 raise exceptions.PermissionError(
786 "This gateway is configured to map only local entities and services" 786 "This gateway is configured to map only local entities and services"
787 ) 787 )
788 788
789 return jid_, node 789 return jid_, node
790 790
791 def getLocalJIDFromAccount(self, account: str) -> jid.JID: 791 def get_local_jid_from_account(self, account: str) -> jid.JID:
792 """Compute JID linking to an AP account 792 """Compute JID linking to an AP account
793 793
794 The local jid is computer by escaping AP actor handle and using it as local part 794 The local jid is computer by escaping AP actor handle and using it as local part
795 of JID, where domain part is this gateway own JID 795 of JID, where domain part is this gateway own JID
796 """ 796 """
801 self.client.jid.host, 801 self.client.jid.host,
802 None 802 None
803 ) 803 )
804 ) 804 )
805 805
806 async def getJIDFromId(self, actor_id: str) -> jid.JID: 806 async def get_jid_from_id(self, actor_id: str) -> jid.JID:
807 """Compute JID linking to an AP Actor ID 807 """Compute JID linking to an AP Actor ID
808 808
809 The local jid is computer by escaping AP actor handle and using it as local part 809 The local jid is computer by escaping AP actor handle and using it as local part
810 of JID, where domain part is this gateway own JID 810 of JID, where domain part is this gateway own JID
811 If the actor_id comes from local server (checked with self.public_url), it means 811 If the actor_id comes from local server (checked with self.public_url), it means
812 that we have an XMPP entity, and the original JID is returned 812 that we have an XMPP entity, and the original JID is returned
813 """ 813 """
814 if self.isLocalURL(actor_id): 814 if self.is_local_url(actor_id):
815 request_type, extra_args = self.parseAPURL(actor_id) 815 request_type, extra_args = self.parse_apurl(actor_id)
816 if request_type != TYPE_ACTOR or len(extra_args) != 1: 816 if request_type != TYPE_ACTOR or len(extra_args) != 1:
817 raise ValueError(f"invalid actor id: {actor_id!r}") 817 raise ValueError(f"invalid actor id: {actor_id!r}")
818 actor_jid, __ = await self.getJIDAndNode(extra_args[0]) 818 actor_jid, __ = await self.get_jid_and_node(extra_args[0])
819 return actor_jid 819 return actor_jid
820 820
821 account = await self.getAPAccountFromId(actor_id) 821 account = await self.get_ap_account_from_id(actor_id)
822 return self.getLocalJIDFromAccount(account) 822 return self.get_local_jid_from_account(account)
823 823
824 def parseAPURL(self, url: str) -> Tuple[str, List[str]]: 824 def parse_apurl(self, url: str) -> Tuple[str, List[str]]:
825 """Parse an URL leading to an AP endpoint 825 """Parse an URL leading to an AP endpoint
826 826
827 @param url: URL to parse (schema is not mandatory) 827 @param url: URL to parse (schema is not mandatory)
828 @return: endpoint type and extra arguments 828 @return: endpoint type and extra arguments
829 """ 829 """
830 path = parse.urlparse(url).path.lstrip("/") 830 path = parse.urlparse(url).path.lstrip("/")
831 type_, *extra_args = path[len(self.ap_path):].lstrip("/").split("/") 831 type_, *extra_args = path[len(self.ap_path):].lstrip("/").split("/")
832 return type_, [parse.unquote(a) for a in extra_args] 832 return type_, [parse.unquote(a) for a in extra_args]
833 833
834 def buildAPURL(self, type_:str , *args: str) -> str: 834 def build_apurl(self, type_:str , *args: str) -> str:
835 """Build an AP endpoint URL 835 """Build an AP endpoint URL
836 836
837 @param type_: type of AP endpoing 837 @param type_: type of AP endpoing
838 @param arg: endpoint dependant arguments 838 @param arg: endpoint dependant arguments
839 """ 839 """
840 return parse.urljoin( 840 return parse.urljoin(
841 self.base_ap_url, 841 self.base_ap_url,
842 str(Path(type_).joinpath(*(parse.quote_plus(a, safe="@") for a in args))) 842 str(Path(type_).joinpath(*(parse.quote_plus(a, safe="@") for a in args)))
843 ) 843 )
844 844
845 def isLocalURL(self, url: str) -> bool: 845 def is_local_url(self, url: str) -> bool:
846 """Tells if an URL link to this component 846 """Tells if an URL link to this component
847 847
848 ``public_url`` and ``ap_path`` are used to check the URL 848 ``public_url`` and ``ap_path`` are used to check the URL
849 """ 849 """
850 return url.startswith(self.base_ap_url) 850 return url.startswith(self.base_ap_url)
851 851
852 def isVirtualJID(self, jid_: jid.JID) -> bool: 852 def is_virtual_jid(self, jid_: jid.JID) -> bool:
853 """Tell if a JID is an AP actor mapped through this gateway""" 853 """Tell if a JID is an AP actor mapped through this gateway"""
854 return jid_.host == self.client.jid.userhost() 854 return jid_.host == self.client.jid.userhost()
855 855
856 def buildSignatureHeader(self, values: Dict[str, str]) -> str: 856 def build_signature_header(self, values: Dict[str, str]) -> str:
857 """Build key="<value>" signature header from signature data""" 857 """Build key="<value>" signature header from signature data"""
858 fields = [] 858 fields = []
859 for key, value in values.items(): 859 for key, value in values.items():
860 if key not in ("(created)", "(expired)"): 860 if key not in ("(created)", "(expired)"):
861 if '"' in value: 861 if '"' in value:
866 value = f'"{value}"' 866 value = f'"{value}"'
867 fields.append(f"{key}={value}") 867 fields.append(f"{key}={value}")
868 868
869 return ",".join(fields) 869 return ",".join(fields)
870 870
871 def getDigest(self, body: bytes, algo="SHA-256") -> Tuple[str, str]: 871 def get_digest(self, body: bytes, algo="SHA-256") -> Tuple[str, str]:
872 """Get digest data to use in header and signature 872 """Get digest data to use in header and signature
873 873
874 @param body: body of the request 874 @param body: body of the request
875 @return: hash name and digest 875 @return: hash name and digest
876 """ 876 """
877 if algo != "SHA-256": 877 if algo != "SHA-256":
878 raise NotImplementedError("only SHA-256 is implemented for now") 878 raise NotImplementedError("only SHA-256 is implemented for now")
879 return algo, base64.b64encode(hashlib.sha256(body).digest()).decode() 879 return algo, base64.b64encode(hashlib.sha256(body).digest()).decode()
880 880
881 @async_lru(maxsize=LRU_MAX_SIZE) 881 @async_lru(maxsize=LRU_MAX_SIZE)
882 async def getActorData(self, actor_id) -> dict: 882 async def get_actor_data(self, actor_id) -> dict:
883 """Retrieve actor data with LRU cache""" 883 """Retrieve actor data with LRU cache"""
884 return await self.apGet(actor_id) 884 return await self.ap_get(actor_id)
885 885
886 @async_lru(maxsize=LRU_MAX_SIZE) 886 @async_lru(maxsize=LRU_MAX_SIZE)
887 async def getActorPubKeyData( 887 async def get_actor_pub_key_data(
888 self, 888 self,
889 actor_id: str 889 actor_id: str
890 ) -> Tuple[str, str, rsa.RSAPublicKey]: 890 ) -> Tuple[str, str, rsa.RSAPublicKey]:
891 """Retrieve Public Key data from actor ID 891 """Retrieve Public Key data from actor ID
892 892
893 @param actor_id: actor ID (url) 893 @param actor_id: actor ID (url)
894 @return: key_id, owner and public_key 894 @return: key_id, owner and public_key
895 @raise KeyError: publicKey is missing from actor data 895 @raise KeyError: publicKey is missing from actor data
896 """ 896 """
897 actor_data = await self.getActorData(actor_id) 897 actor_data = await self.get_actor_data(actor_id)
898 pub_key_data = actor_data["publicKey"] 898 pub_key_data = actor_data["publicKey"]
899 key_id = pub_key_data["id"] 899 key_id = pub_key_data["id"]
900 owner = pub_key_data["owner"] 900 owner = pub_key_data["owner"]
901 pub_key_pem = pub_key_data["publicKeyPem"] 901 pub_key_pem = pub_key_data["publicKeyPem"]
902 pub_key = serialization.load_pem_public_key(pub_key_pem.encode()) 902 pub_key = serialization.load_pem_public_key(pub_key_pem.encode())
945 if target is not None: 945 if target is not None:
946 data["target"] = target 946 data["target"] = target
947 947
948 return data 948 return data
949 949
950 def getKeyId(self, actor_id: str) -> str: 950 def get_key_id(self, actor_id: str) -> str:
951 """Get local key ID from actor ID""" 951 """Get local key ID from actor ID"""
952 return f"{actor_id}#main-key" 952 return f"{actor_id}#main-key"
953 953
954 async def checkSignature( 954 async def check_signature(
955 self, 955 self,
956 signature: str, 956 signature: str,
957 key_id: str, 957 key_id: str,
958 headers: Dict[str, str] 958 headers: Dict[str, str]
959 ) -> str: 959 ) -> str:
969 @raise InvalidSignature: signature doesn't match headers 969 @raise InvalidSignature: signature doesn't match headers
970 """ 970 """
971 to_sign = "\n".join(f"{k.lower()}: {v}" for k,v in headers.items()) 971 to_sign = "\n".join(f"{k.lower()}: {v}" for k,v in headers.items())
972 if key_id.startswith("acct:"): 972 if key_id.startswith("acct:"):
973 actor = key_id[5:] 973 actor = key_id[5:]
974 actor_id = await self.getAPActorIdFromAccount(actor) 974 actor_id = await self.get_ap_actor_id_from_account(actor)
975 else: 975 else:
976 actor_id = key_id.split("#", 1)[0] 976 actor_id = key_id.split("#", 1)[0]
977 977
978 pub_key_id, pub_key_owner, pub_key = await self.getActorPubKeyData(actor_id) 978 pub_key_id, pub_key_owner, pub_key = await self.get_actor_pub_key_data(actor_id)
979 if pub_key_id != key_id or pub_key_owner != actor_id: 979 if pub_key_id != key_id or pub_key_owner != actor_id:
980 raise exceptions.EncryptionError("Public Key mismatch") 980 raise exceptions.EncryptionError("Public Key mismatch")
981 981
982 try: 982 try:
983 pub_key.verify( 983 pub_key.verify(
992 "Invalid signature (using PKC0S1 v1.5 and SHA-256)" 992 "Invalid signature (using PKC0S1 v1.5 and SHA-256)"
993 ) 993 )
994 994
995 return actor_id 995 return actor_id
996 996
997 def getSignatureData( 997 def get_signature_data(
998 self, 998 self,
999 key_id: str, 999 key_id: str,
1000 headers: Dict[str, str] 1000 headers: Dict[str, str]
1001 ) -> Tuple[Dict[str, str], Dict[str, str]]: 1001 ) -> Tuple[Dict[str, str], Dict[str, str]]:
1002 """Generate and return signature and corresponding headers 1002 """Generate and return signature and corresponding headers
1026 "Algorithm": "rsa-sha256", 1026 "Algorithm": "rsa-sha256",
1027 "headers": " ".join(l_headers.keys()), 1027 "headers": " ".join(l_headers.keys()),
1028 "signature": signature 1028 "signature": signature
1029 } 1029 }
1030 new_headers = {k: v for k,v in headers.items() if not k.startswith("(")} 1030 new_headers = {k: v for k,v in headers.items() if not k.startswith("(")}
1031 new_headers["Signature"] = self.buildSignatureHeader(sign_data) 1031 new_headers["Signature"] = self.build_signature_header(sign_data)
1032 return new_headers, sign_data 1032 return new_headers, sign_data
1033 1033
1034 async def convertAndPostItems( 1034 async def convert_and_post_items(
1035 self, 1035 self,
1036 client: SatXMPPEntity, 1036 client: SatXMPPEntity,
1037 ap_account: str, 1037 ap_account: str,
1038 service: jid.JID, 1038 service: jid.JID,
1039 node: str, 1039 node: str,
1047 published 1047 published
1048 @param node: (virtual) node corresponding where the item has been published 1048 @param node: (virtual) node corresponding where the item has been published
1049 @param subscribe_extra_nodes: if True, extra data nodes will be automatically 1049 @param subscribe_extra_nodes: if True, extra data nodes will be automatically
1050 subscribed, that is comment nodes if present and attachments nodes. 1050 subscribed, that is comment nodes if present and attachments nodes.
1051 """ 1051 """
1052 actor_id = await self.getAPActorIdFromAccount(ap_account) 1052 actor_id = await self.get_ap_actor_id_from_account(ap_account)
1053 inbox = await self.getAPInboxFromId(actor_id) 1053 inbox = await self.get_ap_inbox_from_id(actor_id)
1054 for item in items: 1054 for item in items:
1055 if item.name == "item": 1055 if item.name == "item":
1056 cached_item = await self.host.memory.storage.searchPubsubItems({ 1056 cached_item = await self.host.memory.storage.search_pubsub_items({
1057 "profiles": [self.client.profile], 1057 "profiles": [self.client.profile],
1058 "services": [service], 1058 "services": [service],
1059 "nodes": [node], 1059 "nodes": [node],
1060 "names": [item["id"]] 1060 "names": [item["id"]]
1061 }) 1061 })
1068 except (KeyError, RuntimeWarning): 1068 except (KeyError, RuntimeWarning):
1069 root_elt = item 1069 root_elt = item
1070 while root_elt.parent is not None: 1070 while root_elt.parent is not None:
1071 root_elt = root_elt.parent 1071 root_elt = root_elt.parent
1072 author_jid = jid.JID(root_elt["from"]).userhostJID() 1072 author_jid = jid.JID(root_elt["from"]).userhostJID()
1073 if subscribe_extra_nodes and not self.isVirtualJID(author_jid): 1073 if subscribe_extra_nodes and not self.is_virtual_jid(author_jid):
1074 # we subscribe automatically to comment nodes if any 1074 # we subscribe automatically to comment nodes if any
1075 recipient_jid = self.getLocalJIDFromAccount(ap_account) 1075 recipient_jid = self.get_local_jid_from_account(ap_account)
1076 recipient_client = self.client.getVirtualClient(recipient_jid) 1076 recipient_client = self.client.get_virtual_client(recipient_jid)
1077 comments_data = event_data.get("comments") 1077 comments_data = event_data.get("comments")
1078 if comments_data: 1078 if comments_data:
1079 comment_service = jid.JID(comments_data["jid"]) 1079 comment_service = jid.JID(comments_data["jid"])
1080 comment_node = comments_data["node"] 1080 comment_node = comments_data["node"]
1081 await self._p.subscribe( 1081 await self._p.subscribe(
1095 ) 1095 )
1096 else: 1096 else:
1097 # blog item 1097 # blog item
1098 mb_data = await self._m.item_2_mb_data(client, item, service, node) 1098 mb_data = await self._m.item_2_mb_data(client, item, service, node)
1099 author_jid = jid.JID(mb_data["author_jid"]) 1099 author_jid = jid.JID(mb_data["author_jid"])
1100 if subscribe_extra_nodes and not self.isVirtualJID(author_jid): 1100 if subscribe_extra_nodes and not self.is_virtual_jid(author_jid):
1101 # we subscribe automatically to comment nodes if any 1101 # we subscribe automatically to comment nodes if any
1102 recipient_jid = self.getLocalJIDFromAccount(ap_account) 1102 recipient_jid = self.get_local_jid_from_account(ap_account)
1103 recipient_client = self.client.getVirtualClient(recipient_jid) 1103 recipient_client = self.client.get_virtual_client(recipient_jid)
1104 for comment_data in mb_data.get("comments", []): 1104 for comment_data in mb_data.get("comments", []):
1105 comment_service = jid.JID(comment_data["service"]) 1105 comment_service = jid.JID(comment_data["service"])
1106 if self.isVirtualJID(comment_service): 1106 if self.is_virtual_jid(comment_service):
1107 log.debug( 1107 log.debug(
1108 f"ignoring virtual comment service: {comment_data}" 1108 f"ignoring virtual comment service: {comment_data}"
1109 ) 1109 )
1110 continue 1110 continue
1111 comment_node = comment_data["node"] 1111 comment_node = comment_data["node"]
1123 ) 1123 )
1124 ap_item = await self.mb_data_2_ap_item(client, mb_data, is_new=is_new) 1124 ap_item = await self.mb_data_2_ap_item(client, mb_data, is_new=is_new)
1125 1125
1126 url_actor = ap_item["actor"] 1126 url_actor = ap_item["actor"]
1127 elif item.name == "retract": 1127 elif item.name == "retract":
1128 url_actor, ap_item = await self.apDeleteItem( 1128 url_actor, ap_item = await self.ap_delete_item(
1129 client.jid, node, item["id"] 1129 client.jid, node, item["id"]
1130 ) 1130 )
1131 else: 1131 else:
1132 raise exceptions.InternalError(f"unexpected element: {item.toXml()}") 1132 raise exceptions.InternalError(f"unexpected element: {item.toXml()}")
1133 await self.signAndPost(inbox, url_actor, ap_item) 1133 await self.sign_and_post(inbox, url_actor, ap_item)
1134 1134
1135 async def convertAndPostAttachments( 1135 async def convert_and_post_attachments(
1136 self, 1136 self,
1137 client: SatXMPPEntity, 1137 client: SatXMPPEntity,
1138 ap_account: str, 1138 ap_account: str,
1139 service: jid.JID, 1139 service: jid.JID,
1140 node: str, 1140 node: str,
1160 log.warning( 1160 log.warning(
1161 "we should get exactly one attachment item for an entity, got " 1161 "we should get exactly one attachment item for an entity, got "
1162 f"{len(items)})" 1162 f"{len(items)})"
1163 ) 1163 )
1164 1164
1165 actor_id = await self.getAPActorIdFromAccount(ap_account) 1165 actor_id = await self.get_ap_actor_id_from_account(ap_account)
1166 inbox = await self.getAPInboxFromId(actor_id) 1166 inbox = await self.get_ap_inbox_from_id(actor_id)
1167 1167
1168 item_elt = items[0] 1168 item_elt = items[0]
1169 item_id = item_elt["id"] 1169 item_id = item_elt["id"]
1170 1170
1171 if publisher is None: 1171 if publisher is None:
1177 "attachments item ID must be publisher's bare JID, ignoring: " 1177 "attachments item ID must be publisher's bare JID, ignoring: "
1178 f"{item_elt.toXml()}" 1178 f"{item_elt.toXml()}"
1179 ) 1179 )
1180 return 1180 return
1181 1181
1182 if self.isVirtualJID(publisher): 1182 if self.is_virtual_jid(publisher):
1183 log.debug(f"ignoring item coming from local virtual JID {publisher}") 1183 log.debug(f"ignoring item coming from local virtual JID {publisher}")
1184 return 1184 return
1185 1185
1186 if publisher is not None: 1186 if publisher is not None:
1187 item_elt["publisher"] = publisher.userhost() 1187 item_elt["publisher"] = publisher.userhost()
1188 1188
1189 item_service, item_node, item_id = self._pa.attachmentNode2Item(node) 1189 item_service, item_node, item_id = self._pa.attachment_node_2_item(node)
1190 item_account = await self.getAPAccountFromJidAndNode(item_service, item_node) 1190 item_account = await self.get_ap_account_from_jid_and_node(item_service, item_node)
1191 if self.isVirtualJID(item_service): 1191 if self.is_virtual_jid(item_service):
1192 # it's a virtual JID mapping to an external AP actor, we can use the 1192 # it's a virtual JID mapping to an external AP actor, we can use the
1193 # item_id directly 1193 # item_id directly
1194 item_url = item_id 1194 item_url = item_id
1195 if not item_url.startswith("https:"): 1195 if not item_url.startswith("https:"):
1196 log.warning( 1196 log.warning(
1197 "item ID of external AP actor is not an https link, ignoring: " 1197 "item ID of external AP actor is not an https link, ignoring: "
1198 f"{item_id!r}" 1198 f"{item_id!r}"
1199 ) 1199 )
1200 return 1200 return
1201 else: 1201 else:
1202 item_url = self.buildAPURL(TYPE_ITEM, item_account, item_id) 1202 item_url = self.build_apurl(TYPE_ITEM, item_account, item_id)
1203 1203
1204 old_attachment_pubsub_items = await self.host.memory.storage.searchPubsubItems({ 1204 old_attachment_pubsub_items = await self.host.memory.storage.search_pubsub_items({
1205 "profiles": [self.client.profile], 1205 "profiles": [self.client.profile],
1206 "services": [service], 1206 "services": [service],
1207 "nodes": [node], 1207 "nodes": [node],
1208 "names": [item_elt["id"]] 1208 "names": [item_elt["id"]]
1209 }) 1209 })
1210 if not old_attachment_pubsub_items: 1210 if not old_attachment_pubsub_items:
1211 old_attachment = {} 1211 old_attachment = {}
1212 else: 1212 else:
1213 old_attachment_items = [i.data for i in old_attachment_pubsub_items] 1213 old_attachment_items = [i.data for i in old_attachment_pubsub_items]
1214 old_attachments = self._pa.items2attachmentData(client, old_attachment_items) 1214 old_attachments = self._pa.items_2_attachment_data(client, old_attachment_items)
1215 try: 1215 try:
1216 old_attachment = old_attachments[0] 1216 old_attachment = old_attachments[0]
1217 except IndexError: 1217 except IndexError:
1218 # no known element was present in attachments 1218 # no known element was present in attachments
1219 old_attachment = {} 1219 old_attachment = {}
1220 publisher_account = await self.getAPAccountFromJidAndNode( 1220 publisher_account = await self.get_ap_account_from_jid_and_node(
1221 publisher, 1221 publisher,
1222 None 1222 None
1223 ) 1223 )
1224 publisher_actor_id = self.buildAPURL(TYPE_ACTOR, publisher_account) 1224 publisher_actor_id = self.build_apurl(TYPE_ACTOR, publisher_account)
1225 try: 1225 try:
1226 attachments = self._pa.items2attachmentData(client, [item_elt])[0] 1226 attachments = self._pa.items_2_attachment_data(client, [item_elt])[0]
1227 except IndexError: 1227 except IndexError:
1228 # no known element was present in attachments 1228 # no known element was present in attachments
1229 attachments = {} 1229 attachments = {}
1230 1230
1231 # noticed 1231 # noticed
1232 if "noticed" in attachments: 1232 if "noticed" in attachments:
1233 if not "noticed" in old_attachment: 1233 if not "noticed" in old_attachment:
1234 # new "noticed" attachment, we translate to "Like" activity 1234 # new "noticed" attachment, we translate to "Like" activity
1235 activity_id = self.buildAPURL("like", item_account, item_id) 1235 activity_id = self.build_apurl("like", item_account, item_id)
1236 activity = self.create_activity( 1236 activity = self.create_activity(
1237 TYPE_LIKE, publisher_actor_id, item_url, activity_id=activity_id 1237 TYPE_LIKE, publisher_actor_id, item_url, activity_id=activity_id
1238 ) 1238 )
1239 activity["to"] = [ap_account] 1239 activity["to"] = [ap_account]
1240 activity["cc"] = [NS_AP_PUBLIC] 1240 activity["cc"] = [NS_AP_PUBLIC]
1241 await self.signAndPost(inbox, publisher_actor_id, activity) 1241 await self.sign_and_post(inbox, publisher_actor_id, activity)
1242 else: 1242 else:
1243 if "noticed" in old_attachment: 1243 if "noticed" in old_attachment:
1244 # "noticed" attachment has been removed, we undo the "Like" activity 1244 # "noticed" attachment has been removed, we undo the "Like" activity
1245 activity_id = self.buildAPURL("like", item_account, item_id) 1245 activity_id = self.build_apurl("like", item_account, item_id)
1246 activity = self.create_activity( 1246 activity = self.create_activity(
1247 TYPE_LIKE, publisher_actor_id, item_url, activity_id=activity_id 1247 TYPE_LIKE, publisher_actor_id, item_url, activity_id=activity_id
1248 ) 1248 )
1249 activity["to"] = [ap_account] 1249 activity["to"] = [ap_account]
1250 activity["cc"] = [NS_AP_PUBLIC] 1250 activity["cc"] = [NS_AP_PUBLIC]
1251 undo = self.create_activity("Undo", publisher_actor_id, activity) 1251 undo = self.create_activity("Undo", publisher_actor_id, activity)
1252 await self.signAndPost(inbox, publisher_actor_id, undo) 1252 await self.sign_and_post(inbox, publisher_actor_id, undo)
1253 1253
1254 # reactions 1254 # reactions
1255 new_reactions = set(attachments.get("reactions", {}).get("reactions", [])) 1255 new_reactions = set(attachments.get("reactions", {}).get("reactions", []))
1256 old_reactions = set(old_attachment.get("reactions", {}).get("reactions", [])) 1256 old_reactions = set(old_attachment.get("reactions", {}).get("reactions", []))
1257 reactions_remove = old_reactions - new_reactions 1257 reactions_remove = old_reactions - new_reactions
1258 reactions_add = new_reactions - old_reactions 1258 reactions_add = new_reactions - old_reactions
1259 for reactions, undo in ((reactions_remove, True), (reactions_add, False)): 1259 for reactions, undo in ((reactions_remove, True), (reactions_add, False)):
1260 for reaction in reactions: 1260 for reaction in reactions:
1261 activity_id = self.buildAPURL( 1261 activity_id = self.build_apurl(
1262 "reaction", item_account, item_id, reaction.encode().hex() 1262 "reaction", item_account, item_id, reaction.encode().hex()
1263 ) 1263 )
1264 reaction_activity = self.create_activity( 1264 reaction_activity = self.create_activity(
1265 TYPE_REACTION, publisher_actor_id, item_url, 1265 TYPE_REACTION, publisher_actor_id, item_url,
1266 activity_id=activity_id 1266 activity_id=activity_id
1272 activy = self.create_activity( 1272 activy = self.create_activity(
1273 "Undo", publisher_actor_id, reaction_activity 1273 "Undo", publisher_actor_id, reaction_activity
1274 ) 1274 )
1275 else: 1275 else:
1276 activy = reaction_activity 1276 activy = reaction_activity
1277 await self.signAndPost(inbox, publisher_actor_id, activy) 1277 await self.sign_and_post(inbox, publisher_actor_id, activy)
1278 1278
1279 # RSVP 1279 # RSVP
1280 if "rsvp" in attachments: 1280 if "rsvp" in attachments:
1281 attending = attachments["rsvp"].get("attending", "no") 1281 attending = attachments["rsvp"].get("attending", "no")
1282 old_attending = old_attachment.get("rsvp", {}).get("attending", "no") 1282 old_attending = old_attachment.get("rsvp", {}).get("attending", "no")
1283 if attending != old_attending: 1283 if attending != old_attending:
1284 activity_type = TYPE_JOIN if attending == "yes" else TYPE_LEAVE 1284 activity_type = TYPE_JOIN if attending == "yes" else TYPE_LEAVE
1285 activity_id = self.buildAPURL(activity_type.lower(), item_account, item_id) 1285 activity_id = self.build_apurl(activity_type.lower(), item_account, item_id)
1286 activity = self.create_activity( 1286 activity = self.create_activity(
1287 activity_type, publisher_actor_id, item_url, activity_id=activity_id 1287 activity_type, publisher_actor_id, item_url, activity_id=activity_id
1288 ) 1288 )
1289 activity["to"] = [ap_account] 1289 activity["to"] = [ap_account]
1290 activity["cc"] = [NS_AP_PUBLIC] 1290 activity["cc"] = [NS_AP_PUBLIC]
1291 await self.signAndPost(inbox, publisher_actor_id, activity) 1291 await self.sign_and_post(inbox, publisher_actor_id, activity)
1292 else: 1292 else:
1293 if "rsvp" in old_attachment: 1293 if "rsvp" in old_attachment:
1294 old_attending = old_attachment.get("rsvp", {}).get("attending", "no") 1294 old_attending = old_attachment.get("rsvp", {}).get("attending", "no")
1295 if old_attending == "yes": 1295 if old_attending == "yes":
1296 activity_id = self.buildAPURL(TYPE_LEAVE.lower(), item_account, item_id) 1296 activity_id = self.build_apurl(TYPE_LEAVE.lower(), item_account, item_id)
1297 activity = self.create_activity( 1297 activity = self.create_activity(
1298 TYPE_LEAVE, publisher_actor_id, item_url, activity_id=activity_id 1298 TYPE_LEAVE, publisher_actor_id, item_url, activity_id=activity_id
1299 ) 1299 )
1300 activity["to"] = [ap_account] 1300 activity["to"] = [ap_account]
1301 activity["cc"] = [NS_AP_PUBLIC] 1301 activity["cc"] = [NS_AP_PUBLIC]
1302 await self.signAndPost(inbox, publisher_actor_id, activity) 1302 await self.sign_and_post(inbox, publisher_actor_id, activity)
1303 1303
1304 if service.user and self.isVirtualJID(service): 1304 if service.user and self.is_virtual_jid(service):
1305 # the item is on a virtual service, we need to store it in cache 1305 # the item is on a virtual service, we need to store it in cache
1306 log.debug("storing attachments item in cache") 1306 log.debug("storing attachments item in cache")
1307 cached_node = await self.host.memory.storage.getPubsubNode( 1307 cached_node = await self.host.memory.storage.get_pubsub_node(
1308 client, service, node, with_subscriptions=True, create=True 1308 client, service, node, with_subscriptions=True, create=True
1309 ) 1309 )
1310 await self.host.memory.storage.cachePubsubItems( 1310 await self.host.memory.storage.cache_pubsub_items(
1311 self.client, 1311 self.client,
1312 cached_node, 1312 cached_node,
1313 [item_elt], 1313 [item_elt],
1314 [attachments] 1314 [attachments]
1315 ) 1315 )
1316 1316
1317 async def signAndPost(self, url: str, actor_id: str, doc: dict) -> TReqResponse: 1317 async def sign_and_post(self, url: str, actor_id: str, doc: dict) -> TReqResponse:
1318 """Sign a documentent and post it to AP server 1318 """Sign a documentent and post it to AP server
1319 1319
1320 @param url: AP server endpoint 1320 @param url: AP server endpoint
1321 @param actor_id: originating actor ID (URL) 1321 @param actor_id: originating actor ID (URL)
1322 @param doc: document to send 1322 @param doc: document to send
1323 """ 1323 """
1324 if self.verbose: 1324 if self.verbose:
1325 __, actor_args = self.parseAPURL(actor_id) 1325 __, actor_args = self.parse_apurl(actor_id)
1326 actor_account = actor_args[0] 1326 actor_account = actor_args[0]
1327 to_log = [ 1327 to_log = [
1328 "", 1328 "",
1329 f">>> {actor_account} is signing and posting to {url}:\n{pformat(doc)}" 1329 f">>> {actor_account} is signing and posting to {url}:\n{pformat(doc)}"
1330 ] 1330 ]
1331 1331
1332 p_url = parse.urlparse(url) 1332 p_url = parse.urlparse(url)
1333 body = json.dumps(doc).encode() 1333 body = json.dumps(doc).encode()
1334 digest_algo, digest_hash = self.getDigest(body) 1334 digest_algo, digest_hash = self.get_digest(body)
1335 digest = f"{digest_algo}={digest_hash}" 1335 digest = f"{digest_algo}={digest_hash}"
1336 1336
1337 headers = { 1337 headers = {
1338 "(request-target)": f"post {p_url.path}", 1338 "(request-target)": f"post {p_url.path}",
1339 "Host": p_url.hostname, 1339 "Host": p_url.hostname,
1341 "Digest": digest 1341 "Digest": digest
1342 } 1342 }
1343 headers["Content-Type"] = ( 1343 headers["Content-Type"] = (
1344 'application/activity+json' 1344 'application/activity+json'
1345 ) 1345 )
1346 headers, __ = self.getSignatureData(self.getKeyId(actor_id), headers) 1346 headers, __ = self.get_signature_data(self.get_key_id(actor_id), headers)
1347 1347
1348 if self.verbose: 1348 if self.verbose:
1349 if self.verbose>=3: 1349 if self.verbose>=3:
1350 h_to_log = "\n".join(f" {k}: {v}" for k,v in headers.items()) 1350 h_to_log = "\n".join(f" {k}: {v}" for k,v in headers.items())
1351 to_log.append(f" headers:\n{h_to_log}") 1351 to_log.append(f" headers:\n{h_to_log}")
1362 log.warning(f"POST request to {url} failed [{resp.code}]: {text}") 1362 log.warning(f"POST request to {url} failed [{resp.code}]: {text}")
1363 elif self.verbose: 1363 elif self.verbose:
1364 log.info(f"==> response code: {resp.code}") 1364 log.info(f"==> response code: {resp.code}")
1365 return resp 1365 return resp
1366 1366
1367 def _publishMessage(self, mess_data_s: str, service_s: str, profile: str): 1367 def _publish_message(self, mess_data_s: str, service_s: str, profile: str):
1368 mess_data: dict = data_format.deserialise(mess_data_s) # type: ignore 1368 mess_data: dict = data_format.deserialise(mess_data_s) # type: ignore
1369 service = jid.JID(service_s) 1369 service = jid.JID(service_s)
1370 client = self.host.getClient(profile) 1370 client = self.host.get_client(profile)
1371 return defer.ensureDeferred(self.publishMessage(client, mess_data, service)) 1371 return defer.ensureDeferred(self.publish_message(client, mess_data, service))
1372 1372
1373 @async_lru(maxsize=LRU_MAX_SIZE) 1373 @async_lru(maxsize=LRU_MAX_SIZE)
1374 async def getAPActorIdFromAccount(self, account: str) -> str: 1374 async def get_ap_actor_id_from_account(self, account: str) -> str:
1375 """Retrieve account ID from it's handle using WebFinger 1375 """Retrieve account ID from it's handle using WebFinger
1376 1376
1377 Don't use this method to get local actor id from a local account derivated for 1377 Don't use this method to get local actor id from a local account derivated for
1378 JID: in this case, the actor ID is retrieve with 1378 JID: in this case, the actor ID is retrieve with
1379 ``self.buildAPURL(TYPE_ACTOR, ap_account)`` 1379 ``self.build_apurl(TYPE_ACTOR, ap_account)``
1380 1380
1381 @param account: AP handle (user@domain.tld) 1381 @param account: AP handle (user@domain.tld)
1382 @return: Actor ID (which is an URL) 1382 @return: Actor ID (which is an URL)
1383 """ 1383 """
1384 if account.count("@") != 1 or "/" in account: 1384 if account.count("@") != 1 or "/" in account:
1406 raise ValueError( 1406 raise ValueError(
1407 f"No ActivityPub link found for {account!r}" 1407 f"No ActivityPub link found for {account!r}"
1408 ) 1408 )
1409 return href 1409 return href
1410 1410
1411 async def getAPActorDataFromAccount(self, account: str) -> dict: 1411 async def get_ap_actor_data_from_account(self, account: str) -> dict:
1412 """Retrieve ActivityPub Actor data 1412 """Retrieve ActivityPub Actor data
1413 1413
1414 @param account: ActivityPub Actor identifier 1414 @param account: ActivityPub Actor identifier
1415 """ 1415 """
1416 href = await self.getAPActorIdFromAccount(account) 1416 href = await self.get_ap_actor_id_from_account(account)
1417 return await self.apGet(href) 1417 return await self.ap_get(href)
1418 1418
1419 async def getAPInboxFromId(self, actor_id: str, use_shared: bool = True) -> str: 1419 async def get_ap_inbox_from_id(self, actor_id: str, use_shared: bool = True) -> str:
1420 """Retrieve inbox of an actor_id 1420 """Retrieve inbox of an actor_id
1421 1421
1422 @param use_shared: if True, and a shared inbox exists, it will be used instead of 1422 @param use_shared: if True, and a shared inbox exists, it will be used instead of
1423 the user inbox 1423 the user inbox
1424 """ 1424 """
1425 data = await self.getActorData(actor_id) 1425 data = await self.get_actor_data(actor_id)
1426 if use_shared: 1426 if use_shared:
1427 try: 1427 try:
1428 return data["endpoints"]["sharedInbox"] 1428 return data["endpoints"]["sharedInbox"]
1429 except KeyError: 1429 except KeyError:
1430 pass 1430 pass
1431 return data["inbox"] 1431 return data["inbox"]
1432 1432
1433 @async_lru(maxsize=LRU_MAX_SIZE) 1433 @async_lru(maxsize=LRU_MAX_SIZE)
1434 async def getAPAccountFromId(self, actor_id: str) -> str: 1434 async def get_ap_account_from_id(self, actor_id: str) -> str:
1435 """Retrieve AP account from the ID URL 1435 """Retrieve AP account from the ID URL
1436 1436
1437 Works with external or local actor IDs. 1437 Works with external or local actor IDs.
1438 @param actor_id: AP ID of the actor (URL to the actor data) 1438 @param actor_id: AP ID of the actor (URL to the actor data)
1439 @return: AP handle 1439 @return: AP handle
1440 """ 1440 """
1441 if self.isLocalURL(actor_id): 1441 if self.is_local_url(actor_id):
1442 url_type, url_args = self.parseAPURL(actor_id) 1442 url_type, url_args = self.parse_apurl(actor_id)
1443 if url_type != "actor" or not url_args: 1443 if url_type != "actor" or not url_args:
1444 raise exceptions.DataError( 1444 raise exceptions.DataError(
1445 f"invalid local actor ID: {actor_id}" 1445 f"invalid local actor ID: {actor_id}"
1446 ) 1446 )
1447 account = url_args[0] 1447 account = url_args[0]
1456 f"{account!r} is not a valid local account (from {actor_id})" 1456 f"{account!r} is not a valid local account (from {actor_id})"
1457 ) 1457 )
1458 return account 1458 return account
1459 1459
1460 url_parsed = parse.urlparse(actor_id) 1460 url_parsed = parse.urlparse(actor_id)
1461 actor_data = await self.getActorData(actor_id) 1461 actor_data = await self.get_actor_data(actor_id)
1462 username = actor_data.get("preferredUsername") 1462 username = actor_data.get("preferredUsername")
1463 if not username: 1463 if not username:
1464 raise exceptions.DataError( 1464 raise exceptions.DataError(
1465 'No "preferredUsername" field found, can\'t retrieve actor account' 1465 'No "preferredUsername" field found, can\'t retrieve actor account'
1466 ) 1466 )
1467 account = f"{username}@{url_parsed.hostname}" 1467 account = f"{username}@{url_parsed.hostname}"
1468 # we try to retrieve the actor ID from the account to check it 1468 # we try to retrieve the actor ID from the account to check it
1469 found_id = await self.getAPActorIdFromAccount(account) 1469 found_id = await self.get_ap_actor_id_from_account(account)
1470 if found_id != actor_id: 1470 if found_id != actor_id:
1471 # cf. https://socialhub.activitypub.rocks/t/how-to-retrieve-user-server-tld-handle-from-actors-url/2196 1471 # cf. https://socialhub.activitypub.rocks/t/how-to-retrieve-user-server-tld-handle-from-actors-url/2196
1472 msg = ( 1472 msg = (
1473 f"Account ID found on WebFinger {found_id!r} doesn't match our actor ID " 1473 f"Account ID found on WebFinger {found_id!r} doesn't match our actor ID "
1474 f"({actor_id!r}). This AP instance doesn't seems to use " 1474 f"({actor_id!r}). This AP instance doesn't seems to use "
1476 ) 1476 )
1477 log.warning(msg) 1477 log.warning(msg)
1478 raise exceptions.DataError(msg) 1478 raise exceptions.DataError(msg)
1479 return account 1479 return account
1480 1480
1481 async def getAPItems( 1481 async def get_ap_items(
1482 self, 1482 self,
1483 collection: dict, 1483 collection: dict,
1484 max_items: Optional[int] = None, 1484 max_items: Optional[int] = None,
1485 chronological_pagination: bool = True, 1485 chronological_pagination: bool = True,
1486 after_id: Optional[str] = None, 1486 after_id: Optional[str] = None,
1550 # before "start_index" 1550 # before "start_index"
1551 previous_index = start_index - 1 1551 previous_index = start_index - 1
1552 retrieved_items = 0 1552 retrieved_items = 0
1553 current_page = collection["last"] 1553 current_page = collection["last"]
1554 while retrieved_items < count: 1554 while retrieved_items < count:
1555 page_data, items = await self.parseAPPage( 1555 page_data, items = await self.parse_ap_page(
1556 current_page, parser, only_ids 1556 current_page, parser, only_ids
1557 ) 1557 )
1558 if not items: 1558 if not items:
1559 log.warning(f"found an empty AP page at {current_page}") 1559 log.warning(f"found an empty AP page at {current_page}")
1560 return [], rsm_resp 1560 return [], rsm_resp
1586 page_items = [] 1586 page_items = []
1587 retrieved_items = 0 1587 retrieved_items = 0
1588 found_after_id = False 1588 found_after_id = False
1589 1589
1590 while retrieved_items < count: 1590 while retrieved_items < count:
1591 __, page_items = await self.parseAPPage(page, parser, only_ids) 1591 __, page_items = await self.parse_ap_page(page, parser, only_ids)
1592 if not page_items: 1592 if not page_items:
1593 break 1593 break
1594 retrieved_items += len(page_items) 1594 retrieved_items += len(page_items)
1595 if after_id is not None and not found_after_id: 1595 if after_id is not None and not found_after_id:
1596 # if we have an after_id, we ignore all items until the requested one is 1596 # if we have an after_id, we ignore all items until the requested one is
1659 async def ap_item_2_mb_elt(self, ap_item: dict) -> domish.Element: 1659 async def ap_item_2_mb_elt(self, ap_item: dict) -> domish.Element:
1660 """Convert AP item to XMPP item element""" 1660 """Convert AP item to XMPP item element"""
1661 __, item_elt = await self.ap_item_2_mb_data_and_elt(ap_item) 1661 __, item_elt = await self.ap_item_2_mb_data_and_elt(ap_item)
1662 return item_elt 1662 return item_elt
1663 1663
1664 async def parseAPPage( 1664 async def parse_ap_page(
1665 self, 1665 self,
1666 page: Union[str, dict], 1666 page: Union[str, dict],
1667 parser: Callable[[dict], Awaitable[domish.Element]], 1667 parser: Callable[[dict], Awaitable[domish.Element]],
1668 only_ids: bool = False 1668 only_ids: bool = False
1669 ) -> Tuple[dict, List[domish.Element]]: 1669 ) -> Tuple[dict, List[domish.Element]]:
1672 @param page: Can be either url linking and AP page, or the page data directly 1672 @param page: Can be either url linking and AP page, or the page data directly
1673 @param parser: method to use to parse AP items and get XMPP item elements 1673 @param parser: method to use to parse AP items and get XMPP item elements
1674 @param only_ids: if True, only retrieve items IDs 1674 @param only_ids: if True, only retrieve items IDs
1675 @return: page data, pubsub items 1675 @return: page data, pubsub items
1676 """ 1676 """
1677 page_data = await self.apGetObject(page) 1677 page_data = await self.ap_get_object(page)
1678 if page_data is None: 1678 if page_data is None:
1679 log.warning('No data found in collection') 1679 log.warning('No data found in collection')
1680 return {}, [] 1680 return {}, []
1681 ap_items = await self.apGetList(page_data, "orderedItems", only_ids=only_ids) 1681 ap_items = await self.ap_get_list(page_data, "orderedItems", only_ids=only_ids)
1682 if ap_items is None: 1682 if ap_items is None:
1683 ap_items = await self.apGetList(page_data, "items", only_ids=only_ids) 1683 ap_items = await self.ap_get_list(page_data, "items", only_ids=only_ids)
1684 if not ap_items: 1684 if not ap_items:
1685 log.warning(f'No item field found in collection: {page_data!r}') 1685 log.warning(f'No item field found in collection: {page_data!r}')
1686 return page_data, [] 1686 return page_data, []
1687 else: 1687 else:
1688 log.warning( 1688 log.warning(
1697 except (exceptions.DataError, NotImplementedError, error.StanzaError): 1697 except (exceptions.DataError, NotImplementedError, error.StanzaError):
1698 continue 1698 continue
1699 1699
1700 return page_data, items 1700 return page_data, items
1701 1701
1702 async def getCommentsNodes( 1702 async def get_comments_nodes(
1703 self, 1703 self,
1704 item_id: str, 1704 item_id: str,
1705 parent_id: Optional[str] 1705 parent_id: Optional[str]
1706 ) -> Tuple[Optional[str], Optional[str]]: 1706 ) -> Tuple[Optional[str], Optional[str]]:
1707 """Get node where this item is and node to use for comments 1707 """Get node where this item is and node to use for comments
1717 returned when no comment node must be used (happens when we have reached 1717 returned when no comment node must be used (happens when we have reached
1718 "comments_max_depth") 1718 "comments_max_depth")
1719 """ 1719 """
1720 if parent_id is None or not self.comments_max_depth: 1720 if parent_id is None or not self.comments_max_depth:
1721 return ( 1721 return (
1722 self._m.getCommentsNode(parent_id) if parent_id is not None else None, 1722 self._m.get_comments_node(parent_id) if parent_id is not None else None,
1723 self._m.getCommentsNode(item_id) 1723 self._m.get_comments_node(item_id)
1724 ) 1724 )
1725 parent_url = parent_id 1725 parent_url = parent_id
1726 parents = [] 1726 parents = []
1727 for __ in range(COMMENTS_MAX_PARENTS): 1727 for __ in range(COMMENTS_MAX_PARENTS):
1728 parent_item = await self.apGet(parent_url) 1728 parent_item = await self.ap_get(parent_url)
1729 parents.insert(0, parent_item) 1729 parents.insert(0, parent_item)
1730 parent_url = parent_item.get("inReplyTo") 1730 parent_url = parent_item.get("inReplyTo")
1731 if parent_url is None: 1731 if parent_url is None:
1732 break 1732 break
1733 parent_limit = self.comments_max_depth-1 1733 parent_limit = self.comments_max_depth-1
1734 if len(parents) <= parent_limit: 1734 if len(parents) <= parent_limit:
1735 return ( 1735 return (
1736 self._m.getCommentsNode(parents[-1]["id"]), 1736 self._m.get_comments_node(parents[-1]["id"]),
1737 self._m.getCommentsNode(item_id) 1737 self._m.get_comments_node(item_id)
1738 ) 1738 )
1739 else: 1739 else:
1740 last_level_item = parents[parent_limit] 1740 last_level_item = parents[parent_limit]
1741 return ( 1741 return (
1742 self._m.getCommentsNode(last_level_item["id"]), 1742 self._m.get_comments_node(last_level_item["id"]),
1743 None 1743 None
1744 ) 1744 )
1745 1745
1746 async def ap_item_2_mb_data(self, ap_item: dict) -> dict: 1746 async def ap_item_2_mb_data(self, ap_item: dict) -> dict:
1747 """Convert AP activity or object to microblog data 1747 """Convert AP activity or object to microblog data
1753 @raise NotImplementedError: some AP data is not handled yet 1753 @raise NotImplementedError: some AP data is not handled yet
1754 @raise error.StanzaError: error while contacting the AP server 1754 @raise error.StanzaError: error while contacting the AP server
1755 """ 1755 """
1756 is_activity = self.is_activity(ap_item) 1756 is_activity = self.is_activity(ap_item)
1757 if is_activity: 1757 if is_activity:
1758 ap_object = await self.apGetObject(ap_item, "object") 1758 ap_object = await self.ap_get_object(ap_item, "object")
1759 if not ap_object: 1759 if not ap_object:
1760 log.warning(f'No "object" found in AP item {ap_item!r}') 1760 log.warning(f'No "object" found in AP item {ap_item!r}')
1761 raise exceptions.DataError 1761 raise exceptions.DataError
1762 else: 1762 else:
1763 ap_object = ap_item 1763 ap_object = ap_item
1813 attachment[key] = value 1813 attachment[key] = value
1814 attachments.append(attachment) 1814 attachments.append(attachment)
1815 1815
1816 # author 1816 # author
1817 if is_activity: 1817 if is_activity:
1818 authors = await self.apGetActors(ap_item, "actor") 1818 authors = await self.ap_get_actors(ap_item, "actor")
1819 else: 1819 else:
1820 authors = await self.apGetActors(ap_object, "attributedTo") 1820 authors = await self.ap_get_actors(ap_object, "attributedTo")
1821 if len(authors) > 1: 1821 if len(authors) > 1:
1822 # we only keep first item as author 1822 # we only keep first item as author
1823 # TODO: handle multiple actors 1823 # TODO: handle multiple actors
1824 log.warning("multiple actors are not managed") 1824 log.warning("multiple actors are not managed")
1825 1825
1826 account = authors[0] 1826 account = authors[0]
1827 author_jid = self.getLocalJIDFromAccount(account).full() 1827 author_jid = self.get_local_jid_from_account(account).full()
1828 1828
1829 mb_data["author"] = account.split("@", 1)[0] 1829 mb_data["author"] = account.split("@", 1)[0]
1830 mb_data["author_jid"] = author_jid 1830 mb_data["author_jid"] = author_jid
1831 1831
1832 # published/updated 1832 # published/updated
1846 if "_repeated" in ap_item: 1846 if "_repeated" in ap_item:
1847 mb_data["extra"]["repeated"] = ap_item["_repeated"] 1847 mb_data["extra"]["repeated"] = ap_item["_repeated"]
1848 1848
1849 # comments 1849 # comments
1850 in_reply_to = ap_object.get("inReplyTo") 1850 in_reply_to = ap_object.get("inReplyTo")
1851 __, comments_node = await self.getCommentsNodes(item_id, in_reply_to) 1851 __, comments_node = await self.get_comments_nodes(item_id, in_reply_to)
1852 if comments_node is not None: 1852 if comments_node is not None:
1853 comments_data = { 1853 comments_data = {
1854 "service": author_jid, 1854 "service": author_jid,
1855 "node": comments_node, 1855 "node": comments_node,
1856 "uri": uri.buildXMPPUri( 1856 "uri": uri.build_xmpp_uri(
1857 "pubsub", 1857 "pubsub",
1858 path=author_jid, 1858 path=author_jid,
1859 node=comments_node 1859 node=comments_node
1860 ) 1860 )
1861 } 1861 }
1862 mb_data["comments"] = [comments_data] 1862 mb_data["comments"] = [comments_data]
1863 1863
1864 return mb_data 1864 return mb_data
1865 1865
1866 async def getReplyToIdFromXMPPNode( 1866 async def get_reply_to_id_from_xmpp_node(
1867 self, 1867 self,
1868 client: SatXMPPEntity, 1868 client: SatXMPPEntity,
1869 ap_account: str, 1869 ap_account: str,
1870 parent_item: str, 1870 parent_item: str,
1871 mb_data: dict 1871 mb_data: dict
1883 @param mb_data: microblog data of the publication 1883 @param mb_data: microblog data of the publication
1884 @return: URL to use in ``inReplyTo`` field 1884 @return: URL to use in ``inReplyTo`` field
1885 """ 1885 """
1886 # FIXME: propose a protoXEP to properly get parent item, node and service 1886 # FIXME: propose a protoXEP to properly get parent item, node and service
1887 1887
1888 found_items = await self.host.memory.storage.searchPubsubItems({ 1888 found_items = await self.host.memory.storage.search_pubsub_items({
1889 "profiles": [client.profile], 1889 "profiles": [client.profile],
1890 "names": [parent_item] 1890 "names": [parent_item]
1891 }) 1891 })
1892 if not found_items: 1892 if not found_items:
1893 log.warning(f"parent item {parent_item!r} not found in cache") 1893 log.warning(f"parent item {parent_item!r} not found in cache")
1894 parent_ap_account = ap_account 1894 parent_ap_account = ap_account
1895 elif len(found_items) == 1: 1895 elif len(found_items) == 1:
1896 cached_node = found_items[0].node 1896 cached_node = found_items[0].node
1897 parent_ap_account = await self.getAPAccountFromJidAndNode( 1897 parent_ap_account = await self.get_ap_account_from_jid_and_node(
1898 cached_node.service, 1898 cached_node.service,
1899 cached_node.name 1899 cached_node.name
1900 ) 1900 )
1901 else: 1901 else:
1902 # we found several cached item with given ID, we check if there is one 1902 # we found several cached item with given ID, we check if there is one
1915 f"{parent_item!r}" 1915 f"{parent_item!r}"
1916 ) 1916 )
1917 parent_ap_account = ap_account 1917 parent_ap_account = ap_account
1918 else: 1918 else:
1919 cached_node = cached_item.node 1919 cached_node = cached_item.node
1920 parent_ap_account = await self.getAPAccountFromJidAndNode( 1920 parent_ap_account = await self.get_ap_account_from_jid_and_node(
1921 cached_node.service, 1921 cached_node.service,
1922 cached_node.name 1922 cached_node.name
1923 ) 1923 )
1924 1924
1925 return self.buildAPURL( 1925 return self.build_apurl(
1926 TYPE_ITEM, parent_ap_account, parent_item 1926 TYPE_ITEM, parent_ap_account, parent_item
1927 ) 1927 )
1928 1928
1929 async def repeated_mb_2_ap_item( 1929 async def repeated_mb_2_ap_item(
1930 self, 1930 self,
1935 @param mb_data: microblog metadata of an item repeating an other blog post 1935 @param mb_data: microblog metadata of an item repeating an other blog post
1936 @return: Announce activity linking to the repeated item 1936 @return: Announce activity linking to the repeated item
1937 """ 1937 """
1938 repeated = mb_data["extra"]["repeated"] 1938 repeated = mb_data["extra"]["repeated"]
1939 repeater = jid.JID(repeated["by"]) 1939 repeater = jid.JID(repeated["by"])
1940 repeater_account = await self.getAPAccountFromJidAndNode( 1940 repeater_account = await self.get_ap_account_from_jid_and_node(
1941 repeater, 1941 repeater,
1942 None 1942 None
1943 ) 1943 )
1944 repeater_id = self.buildAPURL(TYPE_ACTOR, repeater_account) 1944 repeater_id = self.build_apurl(TYPE_ACTOR, repeater_account)
1945 repeated_uri = repeated["uri"] 1945 repeated_uri = repeated["uri"]
1946 1946
1947 if not repeated_uri.startswith("xmpp:"): 1947 if not repeated_uri.startswith("xmpp:"):
1948 log.warning( 1948 log.warning(
1949 "Only xmpp: URL are handled for repeated item at the moment, ignoring " 1949 "Only xmpp: URL are handled for repeated item at the moment, ignoring "
1950 f"item {mb_data}" 1950 f"item {mb_data}"
1951 ) 1951 )
1952 raise NotImplementedError 1952 raise NotImplementedError
1953 parsed_url = uri.parseXMPPUri(repeated_uri) 1953 parsed_url = uri.parse_xmpp_uri(repeated_uri)
1954 if parsed_url["type"] != "pubsub": 1954 if parsed_url["type"] != "pubsub":
1955 log.warning( 1955 log.warning(
1956 "Only pubsub URL are handled for repeated item at the moment, ignoring " 1956 "Only pubsub URL are handled for repeated item at the moment, ignoring "
1957 f"item {mb_data}" 1957 f"item {mb_data}"
1958 ) 1958 )
1959 raise NotImplementedError 1959 raise NotImplementedError
1960 rep_service = jid.JID(parsed_url["path"]) 1960 rep_service = jid.JID(parsed_url["path"])
1961 rep_item = parsed_url["item"] 1961 rep_item = parsed_url["item"]
1962 activity_id = self.buildAPURL("item", repeater.userhost(), mb_data["id"]) 1962 activity_id = self.build_apurl("item", repeater.userhost(), mb_data["id"])
1963 1963
1964 if self.isVirtualJID(rep_service): 1964 if self.is_virtual_jid(rep_service):
1965 # it's an AP actor linked through this gateway 1965 # it's an AP actor linked through this gateway
1966 # in this case we can simply use the item ID 1966 # in this case we can simply use the item ID
1967 if not rep_item.startswith("https:"): 1967 if not rep_item.startswith("https:"):
1968 log.warning( 1968 log.warning(
1969 f"Was expecting an HTTPS url as item ID and got {rep_item!r}\n" 1969 f"Was expecting an HTTPS url as item ID and got {rep_item!r}\n"
1972 announced_uri = rep_item 1972 announced_uri = rep_item
1973 repeated_account = self._e.unescape(rep_service.user) 1973 repeated_account = self._e.unescape(rep_service.user)
1974 else: 1974 else:
1975 # the repeated item is an XMPP publication, we build the corresponding ID 1975 # the repeated item is an XMPP publication, we build the corresponding ID
1976 rep_node = parsed_url["node"] 1976 rep_node = parsed_url["node"]
1977 repeated_account = await self.getAPAccountFromJidAndNode( 1977 repeated_account = await self.get_ap_account_from_jid_and_node(
1978 rep_service, rep_node 1978 rep_service, rep_node
1979 ) 1979 )
1980 announced_uri = self.buildAPURL("item", repeated_account, rep_item) 1980 announced_uri = self.build_apurl("item", repeated_account, rep_item)
1981 1981
1982 announce = self.create_activity( 1982 announce = self.create_activity(
1983 "Announce", repeater_id, announced_uri, activity_id=activity_id 1983 "Announce", repeater_id, announced_uri, activity_id=activity_id
1984 ) 1984 )
1985 announce["to"] = [NS_AP_PUBLIC] 1985 announce["to"] = [NS_AP_PUBLIC]
1986 announce["cc"] = [ 1986 announce["cc"] = [
1987 self.buildAPURL(TYPE_FOLLOWERS, repeater_account), 1987 self.build_apurl(TYPE_FOLLOWERS, repeater_account),
1988 await self.getAPActorIdFromAccount(repeated_account) 1988 await self.get_ap_actor_id_from_account(repeated_account)
1989 ] 1989 ]
1990 return announce 1990 return announce
1991 1991
1992 async def mb_data_2_ap_item( 1992 async def mb_data_2_ap_item(
1993 self, 1993 self,
2018 return await self.repeated_mb_2_ap_item(mb_data) 2018 return await self.repeated_mb_2_ap_item(mb_data)
2019 if not mb_data.get("id"): 2019 if not mb_data.get("id"):
2020 mb_data["id"] = shortuuid.uuid() 2020 mb_data["id"] = shortuuid.uuid()
2021 if not mb_data.get("author_jid"): 2021 if not mb_data.get("author_jid"):
2022 mb_data["author_jid"] = client.jid.userhost() 2022 mb_data["author_jid"] = client.jid.userhost()
2023 ap_account = await self.getAPAccountFromJidAndNode( 2023 ap_account = await self.get_ap_account_from_jid_and_node(
2024 jid.JID(mb_data["author_jid"]), 2024 jid.JID(mb_data["author_jid"]),
2025 None 2025 None
2026 ) 2026 )
2027 url_actor = self.buildAPURL(TYPE_ACTOR, ap_account) 2027 url_actor = self.build_apurl(TYPE_ACTOR, ap_account)
2028 url_item = self.buildAPURL(TYPE_ITEM, ap_account, mb_data["id"]) 2028 url_item = self.build_apurl(TYPE_ITEM, ap_account, mb_data["id"])
2029 ap_object = { 2029 ap_object = {
2030 "id": url_item, 2030 "id": url_item,
2031 "type": "Note", 2031 "type": "Note",
2032 "published": utils.xmpp_date(mb_data.get("published")), 2032 "published": utils.xmpp_date(mb_data.get("published")),
2033 "attributedTo": url_actor, 2033 "attributedTo": url_actor,
2074 if m_host in (self.public_url, self.client.jid.host): 2074 if m_host in (self.public_url, self.client.jid.host):
2075 # we ignore mention of local users, they should be sent as XMPP 2075 # we ignore mention of local users, they should be sent as XMPP
2076 # references 2076 # references
2077 continue 2077 continue
2078 try: 2078 try:
2079 mentioned_id = await self.getAPActorIdFromAccount(mentioned) 2079 mentioned_id = await self.get_ap_actor_id_from_account(mentioned)
2080 except Exception as e: 2080 except Exception as e:
2081 log.warning(f"Can't add mention to {mentioned!r}: {e}") 2081 log.warning(f"Can't add mention to {mentioned!r}: {e}")
2082 else: 2082 else:
2083 ap_object["to"].append(mentioned_id) 2083 ap_object["to"].append(mentioned_id)
2084 ap_object.setdefault("tag", []).append({ 2084 ap_object.setdefault("tag", []).append({
2092 except KeyError: 2092 except KeyError:
2093 # node and service must always be specified when this method is used 2093 # node and service must always be specified when this method is used
2094 raise exceptions.InternalError( 2094 raise exceptions.InternalError(
2095 "node or service is missing in mb_data" 2095 "node or service is missing in mb_data"
2096 ) 2096 )
2097 target_ap_account = await self.getAPAccountFromJidAndNode( 2097 target_ap_account = await self.get_ap_account_from_jid_and_node(
2098 service, node 2098 service, node
2099 ) 2099 )
2100 if self.isVirtualJID(service): 2100 if self.is_virtual_jid(service):
2101 # service is a proxy JID for AP account 2101 # service is a proxy JID for AP account
2102 actor_data = await self.getAPActorDataFromAccount(target_ap_account) 2102 actor_data = await self.get_ap_actor_data_from_account(target_ap_account)
2103 followers = actor_data.get("followers") 2103 followers = actor_data.get("followers")
2104 else: 2104 else:
2105 # service is a real XMPP entity 2105 # service is a real XMPP entity
2106 followers = self.buildAPURL(TYPE_FOLLOWERS, target_ap_account) 2106 followers = self.build_apurl(TYPE_FOLLOWERS, target_ap_account)
2107 if followers: 2107 if followers:
2108 ap_object["cc"] = [followers] 2108 ap_object["cc"] = [followers]
2109 if self._m.isCommentNode(node): 2109 if self._m.is_comment_node(node):
2110 parent_item = self._m.getParentItem(node) 2110 parent_item = self._m.get_parent_item(node)
2111 if self.isVirtualJID(service): 2111 if self.is_virtual_jid(service):
2112 # the publication is on a virtual node (i.e. an XMPP node managed by 2112 # the publication is on a virtual node (i.e. an XMPP node managed by
2113 # this gateway and linking to an ActivityPub actor) 2113 # this gateway and linking to an ActivityPub actor)
2114 ap_object["inReplyTo"] = parent_item 2114 ap_object["inReplyTo"] = parent_item
2115 else: 2115 else:
2116 # the publication is from a followed real XMPP node 2116 # the publication is from a followed real XMPP node
2117 ap_object["inReplyTo"] = await self.getReplyToIdFromXMPPNode( 2117 ap_object["inReplyTo"] = await self.get_reply_to_id_from_xmpp_node(
2118 client, 2118 client,
2119 ap_account, 2119 ap_account,
2120 parent_item, 2120 parent_item,
2121 mb_data 2121 mb_data
2122 ) 2122 )
2123 2123
2124 return self.create_activity( 2124 return self.create_activity(
2125 "Create" if is_new else "Update", url_actor, ap_object, activity_id=url_item 2125 "Create" if is_new else "Update", url_actor, ap_object, activity_id=url_item
2126 ) 2126 )
2127 2127
2128 async def publishMessage( 2128 async def publish_message(
2129 self, 2129 self,
2130 client: SatXMPPEntity, 2130 client: SatXMPPEntity,
2131 mess_data: dict, 2131 mess_data: dict,
2132 service: jid.JID 2132 service: jid.JID
2133 ) -> None: 2133 ) -> None:
2149 @param service: JID corresponding to the AP actor. 2149 @param service: JID corresponding to the AP actor.
2150 """ 2150 """
2151 if not service.user: 2151 if not service.user:
2152 raise ValueError("service must have a local part") 2152 raise ValueError("service must have a local part")
2153 account = self._e.unescape(service.user) 2153 account = self._e.unescape(service.user)
2154 ap_actor_data = await self.getAPActorDataFromAccount(account) 2154 ap_actor_data = await self.get_ap_actor_data_from_account(account)
2155 2155
2156 try: 2156 try:
2157 inbox_url = ap_actor_data["endpoints"]["sharedInbox"] 2157 inbox_url = ap_actor_data["endpoints"]["sharedInbox"]
2158 except KeyError: 2158 except KeyError:
2159 raise exceptions.DataError("Can't get ActivityPub actor inbox") 2159 raise exceptions.DataError("Can't get ActivityPub actor inbox")
2160 2160
2161 item_data = await self.mb_data_2_ap_item(client, mess_data) 2161 item_data = await self.mb_data_2_ap_item(client, mess_data)
2162 url_actor = item_data["actor"] 2162 url_actor = item_data["actor"]
2163 resp = await self.signAndPost(inbox_url, url_actor, item_data) 2163 resp = await self.sign_and_post(inbox_url, url_actor, item_data)
2164 2164
2165 async def apDeleteItem( 2165 async def ap_delete_item(
2166 self, 2166 self,
2167 jid_: jid.JID, 2167 jid_: jid.JID,
2168 node: Optional[str], 2168 node: Optional[str],
2169 item_id: str, 2169 item_id: str,
2170 public: bool = True 2170 public: bool = True
2180 @return: actor_id of the entity deleting the item, activity to send 2180 @return: actor_id of the entity deleting the item, activity to send
2181 """ 2181 """
2182 if node is None: 2182 if node is None:
2183 node = self._m.namespace 2183 node = self._m.namespace
2184 2184
2185 author_account = await self.getAPAccountFromJidAndNode(jid_, node) 2185 author_account = await self.get_ap_account_from_jid_and_node(jid_, node)
2186 author_actor_id = self.buildAPURL(TYPE_ACTOR, author_account) 2186 author_actor_id = self.build_apurl(TYPE_ACTOR, author_account)
2187 2187
2188 items = await self.host.memory.storage.searchPubsubItems({ 2188 items = await self.host.memory.storage.search_pubsub_items({
2189 "profiles": [self.client.profile], 2189 "profiles": [self.client.profile],
2190 "services": [jid_], 2190 "services": [jid_],
2191 "names": [item_id] 2191 "names": [item_id]
2192 }) 2192 })
2193 if not items: 2193 if not items:
2208 log.debug( 2208 log.debug(
2209 f"Can't parse item, maybe it's not a blog item: {e}\n" 2209 f"Can't parse item, maybe it's not a blog item: {e}\n"
2210 f"{items[0].toXml()}" 2210 f"{items[0].toXml()}"
2211 ) 2211 )
2212 2212
2213 url_item = self.buildAPURL(TYPE_ITEM, author_account, item_id) 2213 url_item = self.build_apurl(TYPE_ITEM, author_account, item_id)
2214 ap_item = self.create_activity( 2214 ap_item = self.create_activity(
2215 "Delete", 2215 "Delete",
2216 author_actor_id, 2216 author_actor_id,
2217 { 2217 {
2218 "id": url_item, 2218 "id": url_item,
2221 ) 2221 )
2222 if public: 2222 if public:
2223 ap_item["to"] = [NS_AP_PUBLIC] 2223 ap_item["to"] = [NS_AP_PUBLIC]
2224 return author_actor_id, ap_item 2224 return author_actor_id, ap_item
2225 2225
2226 def _messageReceivedTrigger( 2226 def _message_received_trigger(
2227 self, 2227 self,
2228 client: SatXMPPEntity, 2228 client: SatXMPPEntity,
2229 message_elt: domish.Element, 2229 message_elt: domish.Element,
2230 post_treat: defer.Deferred 2230 post_treat: defer.Deferred
2231 ) -> bool: 2231 ) -> bool:
2246 if client != self.client: 2246 if client != self.client:
2247 return mess_data 2247 return mess_data
2248 if mess_data["type"] not in ("chat", "normal"): 2248 if mess_data["type"] not in ("chat", "normal"):
2249 log.warning(f"ignoring message with unexpected type: {mess_data}") 2249 log.warning(f"ignoring message with unexpected type: {mess_data}")
2250 return mess_data 2250 return mess_data
2251 if not self.isLocal(mess_data["from"]): 2251 if not self.is_local(mess_data["from"]):
2252 log.warning(f"ignoring non local message: {mess_data}") 2252 log.warning(f"ignoring non local message: {mess_data}")
2253 return mess_data 2253 return mess_data
2254 if not mess_data["to"].user: 2254 if not mess_data["to"].user:
2255 log.warning( 2255 log.warning(
2256 f"ignoring message addressed to gateway itself: {mess_data}" 2256 f"ignoring message addressed to gateway itself: {mess_data}"
2257 ) 2257 )
2258 return mess_data 2258 return mess_data
2259 2259
2260 actor_account = self._e.unescape(mess_data["to"].user) 2260 actor_account = self._e.unescape(mess_data["to"].user)
2261 actor_id = await self.getAPActorIdFromAccount(actor_account) 2261 actor_id = await self.get_ap_actor_id_from_account(actor_account)
2262 inbox = await self.getAPInboxFromId(actor_id, use_shared=False) 2262 inbox = await self.get_ap_inbox_from_id(actor_id, use_shared=False)
2263 2263
2264 try: 2264 try:
2265 language, message = next(iter(mess_data["message"].items())) 2265 language, message = next(iter(mess_data["message"].items()))
2266 except (KeyError, StopIteration): 2266 except (KeyError, StopIteration):
2267 log.warning(f"ignoring empty message: {mess_data}") 2267 log.warning(f"ignoring empty message: {mess_data}")
2280 if attachments: 2280 if attachments:
2281 mb_data["extra"] = { 2281 mb_data["extra"] = {
2282 C.KEY_ATTACHMENTS: attachments 2282 C.KEY_ATTACHMENTS: attachments
2283 } 2283 }
2284 2284
2285 client = self.client.getVirtualClient(mess_data["from"]) 2285 client = self.client.get_virtual_client(mess_data["from"])
2286 ap_item = await self.mb_data_2_ap_item(client, mb_data, public=False) 2286 ap_item = await self.mb_data_2_ap_item(client, mb_data, public=False)
2287 ap_object = ap_item["object"] 2287 ap_object = ap_item["object"]
2288 ap_object["to"] = ap_item["to"] = [actor_id] 2288 ap_object["to"] = ap_item["to"] = [actor_id]
2289 # we add a mention to direct message, otherwise peer is not notified in some AP 2289 # we add a mention to direct message, otherwise peer is not notified in some AP
2290 # implementations (notably Mastodon), and the message may be missed easily. 2290 # implementations (notably Mastodon), and the message may be missed easily.
2292 "type": TYPE_MENTION, 2292 "type": TYPE_MENTION,
2293 "href": actor_id, 2293 "href": actor_id,
2294 "name": f"@{actor_account}", 2294 "name": f"@{actor_account}",
2295 }) 2295 })
2296 2296
2297 await self.signAndPost(inbox, ap_item["actor"], ap_item) 2297 await self.sign_and_post(inbox, ap_item["actor"], ap_item)
2298 return mess_data 2298 return mess_data
2299 2299
2300 async def _onMessageRetract( 2300 async def _on_message_retract(
2301 self, 2301 self,
2302 client: SatXMPPEntity, 2302 client: SatXMPPEntity,
2303 message_elt: domish.Element, 2303 message_elt: domish.Element,
2304 retract_elt: domish.Element, 2304 retract_elt: domish.Element,
2305 fastened_elts 2305 fastened_elts
2306 ) -> bool: 2306 ) -> bool:
2307 if client != self.client: 2307 if client != self.client:
2308 return True 2308 return True
2309 from_jid = jid.JID(message_elt["from"]) 2309 from_jid = jid.JID(message_elt["from"])
2310 if not self.isLocal(from_jid): 2310 if not self.is_local(from_jid):
2311 log.debug( 2311 log.debug(
2312 f"ignoring retract request from non local jid {from_jid}" 2312 f"ignoring retract request from non local jid {from_jid}"
2313 ) 2313 )
2314 return False 2314 return False
2315 to_jid = jid.JID(message_elt["to"]) 2315 to_jid = jid.JID(message_elt["to"])
2317 # to_jid should be a virtual JID from this gateway 2317 # to_jid should be a virtual JID from this gateway
2318 raise exceptions.InternalError( 2318 raise exceptions.InternalError(
2319 f"Invalid destinee's JID: {to_jid.full()}" 2319 f"Invalid destinee's JID: {to_jid.full()}"
2320 ) 2320 )
2321 ap_account = self._e.unescape(to_jid.user) 2321 ap_account = self._e.unescape(to_jid.user)
2322 actor_id = await self.getAPActorIdFromAccount(ap_account) 2322 actor_id = await self.get_ap_actor_id_from_account(ap_account)
2323 inbox = await self.getAPInboxFromId(actor_id, use_shared=False) 2323 inbox = await self.get_ap_inbox_from_id(actor_id, use_shared=False)
2324 url_actor, ap_item = await self.apDeleteItem( 2324 url_actor, ap_item = await self.ap_delete_item(
2325 from_jid.userhostJID(), None, fastened_elts.id, public=False 2325 from_jid.userhostJID(), None, fastened_elts.id, public=False
2326 ) 2326 )
2327 resp = await self.signAndPost(inbox, url_actor, ap_item) 2327 resp = await self.sign_and_post(inbox, url_actor, ap_item)
2328 return False 2328 return False
2329 2329
2330 async def _onReferenceReceived( 2330 async def _on_reference_received(
2331 self, 2331 self,
2332 client: SatXMPPEntity, 2332 client: SatXMPPEntity,
2333 message_elt: domish.Element, 2333 message_elt: domish.Element,
2334 reference_data: Dict[str, Union[str, int]] 2334 reference_data: Dict[str, Union[str, int]]
2335 ) -> bool: 2335 ) -> bool:
2350 "account" 2350 "account"
2351 ) 2351 )
2352 return False 2352 return False
2353 2353
2354 ap_account = self._e.unescape(mentioned.user) 2354 ap_account = self._e.unescape(mentioned.user)
2355 actor_id = await self.getAPActorIdFromAccount(ap_account) 2355 actor_id = await self.get_ap_actor_id_from_account(ap_account)
2356 2356
2357 parsed_anchor: dict = reference_data.get("parsed_anchor") 2357 parsed_anchor: dict = reference_data.get("parsed_anchor")
2358 if not parsed_anchor: 2358 if not parsed_anchor:
2359 log.warning(f"no XMPP anchor, ignoring reference {reference_data!r}") 2359 log.warning(f"no XMPP anchor, ignoring reference {reference_data!r}")
2360 return False 2360 return False
2378 pubsub_item = parsed_anchor.get("item") 2378 pubsub_item = parsed_anchor.get("item")
2379 if not pubsub_item: 2379 if not pubsub_item:
2380 log.warning(f"missing pubsub item in anchor: {reference_data['anchor']}") 2380 log.warning(f"missing pubsub item in anchor: {reference_data['anchor']}")
2381 return False 2381 return False
2382 2382
2383 cached_node = await self.host.memory.storage.getPubsubNode( 2383 cached_node = await self.host.memory.storage.get_pubsub_node(
2384 client, pubsub_service, pubsub_node 2384 client, pubsub_service, pubsub_node
2385 ) 2385 )
2386 if not cached_node: 2386 if not cached_node:
2387 log.warning(f"Anchored node not found in cache: {reference_data['anchor']}") 2387 log.warning(f"Anchored node not found in cache: {reference_data['anchor']}")
2388 return False 2388 return False
2389 2389
2390 cached_items, __ = await self.host.memory.storage.getItems( 2390 cached_items, __ = await self.host.memory.storage.get_items(
2391 cached_node, item_ids=[pubsub_item] 2391 cached_node, item_ids=[pubsub_item]
2392 ) 2392 )
2393 if not cached_items: 2393 if not cached_items:
2394 log.warning( 2394 log.warning(
2395 f"Anchored pubsub item not found in cache: {reference_data['anchor']}" 2395 f"Anchored pubsub item not found in cache: {reference_data['anchor']}"
2408 "type": TYPE_MENTION, 2408 "type": TYPE_MENTION,
2409 "href": actor_id, 2409 "href": actor_id,
2410 "name": ap_account, 2410 "name": ap_account,
2411 }) 2411 })
2412 2412
2413 inbox = await self.getAPInboxFromId(actor_id, use_shared=False) 2413 inbox = await self.get_ap_inbox_from_id(actor_id, use_shared=False)
2414 2414
2415 resp = await self.signAndPost(inbox, ap_item["actor"], ap_item) 2415 resp = await self.sign_and_post(inbox, ap_item["actor"], ap_item)
2416 2416
2417 return False 2417 return False
2418 2418
2419 async def newReplyToXMPPItem( 2419 async def new_reply_to_xmpp_item(
2420 self, 2420 self,
2421 client: SatXMPPEntity, 2421 client: SatXMPPEntity,
2422 ap_item: dict, 2422 ap_item: dict,
2423 targets: Dict[str, Set[str]], 2423 targets: Dict[str, Set[str]],
2424 mentions: List[Dict[str, str]], 2424 mentions: List[Dict[str, str]],
2425 ) -> None: 2425 ) -> None:
2426 """We got an AP item which is a reply to an XMPP item""" 2426 """We got an AP item which is a reply to an XMPP item"""
2427 in_reply_to = ap_item["inReplyTo"] 2427 in_reply_to = ap_item["inReplyTo"]
2428 url_type, url_args = self.parseAPURL(in_reply_to) 2428 url_type, url_args = self.parse_apurl(in_reply_to)
2429 if url_type != "item": 2429 if url_type != "item":
2430 log.warning( 2430 log.warning(
2431 "Ignoring AP item replying to an XMPP item with an unexpected URL " 2431 "Ignoring AP item replying to an XMPP item with an unexpected URL "
2432 f"type({url_type!r}):\n{pformat(ap_item)}" 2432 f"type({url_type!r}):\n{pformat(ap_item)}"
2433 ) 2433 )
2438 log.warning( 2438 log.warning(
2439 "Ignoring AP item replying to an XMPP item with invalid inReplyTo URL " 2439 "Ignoring AP item replying to an XMPP item with invalid inReplyTo URL "
2440 f"({in_reply_to!r}):\n{pformat(ap_item)}" 2440 f"({in_reply_to!r}):\n{pformat(ap_item)}"
2441 ) 2441 )
2442 return 2442 return
2443 parent_item_service, parent_item_node = await self.getJIDAndNode( 2443 parent_item_service, parent_item_node = await self.get_jid_and_node(
2444 parent_item_account 2444 parent_item_account
2445 ) 2445 )
2446 if parent_item_node is None: 2446 if parent_item_node is None:
2447 parent_item_node = self._m.namespace 2447 parent_item_node = self._m.namespace
2448 items, __ = await self._p.getItems( 2448 items, __ = await self._p.get_items(
2449 client, parent_item_service, parent_item_node, item_ids=[parent_item_id] 2449 client, parent_item_service, parent_item_node, item_ids=[parent_item_id]
2450 ) 2450 )
2451 try: 2451 try:
2452 parent_item_elt = items[0] 2452 parent_item_elt = items[0]
2453 except IndexError: 2453 except IndexError:
2461 try: 2461 try:
2462 comment_service = jid.JID(parent_item_parsed["comments"][0]["service"]) 2462 comment_service = jid.JID(parent_item_parsed["comments"][0]["service"])
2463 comment_node = parent_item_parsed["comments"][0]["node"] 2463 comment_node = parent_item_parsed["comments"][0]["node"]
2464 except (KeyError, IndexError): 2464 except (KeyError, IndexError):
2465 # we don't have a comment node set for this item 2465 # we don't have a comment node set for this item
2466 from sat.tools.xml_tools import ppElt 2466 from sat.tools.xml_tools import pp_elt
2467 log.info(f"{ppElt(parent_item_elt.toXml())}") 2467 log.info(f"{pp_elt(parent_item_elt.toXml())}")
2468 raise NotImplementedError() 2468 raise NotImplementedError()
2469 else: 2469 else:
2470 __, item_elt = await self.ap_item_2_mb_data_and_elt(ap_item) 2470 __, item_elt = await self.ap_item_2_mb_data_and_elt(ap_item)
2471 await self._p.publish(client, comment_service, comment_node, [item_elt]) 2471 await self._p.publish(client, comment_service, comment_node, [item_elt])
2472 await self.notifyMentions( 2472 await self.notify_mentions(
2473 targets, mentions, comment_service, comment_node, item_elt["id"] 2473 targets, mentions, comment_service, comment_node, item_elt["id"]
2474 ) 2474 )
2475 2475
2476 def getAPItemTargets( 2476 def get_ap_item_targets(
2477 self, 2477 self,
2478 item: Dict[str, Any] 2478 item: Dict[str, Any]
2479 ) -> Tuple[bool, Dict[str, Set[str]], List[Dict[str, str]]]: 2479 ) -> Tuple[bool, Dict[str, Set[str]], List[Dict[str, str]]]:
2480 """Retrieve targets of an AP item, and indicate if it's a public one 2480 """Retrieve targets of an AP item, and indicate if it's a public one
2481 2481
2497 if value in PUBLIC_TUPLE: 2497 if value in PUBLIC_TUPLE:
2498 is_public = True 2498 is_public = True
2499 continue 2499 continue
2500 if not value: 2500 if not value:
2501 continue 2501 continue
2502 if not self.isLocalURL(value): 2502 if not self.is_local_url(value):
2503 continue 2503 continue
2504 target_type = self.parseAPURL(value)[0] 2504 target_type = self.parse_apurl(value)[0]
2505 if target_type != TYPE_ACTOR: 2505 if target_type != TYPE_ACTOR:
2506 log.debug(f"ignoring non actor type as a target: {href}") 2506 log.debug(f"ignoring non actor type as a target: {href}")
2507 else: 2507 else:
2508 targets.setdefault(target_type, set()).add(value) 2508 targets.setdefault(target_type, set()).add(value)
2509 2509
2515 continue 2515 continue
2516 href = tag.get("href") 2516 href = tag.get("href")
2517 if not href: 2517 if not href:
2518 log.warning('Missing "href" field from mention object: {tag!r}') 2518 log.warning('Missing "href" field from mention object: {tag!r}')
2519 continue 2519 continue
2520 if not self.isLocalURL(href): 2520 if not self.is_local_url(href):
2521 continue 2521 continue
2522 uri_type = self.parseAPURL(href)[0] 2522 uri_type = self.parse_apurl(href)[0]
2523 if uri_type != TYPE_ACTOR: 2523 if uri_type != TYPE_ACTOR:
2524 log.debug(f"ignoring non actor URI as a target: {href}") 2524 log.debug(f"ignoring non actor URI as a target: {href}")
2525 continue 2525 continue
2526 mention = {"uri": href} 2526 mention = {"uri": href}
2527 mentions.append(mention) 2527 mentions.append(mention)
2529 if name: 2529 if name:
2530 mention["content"] = name 2530 mention["content"] = name
2531 2531
2532 return is_public, targets, mentions 2532 return is_public, targets, mentions
2533 2533
2534 async def newAPItem( 2534 async def new_ap_item(
2535 self, 2535 self,
2536 client: SatXMPPEntity, 2536 client: SatXMPPEntity,
2537 destinee: Optional[jid.JID], 2537 destinee: Optional[jid.JID],
2538 node: str, 2538 node: str,
2539 item: dict, 2539 item: dict,
2542 2542
2543 @param destinee: jid of the destinee, 2543 @param destinee: jid of the destinee,
2544 @param node: XMPP pubsub node 2544 @param node: XMPP pubsub node
2545 @param item: AP object payload 2545 @param item: AP object payload
2546 """ 2546 """
2547 is_public, targets, mentions = self.getAPItemTargets(item) 2547 is_public, targets, mentions = self.get_ap_item_targets(item)
2548 if not is_public and targets.keys() == {TYPE_ACTOR}: 2548 if not is_public and targets.keys() == {TYPE_ACTOR}:
2549 # this is a direct message 2549 # this is a direct message
2550 await self.handle_message_ap_item( 2550 await self.handle_message_ap_item(
2551 client, targets, mentions, destinee, item 2551 client, targets, mentions, destinee, item
2552 ) 2552 )
2553 else: 2553 else:
2554 await self.handlePubsubAPItem( 2554 await self.handle_pubsub_ap_item(
2555 client, targets, mentions, destinee, node, item, is_public 2555 client, targets, mentions, destinee, node, item, is_public
2556 ) 2556 )
2557 2557
2558 async def handle_message_ap_item( 2558 async def handle_message_ap_item(
2559 self, 2559 self,
2568 @param targets: actors where the item must be delivered 2568 @param targets: actors where the item must be delivered
2569 @param destinee: jid of the destinee, 2569 @param destinee: jid of the destinee,
2570 @param item: AP object payload 2570 @param item: AP object payload
2571 """ 2571 """
2572 targets_jids = { 2572 targets_jids = {
2573 await self.getJIDFromId(t) 2573 await self.get_jid_from_id(t)
2574 for t_set in targets.values() 2574 for t_set in targets.values()
2575 for t in t_set 2575 for t in t_set
2576 } 2576 }
2577 if destinee is not None: 2577 if destinee is not None:
2578 targets_jids.add(destinee) 2578 targets_jids.add(destinee)
2594 extra=extra 2594 extra=extra
2595 ) 2595 )
2596 ) 2596 )
2597 await defer.DeferredList(defer_l) 2597 await defer.DeferredList(defer_l)
2598 2598
2599 async def notifyMentions( 2599 async def notify_mentions(
2600 self, 2600 self,
2601 targets: Dict[str, Set[str]], 2601 targets: Dict[str, Set[str]],
2602 mentions: List[Dict[str, str]], 2602 mentions: List[Dict[str, str]],
2603 service: jid.JID, 2603 service: jid.JID,
2604 node: str, 2604 node: str,
2610 2610
2611 Mentions are also sent to recipients as they are primary audience (see 2611 Mentions are also sent to recipients as they are primary audience (see
2612 https://www.w3.org/TR/activitystreams-vocabulary/#microsyntaxes). 2612 https://www.w3.org/TR/activitystreams-vocabulary/#microsyntaxes).
2613 2613
2614 """ 2614 """
2615 anchor = uri.buildXMPPUri("pubsub", path=service.full(), node=node, item=item_id) 2615 anchor = uri.build_xmpp_uri("pubsub", path=service.full(), node=node, item=item_id)
2616 seen = set() 2616 seen = set()
2617 # we start with explicit mentions because mentions' content will be used in the 2617 # we start with explicit mentions because mentions' content will be used in the
2618 # future to fill "begin" and "end" reference attributes (we can't do it at the 2618 # future to fill "begin" and "end" reference attributes (we can't do it at the
2619 # moment as there is no way to specify the XML element to use in the blog item). 2619 # moment as there is no way to specify the XML element to use in the blog item).
2620 for mention in mentions: 2620 for mention in mentions:
2621 mentioned_jid = await self.getJIDFromId(mention["uri"]) 2621 mentioned_jid = await self.get_jid_from_id(mention["uri"])
2622 self._refs.sendReference( 2622 self._refs.send_reference(
2623 self.client, 2623 self.client,
2624 to_jid=mentioned_jid, 2624 to_jid=mentioned_jid,
2625 anchor=anchor 2625 anchor=anchor
2626 ) 2626 )
2627 seen.add(mentioned_jid) 2627 seen.add(mentioned_jid)
2628 2628
2629 remaining = { 2629 remaining = {
2630 await self.getJIDFromId(t) 2630 await self.get_jid_from_id(t)
2631 for t_set in targets.values() 2631 for t_set in targets.values()
2632 for t in t_set 2632 for t in t_set
2633 } - seen 2633 } - seen
2634 for target in remaining: 2634 for target in remaining:
2635 self._refs.sendReference( 2635 self._refs.send_reference(
2636 self.client, 2636 self.client,
2637 to_jid=target, 2637 to_jid=target,
2638 anchor=anchor 2638 anchor=anchor
2639 ) 2639 )
2640 2640
2641 async def handlePubsubAPItem( 2641 async def handle_pubsub_ap_item(
2642 self, 2642 self,
2643 client: SatXMPPEntity, 2643 client: SatXMPPEntity,
2644 targets: Dict[str, Set[str]], 2644 targets: Dict[str, Set[str]],
2645 mentions: List[Dict[str, str]], 2645 mentions: List[Dict[str, str]],
2646 destinee: Optional[jid.JID], 2646 destinee: Optional[jid.JID],
2661 in_reply_to = item.get("inReplyTo") 2661 in_reply_to = item.get("inReplyTo")
2662 2662
2663 if in_reply_to and isinstance(in_reply_to, list): 2663 if in_reply_to and isinstance(in_reply_to, list):
2664 in_reply_to = in_reply_to[0] 2664 in_reply_to = in_reply_to[0]
2665 if in_reply_to and isinstance(in_reply_to, str): 2665 if in_reply_to and isinstance(in_reply_to, str):
2666 if self.isLocalURL(in_reply_to): 2666 if self.is_local_url(in_reply_to):
2667 # this is a reply to an XMPP item 2667 # this is a reply to an XMPP item
2668 await self.newReplyToXMPPItem(client, item, targets, mentions) 2668 await self.new_reply_to_xmpp_item(client, item, targets, mentions)
2669 return 2669 return
2670 2670
2671 # this item is a reply to an AP item, we use or create a corresponding node 2671 # this item is a reply to an AP item, we use or create a corresponding node
2672 # for comments 2672 # for comments
2673 parent_node, __ = await self.getCommentsNodes(item["id"], in_reply_to) 2673 parent_node, __ = await self.get_comments_nodes(item["id"], in_reply_to)
2674 node = parent_node or node 2674 node = parent_node or node
2675 cached_node = await self.host.memory.storage.getPubsubNode( 2675 cached_node = await self.host.memory.storage.get_pubsub_node(
2676 client, service, node, with_subscriptions=True, create=True, 2676 client, service, node, with_subscriptions=True, create=True,
2677 create_kwargs={"subscribed": True} 2677 create_kwargs={"subscribed": True}
2678 ) 2678 )
2679 else: 2679 else:
2680 # it is a root item (i.e. not a reply to an other item) 2680 # it is a root item (i.e. not a reply to an other item)
2681 create = node == self._events.namespace 2681 create = node == self._events.namespace
2682 cached_node = await self.host.memory.storage.getPubsubNode( 2682 cached_node = await self.host.memory.storage.get_pubsub_node(
2683 client, service, node, with_subscriptions=True, create=create 2683 client, service, node, with_subscriptions=True, create=create
2684 ) 2684 )
2685 if cached_node is None: 2685 if cached_node is None:
2686 log.warning( 2686 log.warning(
2687 f"Received item in unknown node {node!r} at {service}. This may be " 2687 f"Received item in unknown node {node!r} at {service}. This may be "
2691 return 2691 return
2692 if item.get("type") == TYPE_EVENT: 2692 if item.get("type") == TYPE_EVENT:
2693 data, item_elt = await self.ap_events.ap_item_2_event_data_and_elt(item) 2693 data, item_elt = await self.ap_events.ap_item_2_event_data_and_elt(item)
2694 else: 2694 else:
2695 data, item_elt = await self.ap_item_2_mb_data_and_elt(item) 2695 data, item_elt = await self.ap_item_2_mb_data_and_elt(item)
2696 await self.host.memory.storage.cachePubsubItems( 2696 await self.host.memory.storage.cache_pubsub_items(
2697 client, 2697 client,
2698 cached_node, 2698 cached_node,
2699 [item_elt], 2699 [item_elt],
2700 [data] 2700 [data]
2701 ) 2701 )
2707 service, 2707 service,
2708 node, 2708 node,
2709 [(subscription.subscriber, None, [item_elt])] 2709 [(subscription.subscriber, None, [item_elt])]
2710 ) 2710 )
2711 2711
2712 await self.notifyMentions(targets, mentions, service, node, item_elt["id"]) 2712 await self.notify_mentions(targets, mentions, service, node, item_elt["id"])
2713 2713
2714 async def newAPDeleteItem( 2714 async def new_ap_delete_item(
2715 self, 2715 self,
2716 client: SatXMPPEntity, 2716 client: SatXMPPEntity,
2717 destinee: Optional[jid.JID], 2717 destinee: Optional[jid.JID],
2718 node: str, 2718 node: str,
2719 item: dict, 2719 item: dict,
2729 item_id = item.get("id") 2729 item_id = item.get("id")
2730 if not item_id: 2730 if not item_id:
2731 raise exceptions.DataError('"id" attribute is missing in item') 2731 raise exceptions.DataError('"id" attribute is missing in item')
2732 if not item_id.startswith("http"): 2732 if not item_id.startswith("http"):
2733 raise exceptions.DataError(f"invalid id: {item_id!r}") 2733 raise exceptions.DataError(f"invalid id: {item_id!r}")
2734 if self.isLocalURL(item_id): 2734 if self.is_local_url(item_id):
2735 raise ValueError("Local IDs should not be used") 2735 raise ValueError("Local IDs should not be used")
2736 2736
2737 # we have no way to know if a deleted item is a direct one (thus a message) or one 2737 # we have no way to know if a deleted item is a direct one (thus a message) or one
2738 # converted to pubsub. We check if the id is in message history to decide what to 2738 # converted to pubsub. We check if the id is in message history to decide what to
2739 # do. 2739 # do.
2753 f"not the original sender of the message ({history.source_jid}), " 2753 f"not the original sender of the message ({history.source_jid}), "
2754 "hack attemps?" 2754 "hack attemps?"
2755 ) 2755 )
2756 raise exceptions.PermissionError("forbidden") 2756 raise exceptions.PermissionError("forbidden")
2757 2757
2758 await self._r.retractByHistory(client, history) 2758 await self._r.retract_by_history(client, history)
2759 else: 2759 else:
2760 # no history in cache with this ID, it's probably a pubsub item 2760 # no history in cache with this ID, it's probably a pubsub item
2761 cached_node = await self.host.memory.storage.getPubsubNode( 2761 cached_node = await self.host.memory.storage.get_pubsub_node(
2762 client, client.jid, node, with_subscriptions=True 2762 client, client.jid, node, with_subscriptions=True
2763 ) 2763 )
2764 if cached_node is None: 2764 if cached_node is None:
2765 log.warning( 2765 log.warning(
2766 f"Received an item retract for node {node!r} at {client.jid} " 2766 f"Received an item retract for node {node!r} at {client.jid} "
2767 "which is not cached" 2767 "which is not cached"
2768 ) 2768 )
2769 raise exceptions.NotFound 2769 raise exceptions.NotFound
2770 await self.host.memory.storage.deletePubsubItems(cached_node, [item_id]) 2770 await self.host.memory.storage.delete_pubsub_items(cached_node, [item_id])
2771 # notifyRetract is expecting domish.Element instances 2771 # notifyRetract is expecting domish.Element instances
2772 item_elt = domish.Element((None, "item")) 2772 item_elt = domish.Element((None, "item"))
2773 item_elt["id"] = item_id 2773 item_elt["id"] = item_id
2774 for subscription in cached_node.subscriptions: 2774 for subscription in cached_node.subscriptions:
2775 if subscription.state != SubscriptionState.SUBSCRIBED: 2775 if subscription.state != SubscriptionState.SUBSCRIBED: