Mercurial > libervia-backend
comparison sat/plugins/plugin_xep_0313.py @ 3541:888109774673
core: various changes and fixes to work with new storage and D-Bus bridge:
- fixes coroutines handling in various places
- fixes types which are not serialised by Tx DBus
- XEP-0384: call storage methods in main thread in XEP: Python OMEMO's Promise use thread
which prevent the use of AsyncIO loop. To work around that, callLater is used to launch
storage method in main thread. This is a temporary workaround, as Python OMEMO should
get rid of Promise implementation and threads soon.
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 03 Jun 2021 15:21:43 +0200 |
parents | be6d91572633 |
children | 813595f88612 |
comparison
equal
deleted
inserted
replaced
3540:aa58451b77ba | 3541:888109774673 |
---|---|
73 host.bridge.addMethod( | 73 host.bridge.addMethod( |
74 "MAMGet", ".plugin", in_sign='sss', | 74 "MAMGet", ".plugin", in_sign='sss', |
75 out_sign='(a(sdssa{ss}a{ss}ss)ss)', method=self._getArchives, | 75 out_sign='(a(sdssa{ss}a{ss}ss)ss)', method=self._getArchives, |
76 async_=True) | 76 async_=True) |
77 | 77 |
78 @defer.inlineCallbacks | 78 async def resume(self, client): |
79 def resume(self, client): | |
80 """Retrieve one2one messages received since the last we have in local storage""" | 79 """Retrieve one2one messages received since the last we have in local storage""" |
81 stanza_id_data = yield self.host.memory.storage.getPrivates( | 80 stanza_id_data = await self.host.memory.storage.getPrivates( |
82 mam.NS_MAM, [KEY_LAST_STANZA_ID], profile=client.profile) | 81 mam.NS_MAM, [KEY_LAST_STANZA_ID], profile=client.profile) |
83 stanza_id = stanza_id_data.get(KEY_LAST_STANZA_ID) | 82 stanza_id = stanza_id_data.get(KEY_LAST_STANZA_ID) |
84 if stanza_id is None: | 83 if stanza_id is None: |
85 log.info("can't retrieve last stanza ID, checking history") | 84 log.info("can't retrieve last stanza ID, checking history") |
86 last_mess = yield self.host.memory.historyGet( | 85 last_mess = await self.host.memory.historyGet( |
87 None, None, limit=1, filters={'not_types': C.MESS_TYPE_GROUPCHAT, | 86 None, None, limit=1, filters={'not_types': C.MESS_TYPE_GROUPCHAT, |
88 'last_stanza_id': True}, | 87 'last_stanza_id': True}, |
89 profile=client.profile) | 88 profile=client.profile) |
90 if not last_mess: | 89 if not last_mess: |
91 log.info(_("It seems that we have no MAM history yet")) | 90 log.info(_("It seems that we have no MAM history yet")) |
96 rsm_req = rsm.RSMRequest(max_=100, after=stanza_id) | 95 rsm_req = rsm.RSMRequest(max_=100, after=stanza_id) |
97 mam_req = mam.MAMRequest(rsm_=rsm_req) | 96 mam_req = mam.MAMRequest(rsm_=rsm_req) |
98 complete = False | 97 complete = False |
99 count = 0 | 98 count = 0 |
100 while not complete: | 99 while not complete: |
101 mam_data = yield self.getArchives(client, mam_req, | 100 mam_data = await self.getArchives(client, mam_req, |
102 service=client.jid.userhostJID()) | 101 service=client.jid.userhostJID()) |
103 elt_list, rsm_response, mam_response = mam_data | 102 elt_list, rsm_response, mam_response = mam_data |
104 complete = mam_response["complete"] | 103 complete = mam_response["complete"] |
105 # we update MAM request for next iteration | 104 # we update MAM request for next iteration |
106 mam_req.rsm.after = rsm_response.last | 105 mam_req.rsm.after = rsm_response.last |
139 from_jid=from_jid.full(), xml=mess_elt.toXml())) | 138 from_jid=from_jid.full(), xml=mess_elt.toXml())) |
140 continue | 139 continue |
141 # adding message to history | 140 # adding message to history |
142 mess_data = client.messageProt.parseMessage(fwd_message_elt) | 141 mess_data = client.messageProt.parseMessage(fwd_message_elt) |
143 try: | 142 try: |
144 yield client.messageProt.addToHistory(mess_data) | 143 await client.messageProt.addToHistory(mess_data) |
145 except exceptions.CancelError as e: | 144 except exceptions.CancelError as e: |
146 log.warning( | 145 log.warning( |
147 "message has not been added to history: {e}".format(e=e)) | 146 "message has not been added to history: {e}".format(e=e)) |
148 except Exception as e: | 147 except Exception as e: |
149 log.error( | 148 log.error( |
154 log.info(_("We have received no message while offline")) | 153 log.info(_("We have received no message while offline")) |
155 else: | 154 else: |
156 log.info(_("We have received {num_mess} message(s) while offline.") | 155 log.info(_("We have received {num_mess} message(s) while offline.") |
157 .format(num_mess=count)) | 156 .format(num_mess=count)) |
158 | 157 |
159 def profileConnected(self, client): | 158 async def profileConnected(self, client): |
160 return self.resume(client) | 159 await self.resume(client) |
161 | 160 |
162 def getHandler(self, client): | 161 def getHandler(self, client): |
163 mam_client = client._mam = SatMAMClient(self) | 162 mam_client = client._mam = SatMAMClient(self) |
164 return mam_client | 163 return mam_client |
165 | 164 |