comparison sat/plugins/plugin_comp_ap_gateway/http_server.py @ 3844:65e5718e7710

component AP gateway: `Announce` activity implementation: `Announce` and `Undo` of `Announce` are now implemented and converted to suitable XEP-0277 "repeat" items, or retract. rel 370
author Goffi <goffi@goffi.org>
date Thu, 14 Jul 2022 12:55:30 +0200
parents 381340b9a9ee
children cc13efdd8360
comparison
equal deleted inserted replaced
3843:17c757bd74bc 3844:65e5718e7710
33 33
34 from sat.core import exceptions 34 from sat.core import exceptions
35 from sat.core.constants import Const as C 35 from sat.core.constants import Const as C
36 from sat.core.i18n import _ 36 from sat.core.i18n import _
37 from sat.core.log import getLogger 37 from sat.core.log import getLogger
38 from sat.tools.common import date_utils 38 from sat.tools.common import date_utils, uri
39 from sat.memory.sqla_mapping import SubscriptionState 39 from sat.memory.sqla_mapping import SubscriptionState
40 40
41 from .constants import ( 41 from .constants import (
42 CONTENT_TYPE_AP, TYPE_ACTOR, TYPE_INBOX, TYPE_SHARED_INBOX, TYPE_OUTBOX, 42 CONTENT_TYPE_AP, TYPE_ACTOR, TYPE_INBOX, TYPE_SHARED_INBOX, TYPE_OUTBOX,
43 AP_REQUEST_TYPES, PAGE_SIZE, ACTIVITY_TYPES_LOWER, ACTIVIY_NO_ACCOUNT_ALLOWED, 43 AP_REQUEST_TYPES, PAGE_SIZE, ACTIVITY_TYPES_LOWER, ACTIVIY_NO_ACCOUNT_ALLOWED,
127 type_ = obj.get("type") 127 type_ = obj.get("type")
128 actor = await self.apg.apGetSenderActor(obj) 128 actor = await self.apg.apGetSenderActor(obj)
129 if actor != signing_actor: 129 if actor != signing_actor:
130 log.warning(f"ignoring object not attributed to signing actor: {data}") 130 log.warning(f"ignoring object not attributed to signing actor: {data}")
131 continue 131 continue
132 try:
133 target_account = obj["object"]
134 except KeyError:
135 log.warning(f'ignoring invalid object, missing "object" key: {data}')
136 continue
137 if not self.apg.isLocalURL(target_account):
138 log.warning(f"ignoring unfollow request to non local actor: {data}")
139 continue
140 132
141 if type_ == "Follow": 133 if type_ == "Follow":
134 try:
135 target_account = obj["object"]
136 except KeyError:
137 log.warning(f'ignoring invalid object, missing "object" key: {data}')
138 continue
139 if not self.apg.isLocalURL(target_account):
140 log.warning(f"ignoring unfollow request to non local actor: {data}")
141 continue
142 await self.apg._p.unsubscribe( 142 await self.apg._p.unsubscribe(
143 client, 143 client,
144 account_jid, 144 account_jid,
145 node, 145 node,
146 sender=client.jid, 146 sender=client.jid,
147 ) 147 )
148 elif type_ == "Announce":
149 # we can use directly the Announce object, as only the "id" field is
150 # needed
151 await self.apg.newAPDeleteItem(client, None, node, obj)
148 else: 152 else:
149 log.warning(f"Unmanaged undo type: {type_!r}") 153 log.warning(f"Unmanaged undo type: {type_!r}")
150 154
151 async def handleFollowActivity( 155 async def handleFollowActivity(
152 self, 156 self,
241 node: Optional[str], 245 node: Optional[str],
242 ap_account: Optional[str], 246 ap_account: Optional[str],
243 ap_url: str, 247 ap_url: str,
244 signing_actor: str 248 signing_actor: str
245 ): 249 ):
246 digest = request.getHeader("digest")
247 if digest in self._seen_digest:
248 log.debug(f"Ignoring duplicated request (digest: {digest!r})")
249 return
250 self._seen_digest.append(digest)
251 if node is None: 250 if node is None:
252 node = self.apg._m.namespace 251 node = self.apg._m.namespace
253 client = await self.apg.getVirtualClient(signing_actor) 252 client = await self.apg.getVirtualClient(signing_actor)
254 objects = await self.apg.apGetList(data, "object") 253 objects = await self.apg.apGetList(data, "object")
255 for obj in objects: 254 for obj in objects:
256 await self.apg.newAPDeleteItem(client, account_jid, node, obj) 255 await self.apg.newAPDeleteItem(client, account_jid, node, obj)
257 256
258 async def handleCreateActivity( 257 async def handleNewAPItems(
259 self, 258 self,
260 request: "HTTPRequest", 259 request: "HTTPRequest",
261 data: dict, 260 data: dict,
262 account_jid: Optional[jid.JID], 261 account_jid: Optional[jid.JID],
263 node: Optional[str], 262 node: Optional[str],
264 ap_account: Optional[str], 263 signing_actor: str,
265 ap_url: str, 264 repeated: bool = False,
266 signing_actor: str
267 ): 265 ):
268 digest = request.getHeader("digest") 266 """Helper method to handle workflow for new AP items
269 if digest in self._seen_digest: 267
270 log.debug(f"Ignoring duplicated request (digest: {digest!r})") 268 accept globally the same parameter as for handleCreateActivity
271 return 269 @param repeated: if True, the item is an item republished from somewhere else
272 self._seen_digest.append(digest) 270 """
271 if "_repeated" in data:
272 log.error(
273 '"_repeated" field already present in given AP item, this should not '
274 f"happen. Ignoring object from {signing_actor}\n{data}"
275 )
276 raise exceptions.DataError("unexpected field in item")
273 if node is None: 277 if node is None:
274 node = self.apg._m.namespace 278 node = self.apg._m.namespace
275 client = await self.apg.getVirtualClient(signing_actor) 279 client = await self.apg.getVirtualClient(signing_actor)
276 objects = await self.apg.apGetList(data, "object") 280 objects = await self.apg.apGetList(data, "object")
277 for obj in objects: 281 for obj in objects:
278 sender = await self.apg.apGetSenderActor(obj) 282 sender = await self.apg.apGetSenderActor(obj)
279 if sender != signing_actor: 283 if repeated:
280 log.warning( 284 # we don't check sender when item is repeated, as it should be different
281 "Ignoring object not attributed to signing actor: {obj}" 285 # from post author in this case
282 ) 286 sender_jid = await self.apg.getJIDFromId(sender)
287 repeater_jid = await self.apg.getJIDFromId(signing_actor)
288
289 obj["_repeated"] = {
290 "by": repeater_jid.full(),
291 "at": data.get("published"),
292 "uri": uri.buildXMPPUri(
293 "pubsub",
294 path=sender_jid.full(),
295 node=self.apg._m.namespace,
296 item=obj["id"]
297 )
298 }
299 # we must use activity's id and targets, not the original item ones
300 for field in ("id", "to", "bto", "cc", "bcc"):
301 obj[field] = data.get(field)
283 else: 302 else:
284 await self.apg.newAPItem(client, account_jid, node, obj) 303 if sender != signing_actor:
304 log.warning(
305 "Ignoring object not attributed to signing actor: {obj}"
306 )
307 continue
308 await self.apg.newAPItem(client, account_jid, node, obj)
309
310 async def handleCreateActivity(
311 self,
312 request: "HTTPRequest",
313 data: dict,
314 account_jid: Optional[jid.JID],
315 node: Optional[str],
316 ap_account: Optional[str],
317 ap_url: str,
318 signing_actor: str
319 ):
320 await self.handleNewAPItems(request, data, account_jid, node, signing_actor)
321
322 async def handleAnnounceActivity(
323 self,
324 request: "HTTPRequest",
325 data: dict,
326 account_jid: Optional[jid.JID],
327 node: Optional[str],
328 ap_account: Optional[str],
329 ap_url: str,
330 signing_actor: str
331 ):
332 # we create a new item
333 await self.handleNewAPItems(
334 request,
335 data,
336 account_jid,
337 node,
338 signing_actor,
339 repeated=True
340 )
285 341
286 async def APActorRequest( 342 async def APActorRequest(
287 self, 343 self,
288 request: "HTTPRequest", 344 request: "HTTPRequest",
289 account_jid: jid.JID, 345 account_jid: jid.JID,
678 f"invalid signature: {e}" 734 f"invalid signature: {e}"
679 ) 735 )
680 request.finish() 736 request.finish()
681 return 737 return
682 738
739 request.setResponseCode(http.ACCEPTED)
740
741 digest = request.getHeader("digest")
742 if digest in self._seen_digest:
743 log.debug(f"Ignoring duplicated request (digest: {digest!r})")
744 request.finish()
745 return
746 self._seen_digest.append(digest)
747
683 # default response code, may be changed, e.g. in case of exception 748 # default response code, may be changed, e.g. in case of exception
684 request.setResponseCode(http.ACCEPTED)
685 try: 749 try:
686 return await self.APRequest(request, signing_actor) 750 return await self.APRequest(request, signing_actor)
687 except Exception as e: 751 except Exception as e:
688 self._onRequestError(failure.Failure(e), request) 752 self._onRequestError(failure.Failure(e), request)
689 753