annotate sat/plugins/plugin_comp_ap_gateway/pubsub_service.py @ 3764:125c7043b277

comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers: this patch implements those major features: - `publish` is implemented on virtual pubsub service, thus XMPP entities can now publish to AP using this service - replies to XMPP items are managed - `inReplyTo` is filled when converting XMPP items to AP objects - `follow` and `unfollow` (actually an `undo` activity) are implemented and mapped to XMPP's (un)subscribe. On subscription, AP actor's `outbox` collection is converted to XMPP and put in cache. Subscriptions are always public. - `following` and `followers` collections are mapped to XMPP's Public Pubsub Subscription (which should be XEP-0465, but the XEP is not yet published at the time of commit), in both directions. - new helper methods to check if an URL is local and to get JID from actor ID doc will follow to explain behaviour rel 365
author Goffi <goffi@goffi.org>
date Fri, 13 May 2022 19:12:33 +0200
parents a8c7e5cef0cb
children 865167c34b82
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
3682
7c990aaa49d3 comp AP Gateway: ActivityPub Component first draft:
Goffi <goffi@goffi.org>
parents:
diff changeset
1 #!/usr/bin/env python3
7c990aaa49d3 comp AP Gateway: ActivityPub Component first draft:
Goffi <goffi@goffi.org>
parents:
diff changeset
2
7c990aaa49d3 comp AP Gateway: ActivityPub Component first draft:
Goffi <goffi@goffi.org>
parents:
diff changeset
3 # Libervia ActivityPub Gateway
7c990aaa49d3 comp AP Gateway: ActivityPub Component first draft:
Goffi <goffi@goffi.org>
parents:
diff changeset
4 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
7c990aaa49d3 comp AP Gateway: ActivityPub Component first draft:
Goffi <goffi@goffi.org>
parents:
diff changeset
5
7c990aaa49d3 comp AP Gateway: ActivityPub Component first draft:
Goffi <goffi@goffi.org>
parents:
diff changeset
6 # This program is free software: you can redistribute it and/or modify
7c990aaa49d3 comp AP Gateway: ActivityPub Component first draft:
Goffi <goffi@goffi.org>
parents:
diff changeset
7 # it under the terms of the GNU Affero General Public License as published by
7c990aaa49d3 comp AP Gateway: ActivityPub Component first draft:
Goffi <goffi@goffi.org>
parents:
diff changeset
8 # the Free Software Foundation, either version 3 of the License, or
7c990aaa49d3 comp AP Gateway: ActivityPub Component first draft:
Goffi <goffi@goffi.org>
parents:
diff changeset
9 # (at your option) any later version.
7c990aaa49d3 comp AP Gateway: ActivityPub Component first draft:
Goffi <goffi@goffi.org>
parents:
diff changeset
10
7c990aaa49d3 comp AP Gateway: ActivityPub Component first draft:
Goffi <goffi@goffi.org>
parents:
diff changeset
11 # This program is distributed in the hope that it will be useful,
7c990aaa49d3 comp AP Gateway: ActivityPub Component first draft:
Goffi <goffi@goffi.org>
parents:
diff changeset
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
7c990aaa49d3 comp AP Gateway: ActivityPub Component first draft:
Goffi <goffi@goffi.org>
parents:
diff changeset
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
7c990aaa49d3 comp AP Gateway: ActivityPub Component first draft:
Goffi <goffi@goffi.org>
parents:
diff changeset
14 # GNU Affero General Public License for more details.
7c990aaa49d3 comp AP Gateway: ActivityPub Component first draft:
Goffi <goffi@goffi.org>
parents:
diff changeset
15
7c990aaa49d3 comp AP Gateway: ActivityPub Component first draft:
Goffi <goffi@goffi.org>
parents:
diff changeset
16 # You should have received a copy of the GNU Affero General Public License
7c990aaa49d3 comp AP Gateway: ActivityPub Component first draft:
Goffi <goffi@goffi.org>
parents:
diff changeset
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
7c990aaa49d3 comp AP Gateway: ActivityPub Component first draft:
Goffi <goffi@goffi.org>
parents:
diff changeset
18
3764
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
19 from typing import Optional, Tuple, List, Union
3728
b15644cae50d component AP gateway: JID/node ⟺ AP outbox conversion:
Goffi <goffi@goffi.org>
parents: 3684
diff changeset
20
3745
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
21 from twisted.internet import defer
3728
b15644cae50d component AP gateway: JID/node ⟺ AP outbox conversion:
Goffi <goffi@goffi.org>
parents: 3684
diff changeset
22 from twisted.words.protocols.jabber import jid, error
b15644cae50d component AP gateway: JID/node ⟺ AP outbox conversion:
Goffi <goffi@goffi.org>
parents: 3684
diff changeset
23 from twisted.words.xish import domish
3764
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
24 from wokkel import rsm, pubsub, disco
3728
b15644cae50d component AP gateway: JID/node ⟺ AP outbox conversion:
Goffi <goffi@goffi.org>
parents: 3684
diff changeset
25
b15644cae50d component AP gateway: JID/node ⟺ AP outbox conversion:
Goffi <goffi@goffi.org>
parents: 3684
diff changeset
26 from sat.core.i18n import _
3745
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
27 from sat.core import exceptions
3682
7c990aaa49d3 comp AP Gateway: ActivityPub Component first draft:
Goffi <goffi@goffi.org>
parents:
diff changeset
28 from sat.core.log import getLogger
3745
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
29 from sat.core.constants import Const as C
3728
b15644cae50d component AP gateway: JID/node ⟺ AP outbox conversion:
Goffi <goffi@goffi.org>
parents: 3684
diff changeset
30 from sat.tools.utils import ensure_deferred
3745
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
31 from sat.memory.sqla_mapping import PubsubSub, SubscriptionState
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
32
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
33 from .constants import (
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
34 TYPE_ACTOR,
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
35 )
3682
7c990aaa49d3 comp AP Gateway: ActivityPub Component first draft:
Goffi <goffi@goffi.org>
parents:
diff changeset
36
7c990aaa49d3 comp AP Gateway: ActivityPub Component first draft:
Goffi <goffi@goffi.org>
parents:
diff changeset
37
7c990aaa49d3 comp AP Gateway: ActivityPub Component first draft:
Goffi <goffi@goffi.org>
parents:
diff changeset
38 log = getLogger(__name__)
7c990aaa49d3 comp AP Gateway: ActivityPub Component first draft:
Goffi <goffi@goffi.org>
parents:
diff changeset
39
3745
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
40 # all nodes have the same config
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
41 NODE_CONFIG = [
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
42 {"var": "pubsub#persist_items", "type": "boolean", "value": True},
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
43 {"var": "pubsub#max_items", "value": "max"},
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
44 {"var": "pubsub#access_model", "type": "list-single", "value": "open"},
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
45 {"var": "pubsub#publish_model", "type": "list-single", "value": "open"},
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
46
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
47 ]
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
48
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
49 NODE_CONFIG_VALUES = {c["var"]: c["value"] for c in NODE_CONFIG}
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
50 NODE_OPTIONS = {c["var"]: {} for c in NODE_CONFIG}
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
51 for c in NODE_CONFIG:
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
52 NODE_OPTIONS[c["var"]].update({k:v for k,v in c.items() if k not in ("var", "value")})
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
53
3728
b15644cae50d component AP gateway: JID/node ⟺ AP outbox conversion:
Goffi <goffi@goffi.org>
parents: 3684
diff changeset
54
b15644cae50d component AP gateway: JID/node ⟺ AP outbox conversion:
Goffi <goffi@goffi.org>
parents: 3684
diff changeset
55 class APPubsubService(rsm.PubSubService):
b15644cae50d component AP gateway: JID/node ⟺ AP outbox conversion:
Goffi <goffi@goffi.org>
parents: 3684
diff changeset
56 """Pubsub service for XMPP requests"""
b15644cae50d component AP gateway: JID/node ⟺ AP outbox conversion:
Goffi <goffi@goffi.org>
parents: 3684
diff changeset
57
b15644cae50d component AP gateway: JID/node ⟺ AP outbox conversion:
Goffi <goffi@goffi.org>
parents: 3684
diff changeset
58 def __init__(self, apg):
b15644cae50d component AP gateway: JID/node ⟺ AP outbox conversion:
Goffi <goffi@goffi.org>
parents: 3684
diff changeset
59 super(APPubsubService, self).__init__()
b15644cae50d component AP gateway: JID/node ⟺ AP outbox conversion:
Goffi <goffi@goffi.org>
parents: 3684
diff changeset
60 self.host = apg.host
b15644cae50d component AP gateway: JID/node ⟺ AP outbox conversion:
Goffi <goffi@goffi.org>
parents: 3684
diff changeset
61 self.apg = apg
b15644cae50d component AP gateway: JID/node ⟺ AP outbox conversion:
Goffi <goffi@goffi.org>
parents: 3684
diff changeset
62 self.discoIdentity = {
b15644cae50d component AP gateway: JID/node ⟺ AP outbox conversion:
Goffi <goffi@goffi.org>
parents: 3684
diff changeset
63 "category": "pubsub",
b15644cae50d component AP gateway: JID/node ⟺ AP outbox conversion:
Goffi <goffi@goffi.org>
parents: 3684
diff changeset
64 "type": "service",
b15644cae50d component AP gateway: JID/node ⟺ AP outbox conversion:
Goffi <goffi@goffi.org>
parents: 3684
diff changeset
65 "name": "Libervia ActivityPub Gateway",
b15644cae50d component AP gateway: JID/node ⟺ AP outbox conversion:
Goffi <goffi@goffi.org>
parents: 3684
diff changeset
66 }
b15644cae50d component AP gateway: JID/node ⟺ AP outbox conversion:
Goffi <goffi@goffi.org>
parents: 3684
diff changeset
67
3745
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
68 async def getAPActorIdsAndInbox(
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
69 self,
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
70 requestor: jid.JID,
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
71 recipient: jid.JID,
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
72 ) -> Tuple[str, str, str]:
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
73 """Get AP actor IDs from requestor and destinee JIDs
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
74
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
75 @param requestor: XMPP entity doing a request to an AP actor via the gateway
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
76 @param recipient: JID mapping an AP actor via the gateway
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
77 @return: requestor actor ID, recipient actor ID and recipient inbox
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
78 @raise error.StanzaError: "item-not-found" is raised if not user part is specified
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
79 in requestor
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
80 """
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
81 if not recipient.user:
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
82 raise error.StanzaError(
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
83 "item-not-found",
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
84 text="No user part specified"
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
85 )
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
86 requestor_actor_id = self.apg.buildAPURL(TYPE_ACTOR, requestor.userhost())
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
87 recipient_account = self.apg._e.unescape(recipient.user)
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
88 recipient_actor_id = await self.apg.getAPActorIdFromAccount(recipient_account)
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
89 inbox = await self.apg.getAPInboxFromId(recipient_actor_id)
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
90 return requestor_actor_id, recipient_actor_id, inbox
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
91
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
92
3728
b15644cae50d component AP gateway: JID/node ⟺ AP outbox conversion:
Goffi <goffi@goffi.org>
parents: 3684
diff changeset
93 @ensure_deferred
b15644cae50d component AP gateway: JID/node ⟺ AP outbox conversion:
Goffi <goffi@goffi.org>
parents: 3684
diff changeset
94 async def publish(self, requestor, service, nodeIdentifier, items):
3764
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
95 if self.apg.local_only and not self.apg.isLocal(requestor):
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
96 raise error.StanzaError(
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
97 "forbidden",
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
98 "Only local users can publish on this gateway."
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
99 )
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
100 if not service.user:
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
101 raise error.StanzaError(
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
102 "bad-request",
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
103 "You must specify an ActivityPub actor account in JID user part."
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
104 )
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
105 ap_account = self.apg._e.unescape(service.user)
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
106 if ap_account.count("@") != 1:
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
107 raise error.StanzaError(
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
108 "bad-request",
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
109 f"{ap_account!r} is not a valid ActivityPub actor account."
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
110 )
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
111
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
112 client = self.apg.client.getVirtualClient(requestor)
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
113 await self.apg.convertAndPostItems(
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
114 client, ap_account, service, nodeIdentifier, items
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
115 )
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
116
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
117 async def apFollowing2Elt(self, ap_item: dict) -> domish.Element:
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
118 """Convert actor ID from following collection to XMPP item"""
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
119 actor_id = ap_item["id"]
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
120 actor_jid = await self.apg.getJIDFromId(actor_id)
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
121 subscription_elt = self.apg._pps.buildSubscriptionElt(
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
122 self.apg._m.namespace, actor_jid
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
123 )
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
124 item_elt = pubsub.Item(id=actor_id, payload=subscription_elt)
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
125 return item_elt
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
126
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
127 async def apFollower2Elt(self, ap_item: dict) -> domish.Element:
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
128 """Convert actor ID from followers collection to XMPP item"""
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
129 actor_id = ap_item["id"]
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
130 actor_jid = await self.apg.getJIDFromId(actor_id)
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
131 subscriber_elt = self.apg._pps.buildSubscriberElt(actor_jid)
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
132 item_elt = pubsub.Item(id=actor_id, payload=subscriber_elt)
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
133 return item_elt
3728
b15644cae50d component AP gateway: JID/node ⟺ AP outbox conversion:
Goffi <goffi@goffi.org>
parents: 3684
diff changeset
134
b15644cae50d component AP gateway: JID/node ⟺ AP outbox conversion:
Goffi <goffi@goffi.org>
parents: 3684
diff changeset
135 @ensure_deferred
b15644cae50d component AP gateway: JID/node ⟺ AP outbox conversion:
Goffi <goffi@goffi.org>
parents: 3684
diff changeset
136 async def items(
b15644cae50d component AP gateway: JID/node ⟺ AP outbox conversion:
Goffi <goffi@goffi.org>
parents: 3684
diff changeset
137 self,
b15644cae50d component AP gateway: JID/node ⟺ AP outbox conversion:
Goffi <goffi@goffi.org>
parents: 3684
diff changeset
138 requestor: jid.JID,
b15644cae50d component AP gateway: JID/node ⟺ AP outbox conversion:
Goffi <goffi@goffi.org>
parents: 3684
diff changeset
139 service: jid.JID,
b15644cae50d component AP gateway: JID/node ⟺ AP outbox conversion:
Goffi <goffi@goffi.org>
parents: 3684
diff changeset
140 node: str,
b15644cae50d component AP gateway: JID/node ⟺ AP outbox conversion:
Goffi <goffi@goffi.org>
parents: 3684
diff changeset
141 maxItems: Optional[int],
b15644cae50d component AP gateway: JID/node ⟺ AP outbox conversion:
Goffi <goffi@goffi.org>
parents: 3684
diff changeset
142 itemIdentifiers: Optional[List[str]],
b15644cae50d component AP gateway: JID/node ⟺ AP outbox conversion:
Goffi <goffi@goffi.org>
parents: 3684
diff changeset
143 rsm_req: Optional[rsm.RSMRequest]
3745
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
144 ) -> Tuple[List[domish.Element], Optional[rsm.RSMResponse]]:
3728
b15644cae50d component AP gateway: JID/node ⟺ AP outbox conversion:
Goffi <goffi@goffi.org>
parents: 3684
diff changeset
145 if not service.user:
3745
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
146 return [], None
3728
b15644cae50d component AP gateway: JID/node ⟺ AP outbox conversion:
Goffi <goffi@goffi.org>
parents: 3684
diff changeset
147 ap_account = self.host.plugins["XEP-0106"].unescape(service.user)
b15644cae50d component AP gateway: JID/node ⟺ AP outbox conversion:
Goffi <goffi@goffi.org>
parents: 3684
diff changeset
148 if ap_account.count("@") != 1:
b15644cae50d component AP gateway: JID/node ⟺ AP outbox conversion:
Goffi <goffi@goffi.org>
parents: 3684
diff changeset
149 log.warning(f"Invalid AP account used by {requestor}: {ap_account!r}")
3745
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
150 return [], None
3764
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
151
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
152 kwargs = {}
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
153
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
154 if node == self.apg._pps.subscriptions_node:
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
155 collection_name = "following"
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
156 parser = self.apFollowing2Elt
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
157 kwargs["only_ids"] = True
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
158 use_cache = False
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
159 elif node.startswith(self.apg._pps.subscribers_node_prefix):
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
160 collection_name = "followers"
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
161 parser = self.apFollower2Elt
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
162 kwargs["only_ids"] = True
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
163 use_cache = False
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
164 else:
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
165 if not node.startswith(self.apg._m.namespace):
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
166 raise error.StanzaError(
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
167 "feature-not-implemented",
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
168 text=f"AP Gateway {C.APP_VERSION} only supports "
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
169 f"{self.apg._m.namespace} node for now"
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
170 )
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
171 collection_name = "outbox"
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
172 parser = self.apg.apItem2Elt
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
173 use_cache = True
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
174
3745
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
175 client = self.apg.client
3764
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
176 if use_cache:
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
177 cached_node = await self.host.memory.storage.getPubsubNode(
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
178 client, service, node
3745
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
179 )
3764
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
180 # TODO: check if node is synchronised
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
181 if cached_node is not None:
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
182 # the node is cached, we return items from cache
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
183 log.debug(f"node {node!r} from {service} is in cache")
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
184 pubsub_items, metadata = await self.apg._c.getItemsFromCache(
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
185 client, cached_node, maxItems, itemIdentifiers, rsm_request=rsm_req
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
186 )
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
187 try:
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
188 rsm_resp = rsm.RSMResponse(**metadata["rsm"])
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
189 except KeyError:
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
190 rsm_resp = None
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
191 return [i.data for i in pubsub_items], rsm_resp
3745
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
192
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
193 if itemIdentifiers:
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
194 items = []
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
195 for item_id in itemIdentifiers:
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
196 item_data = await self.apg.apGet(item_id)
3764
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
197 item_elt = await parser(item_data)
3745
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
198 items.append(item_elt)
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
199 return items, None
3728
b15644cae50d component AP gateway: JID/node ⟺ AP outbox conversion:
Goffi <goffi@goffi.org>
parents: 3684
diff changeset
200 else:
3745
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
201 if rsm_req is None:
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
202 if maxItems is None:
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
203 maxItems = 20
3764
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
204 kwargs.update({
3745
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
205 "max_items": maxItems,
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
206 "chronological_pagination": False,
3764
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
207 })
3745
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
208 else:
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
209 if len(
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
210 [v for v in (rsm_req.after, rsm_req.before, rsm_req.index)
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
211 if v is not None]
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
212 ) > 1:
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
213 raise error.StanzaError(
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
214 "bad-request",
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
215 text="You can't use after, before and index at the same time"
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
216 )
3764
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
217 kwargs.update({"max_items": rsm_req.max})
3745
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
218 if rsm_req.after is not None:
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
219 kwargs["after_id"] = rsm_req.after
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
220 elif rsm_req.before is not None:
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
221 kwargs["chronological_pagination"] = False
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
222 if rsm_req.before != "":
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
223 kwargs["after_id"] = rsm_req.before
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
224 elif rsm_req.index is not None:
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
225 kwargs["start_index"] = rsm_req.index
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
226
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
227 log.info(
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
228 f"No cache found for node {node} at {service} (AP account {ap_account}), "
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
229 "using Collection Paging to RSM translation"
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
230 )
3764
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
231 if self.apg._m.isCommentNode(node):
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
232 parent_item = self.apg._m.getParentItem(node)
3745
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
233 try:
3764
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
234 parent_data = await self.apg.apGet(parent_item)
3745
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
235 collection = await self.apg.apGetObject(
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
236 parent_data.get("object", {}),
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
237 "replies"
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
238 )
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
239 except Exception as e:
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
240 raise error.StanzaError(
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
241 "item-not-found",
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
242 text=e
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
243 )
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
244 else:
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
245 actor_data = await self.apg.getAPActorDataFromAccount(ap_account)
3764
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
246 collection = await self.apg.apGetObject(actor_data, collection_name)
3745
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
247 if not collection:
3728
b15644cae50d component AP gateway: JID/node ⟺ AP outbox conversion:
Goffi <goffi@goffi.org>
parents: 3684
diff changeset
248 raise error.StanzaError(
3745
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
249 "item-not-found",
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
250 text=f"No collection found for node {node!r} (account: {ap_account})"
3728
b15644cae50d component AP gateway: JID/node ⟺ AP outbox conversion:
Goffi <goffi@goffi.org>
parents: 3684
diff changeset
251 )
3764
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
252
125c7043b277 comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
Goffi <goffi@goffi.org>
parents: 3745
diff changeset
253 kwargs["parser"] = parser
3745
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
254 return await self.apg.getAPItems(collection, **kwargs)
3728
b15644cae50d component AP gateway: JID/node ⟺ AP outbox conversion:
Goffi <goffi@goffi.org>
parents: 3684
diff changeset
255
b15644cae50d component AP gateway: JID/node ⟺ AP outbox conversion:
Goffi <goffi@goffi.org>
parents: 3684
diff changeset
256 @ensure_deferred
b15644cae50d component AP gateway: JID/node ⟺ AP outbox conversion:
Goffi <goffi@goffi.org>
parents: 3684
diff changeset
257 async def retract(self, requestor, service, nodeIdentifier, itemIdentifiers):
b15644cae50d component AP gateway: JID/node ⟺ AP outbox conversion:
Goffi <goffi@goffi.org>
parents: 3684
diff changeset
258 raise NotImplementedError
b15644cae50d component AP gateway: JID/node ⟺ AP outbox conversion:
Goffi <goffi@goffi.org>
parents: 3684
diff changeset
259
3745
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
260 @ensure_deferred
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
261 async def subscribe(self, requestor, service, nodeIdentifier, subscriber):
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
262 # TODO: handle comments nodes
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
263 client = self.apg.client
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
264 node = await self.host.memory.storage.getPubsubNode(
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
265 client, service, nodeIdentifier, with_subscriptions=True
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
266 )
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
267 if node is None:
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
268 node = await self.host.memory.storage.setPubsubNode(
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
269 client,
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
270 service,
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
271 nodeIdentifier,
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
272 )
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
273 subscription = None
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
274 else:
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
275 try:
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
276 subscription = next(
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
277 s for s in node.subscriptions
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
278 if s.subscriber == requestor.userhostJID()
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
279 )
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
280 except StopIteration:
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
281 subscription = None
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
282
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
283 if subscription is None:
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
284 subscription = PubsubSub(
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
285 subscriber=requestor.userhostJID(),
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
286 state=SubscriptionState.PENDING
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
287 )
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
288 node.subscriptions.append(subscription)
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
289 await self.host.memory.storage.add(node)
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
290 else:
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
291 if subscription.state is None:
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
292 subscription.state = SubscriptionState.PENDING
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
293 await self.host.memory.storage.add(node)
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
294 elif subscription.state == SubscriptionState.SUBSCRIBED:
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
295 log.info(
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
296 f"{requestor.userhostJID()} has already a subscription to {node!r} "
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
297 f"at {service}. Doing the request anyway."
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
298 )
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
299 elif subscription.state == SubscriptionState.PENDING:
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
300 log.info(
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
301 f"{requestor.userhostJID()} has already a pending subscription to "
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
302 f"{node!r} at {service}. Doing the request anyway."
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
303 )
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
304 else:
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
305 raise exceptions.InternalError(
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
306 f"unmanaged subscription state: {subscription.state}"
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
307 )
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
308
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
309 req_actor_id, recip_actor_id, inbox = await self.getAPActorIdsAndInbox(
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
310 requestor, service
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
311 )
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
312
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
313 data = self.apg.createActivity("Follow", req_actor_id, recip_actor_id)
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
314
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
315 resp = await self.apg.signAndPost(inbox, req_actor_id, data)
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
316 if resp.code >= 400:
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
317 text = await resp.text()
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
318 raise error.StanzaError("service-unavailable", text=text)
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
319 return pubsub.Subscription(nodeIdentifier, requestor, "subscribed")
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
320
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
321 @ensure_deferred
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
322 async def unsubscribe(self, requestor, service, nodeIdentifier, subscriber):
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
323 req_actor_id, recip_actor_id, inbox = await self.getAPActorIdsAndInbox(
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
324 requestor, service
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
325 )
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
326 data = self.apg.createActivity(
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
327 "Undo",
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
328 req_actor_id,
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
329 self.apg.createActivity(
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
330 "Follow",
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
331 req_actor_id,
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
332 recip_actor_id
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
333 )
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
334 )
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
335
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
336 resp = await self.apg.signAndPost(inbox, req_actor_id, data)
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
337 if resp.code >= 400:
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
338 text = await resp.text()
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
339 raise error.StanzaError("service-unavailable", text=text)
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
340
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
341 def getConfigurationOptions(self):
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
342 return NODE_OPTIONS
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
343
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
344 def getConfiguration(
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
345 self,
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
346 requestor: jid.JID,
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
347 service: jid.JID,
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
348 nodeIdentifier: str
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
349 ) -> defer.Deferred:
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
350 return defer.succeed(NODE_CONFIG_VALUES)
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
351
3728
b15644cae50d component AP gateway: JID/node ⟺ AP outbox conversion:
Goffi <goffi@goffi.org>
parents: 3684
diff changeset
352 def getNodeInfo(
b15644cae50d component AP gateway: JID/node ⟺ AP outbox conversion:
Goffi <goffi@goffi.org>
parents: 3684
diff changeset
353 self,
b15644cae50d component AP gateway: JID/node ⟺ AP outbox conversion:
Goffi <goffi@goffi.org>
parents: 3684
diff changeset
354 requestor: jid.JID,
b15644cae50d component AP gateway: JID/node ⟺ AP outbox conversion:
Goffi <goffi@goffi.org>
parents: 3684
diff changeset
355 service: jid.JID,
b15644cae50d component AP gateway: JID/node ⟺ AP outbox conversion:
Goffi <goffi@goffi.org>
parents: 3684
diff changeset
356 nodeIdentifier: str,
b15644cae50d component AP gateway: JID/node ⟺ AP outbox conversion:
Goffi <goffi@goffi.org>
parents: 3684
diff changeset
357 pep: bool = False,
b15644cae50d component AP gateway: JID/node ⟺ AP outbox conversion:
Goffi <goffi@goffi.org>
parents: 3684
diff changeset
358 recipient: Optional[jid.JID] = None
b15644cae50d component AP gateway: JID/node ⟺ AP outbox conversion:
Goffi <goffi@goffi.org>
parents: 3684
diff changeset
359 ) -> Optional[dict]:
b15644cae50d component AP gateway: JID/node ⟺ AP outbox conversion:
Goffi <goffi@goffi.org>
parents: 3684
diff changeset
360 if not nodeIdentifier:
b15644cae50d component AP gateway: JID/node ⟺ AP outbox conversion:
Goffi <goffi@goffi.org>
parents: 3684
diff changeset
361 return None
b15644cae50d component AP gateway: JID/node ⟺ AP outbox conversion:
Goffi <goffi@goffi.org>
parents: 3684
diff changeset
362 info = {
b15644cae50d component AP gateway: JID/node ⟺ AP outbox conversion:
Goffi <goffi@goffi.org>
parents: 3684
diff changeset
363 "type": "leaf",
3745
a8c7e5cef0cb comp AP gateway: signature checking, caching and threads management:
Goffi <goffi@goffi.org>
parents: 3729
diff changeset
364 "meta-data": NODE_CONFIG
3728
b15644cae50d component AP gateway: JID/node ⟺ AP outbox conversion:
Goffi <goffi@goffi.org>
parents: 3684
diff changeset
365 }
b15644cae50d component AP gateway: JID/node ⟺ AP outbox conversion:
Goffi <goffi@goffi.org>
parents: 3684
diff changeset
366 return info