Mercurial > libervia-backend
comparison sat/plugins/plugin_comp_ap_gateway/http_server.py @ 3844:65e5718e7710
component AP gateway: `Announce` activity implementation:
`Announce` and `Undo` of `Announce` are now implemented and converted to suitable XEP-0277
"repeat" items, or retract.
rel 370
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 14 Jul 2022 12:55:30 +0200 |
parents | 381340b9a9ee |
children | cc13efdd8360 |
comparison
equal
deleted
inserted
replaced
3843:17c757bd74bc | 3844:65e5718e7710 |
---|---|
33 | 33 |
34 from sat.core import exceptions | 34 from sat.core import exceptions |
35 from sat.core.constants import Const as C | 35 from sat.core.constants import Const as C |
36 from sat.core.i18n import _ | 36 from sat.core.i18n import _ |
37 from sat.core.log import getLogger | 37 from sat.core.log import getLogger |
38 from sat.tools.common import date_utils | 38 from sat.tools.common import date_utils, uri |
39 from sat.memory.sqla_mapping import SubscriptionState | 39 from sat.memory.sqla_mapping import SubscriptionState |
40 | 40 |
41 from .constants import ( | 41 from .constants import ( |
42 CONTENT_TYPE_AP, TYPE_ACTOR, TYPE_INBOX, TYPE_SHARED_INBOX, TYPE_OUTBOX, | 42 CONTENT_TYPE_AP, TYPE_ACTOR, TYPE_INBOX, TYPE_SHARED_INBOX, TYPE_OUTBOX, |
43 AP_REQUEST_TYPES, PAGE_SIZE, ACTIVITY_TYPES_LOWER, ACTIVIY_NO_ACCOUNT_ALLOWED, | 43 AP_REQUEST_TYPES, PAGE_SIZE, ACTIVITY_TYPES_LOWER, ACTIVIY_NO_ACCOUNT_ALLOWED, |
127 type_ = obj.get("type") | 127 type_ = obj.get("type") |
128 actor = await self.apg.apGetSenderActor(obj) | 128 actor = await self.apg.apGetSenderActor(obj) |
129 if actor != signing_actor: | 129 if actor != signing_actor: |
130 log.warning(f"ignoring object not attributed to signing actor: {data}") | 130 log.warning(f"ignoring object not attributed to signing actor: {data}") |
131 continue | 131 continue |
132 try: | |
133 target_account = obj["object"] | |
134 except KeyError: | |
135 log.warning(f'ignoring invalid object, missing "object" key: {data}') | |
136 continue | |
137 if not self.apg.isLocalURL(target_account): | |
138 log.warning(f"ignoring unfollow request to non local actor: {data}") | |
139 continue | |
140 | 132 |
141 if type_ == "Follow": | 133 if type_ == "Follow": |
134 try: | |
135 target_account = obj["object"] | |
136 except KeyError: | |
137 log.warning(f'ignoring invalid object, missing "object" key: {data}') | |
138 continue | |
139 if not self.apg.isLocalURL(target_account): | |
140 log.warning(f"ignoring unfollow request to non local actor: {data}") | |
141 continue | |
142 await self.apg._p.unsubscribe( | 142 await self.apg._p.unsubscribe( |
143 client, | 143 client, |
144 account_jid, | 144 account_jid, |
145 node, | 145 node, |
146 sender=client.jid, | 146 sender=client.jid, |
147 ) | 147 ) |
148 elif type_ == "Announce": | |
149 # we can use directly the Announce object, as only the "id" field is | |
150 # needed | |
151 await self.apg.newAPDeleteItem(client, None, node, obj) | |
148 else: | 152 else: |
149 log.warning(f"Unmanaged undo type: {type_!r}") | 153 log.warning(f"Unmanaged undo type: {type_!r}") |
150 | 154 |
151 async def handleFollowActivity( | 155 async def handleFollowActivity( |
152 self, | 156 self, |
241 node: Optional[str], | 245 node: Optional[str], |
242 ap_account: Optional[str], | 246 ap_account: Optional[str], |
243 ap_url: str, | 247 ap_url: str, |
244 signing_actor: str | 248 signing_actor: str |
245 ): | 249 ): |
246 digest = request.getHeader("digest") | |
247 if digest in self._seen_digest: | |
248 log.debug(f"Ignoring duplicated request (digest: {digest!r})") | |
249 return | |
250 self._seen_digest.append(digest) | |
251 if node is None: | 250 if node is None: |
252 node = self.apg._m.namespace | 251 node = self.apg._m.namespace |
253 client = await self.apg.getVirtualClient(signing_actor) | 252 client = await self.apg.getVirtualClient(signing_actor) |
254 objects = await self.apg.apGetList(data, "object") | 253 objects = await self.apg.apGetList(data, "object") |
255 for obj in objects: | 254 for obj in objects: |
256 await self.apg.newAPDeleteItem(client, account_jid, node, obj) | 255 await self.apg.newAPDeleteItem(client, account_jid, node, obj) |
257 | 256 |
258 async def handleCreateActivity( | 257 async def handleNewAPItems( |
259 self, | 258 self, |
260 request: "HTTPRequest", | 259 request: "HTTPRequest", |
261 data: dict, | 260 data: dict, |
262 account_jid: Optional[jid.JID], | 261 account_jid: Optional[jid.JID], |
263 node: Optional[str], | 262 node: Optional[str], |
264 ap_account: Optional[str], | 263 signing_actor: str, |
265 ap_url: str, | 264 repeated: bool = False, |
266 signing_actor: str | |
267 ): | 265 ): |
268 digest = request.getHeader("digest") | 266 """Helper method to handle workflow for new AP items |
269 if digest in self._seen_digest: | 267 |
270 log.debug(f"Ignoring duplicated request (digest: {digest!r})") | 268 accept globally the same parameter as for handleCreateActivity |
271 return | 269 @param repeated: if True, the item is an item republished from somewhere else |
272 self._seen_digest.append(digest) | 270 """ |
271 if "_repeated" in data: | |
272 log.error( | |
273 '"_repeated" field already present in given AP item, this should not ' | |
274 f"happen. Ignoring object from {signing_actor}\n{data}" | |
275 ) | |
276 raise exceptions.DataError("unexpected field in item") | |
273 if node is None: | 277 if node is None: |
274 node = self.apg._m.namespace | 278 node = self.apg._m.namespace |
275 client = await self.apg.getVirtualClient(signing_actor) | 279 client = await self.apg.getVirtualClient(signing_actor) |
276 objects = await self.apg.apGetList(data, "object") | 280 objects = await self.apg.apGetList(data, "object") |
277 for obj in objects: | 281 for obj in objects: |
278 sender = await self.apg.apGetSenderActor(obj) | 282 sender = await self.apg.apGetSenderActor(obj) |
279 if sender != signing_actor: | 283 if repeated: |
280 log.warning( | 284 # we don't check sender when item is repeated, as it should be different |
281 "Ignoring object not attributed to signing actor: {obj}" | 285 # from post author in this case |
282 ) | 286 sender_jid = await self.apg.getJIDFromId(sender) |
287 repeater_jid = await self.apg.getJIDFromId(signing_actor) | |
288 | |
289 obj["_repeated"] = { | |
290 "by": repeater_jid.full(), | |
291 "at": data.get("published"), | |
292 "uri": uri.buildXMPPUri( | |
293 "pubsub", | |
294 path=sender_jid.full(), | |
295 node=self.apg._m.namespace, | |
296 item=obj["id"] | |
297 ) | |
298 } | |
299 # we must use activity's id and targets, not the original item ones | |
300 for field in ("id", "to", "bto", "cc", "bcc"): | |
301 obj[field] = data.get(field) | |
283 else: | 302 else: |
284 await self.apg.newAPItem(client, account_jid, node, obj) | 303 if sender != signing_actor: |
304 log.warning( | |
305 "Ignoring object not attributed to signing actor: {obj}" | |
306 ) | |
307 continue | |
308 await self.apg.newAPItem(client, account_jid, node, obj) | |
309 | |
310 async def handleCreateActivity( | |
311 self, | |
312 request: "HTTPRequest", | |
313 data: dict, | |
314 account_jid: Optional[jid.JID], | |
315 node: Optional[str], | |
316 ap_account: Optional[str], | |
317 ap_url: str, | |
318 signing_actor: str | |
319 ): | |
320 await self.handleNewAPItems(request, data, account_jid, node, signing_actor) | |
321 | |
322 async def handleAnnounceActivity( | |
323 self, | |
324 request: "HTTPRequest", | |
325 data: dict, | |
326 account_jid: Optional[jid.JID], | |
327 node: Optional[str], | |
328 ap_account: Optional[str], | |
329 ap_url: str, | |
330 signing_actor: str | |
331 ): | |
332 # we create a new item | |
333 await self.handleNewAPItems( | |
334 request, | |
335 data, | |
336 account_jid, | |
337 node, | |
338 signing_actor, | |
339 repeated=True | |
340 ) | |
285 | 341 |
286 async def APActorRequest( | 342 async def APActorRequest( |
287 self, | 343 self, |
288 request: "HTTPRequest", | 344 request: "HTTPRequest", |
289 account_jid: jid.JID, | 345 account_jid: jid.JID, |
678 f"invalid signature: {e}" | 734 f"invalid signature: {e}" |
679 ) | 735 ) |
680 request.finish() | 736 request.finish() |
681 return | 737 return |
682 | 738 |
739 request.setResponseCode(http.ACCEPTED) | |
740 | |
741 digest = request.getHeader("digest") | |
742 if digest in self._seen_digest: | |
743 log.debug(f"Ignoring duplicated request (digest: {digest!r})") | |
744 request.finish() | |
745 return | |
746 self._seen_digest.append(digest) | |
747 | |
683 # default response code, may be changed, e.g. in case of exception | 748 # default response code, may be changed, e.g. in case of exception |
684 request.setResponseCode(http.ACCEPTED) | |
685 try: | 749 try: |
686 return await self.APRequest(request, signing_actor) | 750 return await self.APRequest(request, signing_actor) |
687 except Exception as e: | 751 except Exception as e: |
688 self._onRequestError(failure.Failure(e), request) | 752 self._onRequestError(failure.Failure(e), request) |
689 | 753 |