comparison sat/plugins/plugin_xep_0313.py @ 3541:888109774673

core: various changes and fixes to work with new storage and D-Bus bridge: - fixes coroutines handling in various places - fixes types which are not serialised by Tx DBus - XEP-0384: call storage methods in main thread in XEP: Python OMEMO's Promise use thread which prevent the use of AsyncIO loop. To work around that, callLater is used to launch storage method in main thread. This is a temporary workaround, as Python OMEMO should get rid of Promise implementation and threads soon.
author Goffi <goffi@goffi.org>
date Thu, 03 Jun 2021 15:21:43 +0200
parents be6d91572633
children 813595f88612
comparison
equal deleted inserted replaced
3540:aa58451b77ba 3541:888109774673
73 host.bridge.addMethod( 73 host.bridge.addMethod(
74 "MAMGet", ".plugin", in_sign='sss', 74 "MAMGet", ".plugin", in_sign='sss',
75 out_sign='(a(sdssa{ss}a{ss}ss)ss)', method=self._getArchives, 75 out_sign='(a(sdssa{ss}a{ss}ss)ss)', method=self._getArchives,
76 async_=True) 76 async_=True)
77 77
78 @defer.inlineCallbacks 78 async def resume(self, client):
79 def resume(self, client):
80 """Retrieve one2one messages received since the last we have in local storage""" 79 """Retrieve one2one messages received since the last we have in local storage"""
81 stanza_id_data = yield self.host.memory.storage.getPrivates( 80 stanza_id_data = await self.host.memory.storage.getPrivates(
82 mam.NS_MAM, [KEY_LAST_STANZA_ID], profile=client.profile) 81 mam.NS_MAM, [KEY_LAST_STANZA_ID], profile=client.profile)
83 stanza_id = stanza_id_data.get(KEY_LAST_STANZA_ID) 82 stanza_id = stanza_id_data.get(KEY_LAST_STANZA_ID)
84 if stanza_id is None: 83 if stanza_id is None:
85 log.info("can't retrieve last stanza ID, checking history") 84 log.info("can't retrieve last stanza ID, checking history")
86 last_mess = yield self.host.memory.historyGet( 85 last_mess = await self.host.memory.historyGet(
87 None, None, limit=1, filters={'not_types': C.MESS_TYPE_GROUPCHAT, 86 None, None, limit=1, filters={'not_types': C.MESS_TYPE_GROUPCHAT,
88 'last_stanza_id': True}, 87 'last_stanza_id': True},
89 profile=client.profile) 88 profile=client.profile)
90 if not last_mess: 89 if not last_mess:
91 log.info(_("It seems that we have no MAM history yet")) 90 log.info(_("It seems that we have no MAM history yet"))
96 rsm_req = rsm.RSMRequest(max_=100, after=stanza_id) 95 rsm_req = rsm.RSMRequest(max_=100, after=stanza_id)
97 mam_req = mam.MAMRequest(rsm_=rsm_req) 96 mam_req = mam.MAMRequest(rsm_=rsm_req)
98 complete = False 97 complete = False
99 count = 0 98 count = 0
100 while not complete: 99 while not complete:
101 mam_data = yield self.getArchives(client, mam_req, 100 mam_data = await self.getArchives(client, mam_req,
102 service=client.jid.userhostJID()) 101 service=client.jid.userhostJID())
103 elt_list, rsm_response, mam_response = mam_data 102 elt_list, rsm_response, mam_response = mam_data
104 complete = mam_response["complete"] 103 complete = mam_response["complete"]
105 # we update MAM request for next iteration 104 # we update MAM request for next iteration
106 mam_req.rsm.after = rsm_response.last 105 mam_req.rsm.after = rsm_response.last
139 from_jid=from_jid.full(), xml=mess_elt.toXml())) 138 from_jid=from_jid.full(), xml=mess_elt.toXml()))
140 continue 139 continue
141 # adding message to history 140 # adding message to history
142 mess_data = client.messageProt.parseMessage(fwd_message_elt) 141 mess_data = client.messageProt.parseMessage(fwd_message_elt)
143 try: 142 try:
144 yield client.messageProt.addToHistory(mess_data) 143 await client.messageProt.addToHistory(mess_data)
145 except exceptions.CancelError as e: 144 except exceptions.CancelError as e:
146 log.warning( 145 log.warning(
147 "message has not been added to history: {e}".format(e=e)) 146 "message has not been added to history: {e}".format(e=e))
148 except Exception as e: 147 except Exception as e:
149 log.error( 148 log.error(
154 log.info(_("We have received no message while offline")) 153 log.info(_("We have received no message while offline"))
155 else: 154 else:
156 log.info(_("We have received {num_mess} message(s) while offline.") 155 log.info(_("We have received {num_mess} message(s) while offline.")
157 .format(num_mess=count)) 156 .format(num_mess=count))
158 157
159 def profileConnected(self, client): 158 async def profileConnected(self, client):
160 return self.resume(client) 159 await self.resume(client)
161 160
162 def getHandler(self, client): 161 def getHandler(self, client):
163 mam_client = client._mam = SatMAMClient(self) 162 mam_client = client._mam = SatMAMClient(self)
164 return mam_client 163 return mam_client
165 164