comparison sat/plugins/plugin_exp_jingle_stream.py @ 3404:26a0af6e32c1

plugin XEP-0166: new trigger point + coroutines + helper methods: - a new `XEP-0166_initiate` async trigger point is available - `initate` is now a coroutine - `jingleSessionInit` in applications and transports handlers are now called using `utils.adDeferred` - new `getApplication` helper method, to retrieve application from its namespace - new `getContentData` helper method to retrieve application and its argument from content
author Goffi <goffi@goffi.org>
date Thu, 12 Nov 2020 14:53:15 +0100
parents 559a625a236b
children be6d91572633
comparison
equal deleted inserted replaced
3403:404d4b29de52 3404:26a0af6e32c1
15 # GNU Affero General Public License for more details. 15 # GNU Affero General Public License for more details.
16 16
17 # You should have received a copy of the GNU Affero General Public License 17 # You should have received a copy of the GNU Affero General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>. 18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 19
20 from sat.core.i18n import _, D_ 20 import errno
21 from sat.core.constants import Const as C 21 from zope import interface
22 from sat.core import exceptions
23 from sat.core.log import getLogger
24
25 log = getLogger(__name__)
26 from sat.tools import xml_tools
27 from sat.tools import stream
28 from twisted.words.xish import domish 22 from twisted.words.xish import domish
29 from twisted.words.protocols.jabber import jid 23 from twisted.words.protocols.jabber import jid
30 from twisted.internet import defer 24 from twisted.internet import defer
31 from twisted.internet import protocol 25 from twisted.internet import protocol
32 from twisted.internet import endpoints 26 from twisted.internet import endpoints
33 from twisted.internet import reactor 27 from twisted.internet import reactor
34 from twisted.internet import error 28 from twisted.internet import error
35 from twisted.internet import interfaces 29 from twisted.internet import interfaces
36 from zope import interface 30 from sat.core.i18n import _, D_
37 import errno 31 from sat.core.constants import Const as C
32 from sat.core import exceptions
33 from sat.core.log import getLogger
34 from sat.tools import xml_tools
35 from sat.tools import stream
36
37
38 log = getLogger(__name__)
38 39
39 NS_STREAM = "http://salut-a-toi.org/protocol/stream" 40 NS_STREAM = "http://salut-a-toi.org/protocol/stream"
40 SECURITY_LIMIT = 30 41 SECURITY_LIMIT = 30
41 START_PORT = 8888 42 START_PORT = 8888
42 43
191 192
192 # jingle callbacks 193 # jingle callbacks
193 194
194 def _streamOut(self, to_jid_s, profile_key): 195 def _streamOut(self, to_jid_s, profile_key):
195 client = self.host.getClient(profile_key) 196 client = self.host.getClient(profile_key)
196 return self.streamOut(client, jid.JID(to_jid_s)) 197 return defer.ensureDeferred(self.streamOut(client, jid.JID(to_jid_s)))
197 198
198 @defer.inlineCallbacks 199 async def streamOut(self, client, to_jid):
199 def streamOut(self, client, to_jid):
200 """send a stream 200 """send a stream
201 201
202 @param peer_jid(jid.JID): recipient 202 @param peer_jid(jid.JID): recipient
203 @return: an unique id to identify the transfer 203 @return: an unique id to identify the transfer
204 """ 204 """
205 port = START_PORT 205 port = START_PORT
206 factory = StreamFactory() 206 factory = StreamFactory()
207 while True: 207 while True:
208 endpoint = endpoints.TCP4ServerEndpoint(reactor, port) 208 endpoint = endpoints.TCP4ServerEndpoint(reactor, port)
209 try: 209 try:
210 port_listening = yield endpoint.listen(factory) 210 port_listening = await endpoint.listen(factory)
211 except error.CannotListenError as e: 211 except error.CannotListenError as e:
212 if e.socketError.errno == errno.EADDRINUSE: 212 if e.socketError.errno == errno.EADDRINUSE:
213 port += 1 213 port += 1
214 else: 214 else:
215 raise e 215 raise e
216 else: 216 else:
217 factory.port_listening = port_listening 217 factory.port_listening = port_listening
218 break 218 break
219 self._j.initiate( 219 # we don't want to wait for IQ result of initiate
220 defer.ensureDeferred(self._j.initiate(
220 client, 221 client,
221 to_jid, 222 to_jid,
222 [ 223 [
223 { 224 {
224 "app_ns": NS_STREAM, 225 "app_ns": NS_STREAM,
225 "senders": self._j.ROLE_INITIATOR, 226 "senders": self._j.ROLE_INITIATOR,
226 "app_kwargs": {"stream_object": factory}, 227 "app_kwargs": {"stream_object": factory},
227 } 228 }
228 ], 229 ],
229 ) 230 ))
230 defer.returnValue(str(port)) 231 return str(port)
231 232
232 def jingleSessionInit(self, client, session, content_name, stream_object): 233 def jingleSessionInit(self, client, session, content_name, stream_object):
233 content_data = session["contents"][content_name] 234 content_data = session["contents"][content_name]
234 application_data = content_data["application_data"] 235 application_data = content_data["application_data"]
235 assert "stream_object" not in application_data 236 assert "stream_object" not in application_data