comparison sat/plugins/plugin_comp_file_sharing.py @ 3541:888109774673

core: various changes and fixes to work with new storage and D-Bus bridge: - fixes coroutines handling in various places - fixes types which are not serialised by Tx DBus - XEP-0384: call storage methods in main thread in XEP: Python OMEMO's Promise use thread which prevent the use of AsyncIO loop. To work around that, callLater is used to launch storage method in main thread. This is a temporary workaround, as Python OMEMO should get rid of Promise implementation and threads soon.
author Goffi <goffi@goffi.org>
date Thu, 03 Jun 2021 15:21:43 +0200
parents ab72b8ac3bd2
children 742e466fa000
comparison
equal deleted inserted replaced
3540:aa58451b77ba 3541:888109774673
29 from sat.core.constants import Const as C 29 from sat.core.constants import Const as C
30 from sat.core import exceptions 30 from sat.core import exceptions
31 from sat.core.log import getLogger 31 from sat.core.log import getLogger
32 from sat.tools import stream 32 from sat.tools import stream
33 from sat.tools import video 33 from sat.tools import video
34 from sat.tools.utils import ensure_deferred
34 from sat.tools.common import regex 35 from sat.tools.common import regex
35 from sat.tools.common import uri 36 from sat.tools.common import uri
36 from sat.tools.common import files_utils 37 from sat.tools.common import files_utils
37 from sat.tools.common import utils 38 from sat.tools.common import utils
38 from sat.tools.common import tls 39 from sat.tools.common import tls
485 log.warning(_("Can't get thumbnail for {final_path}: {e}").format( 486 log.warning(_("Can't get thumbnail for {final_path}: {e}").format(
486 final_path=final_path, e=e)) 487 final_path=final_path, e=e))
487 else: 488 else:
488 await self.generate_thumbnails(extra, thumb_path) 489 await self.generate_thumbnails(extra, thumb_path)
489 490
490 self.host.memory.setFile( 491 await self.host.memory.setFile(
491 client, 492 client,
492 name=name, 493 name=name,
493 version="", 494 version="",
494 file_hash=file_hash, 495 file_hash=file_hash,
495 hash_algo=HASH_ALGO, 496 hash_algo=HASH_ALGO,
544 self._f.openFileWrite( 545 self._f.openFileWrite(
545 client, file_tmp_path, transfer_data, file_data, stream_object 546 client, file_tmp_path, transfer_data, file_data, stream_object
546 ) 547 )
547 return False, defer.succeed(True) 548 return False, defer.succeed(True)
548 549
549 @defer.inlineCallbacks 550 async def _retrieveFiles(
550 def _retrieveFiles(
551 self, client, session, content_data, content_name, file_data, file_elt 551 self, client, session, content_data, content_name, file_data, file_elt
552 ): 552 ):
553 """This method retrieve a file on request, and send if after checking permissions""" 553 """This method retrieve a file on request, and send if after checking permissions"""
554 peer_jid = session["peer_jid"] 554 peer_jid = session["peer_jid"]
555 if session['local_jid'].user: 555 if session['local_jid'].user:
556 owner = client.getOwnerFromJid(session['local_jid']) 556 owner = client.getOwnerFromJid(session['local_jid'])
557 else: 557 else:
558 owner = peer_jid 558 owner = peer_jid
559 try: 559 try:
560 found_files = yield self.host.memory.getFiles( 560 found_files = await self.host.memory.getFiles(
561 client, 561 client,
562 peer_jid=peer_jid, 562 peer_jid=peer_jid,
563 name=file_data.get("name"), 563 name=file_data.get("name"),
564 file_hash=file_data.get("file_hash"), 564 file_hash=file_data.get("file_hash"),
565 hash_algo=file_data.get("hash_algo"), 565 hash_algo=file_data.get("hash_algo"),
573 log.warning( 573 log.warning(
574 _("{peer_jid} is trying to access an unauthorized file: {name}").format( 574 _("{peer_jid} is trying to access an unauthorized file: {name}").format(
575 peer_jid=peer_jid, name=file_data.get("name") 575 peer_jid=peer_jid, name=file_data.get("name")
576 ) 576 )
577 ) 577 )
578 defer.returnValue(False) 578 return False
579 579
580 if not found_files: 580 if not found_files:
581 log.warning( 581 log.warning(
582 _("no matching file found ({file_data})").format(file_data=file_data) 582 _("no matching file found ({file_data})").format(file_data=file_data)
583 ) 583 )
584 defer.returnValue(False) 584 return False
585 585
586 # we only use the first found file 586 # we only use the first found file
587 found_file = found_files[0] 587 found_file = found_files[0]
588 if found_file['type'] != C.FILE_TYPE_FILE: 588 if found_file['type'] != C.FILE_TYPE_FILE:
589 raise TypeError("a file was expected, type is {type_}".format( 589 raise TypeError("a file was expected, type is {type_}".format(
605 file_path, 605 file_path,
606 uid=self._jf.getProgressId(session, content_name), 606 uid=self._jf.getProgressId(session, content_name),
607 size=size, 607 size=size,
608 data_cb=lambda data: hasher.update(data), 608 data_cb=lambda data: hasher.update(data),
609 ) 609 )
610 defer.returnValue(True) 610 return True
611 611
612 def _fileSendingRequestTrigger( 612 def _fileSendingRequestTrigger(
613 self, client, session, content_data, content_name, file_data, file_elt 613 self, client, session, content_data, content_name, file_data, file_elt
614 ): 614 ):
615 if not client.is_component: 615 if not client.is_component:
616 return True, None 616 return True, None
617 else: 617 else:
618 return ( 618 return (
619 False, 619 False,
620 self._retrieveFiles( 620 defer.ensureDeferred(self._retrieveFiles(
621 client, session, content_data, content_name, file_data, file_elt 621 client, session, content_data, content_name, file_data, file_elt
622 ), 622 )),
623 ) 623 )
624 624
625 ## HTTP Upload ## 625 ## HTTP Upload ##
626 626
627 def _purge_slot(self, upload_id): 627 def _purge_slot(self, upload_id):
755 file_id = nodeIdentifier[len(COMMENT_NODE_PREFIX) :] 755 file_id = nodeIdentifier[len(COMMENT_NODE_PREFIX) :]
756 if not file_id: 756 if not file_id:
757 raise error.StanzaError("item-not-found") 757 raise error.StanzaError("item-not-found")
758 return file_id 758 return file_id
759 759
760 @defer.inlineCallbacks 760 async def getFileData(self, requestor, nodeIdentifier):
761 def getFileData(self, requestor, nodeIdentifier):
762 file_id = self._getFileId(nodeIdentifier) 761 file_id = self._getFileId(nodeIdentifier)
763 try: 762 try:
764 files = yield self.host.memory.getFiles(self.parent, requestor, file_id) 763 files = await self.host.memory.getFiles(self.parent, requestor, file_id)
765 except (exceptions.NotFound, exceptions.PermissionError): 764 except (exceptions.NotFound, exceptions.PermissionError):
766 # we don't differenciate between NotFound and PermissionError 765 # we don't differenciate between NotFound and PermissionError
767 # to avoid leaking information on existing files 766 # to avoid leaking information on existing files
768 raise error.StanzaError("item-not-found") 767 raise error.StanzaError("item-not-found")
769 if not files: 768 if not files:
770 raise error.StanzaError("item-not-found") 769 raise error.StanzaError("item-not-found")
771 if len(files) > 1: 770 if len(files) > 1:
772 raise error.InternalError("there should be only one file") 771 raise error.InternalError("there should be only one file")
773 defer.returnValue(files[0]) 772 return files[0]
774 773
775 def commentsUpdate(self, extra, new_comments, peer_jid): 774 def commentsUpdate(self, extra, new_comments, peer_jid):
776 """update comments (replace or insert new_comments) 775 """update comments (replace or insert new_comments)
777 776
778 @param extra(dict): extra data to update 777 @param extra(dict): extra data to update
823 iq_elt = item_elt 822 iq_elt = item_elt
824 while iq_elt.parent != None: 823 while iq_elt.parent != None:
825 iq_elt = iq_elt.parent 824 iq_elt = iq_elt.parent
826 return iq_elt["from"] 825 return iq_elt["from"]
827 826
828 @defer.inlineCallbacks 827 @ensure_deferred
829 def publish(self, requestor, service, nodeIdentifier, items): 828 async def publish(self, requestor, service, nodeIdentifier, items):
830 #  we retrieve file a first time to check authorisations 829 #  we retrieve file a first time to check authorisations
831 file_data = yield self.getFileData(requestor, nodeIdentifier) 830 file_data = await self.getFileData(requestor, nodeIdentifier)
832 file_id = file_data["id"] 831 file_id = file_data["id"]
833 comments = [(item["id"], self._getFrom(item), item.toXml()) for item in items] 832 comments = [(item["id"], self._getFrom(item), item.toXml()) for item in items]
834 if requestor.userhostJID() == file_data["owner"]: 833 if requestor.userhostJID() == file_data["owner"]:
835 peer_jid = None 834 peer_jid = None
836 else: 835 else:
837 peer_jid = requestor.userhost() 836 peer_jid = requestor.userhost()
838 update_cb = partial(self.commentsUpdate, new_comments=comments, peer_jid=peer_jid) 837 update_cb = partial(self.commentsUpdate, new_comments=comments, peer_jid=peer_jid)
839 try: 838 try:
840 yield self.host.memory.fileUpdate(file_id, "extra", update_cb) 839 await self.host.memory.fileUpdate(file_id, "extra", update_cb)
841 except exceptions.PermissionError: 840 except exceptions.PermissionError:
842 raise error.StanzaError("not-authorized") 841 raise error.StanzaError("not-authorized")
843 842
844 @defer.inlineCallbacks 843 @ensure_deferred
845 def items(self, requestor, service, nodeIdentifier, maxItems, itemIdentifiers): 844 async def items(self, requestor, service, nodeIdentifier, maxItems, itemIdentifiers):
846 file_data = yield self.getFileData(requestor, nodeIdentifier) 845 file_data = await self.getFileData(requestor, nodeIdentifier)
847 comments = file_data["extra"].get("comments", []) 846 comments = file_data["extra"].get("comments", [])
848 if itemIdentifiers: 847 if itemIdentifiers:
849 defer.returnValue( 848 defer.returnValue(
850 [generic.parseXml(c[2]) for c in comments if c[0] in itemIdentifiers] 849 [generic.parseXml(c[2]) for c in comments if c[0] in itemIdentifiers]
851 ) 850 )
852 else: 851 else:
853 defer.returnValue([generic.parseXml(c[2]) for c in comments]) 852 return [generic.parseXml(c[2]) for c in comments]
854 853
855 @defer.inlineCallbacks 854 @ensure_deferred
856 def retract(self, requestor, service, nodeIdentifier, itemIdentifiers): 855 async def retract(self, requestor, service, nodeIdentifier, itemIdentifiers):
857 file_data = yield self.getFileData(requestor, nodeIdentifier) 856 file_data = await self.getFileData(requestor, nodeIdentifier)
858 file_id = file_data["id"] 857 file_id = file_data["id"]
859 try: 858 try:
860 comments = file_data["extra"]["comments"] 859 comments = file_data["extra"]["comments"]
861 except KeyError: 860 except KeyError:
862 raise error.StanzaError("item-not-found") 861 raise error.StanzaError("item-not-found")
877 if requestor.userhostJID() != file_data["owner"]: 876 if requestor.userhostJID() != file_data["owner"]:
878 if not all([c[1] == requestor.userhost() for c in to_remove]): 877 if not all([c[1] == requestor.userhost() for c in to_remove]):
879 raise error.StanzaError("not-authorized") 878 raise error.StanzaError("not-authorized")
880 879
881 remove_cb = partial(self.commentsDelete, comments=to_remove) 880 remove_cb = partial(self.commentsDelete, comments=to_remove)
882 yield self.host.memory.fileUpdate(file_id, "extra", remove_cb) 881 await self.host.memory.fileUpdate(file_id, "extra", remove_cb)