Mercurial > libervia-backend
comparison sat/core/xmpp.py @ 3541:888109774673
core: various changes and fixes to work with new storage and D-Bus bridge:
- fixes coroutines handling in various places
- fixes types which are not serialised by Tx DBus
- XEP-0384: call storage methods in main thread in XEP: Python OMEMO's Promise use thread
which prevent the use of AsyncIO loop. To work around that, callLater is used to launch
storage method in main thread. This is a temporary workaround, as Python OMEMO should
get rid of Promise implementation and threads soon.
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 03 Jun 2021 15:21:43 +0200 |
parents | f9a5b810f14d |
children | 813595f88612 |
comparison
equal
deleted
inserted
replaced
3540:aa58451b77ba | 3541:888109774673 |
---|---|
712 mess_data["message"] or mess_data["subject"] | 712 mess_data["message"] or mess_data["subject"] |
713 or mess_data["extra"].get(C.MESS_KEY_ATTACHMENTS) | 713 or mess_data["extra"].get(C.MESS_KEY_ATTACHMENTS) |
714 or mess_data["type"] == C.MESS_TYPE_INFO | 714 or mess_data["type"] == C.MESS_TYPE_INFO |
715 ) | 715 ) |
716 | 716 |
717 def messageAddToHistory(self, data): | 717 async def messageAddToHistory(self, data): |
718 """Store message into database (for local history) | 718 """Store message into database (for local history) |
719 | 719 |
720 @param data: message data dictionnary | 720 @param data: message data dictionnary |
721 @param client: profile's client | 721 @param client: profile's client |
722 """ | 722 """ |
724 # we don't add groupchat message to history, as we get them back | 724 # we don't add groupchat message to history, as we get them back |
725 # and they will be added then | 725 # and they will be added then |
726 | 726 |
727 # we need a message to store | 727 # we need a message to store |
728 if self.isMessagePrintable(data): | 728 if self.isMessagePrintable(data): |
729 self.host_app.memory.addToHistory(self, data) | 729 await self.host_app.memory.addToHistory(self, data) |
730 else: | 730 else: |
731 log.warning( | 731 log.warning( |
732 "No message found" | 732 "No message found" |
733 ) # empty body should be managed by plugins before this point | 733 ) # empty body should be managed by plugins before this point |
734 return data | 734 return data |
874 # we want to be sure that we got the roster | 874 # we want to be sure that we got the roster |
875 return self.roster.got_roster | 875 return self.roster.got_roster |
876 | 876 |
877 def addPostXmlCallbacks(self, post_xml_treatments): | 877 def addPostXmlCallbacks(self, post_xml_treatments): |
878 post_xml_treatments.addCallback(self.messageProt.completeAttachments) | 878 post_xml_treatments.addCallback(self.messageProt.completeAttachments) |
879 post_xml_treatments.addCallback(self.messageAddToHistory) | 879 post_xml_treatments.addCallback( |
880 lambda ret: defer.ensureDeferred(self.messageAddToHistory(ret)) | |
881 ) | |
880 post_xml_treatments.addCallback(self.messageSendToBridge) | 882 post_xml_treatments.addCallback(self.messageSendToBridge) |
881 | 883 |
882 def send(self, obj): | 884 def send(self, obj): |
883 # original send method accept string | 885 # original send method accept string |
884 # but we restrict to domish.Element to make trigger treatments easier | 886 # but we restrict to domish.Element to make trigger treatments easier |
1059 else: | 1061 else: |
1060 return start_cb(self) | 1062 return start_cb(self) |
1061 | 1063 |
1062 def addPostXmlCallbacks(self, post_xml_treatments): | 1064 def addPostXmlCallbacks(self, post_xml_treatments): |
1063 if self.sendHistory: | 1065 if self.sendHistory: |
1064 post_xml_treatments.addCallback(self.messageAddToHistory) | 1066 post_xml_treatments.addCallback( |
1067 lambda ret: defer.ensureDeferred(self.messageAddToHistory(ret)) | |
1068 ) | |
1065 | 1069 |
1066 def getOwnerFromJid(self, to_jid: jid.JID) -> jid.JID: | 1070 def getOwnerFromJid(self, to_jid: jid.JID) -> jid.JID: |
1067 """Retrieve "owner" of a component resource from the destination jid of the request | 1071 """Retrieve "owner" of a component resource from the destination jid of the request |
1068 | 1072 |
1069 This method needs plugin XEP-0106 for unescaping, if you use it you must add the | 1073 This method needs plugin XEP-0106 for unescaping, if you use it you must add the |
1210 if not cont: | 1214 if not cont: |
1211 return | 1215 return |
1212 data = self.parseMessage(message_elt) | 1216 data = self.parseMessage(message_elt) |
1213 post_treat.addCallback(self.completeAttachments) | 1217 post_treat.addCallback(self.completeAttachments) |
1214 post_treat.addCallback(self.skipEmptyMessage) | 1218 post_treat.addCallback(self.skipEmptyMessage) |
1215 post_treat.addCallback(self.addToHistory) | 1219 post_treat.addCallback( |
1220 lambda ret: defer.ensureDeferred(self.addToHistory(ret)) | |
1221 ) | |
1216 post_treat.addCallback(self.bridgeSignal, data) | 1222 post_treat.addCallback(self.bridgeSignal, data) |
1217 post_treat.addErrback(self.cancelErrorTrap) | 1223 post_treat.addErrback(self.cancelErrorTrap) |
1218 post_treat.callback(data) | 1224 post_treat.callback(data) |
1219 | 1225 |
1220 def onMessage(self, message_elt): | 1226 def onMessage(self, message_elt): |
1251 def skipEmptyMessage(self, data): | 1257 def skipEmptyMessage(self, data): |
1252 if not data["message"] and not data["extra"] and not data["subject"]: | 1258 if not data["message"] and not data["extra"] and not data["subject"]: |
1253 raise failure.Failure(exceptions.CancelError("Cancelled empty message")) | 1259 raise failure.Failure(exceptions.CancelError("Cancelled empty message")) |
1254 return data | 1260 return data |
1255 | 1261 |
1256 def addToHistory(self, data): | 1262 async def addToHistory(self, data): |
1257 if data.pop("history", None) == C.HISTORY_SKIP: | 1263 if data.pop("history", None) == C.HISTORY_SKIP: |
1258 log.debug("history is skipped as requested") | 1264 log.debug("history is skipped as requested") |
1259 data["extra"]["history"] = C.HISTORY_SKIP | 1265 data["extra"]["history"] = C.HISTORY_SKIP |
1260 else: | 1266 else: |
1261 # we need a message to store | 1267 # we need a message to store |
1262 if self.parent.isMessagePrintable(data): | 1268 if self.parent.isMessagePrintable(data): |
1263 return self.host.memory.addToHistory(self.parent, data) | 1269 return await self.host.memory.addToHistory(self.parent, data) |
1264 else: | 1270 else: |
1265 log.debug("not storing empty message to history: {data}" | 1271 log.debug("not storing empty message to history: {data}" |
1266 .format(data=data)) | 1272 .format(data=data)) |
1267 | 1273 |
1268 def bridgeSignal(self, __, data): | 1274 def bridgeSignal(self, __, data): |
1476 except KeyError: | 1482 except KeyError: |
1477 pass # no previous item registration (or it's been cleared) | 1483 pass # no previous item registration (or it's been cleared) |
1478 self._jids[entity] = item | 1484 self._jids[entity] = item |
1479 self._registerItem(item) | 1485 self._registerItem(item) |
1480 self.host.bridge.newContact( | 1486 self.host.bridge.newContact( |
1481 entity.full(), self.getAttributes(item), item.groups, self.parent.profile | 1487 entity.full(), self.getAttributes(item), list(item.groups), |
1488 self.parent.profile | |
1482 ) | 1489 ) |
1483 | 1490 |
1484 def removeReceived(self, request): | 1491 def removeReceived(self, request): |
1485 entity = request.item.entity | 1492 entity = request.item.entity |
1486 log.info(_("removing {entity} from roster").format(entity=entity.full())) | 1493 log.info(_("removing {entity} from roster").format(entity=entity.full())) |