import os import argparse import concurrent.futures import mail as ml from datetime import date, datetime import dotenv import ovh import fetcher as ft from urllib.request import urlretrieve import logging from logging.handlers import TimedRotatingFileHandler import traceback import sqlite3 import time as tm def init(): global logger # --- Configuration du logging --- logging.addLevelName(logging.DEBUG, "DÉBOGAGE") logging.addLevelName(logging.INFO, "INFO") logging.addLevelName(logging.WARNING, "AVERTISSEMENT") logging.addLevelName(logging.ERROR, "ERREUR") os.makedirs(PATH_LOG, exist_ok=True) logger = logging.getLogger(os.path.join(PATH_LOG, "ovh")) logger.setLevel(logging.INFO) formatter = logging.Formatter( fmt="%(asctime)s | %(levelname)s | %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) # Console ch = logging.StreamHandler() ch.setFormatter(formatter) logger.addHandler(ch) # Fichier fh = TimedRotatingFileHandler( os.path.join(PATH_LOG, "ovh.log"), when="M", interval=1, backupCount=12, encoding="utf-8", ) fh.setFormatter(formatter) logger.addHandler(fh) def get_conn(): """ Ouvre une connexion SQLite vers DB_PATH, crée la table 'bills' si nécessaire, puis retourne la connexion. """ try: logger.debug("Ouverture de la connexion SQLite vers %s", DB_PATH) conn = sqlite3.connect(DB_PATH) logger.debug("Connexion établie, vérification/creation de la table 'bills'") conn.execute(""" CREATE TABLE IF NOT EXISTS bills ( bill_id TEXT PRIMARY KEY, bill_year INT )""") conn.execute(""" CREATE TABLE IF NOT EXISTS dj_bill ( bill_id TEXT PRIMARY KEY, bill_year INT )""") conn.commit() logger.info("Base SQLite initialisée et table 'bills' disponible") return conn except Exception as e: logger.exception("Erreur lors de l'initialisation de la base SQLite: %s", e) raise def send_error_mail(error_msg): try: ml.send_email( subject="[OVH_FACTURES] ERREUR", content=f"
{error_msg}
", email_from=EMAIL, email_password=EMAIL_PASSWORD, smpt_port=SMTP_PORT, smtp_mail_address=SMTP_MAIL_ADDRESS, email_to=EMAIL_TO, ) except Exception: pass def add_entries_to_db(entries: list[tuple[str, int]], conn, table: str): """ Insère en lot des paires (bill_id, bill_year) dans la table spécifiée avec gestion de conflit sur bill_id. """ try: logger.debug("Insertion batch dans '%s': %d entrées", table, len(entries)) query = f""" INSERT INTO {table} (bill_id, bill_year) VALUES (?, ?) ON CONFLICT(bill_id) DO NOTHING """ conn.executemany(query, entries) conn.commit() logger.info("Insertion batch dans '%s' validée", table) except Exception as e: logger.exception("Échec d'insertion batch dans '%s': %s", table, e) send_error_mail(traceback.format_exc()) raise def get_entries_from_db(conn, table: str) -> set[str]: """ Récupère l'ensemble des bill_id présents dans la table demandée et les retourne sous forme de set[str]. """ if table not in _ALLOWED_TABLES: raise ValueError(f"Table inconnue: {table}") try: logger.debug("Sélection des bill_id depuis '%s'", table) cursor = conn.execute(f"SELECT bill_id FROM {table}") rows = cursor.fetchall() logger.info("Sélection terminée: %d bill_id récupérés", len(rows)) return {row[0] for row in rows} except Exception as e: logger.exception("Échec de lecture des bill_id depuis '%s': %s", table, e) send_error_mail(traceback.format_exc()) raise def compare_db_to_data(db_data: set[str], data: list[str]) -> list[str]: return [x for x in data if x not in db_data] def indexer(ids: list[str]) -> list[str]: """ Parcourt le répertoire de l'année courante, filtre les factures déjà présentes localement, conserve les factures absentes datées de l'année courante, et enregistre en base celles qui appartiennent à une autre année. Gère explicitement les cas 31/12 (YEAR-1) et 01/01 (YEAR). """ conn = get_conn() logger.info("Indexation des factures pour l'année %s", YEAR) target_dir = os.path.join(PATH_OVH, str(YEAR)) try: ids_already_in = {fn for fn in os.listdir(target_dir) if fn.endswith(".pdf")} except FileNotFoundError: logger.warning("Dossier %s inexistant, aucune facture locale", target_dir) ids_already_in = set() expected_missing = [x for x in ids if f"{x}.pdf" not in ids_already_in] missing = compare_db_to_data(get_entries_from_db(conn, "bills"), expected_missing) logger.info("%d factures absentes détectées", len(missing)) result: list[str] = [] not_valid_year: list[tuple[str, int]] = [] now = datetime.now() boundary_run = (now.month, now.day) in {(12, 31), (1, 1)} bills_downloaded_dj = set() if boundary_run: try: bills_downloaded_dj = set(get_entries_from_db(conn, "dj_bill")) except Exception: bills_downloaded_dj = set() dj_bills: list[tuple[str, date]] = [] for bill_id in missing: try: meta = ft.fetch_invoice_content( bill_id, app_key=APP_KEY, app_secret=APP_SECRET, consumer_key=CONSUMER_KEY, ) except Exception as e: logger.error("Impossible de récupérer le json pour %s : %s", bill_id, e) send_error_mail(traceback.format_exc()) continue try: bill_dt = datetime.fromisoformat(meta["date"]).date() except Exception: logger.error("Date invalide pour %s: %r", bill_id, meta.get("date")) continue if bill_dt.year == YEAR: result.append(bill_id) else: not_valid_year.append((bill_id, bill_dt.year)) if boundary_run: is_dec31_prev = bill_dt == date(YEAR - 1, 12, 31) is_jan1_curr = bill_dt == date(YEAR, 1, 1) if (is_dec31_prev or is_jan1_curr) and bill_id not in bills_downloaded_dj: dj_bills.append((bill_id, bill_dt)) if not_valid_year: add_entries_to_db(not_valid_year, conn, "bills") logger.info( "Ajout de %d entrées hors année %s dans 'bills'", len(not_valid_year), YEAR ) if dj_bills: try: add_entries_to_db(dj_bills, conn, "dj_bill") logger.info( "Ajout de %d factures de bascule (31/12, 01/01) dans 'dj_bill'", len(dj_bills), ) except Exception as e: logger.error("Échec insertion 'dj_bill': %s", e) logger.info("%d factures retenues pour téléchargement", len(result)) return result def get_ids() -> list[str]: """ Interroge l’API OVH et renvoie la liste des IDs de toutes les factures. """ logger.info("Récupération de la liste des factures via API OVH") try: return ft.fetch_api( app_key=APP_KEY, app_secret=APP_SECRET, consumer_key=CONSUMER_KEY, ) except ovh.exceptions.APIError as e: logger.error("Échec récupération des IDs de factures : %s", e) send_error_mail(traceback.format_exc()) raise RuntimeError(f"Échec de la récupération des IDs de factures : {e}") from e def get_bill(bill_id: str) -> dict: """ Récupère, via l’API OVH, les informations détaillées d’une facture (JSON). """ logger.debug("Récupération de la facture %s", bill_id) try: return ft.fetch_invoice_content( bill_id, app_key=APP_KEY, app_secret=APP_SECRET, consumer_key=CONSUMER_KEY, ) except ovh.exceptions.APIError as e: logger.error("Échec récupération de la facture %s : %s", bill_id, e) send_error_mail(traceback.format_exc()) raise RuntimeError( f"Échec de la récupération de la facture {bill_id} : {e}" ) from e def save_pdf(bill: dict) -> None: """ Télécharge le PDF d’une facture dans un sous-dossier par année. Noms de fichiers : .pdf """ year_dir = os.path.join(PATH_OVH, str(datetime.now().year)) os.makedirs(year_dir, exist_ok=True) dest = os.path.join(year_dir, f"{bill['billId']}.pdf") url = bill["pdfUrl"] try: urlretrieve(url, dest) logger.info("Facture %s sauvegardée dans %s", bill["billId"], dest) except Exception as e: logger.error("Impossible de télécharger la facture %s : %s", bill["billId"], e) send_error_mail(traceback.format_exc()) raise if __name__ == "__main__": # Chargement des variables d'environnement (.env) parser = argparse.ArgumentParser() parser.add_argument("-e", "--env", required=True, help="Path of .env file") args = parser.parse_args() dotenv.load_dotenv(args.env) APP_KEY = os.environ["APP_KEY"] APP_SECRET = os.environ["APP_SECRET"] CONSUMER_KEY = os.environ["CONSUMER_KEY"] PATH_OVH = os.environ["OVH_PATH"] PATH_LOG = os.environ["LOG_PATH"] DB_PATH = os.environ["DB_PATH"] EMAIL = os.environ["EMAIL"] EMAIL_PASSWORD = os.environ["EMAIL_PASSWORD"] SMTP_MAIL_ADDRESS = os.environ["SMTP_MAIL_ADDRESS"] SMTP_PORT = os.environ["SMTP_PORT"] EMAIL_TO = os.environ["EMAIL_TO"].strip().split(",") YEAR = datetime.now().year # Année courante (int) _ALLOWED_TABLES = {"bills", "dj_bill"} init() start = tm.time() logger.info("Démarrage du traitement des factures OVH pour %s", YEAR) os.makedirs(os.path.join(PATH_OVH, str(YEAR)), exist_ok=True) ids_candidats = indexer(get_ids()) bills_json = [] bills_str = [] for bill_id in ids_candidats: bills_json.append((bill_id, get_bill(bill_id))) # pdf enregistrement. if bills_json: with concurrent.futures.ThreadPoolExecutor() as ex: futures = [] for b in bills_json: futures.append(ex.submit(save_pdf, b[1])) # tm.sleep(0.1) for f in futures: f.result(timeout=10) for bill_id, bill_payload in bills_json: d = datetime.fromisoformat(bill_payload["date"]).date() bills_str.append((bill_id, f"{d}")) content = ml.construct_html(bills_str) ml.send_email( "Reçu de facture(s)", content, email_from=EMAIL, email_password=EMAIL_PASSWORD, smpt_port=SMTP_PORT, smtp_mail_address=SMTP_MAIL_ADDRESS, email_to=EMAIL_TO, ) logger.info("Traitement terminé : %d factures téléchargées", len(ids_candidats)) end = tm.time() logger.info(f"Runned for {round(end - start, 2)}secs")