comparison sat/plugins/plugin_comp_ap_gateway/pubsub_service.py @ 4037:524856bd7b19

massive refactoring to switch from camelCase to snake_case: historically, Libervia (SàT before) was using camelCase as allowed by PEP8 when using a pre-PEP8 code, to use the same coding style as in Twisted. However, snake_case is more readable and it's better to follow PEP8 best practices, so it has been decided to move on full snake_case. Because Libervia has a huge codebase, this ended with a ugly mix of camelCase and snake_case. To fix that, this patch does a big refactoring by renaming every function and method (including bridge) that are not coming from Twisted or Wokkel, to use fully snake_case. This is a massive change, and may result in some bugs.
author Goffi <goffi@goffi.org>
date Sat, 08 Apr 2023 13:54:42 +0200
parents 78b5f356900c
children
comparison
equal deleted inserted replaced
4036:c4464d7ae97b 4037:524856bd7b19
32 from sat.core.core_types import SatXMPPEntity 32 from sat.core.core_types import SatXMPPEntity
33 from sat.core.log import getLogger 33 from sat.core.log import getLogger
34 from sat.core.constants import Const as C 34 from sat.core.constants import Const as C
35 from sat.tools import image 35 from sat.tools import image
36 from sat.tools.utils import ensure_deferred 36 from sat.tools.utils import ensure_deferred
37 from sat.tools.web import downloadFile 37 from sat.tools.web import download_file
38 from sat.memory.sqla_mapping import PubsubSub, SubscriptionState 38 from sat.memory.sqla_mapping import PubsubSub, SubscriptionState
39 39
40 from .constants import ( 40 from .constants import (
41 TYPE_ACTOR, 41 TYPE_ACTOR,
42 ST_AVATAR, 42 ST_AVATAR,
72 "category": "pubsub", 72 "category": "pubsub",
73 "type": "service", 73 "type": "service",
74 "name": "Libervia ActivityPub Gateway", 74 "name": "Libervia ActivityPub Gateway",
75 } 75 }
76 76
77 async def getAPActorIdsAndInbox( 77 async def get_ap_actor_ids_and_inbox(
78 self, 78 self,
79 requestor: jid.JID, 79 requestor: jid.JID,
80 recipient: jid.JID, 80 recipient: jid.JID,
81 ) -> Tuple[str, str, str]: 81 ) -> Tuple[str, str, str]:
82 """Get AP actor IDs from requestor and destinee JIDs 82 """Get AP actor IDs from requestor and destinee JIDs
90 if not recipient.user: 90 if not recipient.user:
91 raise error.StanzaError( 91 raise error.StanzaError(
92 "item-not-found", 92 "item-not-found",
93 text="No user part specified" 93 text="No user part specified"
94 ) 94 )
95 requestor_actor_id = self.apg.buildAPURL(TYPE_ACTOR, requestor.userhost()) 95 requestor_actor_id = self.apg.build_apurl(TYPE_ACTOR, requestor.userhost())
96 recipient_account = self.apg._e.unescape(recipient.user) 96 recipient_account = self.apg._e.unescape(recipient.user)
97 recipient_actor_id = await self.apg.getAPActorIdFromAccount(recipient_account) 97 recipient_actor_id = await self.apg.get_ap_actor_id_from_account(recipient_account)
98 inbox = await self.apg.getAPInboxFromId(recipient_actor_id, use_shared=False) 98 inbox = await self.apg.get_ap_inbox_from_id(recipient_actor_id, use_shared=False)
99 return requestor_actor_id, recipient_actor_id, inbox 99 return requestor_actor_id, recipient_actor_id, inbox
100 100
101 101
102 @ensure_deferred 102 @ensure_deferred
103 async def publish(self, requestor, service, nodeIdentifier, items): 103 async def publish(self, requestor, service, nodeIdentifier, items):
104 if self.apg.local_only and not self.apg.isLocal(requestor): 104 if self.apg.local_only and not self.apg.is_local(requestor):
105 raise error.StanzaError( 105 raise error.StanzaError(
106 "forbidden", 106 "forbidden",
107 "Only local users can publish on this gateway." 107 "Only local users can publish on this gateway."
108 ) 108 )
109 if not service.user: 109 if not service.user:
116 raise error.StanzaError( 116 raise error.StanzaError(
117 "bad-request", 117 "bad-request",
118 f"{ap_account!r} is not a valid ActivityPub actor account." 118 f"{ap_account!r} is not a valid ActivityPub actor account."
119 ) 119 )
120 120
121 client = self.apg.client.getVirtualClient(requestor) 121 client = self.apg.client.get_virtual_client(requestor)
122 if self.apg._pa.isAttachmentNode(nodeIdentifier): 122 if self.apg._pa.is_attachment_node(nodeIdentifier):
123 await self.apg.convertAndPostAttachments( 123 await self.apg.convert_and_post_attachments(
124 client, ap_account, service, nodeIdentifier, items, publisher=requestor 124 client, ap_account, service, nodeIdentifier, items, publisher=requestor
125 ) 125 )
126 else: 126 else:
127 await self.apg.convertAndPostItems( 127 await self.apg.convert_and_post_items(
128 client, ap_account, service, nodeIdentifier, items 128 client, ap_account, service, nodeIdentifier, items
129 ) 129 )
130 cached_node = await self.host.memory.storage.getPubsubNode( 130 cached_node = await self.host.memory.storage.get_pubsub_node(
131 client, service, nodeIdentifier, with_subscriptions=True, create=True 131 client, service, nodeIdentifier, with_subscriptions=True, create=True
132 ) 132 )
133 await self.host.memory.storage.cachePubsubItems( 133 await self.host.memory.storage.cache_pubsub_items(
134 client, 134 client,
135 cached_node, 135 cached_node,
136 items 136 items
137 ) 137 )
138 for subscription in cached_node.subscriptions: 138 for subscription in cached_node.subscriptions:
142 service, 142 service,
143 nodeIdentifier, 143 nodeIdentifier,
144 [(subscription.subscriber, None, items)] 144 [(subscription.subscriber, None, items)]
145 ) 145 )
146 146
147 async def apFollowing2Elt(self, ap_item: dict) -> domish.Element: 147 async def ap_following_2_elt(self, ap_item: dict) -> domish.Element:
148 """Convert actor ID from following collection to XMPP item""" 148 """Convert actor ID from following collection to XMPP item"""
149 actor_id = ap_item["id"] 149 actor_id = ap_item["id"]
150 actor_jid = await self.apg.getJIDFromId(actor_id) 150 actor_jid = await self.apg.get_jid_from_id(actor_id)
151 subscription_elt = self.apg._pps.buildSubscriptionElt( 151 subscription_elt = self.apg._pps.build_subscription_elt(
152 self.apg._m.namespace, actor_jid 152 self.apg._m.namespace, actor_jid
153 ) 153 )
154 item_elt = pubsub.Item(id=actor_id, payload=subscription_elt) 154 item_elt = pubsub.Item(id=actor_id, payload=subscription_elt)
155 return item_elt 155 return item_elt
156 156
157 async def apFollower2Elt(self, ap_item: dict) -> domish.Element: 157 async def ap_follower_2_elt(self, ap_item: dict) -> domish.Element:
158 """Convert actor ID from followers collection to XMPP item""" 158 """Convert actor ID from followers collection to XMPP item"""
159 actor_id = ap_item["id"] 159 actor_id = ap_item["id"]
160 actor_jid = await self.apg.getJIDFromId(actor_id) 160 actor_jid = await self.apg.get_jid_from_id(actor_id)
161 subscriber_elt = self.apg._pps.buildSubscriberElt(actor_jid) 161 subscriber_elt = self.apg._pps.build_subscriber_elt(actor_jid)
162 item_elt = pubsub.Item(id=actor_id, payload=subscriber_elt) 162 item_elt = pubsub.Item(id=actor_id, payload=subscriber_elt)
163 return item_elt 163 return item_elt
164 164
165 async def generateVCard(self, ap_account: str) -> domish.Element: 165 async def generate_v_card(self, ap_account: str) -> domish.Element:
166 """Generate vCard4 (XEP-0292) item element from ap_account's metadata""" 166 """Generate vCard4 (XEP-0292) item element from ap_account's metadata"""
167 actor_data = await self.apg.getAPActorDataFromAccount(ap_account) 167 actor_data = await self.apg.get_ap_actor_data_from_account(ap_account)
168 identity_data = {} 168 identity_data = {}
169 169
170 summary = actor_data.get("summary") 170 summary = actor_data.get("summary")
171 # summary is HTML, we have to convert it to text 171 # summary is HTML, we have to convert it to text
172 if summary: 172 if summary:
179 179
180 for field in ("name", "preferredUsername"): 180 for field in ("name", "preferredUsername"):
181 value = actor_data.get(field) 181 value = actor_data.get(field)
182 if value: 182 if value:
183 identity_data.setdefault("nicknames", []).append(value) 183 identity_data.setdefault("nicknames", []).append(value)
184 vcard_elt = self.apg._v.dict2VCard(identity_data) 184 vcard_elt = self.apg._v.dict_2_v_card(identity_data)
185 item_elt = domish.Element((pubsub.NS_PUBSUB, "item")) 185 item_elt = domish.Element((pubsub.NS_PUBSUB, "item"))
186 item_elt.addChild(vcard_elt) 186 item_elt.addChild(vcard_elt)
187 item_elt["id"] = self.apg._p.ID_SINGLETON 187 item_elt["id"] = self.apg._p.ID_SINGLETON
188 return item_elt 188 return item_elt
189 189
190 async def getAvatarData( 190 async def get_avatar_data(
191 self, 191 self,
192 client: SatXMPPEntity, 192 client: SatXMPPEntity,
193 ap_account: str 193 ap_account: str
194 ) -> Dict[str, Any]: 194 ) -> Dict[str, Any]:
195 """Retrieve actor's avatar if any, cache it and file actor_data 195 """Retrieve actor's avatar if any, cache it and file actor_data
196 196
197 ``cache_uid``, `path``` and ``media_type`` keys are always files 197 ``cache_uid``, `path``` and ``media_type`` keys are always files
198 ``base64`` key is only filled if the file was not already in cache 198 ``base64`` key is only filled if the file was not already in cache
199 """ 199 """
200 actor_data = await self.apg.getAPActorDataFromAccount(ap_account) 200 actor_data = await self.apg.get_ap_actor_data_from_account(ap_account)
201 201
202 for icon in await self.apg.apGetList(actor_data, "icon"): 202 for icon in await self.apg.ap_get_list(actor_data, "icon"):
203 url = icon.get("url") 203 url = icon.get("url")
204 if icon["type"] != "Image" or not url: 204 if icon["type"] != "Image" or not url:
205 continue 205 continue
206 parsed_url = urlparse(url) 206 parsed_url = urlparse(url)
207 if not parsed_url.scheme in ("http", "https"): 207 if not parsed_url.scheme in ("http", "https"):
219 cache_uid = await client._ap_storage.get(key) 219 cache_uid = await client._ap_storage.get(key)
220 220
221 if cache_uid is None: 221 if cache_uid is None:
222 cache = None 222 cache = None
223 else: 223 else:
224 cache = self.apg.host.common_cache.getMetadata(cache_uid) 224 cache = self.apg.host.common_cache.get_metadata(cache_uid)
225 225
226 if cache is None: 226 if cache is None:
227 with tempfile.TemporaryDirectory() as dir_name: 227 with tempfile.TemporaryDirectory() as dir_name:
228 dest_path = Path(dir_name, filename) 228 dest_path = Path(dir_name, filename)
229 await downloadFile(url, dest_path, max_size=MAX_AVATAR_SIZE) 229 await download_file(url, dest_path, max_size=MAX_AVATAR_SIZE)
230 avatar_data = { 230 avatar_data = {
231 "path": dest_path, 231 "path": dest_path,
232 "filename": filename, 232 "filename": filename,
233 'media_type': image.guess_type(dest_path), 233 'media_type': image.guess_type(dest_path),
234 } 234 }
235 235
236 await self.apg._i.cacheAvatar( 236 await self.apg._i.cache_avatar(
237 self.apg.IMPORT_NAME, 237 self.apg.IMPORT_NAME,
238 avatar_data 238 avatar_data
239 ) 239 )
240 else: 240 else:
241 avatar_data = { 241 avatar_data = {
244 "media_type": cache["mime_type"] 244 "media_type": cache["mime_type"]
245 } 245 }
246 246
247 return avatar_data 247 return avatar_data
248 248
249 async def generateAvatarMetadata( 249 async def generate_avatar_metadata(
250 self, 250 self,
251 client: SatXMPPEntity, 251 client: SatXMPPEntity,
252 ap_account: str 252 ap_account: str
253 ) -> domish.Element: 253 ) -> domish.Element:
254 """Generate the metadata element for user avatar 254 """Generate the metadata element for user avatar
255 255
256 @raise StanzaError("item-not-found"): no avatar is present in actor data (in 256 @raise StanzaError("item-not-found"): no avatar is present in actor data (in
257 ``icon`` field) 257 ``icon`` field)
258 """ 258 """
259 avatar_data = await self.getAvatarData(client, ap_account) 259 avatar_data = await self.get_avatar_data(client, ap_account)
260 return self.apg._a.buildItemMetadataElt(avatar_data) 260 return self.apg._a.build_item_metadata_elt(avatar_data)
261 261
262 def _blockingB64EncodeAvatar(self, avatar_data: Dict[str, Any]) -> None: 262 def _blocking_b_6_4_encode_avatar(self, avatar_data: Dict[str, Any]) -> None:
263 with avatar_data["path"].open("rb") as f: 263 with avatar_data["path"].open("rb") as f:
264 avatar_data["base64"] = b64encode(f.read()).decode() 264 avatar_data["base64"] = b64encode(f.read()).decode()
265 265
266 async def generateAvatarData( 266 async def generate_avatar_data(
267 self, 267 self,
268 client: SatXMPPEntity, 268 client: SatXMPPEntity,
269 ap_account: str, 269 ap_account: str,
270 itemIdentifiers: Optional[List[str]], 270 itemIdentifiers: Optional[List[str]],
271 ) -> domish.Element: 271 ) -> domish.Element:
272 """Generate the data element for user avatar 272 """Generate the data element for user avatar
273 273
274 @raise StanzaError("item-not-found"): no avatar cached with requested ID 274 @raise StanzaError("item-not-found"): no avatar cached with requested ID
275 """ 275 """
276 if not itemIdentifiers: 276 if not itemIdentifiers:
277 avatar_data = await self.getAvatarData(client, ap_account) 277 avatar_data = await self.get_avatar_data(client, ap_account)
278 if "base64" not in avatar_data: 278 if "base64" not in avatar_data:
279 await threads.deferToThread(self._blockingB64EncodeAvatar, avatar_data) 279 await threads.deferToThread(self._blocking_b_6_4_encode_avatar, avatar_data)
280 else: 280 else:
281 if len(itemIdentifiers) > 1: 281 if len(itemIdentifiers) > 1:
282 # only a single item ID is supported 282 # only a single item ID is supported
283 raise error.StanzaError("item-not-found") 283 raise error.StanzaError("item-not-found")
284 item_id = itemIdentifiers[0] 284 item_id = itemIdentifiers[0]
285 # just to be sure that that we don't have an empty string 285 # just to be sure that that we don't have an empty string
286 assert item_id 286 assert item_id
287 cache_data = self.apg.host.common_cache.getMetadata(item_id) 287 cache_data = self.apg.host.common_cache.get_metadata(item_id)
288 if cache_data is None: 288 if cache_data is None:
289 raise error.StanzaError("item-not-found") 289 raise error.StanzaError("item-not-found")
290 avatar_data = { 290 avatar_data = {
291 "cache_uid": item_id, 291 "cache_uid": item_id,
292 "path": cache_data["path"] 292 "path": cache_data["path"]
293 } 293 }
294 await threads.deferToThread(self._blockingB64EncodeAvatar, avatar_data) 294 await threads.deferToThread(self._blocking_b_6_4_encode_avatar, avatar_data)
295 295
296 return self.apg._a.buildItemDataElt(avatar_data) 296 return self.apg._a.build_item_data_elt(avatar_data)
297 297
298 @ensure_deferred 298 @ensure_deferred
299 async def items( 299 async def items(
300 self, 300 self,
301 requestor: jid.JID, 301 requestor: jid.JID,
318 client = self.apg.client 318 client = self.apg.client
319 kwargs = {} 319 kwargs = {}
320 320
321 if node == self.apg._pps.subscriptions_node: 321 if node == self.apg._pps.subscriptions_node:
322 collection_name = "following" 322 collection_name = "following"
323 parser = self.apFollowing2Elt 323 parser = self.ap_following_2_elt
324 kwargs["only_ids"] = True 324 kwargs["only_ids"] = True
325 use_cache = False 325 use_cache = False
326 elif node.startswith(self.apg._pps.subscribers_node_prefix): 326 elif node.startswith(self.apg._pps.subscribers_node_prefix):
327 collection_name = "followers" 327 collection_name = "followers"
328 parser = self.apFollower2Elt 328 parser = self.ap_follower_2_elt
329 kwargs["only_ids"] = True 329 kwargs["only_ids"] = True
330 use_cache = False 330 use_cache = False
331 elif node == self.apg._v.node: 331 elif node == self.apg._v.node:
332 # vCard4 request 332 # vCard4 request
333 item_elt = await self.generateVCard(ap_account) 333 item_elt = await self.generate_v_card(ap_account)
334 return [item_elt], None 334 return [item_elt], None
335 elif node == self.apg._a.namespace_metadata: 335 elif node == self.apg._a.namespace_metadata:
336 item_elt = await self.generateAvatarMetadata(self.apg.client, ap_account) 336 item_elt = await self.generate_avatar_metadata(self.apg.client, ap_account)
337 return [item_elt], None 337 return [item_elt], None
338 elif node == self.apg._a.namespace_data: 338 elif node == self.apg._a.namespace_data:
339 item_elt = await self.generateAvatarData( 339 item_elt = await self.generate_avatar_data(
340 self.apg.client, ap_account, itemIdentifiers 340 self.apg.client, ap_account, itemIdentifiers
341 ) 341 )
342 return [item_elt], None 342 return [item_elt], None
343 elif self.apg._pa.isAttachmentNode(node): 343 elif self.apg._pa.is_attachment_node(node):
344 use_cache = True 344 use_cache = True
345 # we check cache here because we emit an item-not-found error if the node is 345 # we check cache here because we emit an item-not-found error if the node is
346 # not in cache, as we are not dealing with real AP items 346 # not in cache, as we are not dealing with real AP items
347 cached_node = await self.host.memory.storage.getPubsubNode( 347 cached_node = await self.host.memory.storage.get_pubsub_node(
348 client, service, node 348 client, service, node
349 ) 349 )
350 if cached_node is None: 350 if cached_node is None:
351 raise error.StanzaError("item-not-found") 351 raise error.StanzaError("item-not-found")
352 else: 352 else:
363 collection_name = "outbox" 363 collection_name = "outbox"
364 use_cache = True 364 use_cache = True
365 365
366 if use_cache: 366 if use_cache:
367 if cached_node is None: 367 if cached_node is None:
368 cached_node = await self.host.memory.storage.getPubsubNode( 368 cached_node = await self.host.memory.storage.get_pubsub_node(
369 client, service, node 369 client, service, node
370 ) 370 )
371 # TODO: check if node is synchronised 371 # TODO: check if node is synchronised
372 if cached_node is not None: 372 if cached_node is not None:
373 # the node is cached, we return items from cache 373 # the node is cached, we return items from cache
374 log.debug(f"node {node!r} from {service} is in cache") 374 log.debug(f"node {node!r} from {service} is in cache")
375 pubsub_items, metadata = await self.apg._c.getItemsFromCache( 375 pubsub_items, metadata = await self.apg._c.get_items_from_cache(
376 client, cached_node, maxItems, itemIdentifiers, rsm_request=rsm_req 376 client, cached_node, maxItems, itemIdentifiers, rsm_request=rsm_req
377 ) 377 )
378 try: 378 try:
379 rsm_resp = rsm.RSMResponse(**metadata["rsm"]) 379 rsm_resp = rsm.RSMResponse(**metadata["rsm"])
380 except KeyError: 380 except KeyError:
382 return [i.data for i in pubsub_items], rsm_resp 382 return [i.data for i in pubsub_items], rsm_resp
383 383
384 if itemIdentifiers: 384 if itemIdentifiers:
385 items = [] 385 items = []
386 for item_id in itemIdentifiers: 386 for item_id in itemIdentifiers:
387 item_data = await self.apg.apGet(item_id) 387 item_data = await self.apg.ap_get(item_id)
388 item_elt = await parser(item_data) 388 item_elt = await parser(item_data)
389 items.append(item_elt) 389 items.append(item_elt)
390 return items, None 390 return items, None
391 else: 391 else:
392 if rsm_req is None: 392 if rsm_req is None:
417 417
418 log.info( 418 log.info(
419 f"No cache found for node {node} at {service} (AP account {ap_account}), " 419 f"No cache found for node {node} at {service} (AP account {ap_account}), "
420 "using Collection Paging to RSM translation" 420 "using Collection Paging to RSM translation"
421 ) 421 )
422 if self.apg._m.isCommentNode(node): 422 if self.apg._m.is_comment_node(node):
423 parent_item = self.apg._m.getParentItem(node) 423 parent_item = self.apg._m.get_parent_item(node)
424 try: 424 try:
425 parent_data = await self.apg.apGet(parent_item) 425 parent_data = await self.apg.ap_get(parent_item)
426 collection = await self.apg.apGetObject( 426 collection = await self.apg.ap_get_object(
427 parent_data.get("object", {}), 427 parent_data.get("object", {}),
428 "replies" 428 "replies"
429 ) 429 )
430 except Exception as e: 430 except Exception as e:
431 raise error.StanzaError( 431 raise error.StanzaError(
432 "item-not-found", 432 "item-not-found",
433 text=e 433 text=e
434 ) 434 )
435 else: 435 else:
436 actor_data = await self.apg.getAPActorDataFromAccount(ap_account) 436 actor_data = await self.apg.get_ap_actor_data_from_account(ap_account)
437 collection = await self.apg.apGetObject(actor_data, collection_name) 437 collection = await self.apg.ap_get_object(actor_data, collection_name)
438 if not collection: 438 if not collection:
439 raise error.StanzaError( 439 raise error.StanzaError(
440 "item-not-found", 440 "item-not-found",
441 text=f"No collection found for node {node!r} (account: {ap_account})" 441 text=f"No collection found for node {node!r} (account: {ap_account})"
442 ) 442 )
443 443
444 kwargs["parser"] = parser 444 kwargs["parser"] = parser
445 return await self.apg.getAPItems(collection, **kwargs) 445 return await self.apg.get_ap_items(collection, **kwargs)
446 446
447 @ensure_deferred 447 @ensure_deferred
448 async def retract(self, requestor, service, nodeIdentifier, itemIdentifiers): 448 async def retract(self, requestor, service, nodeIdentifier, itemIdentifiers):
449 raise error.StanzaError("forbidden") 449 raise error.StanzaError("forbidden")
450 450
457 # being internal. 457 # being internal.
458 if nodeIdentifier == self.apg._m.namespace: 458 if nodeIdentifier == self.apg._m.namespace:
459 sub_state = SubscriptionState.PENDING 459 sub_state = SubscriptionState.PENDING
460 else: 460 else:
461 sub_state = SubscriptionState.SUBSCRIBED 461 sub_state = SubscriptionState.SUBSCRIBED
462 node = await self.host.memory.storage.getPubsubNode( 462 node = await self.host.memory.storage.get_pubsub_node(
463 client, service, nodeIdentifier, with_subscriptions=True 463 client, service, nodeIdentifier, with_subscriptions=True
464 ) 464 )
465 if node is None: 465 if node is None:
466 node = await self.host.memory.storage.setPubsubNode( 466 node = await self.host.memory.storage.set_pubsub_node(
467 client, 467 client,
468 service, 468 service,
469 nodeIdentifier, 469 nodeIdentifier,
470 ) 470 )
471 subscription = None 471 subscription = None
508 ) 508 )
509 509
510 if nodeIdentifier in (self.apg._m.namespace, self.apg._events.namespace): 510 if nodeIdentifier in (self.apg._m.namespace, self.apg._events.namespace):
511 # if we subscribe to microblog or events node, we follow the corresponding 511 # if we subscribe to microblog or events node, we follow the corresponding
512 # account 512 # account
513 req_actor_id, recip_actor_id, inbox = await self.getAPActorIdsAndInbox( 513 req_actor_id, recip_actor_id, inbox = await self.get_ap_actor_ids_and_inbox(
514 requestor, service 514 requestor, service
515 ) 515 )
516 516
517 data = self.apg.create_activity("Follow", req_actor_id, recip_actor_id) 517 data = self.apg.create_activity("Follow", req_actor_id, recip_actor_id)
518 518
519 resp = await self.apg.signAndPost(inbox, req_actor_id, data) 519 resp = await self.apg.sign_and_post(inbox, req_actor_id, data)
520 if resp.code >= 300: 520 if resp.code >= 300:
521 text = await resp.text() 521 text = await resp.text()
522 raise error.StanzaError("service-unavailable", text=text) 522 raise error.StanzaError("service-unavailable", text=text)
523 return pubsub.Subscription(nodeIdentifier, requestor, "subscribed") 523 return pubsub.Subscription(nodeIdentifier, requestor, "subscribed")
524 524
525 @ensure_deferred 525 @ensure_deferred
526 async def unsubscribe(self, requestor, service, nodeIdentifier, subscriber): 526 async def unsubscribe(self, requestor, service, nodeIdentifier, subscriber):
527 req_actor_id, recip_actor_id, inbox = await self.getAPActorIdsAndInbox( 527 req_actor_id, recip_actor_id, inbox = await self.get_ap_actor_ids_and_inbox(
528 requestor, service 528 requestor, service
529 ) 529 )
530 data = self.apg.create_activity( 530 data = self.apg.create_activity(
531 "Undo", 531 "Undo",
532 req_actor_id, 532 req_actor_id,
535 req_actor_id, 535 req_actor_id,
536 recip_actor_id 536 recip_actor_id
537 ) 537 )
538 ) 538 )
539 539
540 resp = await self.apg.signAndPost(inbox, req_actor_id, data) 540 resp = await self.apg.sign_and_post(inbox, req_actor_id, data)
541 if resp.code >= 300: 541 if resp.code >= 300:
542 text = await resp.text() 542 text = await resp.text()
543 raise error.StanzaError("service-unavailable", text=text) 543 raise error.StanzaError("service-unavailable", text=text)
544 544
545 def getConfigurationOptions(self): 545 def getConfigurationOptions(self):