Mercurial > libervia-backend
diff sat/plugins/plugin_xep_0300.py @ 2624:56f94936df1e
code style reformatting using black
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 27 Jun 2018 20:14:46 +0200 |
parents | 26edcf3a30eb |
children | 003b8b4b56a7 |
line wrap: on
line diff
--- a/sat/plugins/plugin_xep_0300.py Wed Jun 27 07:51:29 2018 +0200 +++ b/sat/plugins/plugin_xep_0300.py Wed Jun 27 20:14:46 2018 +0200 @@ -20,6 +20,7 @@ from sat.core.i18n import _ from sat.core.constants import Const as C from sat.core.log import getLogger + log = getLogger(__name__) from sat.core import exceptions from twisted.words.xish import domish @@ -41,27 +42,29 @@ C.PI_PROTOCOLS: ["XEP-0300"], C.PI_MAIN: "XEP_0300", C.PI_HANDLER: "yes", - C.PI_DESCRIPTION: _("""Management of cryptographic hashes""") + C.PI_DESCRIPTION: _("""Management of cryptographic hashes"""), } NS_HASHES = "urn:xmpp:hashes:2" NS_HASHES_FUNCTIONS = u"urn:xmpp:hash-function-text-names:{}" -BUFFER_SIZE = 2**12 -ALGO_DEFAULT = 'sha-256' +BUFFER_SIZE = 2 ** 12 +ALGO_DEFAULT = "sha-256" class XEP_0300(object): # TODO: add blake after moving to Python 3 - ALGOS = OrderedDict(( - (u'md5', hashlib.md5), - (u'sha-1', hashlib.sha1), - (u'sha-256', hashlib.sha256), - (u'sha-512', hashlib.sha512), - )) + ALGOS = OrderedDict( + ( + (u"md5", hashlib.md5), + (u"sha-1", hashlib.sha1), + (u"sha-256", hashlib.sha256), + (u"sha-512", hashlib.sha512), + ) + ) def __init__(self, host): log.info(_("plugin Hashes initialization")) - host.registerNamespace('hashes', NS_HASHES) + host.registerNamespace("hashes", NS_HASHES) def getHandler(self, client): return XEP_0300_handler() @@ -90,11 +93,15 @@ """ client = self.host.getClient(profile) for algo in reversed(XEP_0300.ALGOS): - has_feature = yield self.host.hasFeature(client, NS_HASHES_FUNCTIONS.format(algo), to_jid) + has_feature = yield self.host.hasFeature( + client, NS_HASHES_FUNCTIONS.format(algo), to_jid + ) if has_feature: - log.debug(u"Best hashing algorithm found for {jid}: {algo}".format( - jid=to_jid.full(), - algo=algo)) + log.debug( + u"Best hashing algorithm found for {jid}: {algo}".format( + jid=to_jid.full(), algo=algo + ) + ) defer.returnValue(algo) def _calculateHashBlocking(self, file_obj, hasher): @@ -123,16 +130,18 @@ @param algo(unicode): algorithme to use, must be a key of XEP_0300.ALGOS @return (D(domish.Element)): hash element """ + def hashCalculated(hash_): return self.buildHashElt(hash_, algo) + hasher = self.ALGOS[algo] hash_d = self.calculateHash(file_obj, hasher) hash_d.addCallback(hashCalculated) return hash_d def buildHashUsedElt(self, algo=ALGO_DEFAULT): - hash_used_elt = domish.Element((NS_HASHES, 'hash-used')) - hash_used_elt['algo'] = algo + hash_used_elt = domish.Element((NS_HASHES, "hash-used")) + hash_used_elt["algo"] = algo return hash_used_elt def parseHashUsedElt(self, parent): @@ -144,10 +153,10 @@ @raise exceptions.DataError: the element is invalid """ try: - hash_used_elt = next(parent.elements(NS_HASHES, 'hash-used')) + hash_used_elt = next(parent.elements(NS_HASHES, "hash-used")) except StopIteration: raise exceptions.NotFound - algo = hash_used_elt[u'algo'] + algo = hash_used_elt[u"algo"] if not algo: raise exceptions.DataError return algo @@ -161,10 +170,10 @@ """ assert hash_ assert algo - hash_elt = domish.Element((NS_HASHES, 'hash')) + hash_elt = domish.Element((NS_HASHES, "hash")) if hash_ is not None: hash_elt.addContent(base64.b64encode(hash_)) - hash_elt['algo'] = algo + hash_elt["algo"] = algo return hash_elt def parseHashElt(self, parent): @@ -181,8 +190,8 @@ hash_elt = None best_algo = None best_value = None - for hash_elt in parent.elements(NS_HASHES, 'hash'): - algo = hash_elt.getAttribute('algo') + for hash_elt in parent.elements(NS_HASHES, "hash"): + algo = hash_elt.getAttribute("algo") try: idx = algos.index(algo) except ValueError: @@ -204,9 +213,12 @@ class XEP_0300_handler(XMPPHandler): implements(iwokkel.IDisco) - def getDiscoInfo(self, requestor, target, nodeIdentifier=''): - hash_functions_names = [disco.DiscoFeature(NS_HASHES_FUNCTIONS.format(algo)) for algo in XEP_0300.ALGOS] + def getDiscoInfo(self, requestor, target, nodeIdentifier=""): + hash_functions_names = [ + disco.DiscoFeature(NS_HASHES_FUNCTIONS.format(algo)) + for algo in XEP_0300.ALGOS + ] return [disco.DiscoFeature(NS_HASHES)] + hash_functions_names - def getDiscoItems(self, requestor, target, nodeIdentifier=''): + def getDiscoItems(self, requestor, target, nodeIdentifier=""): return []