Skip to content
Snippets Groups Projects
Commit c8b2c17a authored by BENOIT MICHAUD's avatar BENOIT MICHAUD
Browse files

intègre la mensualisation dans la pipeline

parent b92d6b6c
No related branches found
No related tags found
1 merge request!152Integration de la mensualisation
......@@ -3,6 +3,7 @@
from leximpact_prepare_data.pipeline_survey_scenario import PipelineErfsSurveyScenario
from leximpact_prepare_data.pipeline_tax_and_benefit_system import pipeline_tbs
from leximpact_prepare_data.scenario_tools.mensualisation import mensualiser
import os
import click
import pandas as pd
......@@ -49,10 +50,9 @@ def run_pipeline(annee_erfs, annee_pote, annee_de_calcul):
survey_name=f"openfisca_erfs_fpr_{annee_erfs}",
)
# Imputation des variables de revenus du capital (à partir de POTE)
pipeline_survey_scenario.build_imputation(year=str(annee_pote))
pipeline_survey_scenario.build_imputation_loyers()
pipeline_survey_scenario.save_current_survey(
variables=pipeline_survey_scenario.used_as_input_variables,
collection="leximpact",
......@@ -70,6 +70,11 @@ def run_pipeline(annee_erfs, annee_pote, annee_de_calcul):
config_files_directory=config_files_directory,
baseline_tax_benefit_system=pipeline_tbs,
)
# Imputation des loyers
survey_scenario.build_imputation_loyers()
# Boucle annees n, n-1, n-2
for year in [annee_de_calcul, annee_de_calcul - 1, annee_de_calcul - 2]:
variables = survey_scenario.used_as_input_variables
if year in [annee_de_calcul - 1, annee_de_calcul - 2]:
......@@ -94,3 +99,11 @@ def run_pipeline(annee_erfs, annee_pote, annee_de_calcul):
config_files_directory=survey_scenario.data["config_files_directory"],
source_format="parquet",
)
print(f"Mensualisation des données pour l'année {year}.")
mensualiser(
collection=collection,
survey_name=survey_name,
annee_table=year,
config_files_directory=survey_scenario.data["config_files_directory"],
)
......@@ -4,15 +4,17 @@
import pandas as pd
import numpy as np
# import cProfile # test de performance
# from time import time
from leximpact_common_python_libraries.config import Configuration
from openfisca_survey_manager.survey_collections import SurveyCollection
from openfisca_survey_manager.input_dataframe_generator import set_table_in_survey
from collections import Counter
# période
period = 2025
# fichier de config
config = Configuration(project_folder="leximpact-prepare-data")
default_config_files_directory = os.path.join(
config.project_module_path, ".config", "openfisca-survey-manager"
)
# définir les variables à mensualiser
variables = [
......@@ -35,10 +37,12 @@
spr_columns = [f"SPR{str(i).zfill(2)}" for i in range(13)]
def coherence_retraites(df, period):
def coherence_retraites(df, annee_table):
# mettre tous les plus de 70 ans à la retraite
df["70_ans_ou_plus"] = [(period - date.year) >= 70 for date in df["date_naissance"]]
df["70_ans_ou_plus"] = [
(annee_table - date.year) >= 70 for date in df["date_naissance"]
]
# pour les plus de 70 ans, ajouter 3 (=retraite) dans les colonnes SPR
for col in spr_columns:
......@@ -66,14 +70,14 @@ def coherence_retraites(df, period):
return df
def coherence_activite_manquante_vectorise(df, period):
def coherence_activite_manquante_vectorise(df, annee_table):
"""
Version vectorisée de la fonction coherence_activite_manquante.
Traite les activités manquantes en fonction des revenus déclarés.
Args:
df (DataFrame): Le dataframe à traiter
period (int): L'année de référence
annee_table (int): L'année de référence
Returns:
DataFrame: Une copie du dataframe avec les activités mises à jour
......@@ -105,7 +109,7 @@ def coherence_activite_manquante_vectorise(df, period):
# --- CAS 1.1: TOUTES PÉRIODES MANQUANTES ET AUCUN REVENU ---
# Calculer l'âge à partir de la date de naissance
age = period - pd.to_datetime(df["date_naissance"]).dt.year
age = annee_table - pd.to_datetime(df["date_naissance"]).dt.year
# Appliquer les valeurs en une seule opération pour chaque cas
for col in spr_cols:
......@@ -317,7 +321,9 @@ def generate_col_month(row):
return pd.concat([row, activite_annee], axis=0)
def mensualiser_revenus(df):
def mensualiser_revenus(
collection, survey_name, df, dossier_export, annee_table, config_files_directory
):
# générer les colonnes mois_XX avec activite de l'année
df = df.apply(generate_col_month, axis=1)
......@@ -326,22 +332,21 @@ def mensualiser_revenus(df):
cols_mois = [f"mois_{str(i_mois).zfill(2)}" for i_mois in range(1, 13)]
# calculer le nombre de mois :
# - avec salaire
df["nb_mois_avec_salaire"] = (df[cols_mois] == 1).sum(axis=1)
# - avec chomage
df["nb_mois_avec_chomage"] = (df[cols_mois] == 2).sum(axis=1)
# - avec retraite
df["nb_mois_avec_retraite"] = (df[cols_mois] == 3).sum(axis=1)
# --- Boucle sur chaque mois pour calculer le salaire ---
i = 1
for col_mois in cols_mois:
# copier la df avec uniquement noindiv
df_temp = df[["noindiv"]]
df_mois = df.copy()
df_mois = df_mois[["noindiv"]]
# variables activité
cols_activite = [
......@@ -351,7 +356,7 @@ def mensualiser_revenus(df):
"heures_remunerees_volume",
]
for col_activite in cols_activite:
df_temp[col_activite] = np.where(
df_mois[col_activite] = np.where(
df["nb_mois_avec_salaire"] > 0,
(df[col_activite] / df["nb_mois_avec_salaire"]).where(
df[col_mois] == 1, 0
......@@ -362,7 +367,7 @@ def mensualiser_revenus(df):
# variables chômage
cols_chomage = ["chomage_brut"]
for col_chomage in cols_chomage:
df_temp[col_chomage] = np.where(
df_mois[col_chomage] = np.where(
df["nb_mois_avec_chomage"] > 0,
(df[col_chomage] / df["nb_mois_avec_chomage"]).where(
df[col_mois] == 1, 0
......@@ -373,7 +378,7 @@ def mensualiser_revenus(df):
# variables retraite
cols_retraite = ["retraite_brute"]
for col_retraite in cols_retraite:
df_temp[col_retraite] = np.where(
df_mois[col_retraite] = np.where(
df["nb_mois_avec_retraite"] > 0,
(df[col_retraite] / df["nb_mois_avec_retraite"]).where(
df[col_mois] == 1, 0
......@@ -382,37 +387,43 @@ def mensualiser_revenus(df):
)
# catégorie salariée
df_temp["categorie_salarie"] = np.where(
df_mois["categorie_salarie"] = np.where(
df[col_mois] == 1, df["categorie_salarie"], 7
)
# contrat de travail
df_temp["contrat_de_travail"] = np.where(
df_mois["contrat_de_travail"] = np.where(
df[col_mois] == 1, df["contrat_de_travail"], 6
)
# effectif de l'entreprise
df_temp["effectif_entreprise"] = np.where(
df_mois["effectif_entreprise"] = np.where(
df[col_mois] == 1, df["effectif_entreprise"], 0
)
# activité
df_temp["activite"] = match_activite(df[col_mois])
df_mois["activite"] = match_activite(df[col_mois])
# --- Ajuster les horaires temps plein ---
df_temp = heures_temps_plein(df_temp)
df_mois = heures_temps_plein(df_mois)
# --- Enregistrer la table dans la survey ---
# --- Exporter le fichier en parquet ---
temp_nom_fichier = f"individu_2025_{str(i).zfill(2)}.parquet"
df_temp.to_parquet(
f"/home/bmichaud/leximpact/leximpact-prepare-data/leximpact_prepare_data/mensu_export/{temp_nom_fichier}"
nom_fichier_export = f"individu_{annee_table}_{str(i).zfill(2)}"
set_table_in_survey(
df_mois,
entity="individu",
parquet_file=os.path.join(dossier_export, f"{nom_fichier_export}.parquet"),
period=f"{annee_table}-{str(i).zfill(2)}",
collection=collection,
survey_name=survey_name,
config_files_directory=config_files_directory,
)
# incrémenter i
i += 1
# TODO : updater le fichier config.json
def heures_temps_plein(df):
# définir le nombre d'heures et le contrat de travail
......@@ -444,6 +455,9 @@ def heures_temps_plein(df):
),
)
# supprimer la colonne 'remuneration'
df.drop(columns="remuneration", inplace=True)
# contrat de travail
df["contrat_de_travail"] = np.where(
df["heures_remunerees_volume"] == nb_heures_temps_plein,
......@@ -454,58 +468,55 @@ def heures_temps_plein(df):
return df
def mensualiser():
def mensualiser(
collection,
survey_name,
annee_table,
config_files_directory=default_config_files_directory,
):
# --- Importer la table ERFS et sélectionner les colonnes utiles ---
# importer la table ERFS 2021 (colonnes utiles et colonnes SPR)
indiv = pd.read_csv("/mnt/data-in/erfs-fpr/2021/Csv/fpr_indiv_2021.csv", sep=";")
indiv = indiv[["noindiv", "TRIM", "MOIS"] + spr_columns]
indiv[spr_columns] = indiv[spr_columns].replace(np.nan, 9)
erfs_file_path = config.get("RAW_ERFS_FILE_PATH")
df_indiv = pd.read_csv(erfs_file_path, sep=";")
df_indiv = df_indiv[["noindiv", "TRIM", "MOIS"] + spr_columns]
df_indiv[spr_columns] = df_indiv[spr_columns].replace(np.nan, 9)
# --- Importer la table LexImpact d'origine ---
# --- Importer la table LexImpact individu d'origine ---
# fichier de config
config = Configuration(project_folder="leximpact-prepare-data")
config_files_directory = os.path.join(
config.project_module_path, ".config", "openfisca-survey-manager"
)
# importer la table individu
survey_collection = SurveyCollection.load(
config_files_directory=config_files_directory, collection="ines"
config_files_directory=config_files_directory, collection=collection
)
survey_name = "leximpact_2025"
survey = survey_collection.get_survey(survey_name)
table_name = f"individu_{period}"
table_name = f"individu_{annee_table}"
table_leximpact = survey.get_values(table=table_name, ignorecase=True)
dossier_export = survey.parquet_file_path
# --- Merger la table ERFS importée avec la table LexImpact ---
# reconstruction de la table avec SPRXX
df_mens = pd.merge(
indiv,
df_indiv,
table_leximpact[["noindiv", "date_naissance"] + variables],
on="noindiv",
)
# TODO :
# df_mens = df_mens.iloc[0:100]
# --- Mettre en cohérence l'activité
df_mens = coherence_retraites(df_mens, period)
df_mens = coherence_activite_manquante_vectorise(df_mens, period)
df_mens = coherence_retraites(df_mens, annee_table)
df_mens = coherence_activite_manquante_vectorise(df_mens, annee_table)
# --- Mensualiser les revenus et variables catégorielles ---
mensualiser_revenus(df_mens)
mensualiser_revenus(
collection,
survey_name,
df_mens,
dossier_export,
annee_table,
config_files_directory,
)
if __name__ == "__main__":
mensualiser()
# start = time()
# cProfile.run(
# "main_function()",
# "profiling_main_function.prof"
# )
# print(f"Temps d'exécution :{time() - start:.2f} secondes")
mensualiser(collection="leximpact", survey_name="leximpact_2025", annee_table=2025)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment