Mercurial > libervia-backend
comparison src/plugins/plugin_xep_0095.py @ 2489:e2a7bb875957
plugin pipe/stream, file transfert: refactoring and improvments:
this is a big patch as things had to be changed at the same time.
- changed methods using profile argument to use client instead
- move SatFile in a new tools.stream module, has it should be part of core, not a plugin
- new IStreamProducer interface, to handler starting a pull producer
- new FileStreamObject which create a stream producer/consumer from a SatFile
- plugin pipe is no more using unix named pipe, as it complicate the thing,
special care need to be taken to not block, and it's generally not necessary.
Instead a socket is now used, so the plugin has been renomed to jingle stream.
- bad connection/error should be better handler in jingle stream plugin, and code should not block anymore
- jp pipe commands have been updated accordingly
fix bug 237
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 08 Feb 2018 00:37:42 +0100 |
parents | 0046283a285d |
children |
comparison
equal
deleted
inserted
replaced
2488:78c7992a26ed | 2489:e2a7bb875957 |
---|---|
70 try: | 70 try: |
71 del self.si_profiles[si_profile] | 71 del self.si_profiles[si_profile] |
72 except KeyError: | 72 except KeyError: |
73 log.error(u"Trying to unregister SI profile [{}] which was not registered".format(si_profile)) | 73 log.error(u"Trying to unregister SI profile [{}] which was not registered".format(si_profile)) |
74 | 74 |
75 def streamInit(self, iq_elt, profile): | 75 def streamInit(self, iq_elt, client): |
76 """This method is called on stream initiation (XEP-0095 #3.2) | 76 """This method is called on stream initiation (XEP-0095 #3.2) |
77 | 77 |
78 @param iq_elt: IQ element | 78 @param iq_elt: IQ element |
79 @param profile: %(doc_profile)s""" | 79 """ |
80 log.info(_("XEP-0095 Stream initiation")) | 80 log.info(_("XEP-0095 Stream initiation")) |
81 iq_elt.handled = True | 81 iq_elt.handled = True |
82 si_elt = iq_elt.elements(NS_SI, 'si').next() | 82 si_elt = iq_elt.elements(NS_SI, 'si').next() |
83 si_id = si_elt['id'] | 83 si_id = si_elt['id'] |
84 si_mime_type = iq_elt.getAttribute('mime-type', 'application/octet-stream') | 84 si_mime_type = iq_elt.getAttribute('mime-type', 'application/octet-stream') |
85 si_profile = si_elt['profile'] | 85 si_profile = si_elt['profile'] |
86 si_profile_key = si_profile[len(SI_PROFILE_HEADER):] if si_profile.startswith(SI_PROFILE_HEADER) else si_profile | 86 si_profile_key = si_profile[len(SI_PROFILE_HEADER):] if si_profile.startswith(SI_PROFILE_HEADER) else si_profile |
87 if si_profile_key in self.si_profiles: | 87 if si_profile_key in self.si_profiles: |
88 #We know this SI profile, we call the callback | 88 #We know this SI profile, we call the callback |
89 self.si_profiles[si_profile_key](iq_elt, si_id, si_mime_type, si_elt, profile) | 89 self.si_profiles[si_profile_key](client, iq_elt, si_id, si_mime_type, si_elt) |
90 else: | 90 else: |
91 #We don't know this profile, we send an error | 91 #We don't know this profile, we send an error |
92 self.sendError(iq_elt, 'bad-profile', profile) | 92 self.sendError(client, iq_elt, 'bad-profile') |
93 | 93 |
94 def sendError(self, request, condition, profile): | 94 def sendError(self, client, request, condition): |
95 """Send IQ error as a result | 95 """Send IQ error as a result |
96 | 96 |
97 @param request(domish.Element): original IQ request | 97 @param request(domish.Element): original IQ request |
98 @param condition(str): error condition | 98 @param condition(str): error condition |
99 @param profile: %(doc_profile)s | |
100 """ | 99 """ |
101 client = self.host.getClient(profile) | |
102 if condition in SI_ERROR_CONDITIONS: | 100 if condition in SI_ERROR_CONDITIONS: |
103 si_condition = condition | 101 si_condition = condition |
104 condition = 'bad-request' | 102 condition = 'bad-request' |
105 else: | 103 else: |
106 si_condition = None | 104 si_condition = None |
109 if si_condition is not None: | 107 if si_condition is not None: |
110 iq_error_elt.error.addElement((NS_SI, si_condition)) | 108 iq_error_elt.error.addElement((NS_SI, si_condition)) |
111 | 109 |
112 client.send(iq_error_elt) | 110 client.send(iq_error_elt) |
113 | 111 |
114 def acceptStream(self, iq_elt, feature_elt, misc_elts=None, profile=C.PROF_KEY_NONE): | 112 def acceptStream(self, client, iq_elt, feature_elt, misc_elts=None): |
115 """Send the accept stream initiation answer | 113 """Send the accept stream initiation answer |
116 | 114 |
117 @param iq_elt(domish.Element): initial SI request | 115 @param iq_elt(domish.Element): initial SI request |
118 @param feature_elt(domish.Element): 'feature' element containing stream method to use | 116 @param feature_elt(domish.Element): 'feature' element containing stream method to use |
119 @param misc_elts(list[domish.Element]): list of elements to add | 117 @param misc_elts(list[domish.Element]): list of elements to add |
120 @param profile: %(doc_profile)s | |
121 """ | 118 """ |
122 log.info(_("sending stream initiation accept answer")) | 119 log.info(_("sending stream initiation accept answer")) |
123 if misc_elts is None: | 120 if misc_elts is None: |
124 misc_elts = [] | 121 misc_elts = [] |
125 client = self.host.getClient(profile) | |
126 result_elt = xmlstream.toResponse(iq_elt, 'result') | 122 result_elt = xmlstream.toResponse(iq_elt, 'result') |
127 si_elt = result_elt.addElement((NS_SI, 'si')) | 123 si_elt = result_elt.addElement((NS_SI, 'si')) |
128 si_elt.addChild(feature_elt) | 124 si_elt.addChild(feature_elt) |
129 for elt in misc_elts: | 125 for elt in misc_elts: |
130 si_elt.addChild(elt) | 126 si_elt.addChild(elt) |
137 log.warning(u"No <si/> element found in result while expected") | 133 log.warning(u"No <si/> element found in result while expected") |
138 raise exceptions.DataError | 134 raise exceptions.DataError |
139 return (iq_elt, si_elt) | 135 return (iq_elt, si_elt) |
140 | 136 |
141 | 137 |
142 def proposeStream(self, to_jid, si_profile, feature_elt, misc_elts, mime_type='application/octet-stream', profile=C.PROF_KEY_NONE): | 138 def proposeStream(self, client, to_jid, si_profile, feature_elt, misc_elts, mime_type='application/octet-stream'): |
143 """Propose a stream initiation | 139 """Propose a stream initiation |
144 | 140 |
145 @param to_jid(jid.JID): recipient | 141 @param to_jid(jid.JID): recipient |
146 @param si_profile(unicode): Stream initiation profile (XEP-0095) | 142 @param si_profile(unicode): Stream initiation profile (XEP-0095) |
147 @param feature_elt(domish.Element): feature element, according to XEP-0020 | 143 @param feature_elt(domish.Element): feature element, according to XEP-0020 |
148 @param misc_elts(list[domish.Element]): list of elements to add | 144 @param misc_elts(list[domish.Element]): list of elements to add |
149 @param mime_type(unicode): stream mime type | 145 @param mime_type(unicode): stream mime type |
150 @param profile: %(doc_profile)s | |
151 @return (tuple): tuple with: | 146 @return (tuple): tuple with: |
152 - session id (unicode) | 147 - session id (unicode) |
153 - (D(domish_elt, domish_elt): offer deferred which returl a tuple | 148 - (D(domish_elt, domish_elt): offer deferred which returl a tuple |
154 with iq_elt and si_elt | 149 with iq_elt and si_elt |
155 """ | 150 """ |
156 client = self.host.getClient(profile) | |
157 offer = client.IQ() | 151 offer = client.IQ() |
158 sid = str(uuid.uuid4()) | 152 sid = str(uuid.uuid4()) |
159 log.debug(_(u"Stream Session ID: %s") % offer["id"]) | 153 log.debug(_(u"Stream Session ID: %s") % offer["id"]) |
160 | 154 |
161 offer["from"] = client.jid.full() | 155 offer["from"] = client.jid.full() |
179 def __init__(self, plugin_parent): | 173 def __init__(self, plugin_parent): |
180 self.plugin_parent = plugin_parent | 174 self.plugin_parent = plugin_parent |
181 self.host = plugin_parent.host | 175 self.host = plugin_parent.host |
182 | 176 |
183 def connectionInitialized(self): | 177 def connectionInitialized(self): |
184 self.xmlstream.addObserver(SI_REQUEST, self.plugin_parent.streamInit, profile=self.parent.profile) | 178 self.xmlstream.addObserver(SI_REQUEST, self.plugin_parent.streamInit, client=self.parent) |
185 | 179 |
186 def getDiscoInfo(self, requestor, target, nodeIdentifier=''): | 180 def getDiscoInfo(self, requestor, target, nodeIdentifier=''): |
187 return [disco.DiscoFeature(NS_SI)] + [disco.DiscoFeature(u"http://jabber.org/protocol/si/profile/{}".format(profile_name)) for profile_name in self.plugin_parent.si_profiles] | 181 return [disco.DiscoFeature(NS_SI)] + [disco.DiscoFeature(u"http://jabber.org/protocol/si/profile/{}".format(profile_name)) for profile_name in self.plugin_parent.si_profiles] |
188 | 182 |
189 def getDiscoItems(self, requestor, target, nodeIdentifier=''): | 183 def getDiscoItems(self, requestor, target, nodeIdentifier=''): |