Skip to content
Snippets Groups Projects
Commit b92d6b6c authored by BENOIT MICHAUD's avatar BENOIT MICHAUD
Browse files

nettoie et vectorise la mensualisation des revenus et variables d'activité

parent 228408af
Branches
No related tags found
1 merge request!152Integration de la mensualisation
# --- Imports ----
import os
import pandas as pd
import numpy as np
# import cProfile # test de performance
# from time import time
from leximpact_common_python_libraries.config import Configuration
from openfisca_survey_manager.survey_collections import SurveyCollection
from collections import Counter
# période
period = 2025
# définir les variables à mensualiser
variables = [
"salaire_de_base",
"chomage_brut",
"retraite_brute",
"traitement_indiciaire_brut",
"primes_fonction_publique",
"heures_remunerees_volume",
"categorie_salarie",
"activite",
"contrat_de_travail",
"effectif_entreprise",
"rpns_imposables",
"pensions_alimentaires_percues",
"pensions_invalidite",
]
# définir les noms des colonnes SPR
spr_columns = [f"SPR{str(i).zfill(2)}" for i in range(13)]
def coherence_retraites(df, period):
# mettre tous les plus de 70 ans à la retraite
df["70_ans_ou_plus"] = [(period - date.year) >= 70 for date in df["date_naissance"]]
# pour les plus de 70 ans, ajouter 3 (=retraite) dans les colonnes SPR
for col in spr_columns:
df[col] = np.where(df["70_ans_ou_plus"], 3, df[col])
# pour les plus de 70 ans, ajouter 3 (= retraite) dans la colonne activite
df["activite"] = np.where(df["70_ans_ou_plus"], 7, df[col])
# pour les plus de 70 ans, ajouter 7 (= "non_pertinent") dans la colonne categorie_salarie
df["categorie_salarie"] = np.where(df["70_ans_ou_plus"], 7, df[col])
# boucle : mois spr
for i_spr in range(len(spr_columns) - 2):
# condition : deux mois retraire consécutifs et retraite supérieure à 0
conditions = (
(df[spr_columns[i_spr]] == 3)
& (df[spr_columns[i_spr + 1]] == 3)
& (df["retraite_brute"] > 0)
)
# si la condition est remplie, ajouter retraite(3) au mois suivant les deux précédents
df[spr_columns[i_spr + 2]] = np.where(conditions, 3, df[spr_columns[i_spr + 2]])
return df
def coherence_activite_manquante_vectorise(df, period):
"""
Version vectorisée de la fonction coherence_activite_manquante.
Traite les activités manquantes en fonction des revenus déclarés.
Args:
df (DataFrame): Le dataframe à traiter
period (int): L'année de référence
Returns:
DataFrame: Une copie du dataframe avec les activités mises à jour
"""
# Créer une copie du dataframe
df_result = df.copy()
# Identifier les colonnes SPR
spr_cols = [col for col in df.columns if col.startswith("SPR")]
# --- CRÉER LES MASQUES BOOLÉENS POUR LES DIFFÉRENTS CAS ---
# Indicateurs de revenus
has_salaire = (df["salaire_de_base"] > 0) | (df["traitement_indiciaire_brut"] > 0)
has_chomage = df["chomage_brut"] > 0
has_retraite = df["retraite_brute"] > 0
# Nombre de types de revenus par ligne
nb_revenus = (
has_salaire.astype(int) + has_chomage.astype(int) + has_retraite.astype(int)
)
# Masques pour les différents cas
mask_all_missing = (df[spr_cols] == 9).all(axis=1)
mask_some_missing = ~mask_all_missing & (df[spr_cols] == 9).any(axis=1)
mask_no_income = mask_all_missing & (nb_revenus == 0)
mask_with_income = mask_all_missing & (nb_revenus > 0)
# --- CAS 1.1: TOUTES PÉRIODES MANQUANTES ET AUCUN REVENU ---
# Calculer l'âge à partir de la date de naissance
age = period - pd.to_datetime(df["date_naissance"]).dt.year
# Appliquer les valeurs en une seule opération pour chaque cas
for col in spr_cols:
# Si ≤ 16 ans : étudiant (5), sinon inactif (7)
df_result.loc[mask_no_income & (age <= 16), col] = 5 # étudiant
df_result.loc[mask_no_income & (age > 16), col] = 7 # inactif
# --- PRÉPARATION POUR LES CAS AVEC REVENUS ---
# Vérifier quels types d'activités sont déjà renseignés
has_activite_salaire = df_result[spr_cols].eq(1).any(axis=1)
has_activite_chomage = df_result[spr_cols].eq(2).any(axis=1)
has_activite_retraite = df_result[spr_cols].eq(3).any(axis=1)
# Identifier les activités manquantes par rapport aux revenus
need_salaire = has_salaire & ~has_activite_salaire
need_chomage = has_chomage & ~has_activite_chomage
need_retraite = has_retraite & ~has_activite_retraite
# --- CAS 1.2: TOUTES PÉRIODES MANQUANTES AVEC REVENUS ---
# Identifier les lignes concernées et leurs revenus
case1_2_rows = df_result[mask_with_income].index
for idx in case1_2_rows:
# Variables pour le traitement de cette ligne
activities_needed = []
# Déterminer les activités à ajouter par ordre de priorité
if need_retraite.loc[idx]:
activities_needed.append(3) # retraite
if need_chomage.loc[idx]:
activities_needed.append(2) # chômage
if need_salaire.loc[idx]:
activities_needed.append(1) # salaire
if activities_needed:
# Répartir les valeurs de manière équilibrée sur les colonnes SPR
n_chunks = len(activities_needed)
chunk_size = len(spr_cols) // n_chunks
remainder = len(spr_cols) % n_chunks
start_idx = 0
for i, activity in enumerate(activities_needed):
# Déterminer le nombre de colonnes pour cette activité
cols_count = chunk_size + (1 if i < remainder else 0)
end_idx = start_idx + cols_count
# Assigner l'activité aux colonnes correspondantes
for j in range(start_idx, end_idx):
if j < len(spr_cols):
df_result.loc[idx, spr_cols[j]] = activity
start_idx = end_idx
# --- CAS 2: CERTAINES PÉRIODES MANQUANTES ---
# Traiter chaque ligne avec des périodes partiellement manquantes
case2_rows = df_result[mask_some_missing].index
for idx in case2_rows:
row = df_result.loc[idx]
missing_indices = [i for i, col in enumerate(spr_cols) if row[col] == 9]
missing_cols = [spr_cols[i] for i in missing_indices]
# Cas 2.1: Des revenus sans activités correspondantes
row_need_retraite = need_retraite.loc[idx]
row_need_chomage = need_chomage.loc[idx]
row_need_salaire = need_salaire.loc[idx]
nb_to_add = row_need_retraite + row_need_chomage + row_need_salaire
if nb_to_add > 0 and missing_cols:
# Déterminer l'activité principale basée sur le revenu max
revenus = {
"salaire": row["salaire_de_base"] + row["traitement_indiciaire_brut"],
"chomage": row["chomage_brut"],
"retraite": row["retraite_brute"],
}
# Associer chaque type de revenu à son code d'activité
rev_to_act = {"salaire": 1, "chomage": 2, "retraite": 3}
# Trouver le revenu principal
revenu_max = max(revenus, key=revenus.get)
activite_principale = rev_to_act[revenu_max]
# Allouer plus de périodes au revenu principal si beaucoup de périodes manquantes
revenu_principal_extra = 0
if len(missing_cols) > 3:
revenu_principal_extra = 1
nb_to_add += 1
# Préparer les activités à assigner par ordre de priorité
activities_to_assign = []
if row_need_retraite:
activities_to_assign.append(3)
if revenu_principal_extra:
activities_to_assign.append(activite_principale)
if row_need_chomage:
activities_to_assign.append(2)
if row_need_salaire:
activities_to_assign.append(1)
# Répartir les activités sur les périodes manquantes
if activities_to_assign:
chunks = np.array_split(
missing_indices,
min(len(activities_to_assign), len(missing_indices)),
)
for i, chunk in enumerate(chunks):
if i < len(activities_to_assign):
for col_idx in chunk:
df_result.loc[idx, spr_cols[col_idx]] = (
activities_to_assign[i]
)
# Cas 2.2: Compléter les périodes restantes basées sur les périodes non-manquantes
# Récupérer les périodes qui sont encore manquantes après attribution
still_missing = [
i for i, col in enumerate(spr_cols) if df_result.loc[idx, col] == 9
]
for i in still_missing:
col = spr_cols[i]
if i == len(spr_cols) - 1: # SPR00 (fin d'année)
# Compléter par l'activité précédente non-manquante
for j in range(i - 1, -1, -1):
prev_val = df_result.loc[idx, spr_cols[j]]
if prev_val != 9:
df_result.loc[idx, col] = prev_val
break
else:
# Compléter par l'activité majoritaire des périodes suivantes
suivantes = [
df_result.loc[idx, spr_cols[j]]
for j in range(i + 1, len(spr_cols))
if df_result.loc[idx, spr_cols[j]] != 9
]
if suivantes:
majoritaire = Counter(suivantes).most_common(1)[0][0]
df_result.loc[idx, col] = majoritaire
return df_result
def match_activite(colonne_activite):
# prend en entrée une colonne d'activité SPRXX ERFS et renvoie son équivalent pour openfisca
# mapping
mapping = {
1: 0, # activité
2: 1, # chomeurs
3: 3, # retraités
4: 4, # inactif
5: 2, # étudiants
6: 4, # inactif
7: 4, # inactif
9: 4, # pas de réponse donc inactif ? => en complétant le calendrier pas besoin de se poser la question
}
return colonne_activite.map(mapping)
def diviser_liste(liste, n):
# Calculer la taille de chaque partie
taille = len(liste) // n
reste = len(liste) % n
# Initialiser les indices de début et de fin
debut = 0
parties = []
for i in range(n):
# Calculer la taille de cette partie
fin = debut + taille + (1 if i < reste else 0)
# Ajouter la partie à la liste des parties
parties.append(liste[debut:fin])
# Mettre à jour l'indice de début
debut = fin
return parties
def generate_col_month(row):
# sélectionner les colonnes SPR pertinentes à partir du mois d'enquête
if 1 <= row["MOIS"] <= 12:
mois_temp = [
f"SPR{str(max(row['MOIS'] - i - 1, 0)).zfill(2)}" for i in range(12)
]
else:
mois_temp = ["SPR00"] * 12
# extraire les valeurs d'activité
activite_annee = row[mois_temp]
# renommer les index de la série activite_annee en mois_XX
activite_annee.index = [f"mois_{str(i_mois).zfill(2)}" for i_mois in range(1, 13)]
return pd.concat([row, activite_annee], axis=0)
def mensualiser_revenus(df):
# générer les colonnes mois_XX avec activite de l'année
df = df.apply(generate_col_month, axis=1)
# noms des colonnes mois_XX
cols_mois = [f"mois_{str(i_mois).zfill(2)}" for i_mois in range(1, 13)]
# calculer le nombre de mois :
# - avec salaire
df["nb_mois_avec_salaire"] = (df[cols_mois] == 1).sum(axis=1)
# - avec chomage
df["nb_mois_avec_chomage"] = (df[cols_mois] == 2).sum(axis=1)
# - avec retraite
df["nb_mois_avec_retraite"] = (df[cols_mois] == 3).sum(axis=1)
# --- Boucle sur chaque mois pour calculer le salaire ---
i = 1
for col_mois in cols_mois:
# copier la df avec uniquement noindiv
df_temp = df[["noindiv"]]
# variables activité
cols_activite = [
"salaire_de_base",
"traitement_indiciaire_brut",
"primes_fonction_publique",
"heures_remunerees_volume",
]
for col_activite in cols_activite:
df_temp[col_activite] = np.where(
df["nb_mois_avec_salaire"] > 0,
(df[col_activite] / df["nb_mois_avec_salaire"]).where(
df[col_mois] == 1, 0
),
df[col_activite] / 12,
)
# variables chômage
cols_chomage = ["chomage_brut"]
for col_chomage in cols_chomage:
df_temp[col_chomage] = np.where(
df["nb_mois_avec_chomage"] > 0,
(df[col_chomage] / df["nb_mois_avec_chomage"]).where(
df[col_mois] == 1, 0
),
df[col_chomage] / 12,
)
# variables retraite
cols_retraite = ["retraite_brute"]
for col_retraite in cols_retraite:
df_temp[col_retraite] = np.where(
df["nb_mois_avec_retraite"] > 0,
(df[col_retraite] / df["nb_mois_avec_retraite"]).where(
df[col_mois] == 1, 0
),
df[col_retraite] / 12,
)
# catégorie salariée
df_temp["categorie_salarie"] = np.where(
df[col_mois] == 1, df["categorie_salarie"], 7
)
# contrat de travail
df_temp["contrat_de_travail"] = np.where(
df[col_mois] == 1, df["contrat_de_travail"], 6
)
# effectif de l'entreprise
df_temp["effectif_entreprise"] = np.where(
df[col_mois] == 1, df["effectif_entreprise"], 0
)
# activité
df_temp["activite"] = match_activite(df[col_mois])
# --- Ajuster les horaires temps plein ---
df_temp = heures_temps_plein(df_temp)
# --- Exporter le fichier en parquet ---
temp_nom_fichier = f"individu_2025_{str(i).zfill(2)}.parquet"
df_temp.to_parquet(
f"/home/bmichaud/leximpact/leximpact-prepare-data/leximpact_prepare_data/mensu_export/{temp_nom_fichier}"
)
# incrémenter i
i += 1
# TODO : updater le fichier config.json
def heures_temps_plein(df):
# définir le nombre d'heures et le contrat de travail
nb_heures_temps_plein = 151.67
# créer colonne rémunération
df["remuneration"] = (
df["salaire_de_base"]
+ df["traitement_indiciaire_brut"]
+ df["primes_fonction_publique"]
)
# nombre d'heures
df["heures_remunerees_volume"] = np.where(
# si la rémunération est égale à 0, le nombre d'heures est nul
df["remuneration"] == 0,
0,
np.where(
# si le nombre d'heures est égal à 0 (et rémunération > 0), alors on met le nb_heures_temps_plein
df["heures_remunerees_volume"] == 0,
nb_heures_temps_plein,
np.where(
# si le nombre d'heures dépasse le nb_heures_temps_plein, on fait un max(nb_heures_temps_plein)
df["heures_remunerees_volume"] > nb_heures_temps_plein,
nb_heures_temps_plein,
df["heures_remunerees_volume"],
),
),
)
# contrat de travail
df["contrat_de_travail"] = np.where(
df["heures_remunerees_volume"] == nb_heures_temps_plein,
0,
df["contrat_de_travail"],
)
return df
def mensualiser():
# --- Importer la table ERFS et sélectionner les colonnes utiles ---
# importer la table ERFS 2021 (colonnes utiles et colonnes SPR)
indiv = pd.read_csv("/mnt/data-in/erfs-fpr/2021/Csv/fpr_indiv_2021.csv", sep=";")
indiv = indiv[["noindiv", "TRIM", "MOIS"] + spr_columns]
indiv[spr_columns] = indiv[spr_columns].replace(np.nan, 9)
# --- Importer la table LexImpact d'origine ---
# fichier de config
config = Configuration(project_folder="leximpact-prepare-data")
config_files_directory = os.path.join(
config.project_module_path, ".config", "openfisca-survey-manager"
)
# importer la table individu
survey_collection = SurveyCollection.load(
config_files_directory=config_files_directory, collection="ines"
)
survey_name = "leximpact_2025"
survey = survey_collection.get_survey(survey_name)
table_name = f"individu_{period}"
table_leximpact = survey.get_values(table=table_name, ignorecase=True)
# --- Merger la table ERFS importée avec la table LexImpact ---
# reconstruction de la table avec SPRXX
df_mens = pd.merge(
indiv,
table_leximpact[["noindiv", "date_naissance"] + variables],
on="noindiv",
)
# TODO :
# df_mens = df_mens.iloc[0:100]
# --- Mettre en cohérence l'activité
df_mens = coherence_retraites(df_mens, period)
df_mens = coherence_activite_manquante_vectorise(df_mens, period)
# --- Mensualiser les revenus et variables catégorielles ---
mensualiser_revenus(df_mens)
if __name__ == "__main__":
mensualiser()
# start = time()
# cProfile.run(
# "main_function()",
# "profiling_main_function.prof"
# )
# print(f"Temps d'exécution :{time() - start:.2f} secondes")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment