Files
get_ovh_bills/main.py

339 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import argparse
import concurrent.futures
import mail as ml
from datetime import date, datetime
import dotenv
import ovh
import fetcher as ft
from urllib.request import urlretrieve
import logging
from logging.handlers import TimedRotatingFileHandler
import traceback
import sqlite3
import time as tm
def init():
global logger
# --- Configuration du logging ---
logging.addLevelName(logging.DEBUG, APP_ENV + "|" + "DÉBOGAGE")
logging.addLevelName(logging.INFO, APP_ENV + "|" + "INFO")
logging.addLevelName(logging.WARNING, APP_ENV + "|" + "AVERTISSEMENT")
logging.addLevelName(logging.ERROR, APP_ENV + "|" + "ERREUR")
os.makedirs(PATH_LOG, exist_ok=True)
logger = logging.getLogger(os.path.join(PATH_LOG, "ovh"))
logger.setLevel(logging.INFO)
formatter = logging.Formatter(
fmt="%(asctime)s | %(levelname)s | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
# Console
ch = logging.StreamHandler()
ch.setFormatter(formatter)
logger.addHandler(ch)
# Fichier
fh = TimedRotatingFileHandler(
os.path.join(PATH_LOG, "ovh.log"),
when="M",
interval=1,
backupCount=12,
encoding="utf-8",
)
fh.setFormatter(formatter)
logger.addHandler(fh)
def get_conn():
"""
Ouvre une connexion SQLite vers DB_PATH, crée la table 'bills' si nécessaire, puis retourne la connexion.
"""
try:
logger.debug("Ouverture de la connexion SQLite vers %s", DB_PATH)
conn = sqlite3.connect(DB_PATH)
logger.debug("Connexion établie, vérification/creation de la table 'bills'")
conn.execute(
"""
CREATE TABLE IF NOT EXISTS bills (
bill_id TEXT PRIMARY KEY,
bill_year INT
)"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS dj_bill (
bill_id TEXT PRIMARY KEY,
bill_year INT
)"""
)
conn.commit()
logger.info("Base SQLite initialisée et table 'bills' disponible")
return conn
except Exception as e:
logger.exception("Erreur lors de l'initialisation de la base SQLite: %s", e)
raise
def send_error_mail(error_msg):
try:
ml.send_email(
subject="[OVH_FACTURES] ERREUR",
content=f"<pre>{error_msg}</pre>",
email_from=EMAIL,
email_password=EMAIL_PASSWORD,
smpt_port=SMTP_PORT,
smtp_mail_address=SMTP_MAIL_ADDRESS,
email_to=EMAIL_TO,
)
except Exception:
pass
def add_entries_to_db(entries: list[tuple[str, int]], conn, table: str):
"""
Insère en lot des paires (bill_id, bill_year) dans la table spécifiée avec gestion de conflit sur bill_id.
"""
try:
logger.debug("Insertion batch dans '%s': %d entrées", table, len(entries))
query = f"""
INSERT INTO {table} (bill_id, bill_year)
VALUES (?, ?)
ON CONFLICT(bill_id) DO NOTHING
"""
conn.executemany(query, entries)
conn.commit()
logger.info("Insertion batch dans '%s' validée", table)
except Exception as e:
logger.exception("Échec d'insertion batch dans '%s': %s", table, e)
send_error_mail(traceback.format_exc())
raise
def get_entries_from_db(conn, table: str) -> set[str]:
"""
Récupère l'ensemble des bill_id présents dans la table demandée et les retourne sous forme de set[str].
"""
if table not in _ALLOWED_TABLES:
raise ValueError(f"Table inconnue: {table}")
try:
logger.debug("Sélection des bill_id depuis '%s'", table)
cursor = conn.execute(f"SELECT bill_id FROM {table}")
rows = cursor.fetchall()
logger.info("Sélection terminée: %d bill_id récupérés", len(rows))
return {row[0] for row in rows}
except Exception as e:
logger.exception("Échec de lecture des bill_id depuis '%s': %s", table, e)
send_error_mail(traceback.format_exc())
raise
def compare_db_to_data(db_data: set[str], data: list[str]) -> list[str]:
return [x for x in data if x not in db_data]
def indexer(ids: list[str]) -> list[str]:
"""
Parcourt le répertoire de l'année courante, filtre les factures déjà présentes localement,
conserve les factures absentes datées de l'année courante, et enregistre en base celles
qui appartiennent à une autre année. Gère explicitement les cas 31/12 (YEAR-1) et 01/01 (YEAR).
"""
conn = get_conn()
logger.info("Indexation des factures pour l'année %s", YEAR)
target_dir = os.path.join(PATH_OVH, str(YEAR) + "/ovh")
try:
ids_already_in = {fn for fn in os.listdir(target_dir) if fn.endswith(".pdf")}
except FileNotFoundError:
logger.warning("Dossier %s inexistant, aucune facture locale", target_dir)
ids_already_in = set()
expected_missing = [x for x in ids if f"{x}.pdf" not in ids_already_in]
missing = compare_db_to_data(get_entries_from_db(conn, "bills"), expected_missing)
logger.info("%d factures absentes détectées", len(missing))
result: list[str] = []
not_valid_year: list[tuple[str, int]] = []
now = datetime.now()
boundary_run = (now.month, now.day) in {(12, 31), (1, 1)}
bills_downloaded_dj = set()
if boundary_run:
try:
bills_downloaded_dj = set(get_entries_from_db(conn, "dj_bill"))
except Exception:
bills_downloaded_dj = set()
dj_bills: list[tuple[str, date]] = []
for bill_id in missing:
try:
meta = ft.fetch_invoice_content(
bill_id,
app_key=APP_KEY,
app_secret=APP_SECRET,
consumer_key=CONSUMER_KEY,
)
except Exception as e:
logger.error("Impossible de récupérer le json pour %s : %s", bill_id, e)
send_error_mail(traceback.format_exc())
continue
try:
bill_dt = datetime.fromisoformat(meta["date"]).date()
except Exception:
logger.error("Date invalide pour %s: %r", bill_id, meta.get("date"))
continue
if bill_dt.year == YEAR:
result.append(bill_id)
else:
not_valid_year.append((bill_id, bill_dt.year))
if boundary_run:
is_dec31_prev = bill_dt == date(YEAR - 1, 12, 31)
is_jan1_curr = bill_dt == date(YEAR, 1, 1)
if (is_dec31_prev or is_jan1_curr) and bill_id not in bills_downloaded_dj:
dj_bills.append((bill_id, bill_dt))
if not_valid_year:
add_entries_to_db(not_valid_year, conn, "bills")
logger.info(
"Ajout de %d entrées hors année %s dans 'bills'", len(not_valid_year), YEAR
)
if dj_bills:
try:
add_entries_to_db(dj_bills, conn, "dj_bill")
logger.info(
"Ajout de %d factures de bascule (31/12, 01/01) dans 'dj_bill'",
len(dj_bills),
)
except Exception as e:
logger.error("Échec insertion 'dj_bill': %s", e)
logger.info("%d factures retenues pour téléchargement", len(result))
return result
def get_ids() -> list[str]:
"""
Interroge lAPI OVH et renvoie la liste des IDs de toutes les factures.
"""
logger.info("Récupération de la liste des factures via API OVH")
try:
return ft.fetch_api(
app_key=APP_KEY,
app_secret=APP_SECRET,
consumer_key=CONSUMER_KEY,
)
except ovh.exceptions.APIError as e:
logger.error("Échec récupération des IDs de factures : %s", e)
send_error_mail(traceback.format_exc())
raise RuntimeError(f"Échec de la récupération des IDs de factures : {e}") from e
def get_bill(bill_id: str) -> dict:
"""
Récupère, via lAPI OVH, les informations détaillées dune facture (JSON).
"""
logger.debug("Récupération de la facture %s", bill_id)
try:
return ft.fetch_invoice_content(
bill_id,
app_key=APP_KEY,
app_secret=APP_SECRET,
consumer_key=CONSUMER_KEY,
)
except ovh.exceptions.APIError as e:
logger.error("Échec récupération de la facture %s : %s", bill_id, e)
send_error_mail(traceback.format_exc())
raise RuntimeError(
f"Échec de la récupération de la facture {bill_id} : {e}"
) from e
def save_pdf(bill: dict) -> None:
"""
Télécharge le PDF dune facture dans un sous-dossier par année.
Noms de fichiers : <billId>.pdf
"""
year_dir = os.path.join(PATH_OVH, str(datetime.now().year) + "/ovh")
os.makedirs(year_dir, exist_ok=True)
dest = os.path.join(year_dir, f"{bill['billId']}.pdf")
url = bill["pdfUrl"]
try:
urlretrieve(url, dest)
logger.info("Facture %s sauvegardée dans %s", bill["billId"], dest)
except Exception as e:
logger.error("Impossible de télécharger la facture %s : %s", bill["billId"], e)
send_error_mail(traceback.format_exc())
raise
if __name__ == "__main__":
# Chargement des variables d'environnement (.env)
parser = argparse.ArgumentParser()
parser.add_argument("-e", "--env", required=True, help="Path of .env file")
args = parser.parse_args()
dotenv.load_dotenv(args.env)
APP_ENV = args.env
APP_KEY = os.environ["APP_KEY"]
APP_SECRET = os.environ["APP_SECRET"]
CONSUMER_KEY = os.environ["CONSUMER_KEY"]
PATH_OVH = os.environ["OVH_PATH"]
PATH_LOG = os.environ["LOG_PATH"]
DB_PATH = os.environ["DB_PATH"]
EMAIL = os.environ["EMAIL"]
EMAIL_PASSWORD = os.environ["EMAIL_PASSWORD"]
SMTP_MAIL_ADDRESS = os.environ["SMTP_MAIL_ADDRESS"]
SMTP_PORT = os.environ["SMTP_PORT"]
EMAIL_TO = os.environ["EMAIL_TO"].strip().split(",")
YEAR = datetime.now().year # Année courante (int)
_ALLOWED_TABLES = {"bills", "dj_bill"}
init()
start = tm.time()
logger.info("Démarrage du traitement des factures OVH pour %s", YEAR)
os.makedirs(os.path.join(PATH_OVH, str(YEAR)), exist_ok=True)
ids_candidats = indexer(get_ids())
bills_json = []
bills_str = []
for bill_id in ids_candidats:
bills_json.append((bill_id, get_bill(bill_id)))
# pdf enregistrement.
if bills_json:
with concurrent.futures.ThreadPoolExecutor() as ex:
futures = []
for b in bills_json:
futures.append(ex.submit(save_pdf, b[1]))
# tm.sleep(0.1)
for f in futures:
f.result(timeout=10)
for bill_id, bill_payload in bills_json:
d = datetime.fromisoformat(bill_payload["date"]).date()
bills_str.append((bill_id, f"{d}"))
content = ml.construct_html(bills_str)
ml.send_email(
"Reçu de facture(s)",
content,
email_from=EMAIL,
email_password=EMAIL_PASSWORD,
smpt_port=SMTP_PORT,
smtp_mail_address=SMTP_MAIL_ADDRESS,
email_to=EMAIL_TO,
)
logger.info("Traitement terminé : %d factures téléchargées", len(ids_candidats))
end = tm.time()
logger.info(f"Runned for {round(end - start, 2)}secs")