comparison src/plugins/plugin_xep_0095.py @ 2489:e2a7bb875957

plugin pipe/stream, file transfert: refactoring and improvments: this is a big patch as things had to be changed at the same time. - changed methods using profile argument to use client instead - move SatFile in a new tools.stream module, has it should be part of core, not a plugin - new IStreamProducer interface, to handler starting a pull producer - new FileStreamObject which create a stream producer/consumer from a SatFile - plugin pipe is no more using unix named pipe, as it complicate the thing, special care need to be taken to not block, and it's generally not necessary. Instead a socket is now used, so the plugin has been renomed to jingle stream. - bad connection/error should be better handler in jingle stream plugin, and code should not block anymore - jp pipe commands have been updated accordingly fix bug 237
author Goffi <goffi@goffi.org>
date Thu, 08 Feb 2018 00:37:42 +0100
parents 0046283a285d
children
comparison
equal deleted inserted replaced
2488:78c7992a26ed 2489:e2a7bb875957
70 try: 70 try:
71 del self.si_profiles[si_profile] 71 del self.si_profiles[si_profile]
72 except KeyError: 72 except KeyError:
73 log.error(u"Trying to unregister SI profile [{}] which was not registered".format(si_profile)) 73 log.error(u"Trying to unregister SI profile [{}] which was not registered".format(si_profile))
74 74
75 def streamInit(self, iq_elt, profile): 75 def streamInit(self, iq_elt, client):
76 """This method is called on stream initiation (XEP-0095 #3.2) 76 """This method is called on stream initiation (XEP-0095 #3.2)
77 77
78 @param iq_elt: IQ element 78 @param iq_elt: IQ element
79 @param profile: %(doc_profile)s""" 79 """
80 log.info(_("XEP-0095 Stream initiation")) 80 log.info(_("XEP-0095 Stream initiation"))
81 iq_elt.handled = True 81 iq_elt.handled = True
82 si_elt = iq_elt.elements(NS_SI, 'si').next() 82 si_elt = iq_elt.elements(NS_SI, 'si').next()
83 si_id = si_elt['id'] 83 si_id = si_elt['id']
84 si_mime_type = iq_elt.getAttribute('mime-type', 'application/octet-stream') 84 si_mime_type = iq_elt.getAttribute('mime-type', 'application/octet-stream')
85 si_profile = si_elt['profile'] 85 si_profile = si_elt['profile']
86 si_profile_key = si_profile[len(SI_PROFILE_HEADER):] if si_profile.startswith(SI_PROFILE_HEADER) else si_profile 86 si_profile_key = si_profile[len(SI_PROFILE_HEADER):] if si_profile.startswith(SI_PROFILE_HEADER) else si_profile
87 if si_profile_key in self.si_profiles: 87 if si_profile_key in self.si_profiles:
88 #We know this SI profile, we call the callback 88 #We know this SI profile, we call the callback
89 self.si_profiles[si_profile_key](iq_elt, si_id, si_mime_type, si_elt, profile) 89 self.si_profiles[si_profile_key](client, iq_elt, si_id, si_mime_type, si_elt)
90 else: 90 else:
91 #We don't know this profile, we send an error 91 #We don't know this profile, we send an error
92 self.sendError(iq_elt, 'bad-profile', profile) 92 self.sendError(client, iq_elt, 'bad-profile')
93 93
94 def sendError(self, request, condition, profile): 94 def sendError(self, client, request, condition):
95 """Send IQ error as a result 95 """Send IQ error as a result
96 96
97 @param request(domish.Element): original IQ request 97 @param request(domish.Element): original IQ request
98 @param condition(str): error condition 98 @param condition(str): error condition
99 @param profile: %(doc_profile)s
100 """ 99 """
101 client = self.host.getClient(profile)
102 if condition in SI_ERROR_CONDITIONS: 100 if condition in SI_ERROR_CONDITIONS:
103 si_condition = condition 101 si_condition = condition
104 condition = 'bad-request' 102 condition = 'bad-request'
105 else: 103 else:
106 si_condition = None 104 si_condition = None
109 if si_condition is not None: 107 if si_condition is not None:
110 iq_error_elt.error.addElement((NS_SI, si_condition)) 108 iq_error_elt.error.addElement((NS_SI, si_condition))
111 109
112 client.send(iq_error_elt) 110 client.send(iq_error_elt)
113 111
114 def acceptStream(self, iq_elt, feature_elt, misc_elts=None, profile=C.PROF_KEY_NONE): 112 def acceptStream(self, client, iq_elt, feature_elt, misc_elts=None):
115 """Send the accept stream initiation answer 113 """Send the accept stream initiation answer
116 114
117 @param iq_elt(domish.Element): initial SI request 115 @param iq_elt(domish.Element): initial SI request
118 @param feature_elt(domish.Element): 'feature' element containing stream method to use 116 @param feature_elt(domish.Element): 'feature' element containing stream method to use
119 @param misc_elts(list[domish.Element]): list of elements to add 117 @param misc_elts(list[domish.Element]): list of elements to add
120 @param profile: %(doc_profile)s
121 """ 118 """
122 log.info(_("sending stream initiation accept answer")) 119 log.info(_("sending stream initiation accept answer"))
123 if misc_elts is None: 120 if misc_elts is None:
124 misc_elts = [] 121 misc_elts = []
125 client = self.host.getClient(profile)
126 result_elt = xmlstream.toResponse(iq_elt, 'result') 122 result_elt = xmlstream.toResponse(iq_elt, 'result')
127 si_elt = result_elt.addElement((NS_SI, 'si')) 123 si_elt = result_elt.addElement((NS_SI, 'si'))
128 si_elt.addChild(feature_elt) 124 si_elt.addChild(feature_elt)
129 for elt in misc_elts: 125 for elt in misc_elts:
130 si_elt.addChild(elt) 126 si_elt.addChild(elt)
137 log.warning(u"No <si/> element found in result while expected") 133 log.warning(u"No <si/> element found in result while expected")
138 raise exceptions.DataError 134 raise exceptions.DataError
139 return (iq_elt, si_elt) 135 return (iq_elt, si_elt)
140 136
141 137
142 def proposeStream(self, to_jid, si_profile, feature_elt, misc_elts, mime_type='application/octet-stream', profile=C.PROF_KEY_NONE): 138 def proposeStream(self, client, to_jid, si_profile, feature_elt, misc_elts, mime_type='application/octet-stream'):
143 """Propose a stream initiation 139 """Propose a stream initiation
144 140
145 @param to_jid(jid.JID): recipient 141 @param to_jid(jid.JID): recipient
146 @param si_profile(unicode): Stream initiation profile (XEP-0095) 142 @param si_profile(unicode): Stream initiation profile (XEP-0095)
147 @param feature_elt(domish.Element): feature element, according to XEP-0020 143 @param feature_elt(domish.Element): feature element, according to XEP-0020
148 @param misc_elts(list[domish.Element]): list of elements to add 144 @param misc_elts(list[domish.Element]): list of elements to add
149 @param mime_type(unicode): stream mime type 145 @param mime_type(unicode): stream mime type
150 @param profile: %(doc_profile)s
151 @return (tuple): tuple with: 146 @return (tuple): tuple with:
152 - session id (unicode) 147 - session id (unicode)
153 - (D(domish_elt, domish_elt): offer deferred which returl a tuple 148 - (D(domish_elt, domish_elt): offer deferred which returl a tuple
154 with iq_elt and si_elt 149 with iq_elt and si_elt
155 """ 150 """
156 client = self.host.getClient(profile)
157 offer = client.IQ() 151 offer = client.IQ()
158 sid = str(uuid.uuid4()) 152 sid = str(uuid.uuid4())
159 log.debug(_(u"Stream Session ID: %s") % offer["id"]) 153 log.debug(_(u"Stream Session ID: %s") % offer["id"])
160 154
161 offer["from"] = client.jid.full() 155 offer["from"] = client.jid.full()
179 def __init__(self, plugin_parent): 173 def __init__(self, plugin_parent):
180 self.plugin_parent = plugin_parent 174 self.plugin_parent = plugin_parent
181 self.host = plugin_parent.host 175 self.host = plugin_parent.host
182 176
183 def connectionInitialized(self): 177 def connectionInitialized(self):
184 self.xmlstream.addObserver(SI_REQUEST, self.plugin_parent.streamInit, profile=self.parent.profile) 178 self.xmlstream.addObserver(SI_REQUEST, self.plugin_parent.streamInit, client=self.parent)
185 179
186 def getDiscoInfo(self, requestor, target, nodeIdentifier=''): 180 def getDiscoInfo(self, requestor, target, nodeIdentifier=''):
187 return [disco.DiscoFeature(NS_SI)] + [disco.DiscoFeature(u"http://jabber.org/protocol/si/profile/{}".format(profile_name)) for profile_name in self.plugin_parent.si_profiles] 181 return [disco.DiscoFeature(NS_SI)] + [disco.DiscoFeature(u"http://jabber.org/protocol/si/profile/{}".format(profile_name)) for profile_name in self.plugin_parent.si_profiles]
188 182
189 def getDiscoItems(self, requestor, target, nodeIdentifier=''): 183 def getDiscoItems(self, requestor, target, nodeIdentifier=''):