Mercurial > libervia-backend
comparison sat/plugins/plugin_comp_ap_gateway/pubsub_service.py @ 4037:524856bd7b19
massive refactoring to switch from camelCase to snake_case:
historically, Libervia (SàT before) was using camelCase as allowed by PEP8 when using a
pre-PEP8 code, to use the same coding style as in Twisted.
However, snake_case is more readable and it's better to follow PEP8 best practices, so it
has been decided to move on full snake_case. Because Libervia has a huge codebase, this
ended with a ugly mix of camelCase and snake_case.
To fix that, this patch does a big refactoring by renaming every function and method
(including bridge) that are not coming from Twisted or Wokkel, to use fully snake_case.
This is a massive change, and may result in some bugs.
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 08 Apr 2023 13:54:42 +0200 |
parents | 78b5f356900c |
children |
comparison
equal
deleted
inserted
replaced
4036:c4464d7ae97b | 4037:524856bd7b19 |
---|---|
32 from sat.core.core_types import SatXMPPEntity | 32 from sat.core.core_types import SatXMPPEntity |
33 from sat.core.log import getLogger | 33 from sat.core.log import getLogger |
34 from sat.core.constants import Const as C | 34 from sat.core.constants import Const as C |
35 from sat.tools import image | 35 from sat.tools import image |
36 from sat.tools.utils import ensure_deferred | 36 from sat.tools.utils import ensure_deferred |
37 from sat.tools.web import downloadFile | 37 from sat.tools.web import download_file |
38 from sat.memory.sqla_mapping import PubsubSub, SubscriptionState | 38 from sat.memory.sqla_mapping import PubsubSub, SubscriptionState |
39 | 39 |
40 from .constants import ( | 40 from .constants import ( |
41 TYPE_ACTOR, | 41 TYPE_ACTOR, |
42 ST_AVATAR, | 42 ST_AVATAR, |
72 "category": "pubsub", | 72 "category": "pubsub", |
73 "type": "service", | 73 "type": "service", |
74 "name": "Libervia ActivityPub Gateway", | 74 "name": "Libervia ActivityPub Gateway", |
75 } | 75 } |
76 | 76 |
77 async def getAPActorIdsAndInbox( | 77 async def get_ap_actor_ids_and_inbox( |
78 self, | 78 self, |
79 requestor: jid.JID, | 79 requestor: jid.JID, |
80 recipient: jid.JID, | 80 recipient: jid.JID, |
81 ) -> Tuple[str, str, str]: | 81 ) -> Tuple[str, str, str]: |
82 """Get AP actor IDs from requestor and destinee JIDs | 82 """Get AP actor IDs from requestor and destinee JIDs |
90 if not recipient.user: | 90 if not recipient.user: |
91 raise error.StanzaError( | 91 raise error.StanzaError( |
92 "item-not-found", | 92 "item-not-found", |
93 text="No user part specified" | 93 text="No user part specified" |
94 ) | 94 ) |
95 requestor_actor_id = self.apg.buildAPURL(TYPE_ACTOR, requestor.userhost()) | 95 requestor_actor_id = self.apg.build_apurl(TYPE_ACTOR, requestor.userhost()) |
96 recipient_account = self.apg._e.unescape(recipient.user) | 96 recipient_account = self.apg._e.unescape(recipient.user) |
97 recipient_actor_id = await self.apg.getAPActorIdFromAccount(recipient_account) | 97 recipient_actor_id = await self.apg.get_ap_actor_id_from_account(recipient_account) |
98 inbox = await self.apg.getAPInboxFromId(recipient_actor_id, use_shared=False) | 98 inbox = await self.apg.get_ap_inbox_from_id(recipient_actor_id, use_shared=False) |
99 return requestor_actor_id, recipient_actor_id, inbox | 99 return requestor_actor_id, recipient_actor_id, inbox |
100 | 100 |
101 | 101 |
102 @ensure_deferred | 102 @ensure_deferred |
103 async def publish(self, requestor, service, nodeIdentifier, items): | 103 async def publish(self, requestor, service, nodeIdentifier, items): |
104 if self.apg.local_only and not self.apg.isLocal(requestor): | 104 if self.apg.local_only and not self.apg.is_local(requestor): |
105 raise error.StanzaError( | 105 raise error.StanzaError( |
106 "forbidden", | 106 "forbidden", |
107 "Only local users can publish on this gateway." | 107 "Only local users can publish on this gateway." |
108 ) | 108 ) |
109 if not service.user: | 109 if not service.user: |
116 raise error.StanzaError( | 116 raise error.StanzaError( |
117 "bad-request", | 117 "bad-request", |
118 f"{ap_account!r} is not a valid ActivityPub actor account." | 118 f"{ap_account!r} is not a valid ActivityPub actor account." |
119 ) | 119 ) |
120 | 120 |
121 client = self.apg.client.getVirtualClient(requestor) | 121 client = self.apg.client.get_virtual_client(requestor) |
122 if self.apg._pa.isAttachmentNode(nodeIdentifier): | 122 if self.apg._pa.is_attachment_node(nodeIdentifier): |
123 await self.apg.convertAndPostAttachments( | 123 await self.apg.convert_and_post_attachments( |
124 client, ap_account, service, nodeIdentifier, items, publisher=requestor | 124 client, ap_account, service, nodeIdentifier, items, publisher=requestor |
125 ) | 125 ) |
126 else: | 126 else: |
127 await self.apg.convertAndPostItems( | 127 await self.apg.convert_and_post_items( |
128 client, ap_account, service, nodeIdentifier, items | 128 client, ap_account, service, nodeIdentifier, items |
129 ) | 129 ) |
130 cached_node = await self.host.memory.storage.getPubsubNode( | 130 cached_node = await self.host.memory.storage.get_pubsub_node( |
131 client, service, nodeIdentifier, with_subscriptions=True, create=True | 131 client, service, nodeIdentifier, with_subscriptions=True, create=True |
132 ) | 132 ) |
133 await self.host.memory.storage.cachePubsubItems( | 133 await self.host.memory.storage.cache_pubsub_items( |
134 client, | 134 client, |
135 cached_node, | 135 cached_node, |
136 items | 136 items |
137 ) | 137 ) |
138 for subscription in cached_node.subscriptions: | 138 for subscription in cached_node.subscriptions: |
142 service, | 142 service, |
143 nodeIdentifier, | 143 nodeIdentifier, |
144 [(subscription.subscriber, None, items)] | 144 [(subscription.subscriber, None, items)] |
145 ) | 145 ) |
146 | 146 |
147 async def apFollowing2Elt(self, ap_item: dict) -> domish.Element: | 147 async def ap_following_2_elt(self, ap_item: dict) -> domish.Element: |
148 """Convert actor ID from following collection to XMPP item""" | 148 """Convert actor ID from following collection to XMPP item""" |
149 actor_id = ap_item["id"] | 149 actor_id = ap_item["id"] |
150 actor_jid = await self.apg.getJIDFromId(actor_id) | 150 actor_jid = await self.apg.get_jid_from_id(actor_id) |
151 subscription_elt = self.apg._pps.buildSubscriptionElt( | 151 subscription_elt = self.apg._pps.build_subscription_elt( |
152 self.apg._m.namespace, actor_jid | 152 self.apg._m.namespace, actor_jid |
153 ) | 153 ) |
154 item_elt = pubsub.Item(id=actor_id, payload=subscription_elt) | 154 item_elt = pubsub.Item(id=actor_id, payload=subscription_elt) |
155 return item_elt | 155 return item_elt |
156 | 156 |
157 async def apFollower2Elt(self, ap_item: dict) -> domish.Element: | 157 async def ap_follower_2_elt(self, ap_item: dict) -> domish.Element: |
158 """Convert actor ID from followers collection to XMPP item""" | 158 """Convert actor ID from followers collection to XMPP item""" |
159 actor_id = ap_item["id"] | 159 actor_id = ap_item["id"] |
160 actor_jid = await self.apg.getJIDFromId(actor_id) | 160 actor_jid = await self.apg.get_jid_from_id(actor_id) |
161 subscriber_elt = self.apg._pps.buildSubscriberElt(actor_jid) | 161 subscriber_elt = self.apg._pps.build_subscriber_elt(actor_jid) |
162 item_elt = pubsub.Item(id=actor_id, payload=subscriber_elt) | 162 item_elt = pubsub.Item(id=actor_id, payload=subscriber_elt) |
163 return item_elt | 163 return item_elt |
164 | 164 |
165 async def generateVCard(self, ap_account: str) -> domish.Element: | 165 async def generate_v_card(self, ap_account: str) -> domish.Element: |
166 """Generate vCard4 (XEP-0292) item element from ap_account's metadata""" | 166 """Generate vCard4 (XEP-0292) item element from ap_account's metadata""" |
167 actor_data = await self.apg.getAPActorDataFromAccount(ap_account) | 167 actor_data = await self.apg.get_ap_actor_data_from_account(ap_account) |
168 identity_data = {} | 168 identity_data = {} |
169 | 169 |
170 summary = actor_data.get("summary") | 170 summary = actor_data.get("summary") |
171 # summary is HTML, we have to convert it to text | 171 # summary is HTML, we have to convert it to text |
172 if summary: | 172 if summary: |
179 | 179 |
180 for field in ("name", "preferredUsername"): | 180 for field in ("name", "preferredUsername"): |
181 value = actor_data.get(field) | 181 value = actor_data.get(field) |
182 if value: | 182 if value: |
183 identity_data.setdefault("nicknames", []).append(value) | 183 identity_data.setdefault("nicknames", []).append(value) |
184 vcard_elt = self.apg._v.dict2VCard(identity_data) | 184 vcard_elt = self.apg._v.dict_2_v_card(identity_data) |
185 item_elt = domish.Element((pubsub.NS_PUBSUB, "item")) | 185 item_elt = domish.Element((pubsub.NS_PUBSUB, "item")) |
186 item_elt.addChild(vcard_elt) | 186 item_elt.addChild(vcard_elt) |
187 item_elt["id"] = self.apg._p.ID_SINGLETON | 187 item_elt["id"] = self.apg._p.ID_SINGLETON |
188 return item_elt | 188 return item_elt |
189 | 189 |
190 async def getAvatarData( | 190 async def get_avatar_data( |
191 self, | 191 self, |
192 client: SatXMPPEntity, | 192 client: SatXMPPEntity, |
193 ap_account: str | 193 ap_account: str |
194 ) -> Dict[str, Any]: | 194 ) -> Dict[str, Any]: |
195 """Retrieve actor's avatar if any, cache it and file actor_data | 195 """Retrieve actor's avatar if any, cache it and file actor_data |
196 | 196 |
197 ``cache_uid``, `path``` and ``media_type`` keys are always files | 197 ``cache_uid``, `path``` and ``media_type`` keys are always files |
198 ``base64`` key is only filled if the file was not already in cache | 198 ``base64`` key is only filled if the file was not already in cache |
199 """ | 199 """ |
200 actor_data = await self.apg.getAPActorDataFromAccount(ap_account) | 200 actor_data = await self.apg.get_ap_actor_data_from_account(ap_account) |
201 | 201 |
202 for icon in await self.apg.apGetList(actor_data, "icon"): | 202 for icon in await self.apg.ap_get_list(actor_data, "icon"): |
203 url = icon.get("url") | 203 url = icon.get("url") |
204 if icon["type"] != "Image" or not url: | 204 if icon["type"] != "Image" or not url: |
205 continue | 205 continue |
206 parsed_url = urlparse(url) | 206 parsed_url = urlparse(url) |
207 if not parsed_url.scheme in ("http", "https"): | 207 if not parsed_url.scheme in ("http", "https"): |
219 cache_uid = await client._ap_storage.get(key) | 219 cache_uid = await client._ap_storage.get(key) |
220 | 220 |
221 if cache_uid is None: | 221 if cache_uid is None: |
222 cache = None | 222 cache = None |
223 else: | 223 else: |
224 cache = self.apg.host.common_cache.getMetadata(cache_uid) | 224 cache = self.apg.host.common_cache.get_metadata(cache_uid) |
225 | 225 |
226 if cache is None: | 226 if cache is None: |
227 with tempfile.TemporaryDirectory() as dir_name: | 227 with tempfile.TemporaryDirectory() as dir_name: |
228 dest_path = Path(dir_name, filename) | 228 dest_path = Path(dir_name, filename) |
229 await downloadFile(url, dest_path, max_size=MAX_AVATAR_SIZE) | 229 await download_file(url, dest_path, max_size=MAX_AVATAR_SIZE) |
230 avatar_data = { | 230 avatar_data = { |
231 "path": dest_path, | 231 "path": dest_path, |
232 "filename": filename, | 232 "filename": filename, |
233 'media_type': image.guess_type(dest_path), | 233 'media_type': image.guess_type(dest_path), |
234 } | 234 } |
235 | 235 |
236 await self.apg._i.cacheAvatar( | 236 await self.apg._i.cache_avatar( |
237 self.apg.IMPORT_NAME, | 237 self.apg.IMPORT_NAME, |
238 avatar_data | 238 avatar_data |
239 ) | 239 ) |
240 else: | 240 else: |
241 avatar_data = { | 241 avatar_data = { |
244 "media_type": cache["mime_type"] | 244 "media_type": cache["mime_type"] |
245 } | 245 } |
246 | 246 |
247 return avatar_data | 247 return avatar_data |
248 | 248 |
249 async def generateAvatarMetadata( | 249 async def generate_avatar_metadata( |
250 self, | 250 self, |
251 client: SatXMPPEntity, | 251 client: SatXMPPEntity, |
252 ap_account: str | 252 ap_account: str |
253 ) -> domish.Element: | 253 ) -> domish.Element: |
254 """Generate the metadata element for user avatar | 254 """Generate the metadata element for user avatar |
255 | 255 |
256 @raise StanzaError("item-not-found"): no avatar is present in actor data (in | 256 @raise StanzaError("item-not-found"): no avatar is present in actor data (in |
257 ``icon`` field) | 257 ``icon`` field) |
258 """ | 258 """ |
259 avatar_data = await self.getAvatarData(client, ap_account) | 259 avatar_data = await self.get_avatar_data(client, ap_account) |
260 return self.apg._a.buildItemMetadataElt(avatar_data) | 260 return self.apg._a.build_item_metadata_elt(avatar_data) |
261 | 261 |
262 def _blockingB64EncodeAvatar(self, avatar_data: Dict[str, Any]) -> None: | 262 def _blocking_b_6_4_encode_avatar(self, avatar_data: Dict[str, Any]) -> None: |
263 with avatar_data["path"].open("rb") as f: | 263 with avatar_data["path"].open("rb") as f: |
264 avatar_data["base64"] = b64encode(f.read()).decode() | 264 avatar_data["base64"] = b64encode(f.read()).decode() |
265 | 265 |
266 async def generateAvatarData( | 266 async def generate_avatar_data( |
267 self, | 267 self, |
268 client: SatXMPPEntity, | 268 client: SatXMPPEntity, |
269 ap_account: str, | 269 ap_account: str, |
270 itemIdentifiers: Optional[List[str]], | 270 itemIdentifiers: Optional[List[str]], |
271 ) -> domish.Element: | 271 ) -> domish.Element: |
272 """Generate the data element for user avatar | 272 """Generate the data element for user avatar |
273 | 273 |
274 @raise StanzaError("item-not-found"): no avatar cached with requested ID | 274 @raise StanzaError("item-not-found"): no avatar cached with requested ID |
275 """ | 275 """ |
276 if not itemIdentifiers: | 276 if not itemIdentifiers: |
277 avatar_data = await self.getAvatarData(client, ap_account) | 277 avatar_data = await self.get_avatar_data(client, ap_account) |
278 if "base64" not in avatar_data: | 278 if "base64" not in avatar_data: |
279 await threads.deferToThread(self._blockingB64EncodeAvatar, avatar_data) | 279 await threads.deferToThread(self._blocking_b_6_4_encode_avatar, avatar_data) |
280 else: | 280 else: |
281 if len(itemIdentifiers) > 1: | 281 if len(itemIdentifiers) > 1: |
282 # only a single item ID is supported | 282 # only a single item ID is supported |
283 raise error.StanzaError("item-not-found") | 283 raise error.StanzaError("item-not-found") |
284 item_id = itemIdentifiers[0] | 284 item_id = itemIdentifiers[0] |
285 # just to be sure that that we don't have an empty string | 285 # just to be sure that that we don't have an empty string |
286 assert item_id | 286 assert item_id |
287 cache_data = self.apg.host.common_cache.getMetadata(item_id) | 287 cache_data = self.apg.host.common_cache.get_metadata(item_id) |
288 if cache_data is None: | 288 if cache_data is None: |
289 raise error.StanzaError("item-not-found") | 289 raise error.StanzaError("item-not-found") |
290 avatar_data = { | 290 avatar_data = { |
291 "cache_uid": item_id, | 291 "cache_uid": item_id, |
292 "path": cache_data["path"] | 292 "path": cache_data["path"] |
293 } | 293 } |
294 await threads.deferToThread(self._blockingB64EncodeAvatar, avatar_data) | 294 await threads.deferToThread(self._blocking_b_6_4_encode_avatar, avatar_data) |
295 | 295 |
296 return self.apg._a.buildItemDataElt(avatar_data) | 296 return self.apg._a.build_item_data_elt(avatar_data) |
297 | 297 |
298 @ensure_deferred | 298 @ensure_deferred |
299 async def items( | 299 async def items( |
300 self, | 300 self, |
301 requestor: jid.JID, | 301 requestor: jid.JID, |
318 client = self.apg.client | 318 client = self.apg.client |
319 kwargs = {} | 319 kwargs = {} |
320 | 320 |
321 if node == self.apg._pps.subscriptions_node: | 321 if node == self.apg._pps.subscriptions_node: |
322 collection_name = "following" | 322 collection_name = "following" |
323 parser = self.apFollowing2Elt | 323 parser = self.ap_following_2_elt |
324 kwargs["only_ids"] = True | 324 kwargs["only_ids"] = True |
325 use_cache = False | 325 use_cache = False |
326 elif node.startswith(self.apg._pps.subscribers_node_prefix): | 326 elif node.startswith(self.apg._pps.subscribers_node_prefix): |
327 collection_name = "followers" | 327 collection_name = "followers" |
328 parser = self.apFollower2Elt | 328 parser = self.ap_follower_2_elt |
329 kwargs["only_ids"] = True | 329 kwargs["only_ids"] = True |
330 use_cache = False | 330 use_cache = False |
331 elif node == self.apg._v.node: | 331 elif node == self.apg._v.node: |
332 # vCard4 request | 332 # vCard4 request |
333 item_elt = await self.generateVCard(ap_account) | 333 item_elt = await self.generate_v_card(ap_account) |
334 return [item_elt], None | 334 return [item_elt], None |
335 elif node == self.apg._a.namespace_metadata: | 335 elif node == self.apg._a.namespace_metadata: |
336 item_elt = await self.generateAvatarMetadata(self.apg.client, ap_account) | 336 item_elt = await self.generate_avatar_metadata(self.apg.client, ap_account) |
337 return [item_elt], None | 337 return [item_elt], None |
338 elif node == self.apg._a.namespace_data: | 338 elif node == self.apg._a.namespace_data: |
339 item_elt = await self.generateAvatarData( | 339 item_elt = await self.generate_avatar_data( |
340 self.apg.client, ap_account, itemIdentifiers | 340 self.apg.client, ap_account, itemIdentifiers |
341 ) | 341 ) |
342 return [item_elt], None | 342 return [item_elt], None |
343 elif self.apg._pa.isAttachmentNode(node): | 343 elif self.apg._pa.is_attachment_node(node): |
344 use_cache = True | 344 use_cache = True |
345 # we check cache here because we emit an item-not-found error if the node is | 345 # we check cache here because we emit an item-not-found error if the node is |
346 # not in cache, as we are not dealing with real AP items | 346 # not in cache, as we are not dealing with real AP items |
347 cached_node = await self.host.memory.storage.getPubsubNode( | 347 cached_node = await self.host.memory.storage.get_pubsub_node( |
348 client, service, node | 348 client, service, node |
349 ) | 349 ) |
350 if cached_node is None: | 350 if cached_node is None: |
351 raise error.StanzaError("item-not-found") | 351 raise error.StanzaError("item-not-found") |
352 else: | 352 else: |
363 collection_name = "outbox" | 363 collection_name = "outbox" |
364 use_cache = True | 364 use_cache = True |
365 | 365 |
366 if use_cache: | 366 if use_cache: |
367 if cached_node is None: | 367 if cached_node is None: |
368 cached_node = await self.host.memory.storage.getPubsubNode( | 368 cached_node = await self.host.memory.storage.get_pubsub_node( |
369 client, service, node | 369 client, service, node |
370 ) | 370 ) |
371 # TODO: check if node is synchronised | 371 # TODO: check if node is synchronised |
372 if cached_node is not None: | 372 if cached_node is not None: |
373 # the node is cached, we return items from cache | 373 # the node is cached, we return items from cache |
374 log.debug(f"node {node!r} from {service} is in cache") | 374 log.debug(f"node {node!r} from {service} is in cache") |
375 pubsub_items, metadata = await self.apg._c.getItemsFromCache( | 375 pubsub_items, metadata = await self.apg._c.get_items_from_cache( |
376 client, cached_node, maxItems, itemIdentifiers, rsm_request=rsm_req | 376 client, cached_node, maxItems, itemIdentifiers, rsm_request=rsm_req |
377 ) | 377 ) |
378 try: | 378 try: |
379 rsm_resp = rsm.RSMResponse(**metadata["rsm"]) | 379 rsm_resp = rsm.RSMResponse(**metadata["rsm"]) |
380 except KeyError: | 380 except KeyError: |
382 return [i.data for i in pubsub_items], rsm_resp | 382 return [i.data for i in pubsub_items], rsm_resp |
383 | 383 |
384 if itemIdentifiers: | 384 if itemIdentifiers: |
385 items = [] | 385 items = [] |
386 for item_id in itemIdentifiers: | 386 for item_id in itemIdentifiers: |
387 item_data = await self.apg.apGet(item_id) | 387 item_data = await self.apg.ap_get(item_id) |
388 item_elt = await parser(item_data) | 388 item_elt = await parser(item_data) |
389 items.append(item_elt) | 389 items.append(item_elt) |
390 return items, None | 390 return items, None |
391 else: | 391 else: |
392 if rsm_req is None: | 392 if rsm_req is None: |
417 | 417 |
418 log.info( | 418 log.info( |
419 f"No cache found for node {node} at {service} (AP account {ap_account}), " | 419 f"No cache found for node {node} at {service} (AP account {ap_account}), " |
420 "using Collection Paging to RSM translation" | 420 "using Collection Paging to RSM translation" |
421 ) | 421 ) |
422 if self.apg._m.isCommentNode(node): | 422 if self.apg._m.is_comment_node(node): |
423 parent_item = self.apg._m.getParentItem(node) | 423 parent_item = self.apg._m.get_parent_item(node) |
424 try: | 424 try: |
425 parent_data = await self.apg.apGet(parent_item) | 425 parent_data = await self.apg.ap_get(parent_item) |
426 collection = await self.apg.apGetObject( | 426 collection = await self.apg.ap_get_object( |
427 parent_data.get("object", {}), | 427 parent_data.get("object", {}), |
428 "replies" | 428 "replies" |
429 ) | 429 ) |
430 except Exception as e: | 430 except Exception as e: |
431 raise error.StanzaError( | 431 raise error.StanzaError( |
432 "item-not-found", | 432 "item-not-found", |
433 text=e | 433 text=e |
434 ) | 434 ) |
435 else: | 435 else: |
436 actor_data = await self.apg.getAPActorDataFromAccount(ap_account) | 436 actor_data = await self.apg.get_ap_actor_data_from_account(ap_account) |
437 collection = await self.apg.apGetObject(actor_data, collection_name) | 437 collection = await self.apg.ap_get_object(actor_data, collection_name) |
438 if not collection: | 438 if not collection: |
439 raise error.StanzaError( | 439 raise error.StanzaError( |
440 "item-not-found", | 440 "item-not-found", |
441 text=f"No collection found for node {node!r} (account: {ap_account})" | 441 text=f"No collection found for node {node!r} (account: {ap_account})" |
442 ) | 442 ) |
443 | 443 |
444 kwargs["parser"] = parser | 444 kwargs["parser"] = parser |
445 return await self.apg.getAPItems(collection, **kwargs) | 445 return await self.apg.get_ap_items(collection, **kwargs) |
446 | 446 |
447 @ensure_deferred | 447 @ensure_deferred |
448 async def retract(self, requestor, service, nodeIdentifier, itemIdentifiers): | 448 async def retract(self, requestor, service, nodeIdentifier, itemIdentifiers): |
449 raise error.StanzaError("forbidden") | 449 raise error.StanzaError("forbidden") |
450 | 450 |
457 # being internal. | 457 # being internal. |
458 if nodeIdentifier == self.apg._m.namespace: | 458 if nodeIdentifier == self.apg._m.namespace: |
459 sub_state = SubscriptionState.PENDING | 459 sub_state = SubscriptionState.PENDING |
460 else: | 460 else: |
461 sub_state = SubscriptionState.SUBSCRIBED | 461 sub_state = SubscriptionState.SUBSCRIBED |
462 node = await self.host.memory.storage.getPubsubNode( | 462 node = await self.host.memory.storage.get_pubsub_node( |
463 client, service, nodeIdentifier, with_subscriptions=True | 463 client, service, nodeIdentifier, with_subscriptions=True |
464 ) | 464 ) |
465 if node is None: | 465 if node is None: |
466 node = await self.host.memory.storage.setPubsubNode( | 466 node = await self.host.memory.storage.set_pubsub_node( |
467 client, | 467 client, |
468 service, | 468 service, |
469 nodeIdentifier, | 469 nodeIdentifier, |
470 ) | 470 ) |
471 subscription = None | 471 subscription = None |
508 ) | 508 ) |
509 | 509 |
510 if nodeIdentifier in (self.apg._m.namespace, self.apg._events.namespace): | 510 if nodeIdentifier in (self.apg._m.namespace, self.apg._events.namespace): |
511 # if we subscribe to microblog or events node, we follow the corresponding | 511 # if we subscribe to microblog or events node, we follow the corresponding |
512 # account | 512 # account |
513 req_actor_id, recip_actor_id, inbox = await self.getAPActorIdsAndInbox( | 513 req_actor_id, recip_actor_id, inbox = await self.get_ap_actor_ids_and_inbox( |
514 requestor, service | 514 requestor, service |
515 ) | 515 ) |
516 | 516 |
517 data = self.apg.create_activity("Follow", req_actor_id, recip_actor_id) | 517 data = self.apg.create_activity("Follow", req_actor_id, recip_actor_id) |
518 | 518 |
519 resp = await self.apg.signAndPost(inbox, req_actor_id, data) | 519 resp = await self.apg.sign_and_post(inbox, req_actor_id, data) |
520 if resp.code >= 300: | 520 if resp.code >= 300: |
521 text = await resp.text() | 521 text = await resp.text() |
522 raise error.StanzaError("service-unavailable", text=text) | 522 raise error.StanzaError("service-unavailable", text=text) |
523 return pubsub.Subscription(nodeIdentifier, requestor, "subscribed") | 523 return pubsub.Subscription(nodeIdentifier, requestor, "subscribed") |
524 | 524 |
525 @ensure_deferred | 525 @ensure_deferred |
526 async def unsubscribe(self, requestor, service, nodeIdentifier, subscriber): | 526 async def unsubscribe(self, requestor, service, nodeIdentifier, subscriber): |
527 req_actor_id, recip_actor_id, inbox = await self.getAPActorIdsAndInbox( | 527 req_actor_id, recip_actor_id, inbox = await self.get_ap_actor_ids_and_inbox( |
528 requestor, service | 528 requestor, service |
529 ) | 529 ) |
530 data = self.apg.create_activity( | 530 data = self.apg.create_activity( |
531 "Undo", | 531 "Undo", |
532 req_actor_id, | 532 req_actor_id, |
535 req_actor_id, | 535 req_actor_id, |
536 recip_actor_id | 536 recip_actor_id |
537 ) | 537 ) |
538 ) | 538 ) |
539 | 539 |
540 resp = await self.apg.signAndPost(inbox, req_actor_id, data) | 540 resp = await self.apg.sign_and_post(inbox, req_actor_id, data) |
541 if resp.code >= 300: | 541 if resp.code >= 300: |
542 text = await resp.text() | 542 text = await resp.text() |
543 raise error.StanzaError("service-unavailable", text=text) | 543 raise error.StanzaError("service-unavailable", text=text) |
544 | 544 |
545 def getConfigurationOptions(self): | 545 def getConfigurationOptions(self): |