comparison sat/core/xmpp.py @ 3541:888109774673

core: various changes and fixes to work with new storage and D-Bus bridge: - fixes coroutines handling in various places - fixes types which are not serialised by Tx DBus - XEP-0384: call storage methods in main thread in XEP: Python OMEMO's Promise use thread which prevent the use of AsyncIO loop. To work around that, callLater is used to launch storage method in main thread. This is a temporary workaround, as Python OMEMO should get rid of Promise implementation and threads soon.
author Goffi <goffi@goffi.org>
date Thu, 03 Jun 2021 15:21:43 +0200
parents f9a5b810f14d
children 813595f88612
comparison
equal deleted inserted replaced
3540:aa58451b77ba 3541:888109774673
712 mess_data["message"] or mess_data["subject"] 712 mess_data["message"] or mess_data["subject"]
713 or mess_data["extra"].get(C.MESS_KEY_ATTACHMENTS) 713 or mess_data["extra"].get(C.MESS_KEY_ATTACHMENTS)
714 or mess_data["type"] == C.MESS_TYPE_INFO 714 or mess_data["type"] == C.MESS_TYPE_INFO
715 ) 715 )
716 716
717 def messageAddToHistory(self, data): 717 async def messageAddToHistory(self, data):
718 """Store message into database (for local history) 718 """Store message into database (for local history)
719 719
720 @param data: message data dictionnary 720 @param data: message data dictionnary
721 @param client: profile's client 721 @param client: profile's client
722 """ 722 """
724 # we don't add groupchat message to history, as we get them back 724 # we don't add groupchat message to history, as we get them back
725 # and they will be added then 725 # and they will be added then
726 726
727 # we need a message to store 727 # we need a message to store
728 if self.isMessagePrintable(data): 728 if self.isMessagePrintable(data):
729 self.host_app.memory.addToHistory(self, data) 729 await self.host_app.memory.addToHistory(self, data)
730 else: 730 else:
731 log.warning( 731 log.warning(
732 "No message found" 732 "No message found"
733 ) # empty body should be managed by plugins before this point 733 ) # empty body should be managed by plugins before this point
734 return data 734 return data
874 # we want to be sure that we got the roster 874 # we want to be sure that we got the roster
875 return self.roster.got_roster 875 return self.roster.got_roster
876 876
877 def addPostXmlCallbacks(self, post_xml_treatments): 877 def addPostXmlCallbacks(self, post_xml_treatments):
878 post_xml_treatments.addCallback(self.messageProt.completeAttachments) 878 post_xml_treatments.addCallback(self.messageProt.completeAttachments)
879 post_xml_treatments.addCallback(self.messageAddToHistory) 879 post_xml_treatments.addCallback(
880 lambda ret: defer.ensureDeferred(self.messageAddToHistory(ret))
881 )
880 post_xml_treatments.addCallback(self.messageSendToBridge) 882 post_xml_treatments.addCallback(self.messageSendToBridge)
881 883
882 def send(self, obj): 884 def send(self, obj):
883 # original send method accept string 885 # original send method accept string
884 # but we restrict to domish.Element to make trigger treatments easier 886 # but we restrict to domish.Element to make trigger treatments easier
1059 else: 1061 else:
1060 return start_cb(self) 1062 return start_cb(self)
1061 1063
1062 def addPostXmlCallbacks(self, post_xml_treatments): 1064 def addPostXmlCallbacks(self, post_xml_treatments):
1063 if self.sendHistory: 1065 if self.sendHistory:
1064 post_xml_treatments.addCallback(self.messageAddToHistory) 1066 post_xml_treatments.addCallback(
1067 lambda ret: defer.ensureDeferred(self.messageAddToHistory(ret))
1068 )
1065 1069
1066 def getOwnerFromJid(self, to_jid: jid.JID) -> jid.JID: 1070 def getOwnerFromJid(self, to_jid: jid.JID) -> jid.JID:
1067 """Retrieve "owner" of a component resource from the destination jid of the request 1071 """Retrieve "owner" of a component resource from the destination jid of the request
1068 1072
1069 This method needs plugin XEP-0106 for unescaping, if you use it you must add the 1073 This method needs plugin XEP-0106 for unescaping, if you use it you must add the
1210 if not cont: 1214 if not cont:
1211 return 1215 return
1212 data = self.parseMessage(message_elt) 1216 data = self.parseMessage(message_elt)
1213 post_treat.addCallback(self.completeAttachments) 1217 post_treat.addCallback(self.completeAttachments)
1214 post_treat.addCallback(self.skipEmptyMessage) 1218 post_treat.addCallback(self.skipEmptyMessage)
1215 post_treat.addCallback(self.addToHistory) 1219 post_treat.addCallback(
1220 lambda ret: defer.ensureDeferred(self.addToHistory(ret))
1221 )
1216 post_treat.addCallback(self.bridgeSignal, data) 1222 post_treat.addCallback(self.bridgeSignal, data)
1217 post_treat.addErrback(self.cancelErrorTrap) 1223 post_treat.addErrback(self.cancelErrorTrap)
1218 post_treat.callback(data) 1224 post_treat.callback(data)
1219 1225
1220 def onMessage(self, message_elt): 1226 def onMessage(self, message_elt):
1251 def skipEmptyMessage(self, data): 1257 def skipEmptyMessage(self, data):
1252 if not data["message"] and not data["extra"] and not data["subject"]: 1258 if not data["message"] and not data["extra"] and not data["subject"]:
1253 raise failure.Failure(exceptions.CancelError("Cancelled empty message")) 1259 raise failure.Failure(exceptions.CancelError("Cancelled empty message"))
1254 return data 1260 return data
1255 1261
1256 def addToHistory(self, data): 1262 async def addToHistory(self, data):
1257 if data.pop("history", None) == C.HISTORY_SKIP: 1263 if data.pop("history", None) == C.HISTORY_SKIP:
1258 log.debug("history is skipped as requested") 1264 log.debug("history is skipped as requested")
1259 data["extra"]["history"] = C.HISTORY_SKIP 1265 data["extra"]["history"] = C.HISTORY_SKIP
1260 else: 1266 else:
1261 # we need a message to store 1267 # we need a message to store
1262 if self.parent.isMessagePrintable(data): 1268 if self.parent.isMessagePrintable(data):
1263 return self.host.memory.addToHistory(self.parent, data) 1269 return await self.host.memory.addToHistory(self.parent, data)
1264 else: 1270 else:
1265 log.debug("not storing empty message to history: {data}" 1271 log.debug("not storing empty message to history: {data}"
1266 .format(data=data)) 1272 .format(data=data))
1267 1273
1268 def bridgeSignal(self, __, data): 1274 def bridgeSignal(self, __, data):
1476 except KeyError: 1482 except KeyError:
1477 pass # no previous item registration (or it's been cleared) 1483 pass # no previous item registration (or it's been cleared)
1478 self._jids[entity] = item 1484 self._jids[entity] = item
1479 self._registerItem(item) 1485 self._registerItem(item)
1480 self.host.bridge.newContact( 1486 self.host.bridge.newContact(
1481 entity.full(), self.getAttributes(item), item.groups, self.parent.profile 1487 entity.full(), self.getAttributes(item), list(item.groups),
1488 self.parent.profile
1482 ) 1489 )
1483 1490
1484 def removeReceived(self, request): 1491 def removeReceived(self, request):
1485 entity = request.item.entity 1492 entity = request.item.entity
1486 log.info(_("removing {entity} from roster").format(entity=entity.full())) 1493 log.info(_("removing {entity} from roster").format(entity=entity.full()))