mirror of
https://github.com/Fare-spec/get_ovh_bills.git
synced 2025-12-07 10:20:36 +00:00
339 lines
11 KiB
Python
339 lines
11 KiB
Python
import os
|
||
import argparse
|
||
import concurrent.futures
|
||
import mail as ml
|
||
from datetime import date, datetime
|
||
import dotenv
|
||
import ovh
|
||
import fetcher as ft
|
||
from urllib.request import urlretrieve
|
||
import logging
|
||
from logging.handlers import TimedRotatingFileHandler
|
||
import traceback
|
||
import sqlite3
|
||
import time as tm
|
||
|
||
|
||
def init():
|
||
global logger
|
||
# --- Configuration du logging ---
|
||
logging.addLevelName(logging.DEBUG, APP_ENV + " | " + "DÉBOGAGE")
|
||
logging.addLevelName(logging.INFO, APP_ENV + " | " + "INFO")
|
||
logging.addLevelName(logging.WARNING, APP_ENV + " | " + "AVERTISSEMENT")
|
||
logging.addLevelName(logging.ERROR, APP_ENV + " | " + "ERREUR")
|
||
|
||
os.makedirs(PATH_LOG, exist_ok=True)
|
||
logger = logging.getLogger(os.path.join(PATH_LOG, "ovh"))
|
||
logger.setLevel(logging.INFO)
|
||
formatter = logging.Formatter(
|
||
fmt="%(asctime)s | %(levelname)s | %(message)s",
|
||
datefmt="%Y-%m-%d %H:%M:%S",
|
||
)
|
||
# Console
|
||
ch = logging.StreamHandler()
|
||
ch.setFormatter(formatter)
|
||
logger.addHandler(ch)
|
||
# Fichier
|
||
|
||
fh = TimedRotatingFileHandler(
|
||
os.path.join(PATH_LOG, "ovh.log"),
|
||
when="M",
|
||
interval=1,
|
||
backupCount=12,
|
||
encoding="utf-8",
|
||
)
|
||
fh.setFormatter(formatter)
|
||
logger.addHandler(fh)
|
||
|
||
|
||
def get_conn():
|
||
"""
|
||
Ouvre une connexion SQLite vers DB_PATH, crée la table 'bills' si nécessaire, puis retourne la connexion.
|
||
"""
|
||
try:
|
||
logger.debug("Ouverture de la connexion SQLite vers %s", DB_PATH)
|
||
conn = sqlite3.connect(DB_PATH)
|
||
logger.debug("Connexion établie, vérification/creation de la table 'bills'")
|
||
conn.execute(
|
||
"""
|
||
CREATE TABLE IF NOT EXISTS bills (
|
||
bill_id TEXT PRIMARY KEY,
|
||
bill_year INT
|
||
)"""
|
||
)
|
||
|
||
conn.execute(
|
||
"""
|
||
CREATE TABLE IF NOT EXISTS dj_bill (
|
||
bill_id TEXT PRIMARY KEY,
|
||
bill_year INT
|
||
)"""
|
||
)
|
||
conn.commit()
|
||
logger.info("Base SQLite initialisée et table 'bills' disponible")
|
||
return conn
|
||
except Exception as e:
|
||
logger.exception("Erreur lors de l'initialisation de la base SQLite: %s", e)
|
||
raise
|
||
|
||
|
||
def send_error_mail(error_msg):
|
||
try:
|
||
ml.send_email(
|
||
subject="[OVH_FACTURES] ERREUR",
|
||
content=f"<pre>{error_msg}</pre>",
|
||
email_from=EMAIL,
|
||
email_password=EMAIL_PASSWORD,
|
||
smpt_port=SMTP_PORT,
|
||
smtp_mail_address=SMTP_MAIL_ADDRESS,
|
||
email_to=EMAIL_TO,
|
||
)
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
def add_entries_to_db(entries: list[tuple[str, int]], conn, table: str):
|
||
"""
|
||
Insère en lot des paires (bill_id, bill_year) dans la table spécifiée avec gestion de conflit sur bill_id.
|
||
"""
|
||
try:
|
||
logger.debug("Insertion batch dans '%s': %d entrées", table, len(entries))
|
||
query = f"""
|
||
INSERT INTO {table} (bill_id, bill_year)
|
||
VALUES (?, ?)
|
||
ON CONFLICT(bill_id) DO NOTHING
|
||
"""
|
||
conn.executemany(query, entries)
|
||
conn.commit()
|
||
logger.info("Insertion batch dans '%s' validée", table)
|
||
except Exception as e:
|
||
logger.exception("Échec d'insertion batch dans '%s': %s", table, e)
|
||
send_error_mail(traceback.format_exc())
|
||
raise
|
||
|
||
|
||
def get_entries_from_db(conn, table: str) -> set[str]:
|
||
"""
|
||
Récupère l'ensemble des bill_id présents dans la table demandée et les retourne sous forme de set[str].
|
||
"""
|
||
if table not in _ALLOWED_TABLES:
|
||
raise ValueError(f"Table inconnue: {table}")
|
||
try:
|
||
logger.debug("Sélection des bill_id depuis '%s'", table)
|
||
cursor = conn.execute(f"SELECT bill_id FROM {table}")
|
||
rows = cursor.fetchall()
|
||
logger.info("Sélection terminée: %d bill_id récupérés", len(rows))
|
||
return {row[0] for row in rows}
|
||
except Exception as e:
|
||
logger.exception("Échec de lecture des bill_id depuis '%s': %s", table, e)
|
||
send_error_mail(traceback.format_exc())
|
||
raise
|
||
|
||
|
||
def compare_db_to_data(db_data: set[str], data: list[str]) -> list[str]:
|
||
return [x for x in data if x not in db_data]
|
||
|
||
|
||
def indexer(ids: list[str]) -> list[str]:
|
||
"""
|
||
Parcourt le répertoire de l'année courante, filtre les factures déjà présentes localement,
|
||
conserve les factures absentes datées de l'année courante, et enregistre en base celles
|
||
qui appartiennent à une autre année. Gère explicitement les cas 31/12 (YEAR-1) et 01/01 (YEAR).
|
||
"""
|
||
conn = get_conn()
|
||
logger.info("Indexation des factures pour l'année %s", YEAR)
|
||
|
||
target_dir = os.path.join(PATH_OVH, str(YEAR) + "/ovh")
|
||
try:
|
||
ids_already_in = {fn for fn in os.listdir(target_dir) if fn.endswith(".pdf")}
|
||
except FileNotFoundError:
|
||
logger.warning("Dossier %s inexistant, aucune facture locale", target_dir)
|
||
ids_already_in = set()
|
||
|
||
expected_missing = [x for x in ids if f"{x}.pdf" not in ids_already_in]
|
||
missing = compare_db_to_data(get_entries_from_db(conn, "bills"), expected_missing)
|
||
logger.info("%d factures absentes détectées", len(missing))
|
||
|
||
result: list[str] = []
|
||
not_valid_year: list[tuple[str, int]] = []
|
||
|
||
now = datetime.now()
|
||
boundary_run = (now.month, now.day) in {(12, 31), (1, 1)}
|
||
|
||
bills_downloaded_dj = set()
|
||
if boundary_run:
|
||
try:
|
||
bills_downloaded_dj = set(get_entries_from_db(conn, "dj_bill"))
|
||
except Exception:
|
||
bills_downloaded_dj = set()
|
||
|
||
dj_bills: list[tuple[str, date]] = []
|
||
|
||
for bill_id in missing:
|
||
try:
|
||
meta = ft.fetch_invoice_content(
|
||
bill_id,
|
||
app_key=APP_KEY,
|
||
app_secret=APP_SECRET,
|
||
consumer_key=CONSUMER_KEY,
|
||
)
|
||
except Exception as e:
|
||
logger.error("Impossible de récupérer le json pour %s : %s", bill_id, e)
|
||
send_error_mail(traceback.format_exc())
|
||
continue
|
||
|
||
try:
|
||
bill_dt = datetime.fromisoformat(meta["date"]).date()
|
||
except Exception:
|
||
logger.error("Date invalide pour %s: %r", bill_id, meta.get("date"))
|
||
continue
|
||
|
||
if bill_dt.year == YEAR:
|
||
result.append(bill_id)
|
||
else:
|
||
not_valid_year.append((bill_id, bill_dt.year))
|
||
|
||
if boundary_run:
|
||
is_dec31_prev = bill_dt == date(YEAR - 1, 12, 31)
|
||
is_jan1_curr = bill_dt == date(YEAR, 1, 1)
|
||
if (is_dec31_prev or is_jan1_curr) and bill_id not in bills_downloaded_dj:
|
||
dj_bills.append((bill_id, bill_dt))
|
||
|
||
if not_valid_year:
|
||
add_entries_to_db(not_valid_year, conn, "bills")
|
||
logger.info(
|
||
"Ajout de %d entrées hors année %s dans 'bills'", len(not_valid_year), YEAR
|
||
)
|
||
|
||
if dj_bills:
|
||
try:
|
||
add_entries_to_db(dj_bills, conn, "dj_bill")
|
||
logger.info(
|
||
"Ajout de %d factures de bascule (31/12, 01/01) dans 'dj_bill'",
|
||
len(dj_bills),
|
||
)
|
||
except Exception as e:
|
||
logger.error("Échec insertion 'dj_bill': %s", e)
|
||
|
||
logger.info("%d factures retenues pour téléchargement", len(result))
|
||
return result
|
||
|
||
|
||
def get_ids() -> list[str]:
|
||
"""
|
||
Interroge l’API OVH et renvoie la liste des IDs de toutes les factures.
|
||
"""
|
||
logger.info("Récupération de la liste des factures via API OVH")
|
||
try:
|
||
return ft.fetch_api(
|
||
app_key=APP_KEY,
|
||
app_secret=APP_SECRET,
|
||
consumer_key=CONSUMER_KEY,
|
||
)
|
||
except ovh.exceptions.APIError as e:
|
||
logger.error("Échec récupération des IDs de factures : %s", e)
|
||
|
||
send_error_mail(traceback.format_exc())
|
||
raise RuntimeError(f"Échec de la récupération des IDs de factures : {e}") from e
|
||
|
||
|
||
def get_bill(bill_id: str) -> dict:
|
||
"""
|
||
Récupère, via l’API OVH, les informations détaillées d’une facture (JSON).
|
||
"""
|
||
logger.debug("Récupération de la facture %s", bill_id)
|
||
try:
|
||
return ft.fetch_invoice_content(
|
||
bill_id,
|
||
app_key=APP_KEY,
|
||
app_secret=APP_SECRET,
|
||
consumer_key=CONSUMER_KEY,
|
||
)
|
||
except ovh.exceptions.APIError as e:
|
||
logger.error("Échec récupération de la facture %s : %s", bill_id, e)
|
||
|
||
send_error_mail(traceback.format_exc())
|
||
raise RuntimeError(
|
||
f"Échec de la récupération de la facture {bill_id} : {e}"
|
||
) from e
|
||
|
||
|
||
def save_pdf(bill: dict) -> None:
|
||
"""
|
||
Télécharge le PDF d’une facture dans un sous-dossier par année.
|
||
Noms de fichiers : <billId>.pdf
|
||
"""
|
||
year_dir = os.path.join(PATH_OVH, str(datetime.now().year) + "/ovh")
|
||
os.makedirs(year_dir, exist_ok=True)
|
||
dest = os.path.join(year_dir, f"{bill['billId']}.pdf")
|
||
|
||
url = bill["pdfUrl"]
|
||
try:
|
||
urlretrieve(url, dest)
|
||
logger.info("Facture %s sauvegardée dans %s", bill["billId"], dest)
|
||
except Exception as e:
|
||
logger.error("Impossible de télécharger la facture %s : %s", bill["billId"], e)
|
||
send_error_mail(traceback.format_exc())
|
||
raise
|
||
|
||
|
||
if __name__ == "__main__":
|
||
# Chargement des variables d'environnement (.env)
|
||
parser = argparse.ArgumentParser()
|
||
parser.add_argument("-e", "--env", required=True, help="Path of .env file")
|
||
args = parser.parse_args()
|
||
|
||
dotenv.load_dotenv(args.env)
|
||
APP_ENV = args.env
|
||
APP_KEY = os.environ["APP_KEY"]
|
||
APP_SECRET = os.environ["APP_SECRET"]
|
||
CONSUMER_KEY = os.environ["CONSUMER_KEY"]
|
||
PATH_OVH = os.environ["OVH_PATH"]
|
||
PATH_LOG = os.environ["LOG_PATH"]
|
||
DB_PATH = os.environ["DB_PATH"]
|
||
EMAIL = os.environ["EMAIL"]
|
||
EMAIL_PASSWORD = os.environ["EMAIL_PASSWORD"]
|
||
SMTP_MAIL_ADDRESS = os.environ["SMTP_MAIL_ADDRESS"]
|
||
SMTP_PORT = os.environ["SMTP_PORT"]
|
||
EMAIL_TO = os.environ["EMAIL_TO"].strip().split(",")
|
||
YEAR = datetime.now().year # Année courante (int)
|
||
_ALLOWED_TABLES = {"bills", "dj_bill"}
|
||
init()
|
||
start = tm.time()
|
||
logger.info("Démarrage du traitement des factures OVH pour %s", YEAR)
|
||
os.makedirs(os.path.join(PATH_OVH, str(YEAR)), exist_ok=True)
|
||
|
||
ids_candidats = indexer(get_ids())
|
||
bills_json = []
|
||
bills_str = []
|
||
for bill_id in ids_candidats:
|
||
bills_json.append((bill_id, get_bill(bill_id)))
|
||
|
||
# pdf enregistrement.
|
||
|
||
if bills_json:
|
||
with concurrent.futures.ThreadPoolExecutor() as ex:
|
||
futures = []
|
||
for b in bills_json:
|
||
futures.append(ex.submit(save_pdf, b[1]))
|
||
# tm.sleep(0.1)
|
||
for f in futures:
|
||
f.result(timeout=10)
|
||
|
||
for bill_id, bill_payload in bills_json:
|
||
d = datetime.fromisoformat(bill_payload["date"]).date()
|
||
bills_str.append((bill_id, f"{d}"))
|
||
content = ml.construct_html(bills_str)
|
||
ml.send_email(
|
||
"Reçu de facture(s)",
|
||
content,
|
||
email_from=EMAIL,
|
||
email_password=EMAIL_PASSWORD,
|
||
smpt_port=SMTP_PORT,
|
||
smtp_mail_address=SMTP_MAIL_ADDRESS,
|
||
email_to=EMAIL_TO,
|
||
)
|
||
logger.info("Traitement terminé : %d factures téléchargées", len(ids_candidats))
|
||
end = tm.time()
|
||
logger.info(f"Runned for {round(end - start, 2)}secs")
|