comparison libervia/backend/plugins/plugin_misc_attach.py @ 4270:0d7bb4df2343

Reformatted code base using black.
author Goffi <goffi@goffi.org>
date Wed, 19 Jun 2024 18:44:57 +0200
parents 4b842c1fb686
children
comparison
equal deleted inserted replaced
4269:64a85ce8be70 4270:0d7bb4df2343
47 C.PI_HANDLER: "no", 47 C.PI_HANDLER: "no",
48 C.PI_DESCRIPTION: _("""Attachments handler"""), 48 C.PI_DESCRIPTION: _("""Attachments handler"""),
49 } 49 }
50 50
51 51
52 AttachmentHandler = namedtuple('AttachmentHandler', ['can_handle', 'attach', 'priority']) 52 AttachmentHandler = namedtuple("AttachmentHandler", ["can_handle", "attach", "priority"])
53 53
54 54
55 class AttachPlugin: 55 class AttachPlugin:
56 56
57 def __init__(self, host): 57 def __init__(self, host):
58 log.info(_("plugin Attach initialization")) 58 log.info(_("plugin Attach initialization"))
59 self.host = host 59 self.host = host
60 self._u = host.plugins["UPLOAD"] 60 self._u = host.plugins["UPLOAD"]
61 host.trigger.add("sendMessage", self._send_message_trigger) 61 host.trigger.add("sendMessage", self._send_message_trigger)
62 host.trigger.add("sendMessageComponent", self._send_message_trigger) 62 host.trigger.add("sendMessageComponent", self._send_message_trigger)
63 self._attachments_handlers = {'clear': [], 'encrypted': []} 63 self._attachments_handlers = {"clear": [], "encrypted": []}
64 self.register(self.default_can_handle, self.default_attach, False, -1000) 64 self.register(self.default_can_handle, self.default_attach, False, -1000)
65 65
66 def register(self, can_handle, attach, encrypted=False, priority=0): 66 def register(self, can_handle, attach, encrypted=False, priority=0):
67 """Register an attachments handler 67 """Register an attachments handler
68 68
80 @param priority(int): priority of this handler, handler with higher priority will 80 @param priority(int): priority of this handler, handler with higher priority will
81 be tried first 81 be tried first
82 """ 82 """
83 handler = AttachmentHandler(can_handle, attach, priority) 83 handler = AttachmentHandler(can_handle, attach, priority)
84 handlers = ( 84 handlers = (
85 self._attachments_handlers['encrypted'] 85 self._attachments_handlers["encrypted"]
86 if encrypted else self._attachments_handlers['clear'] 86 if encrypted
87 else self._attachments_handlers["clear"]
87 ) 88 )
88 if handler in handlers: 89 if handler in handlers:
89 raise exceptions.InternalError( 90 raise exceptions.InternalError(
90 'Attachment handler has been registered twice, this should never happen' 91 "Attachment handler has been registered twice, this should never happen"
91 ) 92 )
92 93
93 handlers.append(handler) 94 handlers.append(handler)
94 handlers.sort(key=lambda h: h.priority, reverse=True) 95 handlers.sort(key=lambda h: h.priority, reverse=True)
95 log.debug(f"new attachments handler: {handler}") 96 log.debug(f"new attachments handler: {handler}")
110 media_type = attachment[C.KEY_ATTACHMENTS_MEDIA_TYPE] 111 media_type = attachment[C.KEY_ATTACHMENTS_MEDIA_TYPE]
111 except KeyError: 112 except KeyError:
112 media_type = mimetypes.guess_type(path, strict=False)[0] 113 media_type = mimetypes.guess_type(path, strict=False)[0]
113 if media_type is None: 114 if media_type is None:
114 log.warning( 115 log.warning(
115 _("Can't resize attachment of unknown type: {attachment}") 116 _(
116 .format(attachment=attachment)) 117 "Can't resize attachment of unknown type: {attachment}"
118 ).format(attachment=attachment)
119 )
117 continue 120 continue
118 attachment[C.KEY_ATTACHMENTS_MEDIA_TYPE] = media_type 121 attachment[C.KEY_ATTACHMENTS_MEDIA_TYPE] = media_type
119 122
120 main_type = media_type.split('/')[0] 123 main_type = media_type.split("/")[0]
121 if main_type == "image": 124 if main_type == "image":
122 report = image.check(self.host, path) 125 report = image.check(self.host, path)
123 if report['too_large']: 126 if report["too_large"]:
124 tmp_dir = Path(tempfile.mkdtemp()) 127 tmp_dir = Path(tempfile.mkdtemp())
125 tmp_dirs_to_clean.append(tmp_dir) 128 tmp_dirs_to_clean.append(tmp_dir)
126 new_path = tmp_dir / path.name 129 new_path = tmp_dir / path.name
127 await image.resize( 130 await image.resize(
128 path, report["recommended_size"], dest=new_path) 131 path, report["recommended_size"], dest=new_path
132 )
129 attachment["path"] = new_path 133 attachment["path"] = new_path
130 log.info( 134 log.info(
131 _("Attachment {path!r} has been resized at {new_path!r}") 135 _(
132 .format(path=str(path), new_path=str(new_path))) 136 "Attachment {path!r} has been resized at {new_path!r}"
137 ).format(path=str(path), new_path=str(new_path))
138 )
133 else: 139 else:
134 log.warning( 140 log.warning(
135 _("Can't resize attachment of type {main_type!r}: {attachment}") 141 _(
136 .format(main_type=main_type, attachment=attachment)) 142 "Can't resize attachment of type {main_type!r}: {attachment}"
143 ).format(main_type=main_type, attachment=attachment)
144 )
137 145
138 if client.encryption.is_encryption_requested(data): 146 if client.encryption.is_encryption_requested(data):
139 handlers = self._attachments_handlers['encrypted'] 147 handlers = self._attachments_handlers["encrypted"]
140 else: 148 else:
141 handlers = self._attachments_handlers['clear'] 149 handlers = self._attachments_handlers["clear"]
142 150
143 for handler in handlers: 151 for handler in handlers:
144 can_handle = await utils.as_deferred(handler.can_handle, client, data) 152 can_handle = await utils.as_deferred(handler.can_handle, client, data)
145 if can_handle: 153 if can_handle:
146 break 154 break
147 else: 155 else:
148 raise exceptions.NotFound( 156 raise exceptions.NotFound(
149 _("No plugin can handle attachment with {destinee}").format( 157 _("No plugin can handle attachment with {destinee}").format(
150 destinee = data['to'] 158 destinee=data["to"]
151 )) 159 )
160 )
152 161
153 await utils.as_deferred(handler.attach, client, data) 162 await utils.as_deferred(handler.attach, client, data)
154 163
155 for dir_path in tmp_dirs_to_clean: 164 for dir_path in tmp_dirs_to_clean:
156 log.debug(f"Cleaning temporary directory at {dir_path}") 165 log.debug(f"Cleaning temporary directory at {dir_path}")
157 shutil.rmtree(dir_path) 166 shutil.rmtree(dir_path)
158 167
159 return data 168 return data
160 169
161 async def upload_files( 170 async def upload_files(
162 self, 171 self, client: SatXMPPEntity, data: dict, upload_cb: Optional[Callable] = None
163 client: SatXMPPEntity,
164 data: dict,
165 upload_cb: Optional[Callable] = None
166 ): 172 ):
167 """Upload file, and update attachments 173 """Upload file, and update attachments
168 174
169 invalid attachments will be removed 175 invalid attachments will be removed
170 @param client: 176 @param client:
200 log.warning("no path in attachment: {attachment}") 206 log.warning("no path in attachment: {attachment}")
201 to_delete.append(attachment) 207 to_delete.append(attachment)
202 continue 208 continue
203 209
204 if "url" in attachment: 210 if "url" in attachment:
205 url = attachment.pop('url') 211 url = attachment.pop("url")
206 log.warning( 212 log.warning(
207 f"unexpected URL in attachment: {url!r}\nattachment: {attachment}" 213 f"unexpected URL in attachment: {url!r}\nattachment: {attachment}"
208 ) 214 )
209 215
210 try: 216 try:
212 except KeyError: 218 except KeyError:
213 name = attachment["name"] = path.name 219 name = attachment["name"] = path.name
214 220
215 attachment["size"] = path.stat().st_size 221 attachment["size"] = path.stat().st_size
216 222
217 extra = { 223 extra = {"attachment": attachment}
218 "attachment": attachment
219 }
220 progress_id = attachment.pop("progress_id", None) 224 progress_id = attachment.pop("progress_id", None)
221 if progress_id: 225 if progress_id:
222 extra["progress_id"] = progress_id 226 extra["progress_id"] = progress_id
223 check_certificate = self.host.memory.param_get_a( 227 check_certificate = self.host.memory.param_get_a(
224 "check_certificate", "Connection", profile_key=client.profile) 228 "check_certificate", "Connection", profile_key=client.profile
229 )
225 if not check_certificate: 230 if not check_certificate:
226 extra['ignore_tls_errors'] = True 231 extra["ignore_tls_errors"] = True
227 log.warning( 232 log.warning(
228 _("certificate check disabled for upload, this is dangerous!")) 233 _("certificate check disabled for upload, this is dangerous!")
234 )
229 235
230 __, upload_d = await upload_cb( 236 __, upload_d = await upload_cb(
231 client=client, 237 client=client,
232 filepath=path, 238 filepath=path,
233 filename=name, 239 filename=name,
253 259
254 def _attach_files(self, data, client): 260 def _attach_files(self, data, client):
255 return defer.ensureDeferred(self.attach_files(client, data)) 261 return defer.ensureDeferred(self.attach_files(client, data))
256 262
257 def _send_message_trigger( 263 def _send_message_trigger(
258 self, client, mess_data, pre_xml_treatments, post_xml_treatments): 264 self, client, mess_data, pre_xml_treatments, post_xml_treatments
259 if mess_data['extra'].get(C.KEY_ATTACHMENTS): 265 ):
266 if mess_data["extra"].get(C.KEY_ATTACHMENTS):
260 post_xml_treatments.addCallback(self._attach_files, client=client) 267 post_xml_treatments.addCallback(self._attach_files, client=client)
261 return True 268 return True
262 269
263 async def default_can_handle(self, client, data): 270 async def default_can_handle(self, client, data):
264 return True 271 return True
269 body_elt = data["xml"].body 276 body_elt = data["xml"].body
270 if body_elt is None: 277 if body_elt is None:
271 body_elt = data["xml"].addElement("body") 278 body_elt = data["xml"].addElement("body")
272 attachments = data["extra"][C.KEY_ATTACHMENTS] 279 attachments = data["extra"][C.KEY_ATTACHMENTS]
273 if attachments: 280 if attachments:
274 body_links = '\n'.join(a['url'] for a in attachments) 281 body_links = "\n".join(a["url"] for a in attachments)
275 if str(body_elt).strip(): 282 if str(body_elt).strip():
276 # if there is already a body, we add a line feed before the first link 283 # if there is already a body, we add a line feed before the first link
277 body_elt.addContent('\n') 284 body_elt.addContent("\n")
278 body_elt.addContent(body_links) 285 body_elt.addContent(body_links)