Mercurial > sat_docs
view scripts/association/update_garradin.py @ 52:4d924e65a4ba
compte-rendu de l'AG ordinaire du 24 aout 2015 à Sankt Augustin
author | souliane <souliane@mailoo.org> |
---|---|
date | Sun, 04 Oct 2015 11:21:02 +0200 |
parents | c34633e3f56a |
children |
line wrap: on
line source
#!/usr/bin/python # -*- coding: utf-8 -*- # Scripts to update Garradin database from various CSV inputs. # Copyright (C) 2015 Adrien Cossa <souliane@mailoo.org> # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import sqlite3 import csv, unicodecsv import os import os.path import tempfile from glob import glob from time import strftime from datetime import datetime from collections import OrderedDict ### PATHS AND OTHER STUFF TO CONFIGURE ### DB_FILE = u"./association.sqlite" INPUT_PATH = u"../../../sat_private" APAYER_FILES = (u"apayer", u"SALUT-A-TOI_*.csv") CM_FILES = (u"credit_mutuel/dernieres_operations", "*_*.csv") DJANGO_FILES = (u"django", u"sat_website_subscriptions.csv") LOG_PATH = u"./OUTPUT" INFO, WARNING = 1, 2 LOG_LEVELS = (INFO, WARNING) def dirtyGetDateOfFile(path): """Extract the date at the end of the filename, just before the extension. @return: datetime """ # FIXME: there should be a better way to do that token = os.path.basename(path)[-12:-4] return datetime.strptime(token, "%Y%m%d") ### !!! DO NOT MODIFY BELOW THIS LINE !!! ### ### DATE AND TIME ### DATE_ORIGIN = u"1970-01-01" DATE_FORMAT = "%Y-%m-%d" TODAY = strftime(DATE_FORMAT) ### RETRIEVE DATABASE INFOS ### class GarradinDB(object): TABLES = ("membres", "cotisations", "cotisations_membres") def __init__(self, cursor): self.cursor = cursor def getTableInfos(self): infos = {} for table in self.TABLES: cursor.execute("PRAGMA table_info(%s)" % table) rows = cursor.fetchall() infos[table] = OrderedDict() for row in rows: # don't use {row[1]: row[2]...} : it messes the order infos[table][row[1]] = row[2] # field name: field type return infos def getLastAutoUpdate(self): """Get the datetime of the last database automatic update. @return: datetime """ self.cursor.execute("SELECT valeur FROM config WHERE cle='last_auto_update'") row = self.cursor.fetchone() try: date = row[0] except TypeError: date = DATE_ORIGIN return datetime.strptime(date, DATE_FORMAT) def setLastAutoUpdate(self, date=TODAY): """Set to date of the last database automatic update to today.""" self.cursor.execute("REPLACE INTO config VALUES ('last_auto_update', ?)", (date,)) con = sqlite3.connect(DB_FILE) with con: cursor = con.cursor() db = GarradinDB(cursor) TABLE_INFOS = db.getTableInfos() LAST_AUTO_UPDATE = db.getLastAutoUpdate() ### LOGGERS ### class Logger(object): def __init__(self, path): if not os.path.isdir(path): os.mkdir(path) basename = "update_garradin_{day}_%s.log".format(day=strftime("%Y%m%d")) self.info_file = os.path.join(path, basename % "info") self.warning_file = os.path.join(path, basename % "warning") def write(self, file, msg): with open(file, 'a') as file: file.write((u"[%s] %s\n" % (strftime("%d/%m/%Y@%H:%m:%S"), msg)).encode("utf-8")) def info(self, msg): self.write(self.info_file, msg) if INFO in LOG_LEVELS: print msg def warning(self, msg): self.write(self.warning_file, msg) if WARNING in LOG_LEVELS: print msg logger = Logger(LOG_PATH) ### CSV DIALECTS ### # FIXME: Python CSV doesn't handle CSV file starting with a BOM class my_dialect(csv.excel): """Describe my properties for CSV files.""" delimiter = ';' csv.register_dialect("my_dialect", my_dialect) ### GENERIC ENTRIES' CLASSES ### class Entry(OrderedDict): TYPE_INTEGER = u"INTEGER" TYPE_TEXT = u"TEXT" ID_UNIQUE_KEYS = ("FIELD_NAME", "FIELD_SURNAME", "FIELD_EMAIL") # define unique members ID_EQUAL_KEYS = ID_UNIQUE_KEYS # define equality, not even needed to update the database FIELD_NAME = u"prenom" FIELD_SURNAME = u"nom" FIELD_EMAIL = u"email" FIELDS = [FIELD_NAME, FIELD_SURNAME, FIELD_EMAIL] MATCH_NONE = 0 MATCH_INCOMPLETE = 1 MATCH_UNIQUE = 2 MATCH_EQUAL = 3 matched = MATCH_NONE def __init__(self, entry): """ @param entry(dict or row): entry data """ if isinstance(entry, dict): data = OrderedDict({key: value for key, value in entry.iteritems() if key in self.FIELDS}) if hasattr(entry, "identityDict"): data.update(entry.identityDict(self)) else: data = self.row2dict(entry) OrderedDict.__init__(self, data) def __iter__(self): # XXX: use the order of self.FIELDS and ignore OrderedDict internal order for field in self.FIELDS: if field in self: yield field def row2dict(self, row): """Return a dict representation of a row. @param row (list[unicode]): a row of information @return: OrderedDict {unicode: unicode} with: - key: field name - value: field value """ data = OrderedDict() index = 0 for field in self.FIELDS: if row[index]: data[field] = row[index] index += 1 return data def identityDict(self, cls=None): """Return a dict representing a user identity. @param cls (class): if specified, the result dict will use cls's identity fields for its own keys (instead of self's fields). @return: dict{unicode: unicode} """ if cls is None: cls = self return {getattr(cls, key): self[getattr(self, key)] for key in self.ID_EQUAL_KEYS} def identityMatches(self, entry): """Return a dict compiling the identity matches with another entry. @param entry (Entry): another entry @return: dict{unicode: bool} """ return {key: self[getattr(self, key)] == entry[getattr(entry, key)] for key in self.ID_EQUAL_KEYS} def identityMatch(self, entry): """Return a value representing the matching level with another entry. @param entry (Entry): another entry @return: int in (Entry.MATCH_NONE, Entry.MATCH_INCOMPLETE, Entry.MATCH_UNIQUE, Entry.MATCH_EQUAL) """ matches = self.identityMatches(entry) if len([match for match in matches.values() if not match]) == 0: return self.MATCH_EQUAL match_name = matches["FIELD_NAME"] and matches["FIELD_SURNAME"] if match_name and matches["FIELD_EMAIL"]: return self.MATCH_UNIQUE if match_name or matches["FIELD_EMAIL"]: return self.MATCH_INCOMPLETE return self.MATCH_NONE def identityToString(self): """Return a human-readable string representing a user identity. @return: unicode """ return "%s %s <%s>" % (self[self.FIELD_NAME], self[self.FIELD_SURNAME], self[self.FIELD_EMAIL]) def valuesToString(self): """List the values. @return: unicode """ return u", ".join(self.values()) def fields(self): """List the fields to which a value is associated. @return: list[unicode] """ return [field for field in self.FIELDS if field in self] def fieldsToString(self): """List the fields to which a value is associated. @return: unicode """ return u", ".join(self.fields()) def changeset(self): """Return the list of fields and values as needed to update the database. @return: tuple(unicode, list) """ changes = ['%s=?' % field for field in self.fields()] return (u", ".join(changes), self.values()) class GarradinDBEntry(Entry): def __init__(self, cursor, entry): self.cursor = cursor Entry.__init__(self, entry) def insertOrReplace(self): """Insert or replace the entry in the garradin database.""" fields = self.fieldsToString() args = self.values() placeholders = ",".join([u"?" for arg in args]) # XXX: "INSERT OR REPLACE" is needed for updating lastrowid, "REPLACE" is not enough request = "INSERT OR REPLACE INTO %s (%s) VALUES (%s)" % (self.TABLE, fields, placeholders) self.cursor.execute(request, args) class CSVEntry(Entry): def date(self): """Return the date associated with this entry. @return: datetime """ try: format = self.DATE_FORMAT except AttributeError: format = DATE_FORMAT return datetime.strptime(self[self.FIELD_DATE], format) ### MEMBERS ENTRIES ### class GarradinDBMember(GarradinDBEntry): TABLE = u"membres" FIELDS = TABLE_INFOS[TABLE].keys() FIELD_ID_CATEGORY = u"id_categorie" FIELD_PASSWORD = u"passe" FIELD_AUTO_UPDATE = u"date_maj_auto" FIELD_ADDRESS = u"adresse" ID_EQUAL_KEYS = Entry.ID_UNIQUE_KEYS + ("FIELD_ADDRESS",) def __init__(self, cursor, entry): GarradinDBEntry.__init__(self, cursor, entry) try: self.pop(self.FIELD_PASSWORD) except KeyError: pass if hasattr(entry, "address"): self[self.FIELD_ADDRESS] = entry.address() def setAutoUpdate(self): """Update the value of the field self.FIELD_AUTO_UPDATE.""" self[self.FIELD_AUTO_UPDATE] = TODAY def values(self): """List the values. @return: list """ values = [] for field, value in self.iteritems(): if TABLE_INFOS[self.TABLE].get(field, self.TYPE_TEXT) == self.TYPE_TEXT: value = u"%s" % value.replace('\r', '').replace('\n', '\\n') values.append(unicode(value)) return values class GarradinDBMembership(GarradinDBEntry): TABLE = u"cotisations_membres" FIELDS = TABLE_INFOS[TABLE].keys() FIELD_MEMBER = u"id_membre" FIELD_MEMBERSHIP = u"id_cotisation" FIELD_DATE = u"date" class GarradinDBMembershipType(GarradinDBEntry): TABLE = u"cotisations" FIELDS = TABLE_INFOS[TABLE].keys() FIELD_AMOUNT = u"montant" class GarradinCSVMember(CSVEntry): """Like GarradinDBMember but with a "categorie" field in second position.""" FIELD_ID_CATEGORY = u"id_categorie" FIELD_CATEGORY = u"categorie" FIELD_ADDRESS = u"adresse" FIELDS = GarradinDBMember.FIELDS[:1] + [FIELD_CATEGORY] + GarradinDBMember.FIELDS[1:] def __init__(self, cursor, db_member): """ @param db_member (GarradinDBMember) """ CSVEntry.__init__(self, db_member) if self.FIELD_ID_CATEGORY not in self: self[self.FIELD_ID_CATEGORY] = 1 if self.FIELD_CATEGORY not in self: cursor.execute("SELECT nom FROM membres_categories WHERE id=?", (self[self.FIELD_ID_CATEGORY],)) rows = cursor.fetchone() self[self.FIELD_CATEGORY] = rows[0] class ApayerCSVEntry(CSVEntry): FIELD_NAME = u"Prenom" FIELD_SURNAME = u"Nom" FIELD_EMAIL = u"Courriel" FIELD_AMOUNT = u"Montant" FIELD_DATE = u"DatePaiement" DATE_FORMAT = u"%d/%m/%y" # XXX: there's a BOM at the beginning of the CSV file FIELDS = ["Mnemonique", "TPE", "DatePaiement", "ObjetPaiement", "AutreObjet", "Montant", "Devise", "Reference", "Commentaire", "ReferencBancaire", "NumeroAutorisation", "Nom", "Prenom", "Adresse", "CodePostal", "Ville", "Courriel", "Etat", "MotifRefus", "Cvx", "Vld", "Brand", "Status3DS"] FILTERED_OUT = ("", u"Mnemonique", "TPE", "Devise", "ReferencBancaire", "NumeroAutorisation", "Cvx", "Vld", "Brand", "Status3DS") def address(self): return u"\n".join([self["Adresse"], self["CodePostal"], self["Ville"]]) class DjangoCSVEntry(CSVEntry): FIELD_NAME = u"prenom" FIELD_SURNAME = u"nom" FIELD_EMAIL = u"courriel" FIELD_AMOUNT = u"montant" FIELD_DATE = u"date" FIELDS = ["date", "prenom", "nom", "adresse", "courriel", "jid", "montant", "moyen_paiement", "reference", "commentaire", "lettre_info", "langue"] ### ENTRIES FOR THE FINANCES ### class GarradinDBOperation(GarradinDBEntry): TABLE = u"compta_journal" class GarradinCSVOperation(CSVEntry): FIELDS = (u"Numéro mouvement", u"Date", u"Type de mouvement", u"Catégorie", u"Libellé", u"Montant", u"Compte de débit - numéro", u"Compte de débit - libellé", u"Compte de crédit - numéro", u"Compte de crédit - libellé", u"Moyen de paiement", u"Numéro de chèque", u"Numéro de pièce", u"Remarques") class CmCSVOperation(CSVEntry): FIELD_DATE = u"Date d'opération" DATE_FORMAT = u"%d/%m/%Y" FIELDS = (FIELD_DATE, u"Date de valeur", u"Débit", u"Crédit", u"Libellé", u"Solde") ### CLASSES FOR THE IMPORTS ### class EntryMatcher(object): def __init__(self, cursor): self.cursor = cursor def processEntryForCSVExport(self, entry, context): """Return the processed entry ready to be exported to CSV, or None if the entry is conflicting with another one already existing. @param entry (Entry): an Entry instance @param context (unicode): context indication for the user @return: GarradinCSVMember or None """ # this method is not optimised but it's sensible... don't mess with it. tmp_entry = GarradinDBMember(self.cursor, entry) base_entry = self.getMatchingEntry(tmp_entry, context) if base_entry is None: return None base_entry.update(tmp_entry) new_entry = GarradinCSVMember(self.cursor, base_entry) new_entry.matched = base_entry.matched return new_entry def getMatchingEntry(self, new_entry, context): """Try to get a matching entry, the behavior depends of the found match: - no match at all: returns a new empty entry - incomplete match: returns None and write a warning in the logs - unique or equal match: returns the matching database entry @param new_entry (GarradinDBMember): entry to be matched @param context (unicode): context indication for the user @return: GarradinDBMember or None: """ self.cursor.execute("SELECT * FROM %s" % new_entry.TABLE) rows = self.cursor.fetchall() for row in rows: row = list(row) current_entry = GarradinDBMember(self, row) match = current_entry.identityMatch(new_entry) if match == Entry.MATCH_NONE: continue # try to match next row current_entry.matched = match if match == Entry.MATCH_EQUAL: return current_entry # we still need to update the membership if match == Entry.MATCH_INCOMPLETE: log = logger.warning desc = (u"INCOMPLETE", u"A MANUAL check and process is required!") elif match == Entry.MATCH_UNIQUE: log = logger.info desc = (u"unique", u"The database entry will be updated.") log(u"A %s match has been found between these two entries from \"%s\" and garradin's \"%s\" table:" % (desc[0], context, current_entry.TABLE)) log(u" - %s" % new_entry.valuesToString()) log(u" - %s" % current_entry.valuesToString()) log(" --> %s" % desc[1]) return current_entry if match == Entry.MATCH_UNIQUE else None return GarradinDBMember(self, {}) class MembershipManager(object): def __init__(self, cursor): self.cursor = cursor self.memberships = {} def setCache(self, email, amount, date): """Write a membership in cache for later database update. @param email (unicode): email adress of the member @param amount (int): amount of the membership fee @param date (datetime): date of the subscription """ self.memberships[email] = (amount, date) def getMembershipAmount(self, membership_id): """Return a membership amount. @param membership_id (int): membership ID in the "cotisations" table @return: int """ self.cursor.execute("SELECT %s FROM %s WHERE id=?" % (GarradinDBMembershipType.FIELD_AMOUNT, GarradinDBMembershipType.TABLE), (membership_id,)) return int(self.cursor.fetchone()[0]) def getMemberIdentity(self, member_id): """Return a human-readable identity. @param member_id (int): member ID in the "membres" table @return: unicode """ self.cursor.execute("SELECT * FROM %s WHERE id=?" % GarradinDBMember.TABLE, (member_id,)) return GarradinDBMember(self, self.cursor.fetchone()).identityToString() def getMembershipType(self, amount): """Return the suitable membership type ID and amount for the given amount. @param amount (int): payment amount @return: (int, int) """ request = "SELECT id, MAX({field}) FROM {table} WHERE {field} <= ?".format(field=GarradinDBMembershipType.FIELD_AMOUNT, table=GarradinDBMembershipType.TABLE) self.cursor.execute(request, (amount,)) return self.cursor.fetchone() def updateMembershipFromCache(self, member_id, email): """Look in the cache to update the membership fee of a member. @param member_id (int): member ID in the "membres" table @param email(unicode): member email """ membership = self.memberships.pop(email, None) if membership: amount, date = membership self.updateMembership(member_id, amount, date) def updateMembership(self, member_id, amount, date_payment): """Update the membership fee of a member. @param member_id (int): member ID in the "membres" table @param amount (float): amount of the membership fee @param date_payment (unicode): date of the payment """ member = self.getMemberIdentity(member_id) self.cursor.execute("SELECT * FROM %s WHERE id_membre=?" % GarradinDBMembership.TABLE, (member_id,)) rows = self.cursor.fetchall() entries = [GarradinDBMembership(self.cursor, row) for row in rows] entries.sort(key=lambda entry: entry[entry.FIELD_DATE]) # first we check for inconsistency last_date, last_amount = datetime.strptime(DATE_ORIGIN, DATE_FORMAT), 0 for entry in entries: current_date = datetime.strptime(entry[entry.FIELD_DATE], DATE_FORMAT) current_amount = self.getMembershipAmount(entry[entry.FIELD_MEMBERSHIP]) if (current_date - last_date).days < 365: logger.warning(u"At least two memberships within less than one year have been registered for %s:" % member) logger.warning(u" - € %d on %s" % (last_amount, last_date)) logger.warning(u" - € %d on %s" % (current_amount, current_date)) logger.warning(u" --> Please fix it MANUALLY!") last_date, last_amount = current_date, current_amount if (date_payment - last_date).days < 365: amount += last_amount # cumulate two payments else: data = {GarradinDBMembership.FIELD_MEMBER: member_id, GarradinDBMembership.FIELD_DATE: date_payment} entry = GarradinDBMembership(self.cursor, data) m_type_id, m_type_amount = self.getMembershipType(amount) delta = amount - m_type_amount if delta: logger.warning(u"We received € %.2f from %s: a membership fee of € %d will be registered." % (amount, member, m_type_amount)) logger.warning(u" --> Please MANUALLY register a donation of € %.2f." % delta) entry[entry.FIELD_MEMBERSHIP] = m_type_id entry.insertOrReplace() class MembersImporter(object): def __init__(self, cursor): self.cursor = cursor self.fails = [] self.membership_manager = MembershipManager(cursor) def fixRowFromLeadingEqualSign(self, row): """Fix leading equal sign in CSV field since it is not handled by Python. @param data (dict{unicode: unicode}): row data """ for field, value in row.iteritems(): if value.startswith('="') and value.endswith('"'): row[field] = value[2:-1] def importFromCSV(self, input_csv, output_csv=None): """Import members from a CSV to garradin. @param input_csv (unicode): path to the input CSV file @param output_csv (unicode): path to store intermediate Garradin CSV """ tmp_csv = output_csv if output_csv else tempfile.mkstemp()[1] matcher = EntryMatcher(self.cursor) with open(input_csv, 'r') as csv_file: reader = unicodecsv.DictReader(csv_file, dialect="my_dialect") logger.info("Processing %s..." % input_csv) with open(tmp_csv, 'w') as tmp_file: writer = unicodecsv.DictWriter(tmp_file, GarradinCSVMember.FIELDS, dialect="my_dialect") fails = [] for row in reader: input_entry = self.CSV_ENTRY_CLASS(row) date = input_entry.date() if (LAST_AUTO_UPDATE - date).days > self.AGE_LIMIT: continue # skip older entries self.fixRowFromLeadingEqualSign(input_entry) if not self.checkEntry(input_entry): self.fails.append(input_entry) continue entry = matcher.processEntryForCSVExport(input_entry, self.CONTEXT) if not entry: continue if entry.matched != Entry.MATCH_EQUAL: writer.writerow(entry) self.extraProcessEntry(input_entry, entry) self.printFails() self.importFromGarradinCSV(tmp_csv) if not output_csv: os.remove(tmp_csv) def checkEntry(self, input_entry): return True def printFails(self): pass def extraProcessEntry(self, input_entry, entry): pass def importFromGarradinCSV(self, csv_file): """Import a garradin CSV to the database. @param csv_file (unicode): file to import """ with open(csv_file, 'r') as file: reader = unicodecsv.reader(file, dialect="my_dialect") for row in reader: row.pop(1) # remove the category name db_entry = GarradinDBMember(self.cursor, row) db_entry.setAutoUpdate() db_entry.insertOrReplace() self.membership_manager.updateMembershipFromCache(self.cursor.lastrowid, db_entry[db_entry.FIELD_EMAIL]) action = "updated" if db_entry.get('id', None) else "added" logger.warning("The member %s has been %s." % (db_entry.identityToString(), action)) class ApayerImporter(MembersImporter): CSV_ENTRY_CLASS = ApayerCSVEntry # since we receive the CSV every Sunday, operations can be one week old # without having been processed yet, even if you do a daily auto update AGE_LIMIT = 14 CONTEXT = u"apayer" STATE_ACCEPTED = u"Paiement accepté (payé)" def checkEntry(self, input_entry): return input_entry["Etat"] == self.STATE_ACCEPTED def printFails(self): if self.fails: logger.warning(u"The following \"apayer\" operations have NOT been completed, you MAY want to contact the users:") for row in self.fails: fields = ["%s: %s" % (key, value) for key, value in row.iteritems() if key not in ApayerCSVEntry.FILTERED_OUT and value] logger.warning(u" - %s" % ", ".join(fields)) def extraProcessEntry(self, input_entry, entry): # update membership fee date = input_entry.date() amount = float(input_entry[input_entry.FIELD_AMOUNT]) if entry.get("id", None): self.membership_manager.updateMembership(entry["id"], amount, date) else: # we don't know the member ID yet self.membership_manager.setCache(entry[entry.FIELD_EMAIL], amount, date) class DjangoImporter(MembersImporter): CSV_ENTRY_CLASS = DjangoCSVEntry # if you do a daily auto update, this file is processed everyday AGE_LIMIT = 2 CONTEXT = u"django form" def extraProcessEntry(self, input_entry, entry): date = input_entry.date() if not entry.get("id", None): # new user self.membership_manager.setCache(entry[entry.FIELD_EMAIL], 0, date) class CmImporter(object): AGE_LIMIT = 60 def importFromCSV(self, csv_file): """Import financial operation from credit mutuel CSV to garradin. @param csv_file (unicode): path to django form CSV file """ raise NotImplementedError # TODO: update database with entry ### MAIN CLASS ### class GarradinUpdater(object): def __init__(self, cursor): self.cursor = cursor def importFromApayer(self): """Import CSV files from apayer""" for apayer_csv in glob(os.path.join(INPUT_PATH, *APAYER_FILES)): if dirtyGetDateOfFile(apayer_csv) >= LAST_AUTO_UPDATE: ApayerImporter(self.cursor).importFromCSV(apayer_csv) def importFromDjango(self): """Import all Django CSV files""" # FIXME: import JID, subscription to mailing list and comment for django_csv in glob(os.path.join(INPUT_PATH, *DJANGO_FILES)): DjangoImporter(self.cursor).importFromCSV(django_csv) def importFromCm(self): """Import all credit mutuel CSV files""" for cm_csv in glob(os.path.join(INPUT_PATH, *CM_FILES)): if dirtyGetDateOfFile(cm_csv) >= LAST_AUTO_UPDATE: CmImporter(self.cursor).importFromCSV(cm_csv) con = sqlite3.connect(DB_FILE) with con: cursor = con.cursor() updater = GarradinUpdater(cursor) updater.importFromApayer() updater.importFromDjango() #updater.importFromCm() GarradinDB(cursor).setLastAutoUpdate("2015-06-01")