--- title: Pipeline de préparation d'une base de données 2021 keywords: fastai sidebar: home_sidebar nb_path: "notebooks/retraitement_erfs-fpr/pipeline_prepare_data.ipynb" ---
Ce notebook gère toute la pipeline de création de notre base de données à partir de l'ERFS-FPR, selon les étapes décrites ci-dessous. Avant tout, nous rappelons que nous utilisons les dates suivantes:
ANNEXES
A. Vérification d'agrégats
B. Calculs de la CSG
C. Calculs de CRDS
from time import time
start_notebook = time()
import gc
import json
import unittest
import numpy as np
import pandas as pd
from openfisca_france import FranceTaxBenefitSystem
TBS = FranceTaxBenefitSystem()
from leximpact_socio_fisca_simu_etat.config import Configuration
from leximpact_socio_fisca_simu_etat.logger import logger as log
from matplotlib import pyplot as plt
tc = unittest.TestCase()
config = Configuration(project_folder="leximpact-prepare-data")
from leximpact_prepare_data.aging_tools import (
bruitage,
calib_initiale_ff,
inflation_economique,
inflation_foyers,
)
from leximpact_prepare_data.calib_and_copules import reduce_bucket_number
from leximpact_prepare_data.calibration_tools import (
ajout_gens_en_haut,
calibration,
calibration_quantiles,
compare_distributions,
distrib_to_quantiles,
)
from leximpact_prepare_data.copules_add_var import (
convert_to_openfisca,
get_ratios,
integration_data_ff,
)
from leximpact_prepare_data.enlargement import enlarge
from leximpact_prepare_data.monte_carlo import *
from leximpact_prepare_data.reduce_data import remove_useless_variables
# Import des modules de calcul spécifiques
from leximpact_prepare_data.toolbase import (
compute_var_in_ff,
create_simulation,
foyers_fiscaux_to_individus,
individus_to_foyers_fiscaux,
)
log.debug(config)
annee_erfs = config.get("YEAR_ERFS")
annee_pote = config.get("YEAR_POTE")
annee_de_calcul = config.get("YEAR_COMPUTATION")
wanted = {}
erfs_filename = config.get("ERFS") + "erfs_flat_" + config.get("YEAR_ERFS") + ".h5"
erfs = pd.read_hdf(erfs_filename) # Une ligne par individu
# On supprime cette colonne étonnante
erfs.drop("taux_csg_remplacement", axis=1, inplace=True)
# On remplace les nans par des 0 (seule la colone f4ba en comporte)
erfs = erfs.fillna(0)
tc.assertEqual(erfs.isna().sum().sum(), 0)
erfs.columns
print(
"L'ERFS en Foyers est un échantillon de ",
individus_to_foyers_fiscaux(erfs)["idfoy"].nunique(),
" foyers fiscaux",
)
print(
"Il représente une population de ",
individus_to_foyers_fiscaux(erfs)["wprm"].sum(),
"foyers fiscaux",
)
tc.assertEqual(
erfs["idfoy"].nunique(), individus_to_foyers_fiscaux(erfs)["idfoy"].nunique()
)
erfs = calib_initiale_ff(erfs, annee_erfs)
erfsff = individus_to_foyers_fiscaux(erfs)
tc.assertEqual(erfs["idfoy"].nunique(), erfsff["idfoy"].nunique())
print(
"On a désormais un total de ",
erfsff["wprm"].sum(),
" foyers fiscaux représentés par cette base issue de l'ERFS-FPR 2018",
)
pipeline_tracker = pd.DataFrame(
columns=[
"init",
"enlarge",
"post_calib_RFR",
"post_calib",
"infl_eco19",
"infl_ff19",
"infl_eco21",
"infl_ff21",
"final",
]
)
# Mise en forme
pd.set_option("display.max_colwidth", 80)
pd.options.display.float_format = "{:,.7f}".format
# Données d'intérêt
data = ["f4ba", "chomage_brut", "salaire_de_base", "retraite_brute"]
# Initialisation
# Taille de la base
pipeline_tracker.loc["Len_ind", "init"] = round(len(erfs["wprm"]))
pipeline_tracker.loc["Nb_foyers", "init"] = round((erfsff["wprm"]).sum())
pipeline_tracker.loc["Len_ff", "init"] = round(len(erfsff["wprm"]))
# Variables
for var in data:
pipeline_tracker.loc[var, "init"] = round((erfs["wprm"] * erfs[var]).sum())
del erfsff
pipeline_tracker
outfile = (
config.get("DATA_OUT") + "01_erfs_reduced_ind" + config.get("YEAR_ERFS") + ".h5"
)
# Variables d'intérêt
# name_variables = ("rfr", "irpp", "nbptr")
# , 'csg', 'csg_deductible_chomage', 'imposable_chomage',
# 'csg_deductible_retraite', 'csg_imposable_retraite', 'csg_deductible_salaire', 'csg_imposable_salaire',
# 'csg_non_salarie', 'csg_patrimoine_deductible_ir', 'csg_revenus_capital',
# 'assiette_csg', 'assiette_csg_crds_non_salarie', 'assiette_csg_non_abattue', 'assiette_csg_plus_values', 'crds_non_salarie')
# Calcul des variables inutiles
# list_useless = remove_useless_variables(
# input_h5=erfs,
# outfile_path=outfile,
# name_variables=name_variables,
# PERIOD=annee_erfs,
# )
# print("List of variables that were removed : ", list_useless)
# config.get("DATA_OUT") + "01_erfs_reduced_ind" + config.get("YEAR_ERFS") + ".h5"
# )
# print(outfile)
outfile_path = config.get("DATA_OUT") + "01_erfs_red_" + config.get("YEAR_ERFS") + ".h5"
erfs.to_hdf(outfile_path, key="input", mode="w")
del erfs
Pour le moment, on décide de ne pas utiliser le process de reduce_data pour garder un maximum de colonnes pour affiner les calculs OpenFisca
# erfs_r = pd.read_hdf(config.get("DATA_OUT") + "01_erfs_reduced_" + config.get("YEAR_ERFS") + ".h5")
erfs_filename = (
config.get("DATA_OUT") + "01_erfs_red_" + config.get("YEAR_ERFS") + ".h5"
)
erfs_r = pd.read_hdf(erfs_filename) # Une ligne par individu
if "taux_csg_remplacement" in erfs_r.columns:
erfs_r.drop("taux_csg_remplacement", axis=1, inplace=True)
len_av = len(erfs_r.columns)
for col in erfs_r:
if (col not in ["date_naissance"]) and (erfs_r[col].sum() == 0):
erfs_r.drop(col, axis=1, inplace=True)
print(col)
print("On a retiré ", len_av - len(erfs_r.columns), " colonnes vides")
tc.assertEqual(
erfs_r["idfoy"].nunique(), individus_to_foyers_fiscaux(erfs_r)["idfoy"].nunique()
)
N = 3
erfs_enlarged = enlarge(erfs_r, N)
tc.assertEqual(
erfs_enlarged["idfoy"].nunique(),
individus_to_foyers_fiscaux(erfs_enlarged)["idfoy"].nunique(),
)
for var in data:
pipeline_tracker.loc[var, "enlarge"] = round(
(erfs_enlarged["wprm"] * erfs_enlarged[var]).sum()
)
# Taille de la base
pipeline_tracker.loc["Len_ind", "enlarge"] = round(len(erfs_enlarged["wprm"]))
erfsff = individus_to_foyers_fiscaux(erfs_enlarged)
pipeline_tracker.loc["Nb_foyers", "enlarge"] = round((erfsff["wprm"]).sum())
pipeline_tracker.loc["Len_ff", "enlarge"] = round(len(erfsff["wprm"]))
del erfsff
pipeline_tracker
outfile_path = (
config.get("DATA_OUT") + "02_erfs_enlarged_ind" + config.get("YEAR_ERFS") + ".h5"
)
erfs_enlarged.to_hdf(outfile_path, key="input", mode="w")
del erfs_enlarged
erfs_enlarged_av = pd.read_hdf(
config.get("DATA_OUT") + "02_erfs_enlarged_ind" + config.get("YEAR_ERFS") + ".h5"
)
erfs_enlarged_av.columns
%%capture
var_name = "f4ba"
# Calibration
erfs_cal_ff, erfs_enlarged, Distribs_f4ba, plot_f4ba = calibration(
erfs_enlarged_av, var_name, annee_erfs, annee_erfs
)
# Sauvegarde de l'agrégat de référence
wanted[var_name] = Distribs_f4ba[1].df["sum"].sum()
# A-t'on ajouté des gens dans la base?
if erfs_enlarged["idfoy"].nunique() > erfs_enlarged_av["idfoy"].nunique():
new_ppl = True
elif erfs_enlarged["idfoy"].nunique() == erfs_enlarged_av["idfoy"].nunique():
new_ppl = False
else:
erfs_enlarged["idfoy"].nunique() < erfs_enlarged_av["idfoy"].nunique()
raise Exception("Il y a eu une erreur dans la calibration de ", var_name)
print("On a calibré ", var_name)
plt.figure().clear()
plot_f4ba
plt.show()
# Variable
pipeline_tracker.loc["f4ba", "post_calib"] = round(
(erfs_enlarged["wprm"] * erfs_enlarged["f4ba"]).sum()
)
# Taille de la base
pipeline_tracker.loc["Len_ind", "post_calib"] = round(len(erfs_enlarged["wprm"]))
erfsff = individus_to_foyers_fiscaux(erfs_enlarged)
pipeline_tracker.loc["Nb_foyers", "post_calib"] = round((erfsff["wprm"]).sum())
pipeline_tracker.loc["Len_ff", "post_calib"] = round(len(erfsff["wprm"]))
pipeline_tracker
my_simu, dico = create_simulation(data=erfs_enlarged, tbs=TBS, period=annee_erfs)
var_list = ["rfr"]
cols_declarant_principal = ["rfr"]
# Calcul du RFR
erfs_03_rfr_ind = compute_var_in_ff(
my_simu, annee_erfs, erfs_enlarged, var_list, cols_declarant_principal
)
tc.assertEqual(
erfs_03_rfr_ind["idfoy"].nunique(),
individus_to_foyers_fiscaux(erfs_03_rfr_ind)["idfoy"].nunique(),
)
# Données d'intérêt
data.append("rfr")
# Initialisation
pipeline_tracker.loc["rfr", "init"] = round(
(erfs_03_rfr_ind["wprm"] * erfs_03_rfr_ind["rfr"]).sum()
)
pipeline_tracker
outfile_path = (
config.get("DATA_OUT") + "03_erfs_rfr_ind" + config.get("YEAR_ERFS") + ".h5"
)
erfs_03_rfr_ind.to_hdf(outfile_path, key="input", mode="w")
del erfs_03_rfr_ind
gc.collect()
erfs_03_rfr_ind_av = pd.read_hdf(
config.get("DATA_OUT") + "03_erfs_rfr_ind" + config.get("YEAR_ERFS") + ".h5"
)
file = config.get("CALIB") + "CalibPote-" + config.get("YEAR_ERFS") + "-revkire.json"
with open(file) as f:
calib = json.load(f)
%%capture
var_name = "rfr"
# Calibration
erfs_cal_ff, erfs_03_rfr_ind, Distribs, plot_rfr = calibration(
erfs_03_rfr_ind_av, var_name, annee_erfs, annee_erfs
)
# Sauvegarde de l'agrégat de référence
wanted[var_name] = Distribs[1].df["sum"].sum()
print(wanted[var_name])
# A-t'on ajouté des gens dans la base?
if erfs_cal_ff["idfoy"].nunique() > erfs_03_rfr_ind_av["idfoy"].nunique():
new_ppl = True
elif erfs_cal_ff["idfoy"].nunique() == erfs_03_rfr_ind_av["idfoy"].nunique():
new_ppl = False
else:
erfs_cal_ff["idfoy"].nunique() < erfs_03_rfr_ind_av["idfoy"].nunique()
raise Exception("Il y a eu une erreur dans la calibration de ", var_name)
print("On a calibré ", var_name)
plt.figure().clear()
plot_rfr
plt.show()
# Variable
pipeline_tracker.loc["rfr", "post_calib_RFR"] = round(
(erfs_03_rfr_ind["wprm"] * erfs_03_rfr_ind["rfr"]).sum()
)
pipeline_tracker.loc["Len_ind", "post_calib_RFR"] = round(len(erfs_03_rfr_ind["wprm"]))
erfsff = individus_to_foyers_fiscaux(erfs_03_rfr_ind)
pipeline_tracker.loc["Nb_foyers", "post_calib_RFR"] = round((erfsff["wprm"]).sum())
pipeline_tracker.loc["Len_ff", "post_calib_RFR"] = round(len(erfsff["wprm"]))
pipeline_tracker
outfile_path = (
config.get("DATA_OUT") + "03_erfs_rfr_cal_ind" + config.get("YEAR_ERFS") + ".h5"
)
erfs_03_rfr_ind.to_hdf(outfile_path, key="input", mode="w")
del erfs_cal_ff
del erfs_03_rfr_ind
del erfsff
gc.collect()
erfs_cal_ind = pd.read_hdf(
config.get("DATA_OUT") + "03_erfs_rfr_cal_ind" + config.get("YEAR_ERFS") + ".h5"
)
erfs_cal_ff = individus_to_foyers_fiscaux(erfs_cal_ind)
# Année de départ: année de la base de départ
year_start = int(config.get("YEAR_ERFS"))
# Année de fin : année de la base POTE pour import des copules
year_end = int(config.get("YEAR_POTE"))
print("On vieillit la base de ", year_start, " à ", year_end)
# erfs_cal_ff.columns
cols_to_inflate = [
"chomage_brut",
"pensions_alimentaires_percues",
"retraite_brute",
"rag",
"ric",
"rnc",
"salaire_de_base",
"f4ba",
"rfr",
]
erfs_inflated_ff = inflation_economique(
erfs_cal_ff, cols_to_inflate, year_start, year_end
)
print(
"Evolution du salaire de base avec l'inflation : ",
100
* (erfs_inflated_ff["salaire_de_base"].sum() - erfs_cal_ff["salaire_de_base"].sum())
/ erfs_cal_ff["salaire_de_base"].sum(),
"%",
)
[Attention] Ici il faut impérativement convertir la base en individus avant de procéder à l'inflation du nombre de foyers En effet, comme on repasse en base individus avec un merge sur ['idfoy', 'wprm'], le merge (ff vers individus) n'aurait pas lieu après la modification des poids
# On ajoute les valeurs dans la base individus
cols_declarant_principal = cols_to_inflate
to_update = cols_to_inflate
erfs_inflated_ind = foyers_fiscaux_to_individus(
erfs_cal_ind,
erfs_inflated_ff,
to_update,
cols_declarant_principal,
)
erfs_inflated_ind.columns
print("Somme des WPRM", (erfs_inflated_ind["wprm"]).sum())
# Initialisation
for var in data:
pipeline_tracker.loc[var, "infl_eco19"] = round(
(erfs_inflated_ind["wprm"] * erfs_inflated_ind[var]).sum()
)
# Taille de la base
pipeline_tracker.loc["Len_ind", "infl_eco19"] = round(len(erfs_inflated_ind["wprm"]))
erfsff = individus_to_foyers_fiscaux(erfs_inflated_ind)
pipeline_tracker.loc["Nb_foyers", "infl_eco19"] = round((erfsff["wprm"]).sum())
pipeline_tracker.loc["Len_ff", "infl_eco19"] = round(len(erfsff["wprm"]))
del erfsff
pipeline_tracker
# à faire sur une base en individus !!
erfs_inflate_foyers_ff, erfs_inflate_foyers_ind = inflation_foyers(
erfs_inflated_ind, year_start, year_end
)
print(
"Somme des RFR apres inflation des foyers",
(erfs_inflate_foyers_ff["rfr"] * erfs_inflate_foyers_ff["wprm"]).sum(),
" pour une somme wanted : ",
wanted["rfr"],
)
tc.assertEqual(
erfs_inflate_foyers_ind["idfoy"].nunique(),
erfs_inflate_foyers_ff["idfoy"].nunique(),
)
# Variables
for var in data:
pipeline_tracker.loc[var, "infl_ff19"] = round(
(erfs_inflate_foyers_ind["wprm"] * erfs_inflate_foyers_ind[var]).sum()
)
# Taille de la base
pipeline_tracker.loc["Len_ind", "infl_ff19"] = round(
len(erfs_inflate_foyers_ind["wprm"])
)
erfsff = individus_to_foyers_fiscaux(erfs_inflate_foyers_ind)
pipeline_tracker.loc["Nb_foyers", "infl_ff19"] = round((erfsff["wprm"]).sum())
pipeline_tracker.loc["Len_ff", "infl_ff19"] = round(len(erfsff["wprm"]))
del erfsff
pipeline_tracker
outfile_path = (
config.get("DATA_OUT")
+ "05_erfs_ind"
+ config.get("YEAR_ERFS")
+ "_inflated_to_"
+ config.get("YEAR_POTE")
+ ".h5"
)
erfs_inflate_foyers_ind.to_hdf(outfile_path, key="input", mode="w")
del erfs_inflate_foyers_ind
gc.collect()
# Export
outfile_path = (
config.get("DATA_OUT")
+ "05_erfs_ff"
+ config.get("YEAR_ERFS")
+ "_inflated_to_"
+ config.get("YEAR_POTE")
+ ".h5"
)
erfs_inflate_foyers_ff.to_hdf(outfile_path, key="input", mode="w")
del erfs_inflate_foyers_ff
gc.collect()
erfs_inflated_ff = pd.read_hdf(
config.get("DATA_OUT")
+ "05_erfs_ff"
+ config.get("YEAR_ERFS")
+ "_inflated_to_"
+ config.get("YEAR_POTE")
+ ".h5"
)
erfs_inflate_foyers_ind = pd.read_hdf(
config.get("DATA_OUT")
+ "05_erfs_ind"
+ config.get("YEAR_ERFS")
+ "_inflated_to_"
+ config.get("YEAR_POTE")
+ ".h5"
)
data_to_process = [
{
"file": "ExportCopule-2019-assiette_csg_revenus_capital.json",
"column_name": "pote_rev_capital",
"col_name_export_pote": "assiette_csg_revenus_capital",
},
]
# {
# "file": "ExportCopule-2019-retraites.json",
# "column_name": "pote_retraite",
# "col_name_export_pote": "retraites",
# },
# TODO : chômage
nb_tirage = 10
erfs_copules_ff, data_to_process = integration_data_ff(
erfs_inflated_ff, data_to_process, nb_tirage
)
erfs_copules_ff.head()
new_columns = [data["column_name"] for data in data_to_process]
# On sépare l'échantillon de foyers fiscaux en individus
sample_ff_to_merge = erfs_copules_ff[["idfoy"] + new_columns]
cols_declarant_principal = new_columns
to_update = new_columns
erfs_copules_ind = foyers_fiscaux_to_individus(
erfs_inflate_foyers_ind,
sample_ff_to_merge,
to_update,
cols_declarant_principal,
)
erfs_copules_ind_of = convert_to_openfisca(erfs_copules_ind, str(annee_pote))
erfs_copules_ind_of.drop(new_columns, axis=1, inplace=True)
# print(erfs_copules_ind_of.columns)
erfs_copules_ind_of.head()
data_new = [
"revenus_capitaux_prelevement_bareme",
"revenus_capitaux_prelevement_liberatoire",
"revenus_capitaux_prelevement_forfaitaire_unique_ir",
]
for i in data_new:
data.append(i)
# Ajout des valeurs
for var in data_new:
pipeline_tracker.loc[var, "init"] = round(
(erfs_copules_ind_of["wprm"] * erfs_copules_ind_of[var]).sum()
)
pipeline_tracker
outfile_path = (
config.get("DATA_OUT")
+ "07_erfs_copules_ind"
+ config.get("YEAR_ERFS")
+ "_inflated_to_"
+ config.get("YEAR_POTE")
+ ".h5"
)
erfs_copules_ind_of.to_hdf(outfile_path, key="input", mode="w")
# Conversion
erfs_copules_ff = individus_to_foyers_fiscaux(erfs_copules_ind_of)
# Export
outfile_path = (
config.get("DATA_OUT")
+ "07_erfs_copules_ff"
+ config.get("YEAR_ERFS")
+ "_inflated_to_"
+ config.get("YEAR_POTE")
+ ".h5"
)
erfs_copules_ff.to_hdf(outfile_path, key="input", mode="w")
del erfs_copules_ind_of
del erfs_copules_ff
del data_to_process
gc.collect()
erfs_to_cal_ind = pd.read_hdf(
config.get("DATA_OUT")
+ "07_erfs_copules_ind"
+ config.get("YEAR_ERFS")
+ "_inflated_to_"
+ config.get("YEAR_POTE")
+ ".h5"
)
# erfs_to_cal_ind.columns
Dans POTE, on a uniquement le salaire_imposable : c'est à partir de ces données qu'on a généré le fichier de calibration. Or, dans l'ERFS on a la colonne salaire_de_base uniquement.
Il faut donc corriger cela.
On calcule puis calibre le salaire_imposable, puis on recalcule le salaire de base à partir du salaire imposable (car on a besoin du salaire de base pour l'assiette de csg_crds)
# Il se calcule sur une base en individus
my_simu, _ = create_simulation(data=erfs_to_cal_ind, tbs=TBS, period=annee_pote)
to_calc = my_simu.calculate_add("salaire_imposable", annee_pote) # On calcule en annuel
erfs_to_cal_ind["salaire_imposable"] = to_calc
# erfs_to_cal_ind.head()
sum_imp = (erfs_to_cal_ind["salaire_imposable"] * erfs_to_cal_ind["wprm"]).sum()
# print(sum_imp)
sum_base = (erfs_to_cal_ind["salaire_de_base"] * erfs_to_cal_ind["wprm"]).sum()
# print(sum_base)
print("ratio Imposable / Base : ", sum_base / sum_imp)
erfs_to_cal_ind["primes_etc"] = (
erfs_to_cal_ind["salaire_imposable"] - erfs_to_cal_ind["salaire_de_base"]
)
data.append("salaire_imposable")
# Ajout des valeurs
pipeline_tracker.loc["salaire_imposable", "init"] = round(
(erfs_to_cal_ind["wprm"] * erfs_to_cal_ind["salaire_imposable"]).sum()
)
pipeline_tracker
outfile_path = (
config.get("DATA_OUT")
+ "07_erfs_salaire_to_cal_ind"
+ config.get("YEAR_ERFS")
+ "_inflated_to_"
+ config.get("YEAR_POTE")
+ ".h5"
)
erfs_to_cal_ind.to_hdf(outfile_path, key="input")
del erfs_to_cal_ind
gc.collect()
erfs_to_cal_ind = pd.read_hdf(
config.get("DATA_OUT")
+ "07_erfs_salaire_to_cal_ind"
+ config.get("YEAR_ERFS")
+ "_inflated_to_"
+ config.get("YEAR_POTE")
+ ".h5"
)
# erfs_to_cal_ind.columns
erfs_to_cal_ff = individus_to_foyers_fiscaux(erfs_to_cal_ind)
# erfs_to_cal_ff.head()
%%capture
var_name = "salaire_imposable"
# Calibration
erfs_cal_ff, erfs_to_cal_ind, Distribs_sal, plot_sal = calibration(
erfs_to_cal_ind, var_name, annee_erfs, annee_pote
)
# Sauvegarde de l'agrégat de référence
wanted[var_name] = Distribs_sal[1].df["sum"].sum()
print("On a calibré ", var_name)
plot_sal
plt.show()
%%capture
var_name = "retraite_brute"
erfs_to_cal_ff = erfs_cal_ff.copy()
# Calibration
erfs_cal_ff, erfs_to_cal_ind, Distribs_ret, plot_retraite = calibration(
erfs_to_cal_ind, var_name, annee_erfs, annee_pote
)
# Sauvegarde de l'agrégat de référence
wanted[var_name] = Distribs_ret[1].df["sum"].sum()
print("On a calibré ", var_name)
plt.figure().clear()
plot_retraite
plt.show()
var_name = "chomage_brut"
erfs_to_cal_ff = erfs_cal_ff.copy()
# Calibration
erfs_cal_ff, erfs_to_cal_ind, Distribs_chom, plot_chom = calibration(
erfs_to_cal_ind, var_name, annee_erfs, annee_pote
)
# Sauvegarde de l'agrégat de référence
wanted[var_name] = Distribs_chom[2].df["sum"].sum()
print("On a calibré ", var_name)
erfs_cal_ff["idfoy"].nunique()
erfs_to_cal_ff["idfoy"].nunique()
plt.figure().clear()
plot_chom
plt.show()
%%capture
var_name = "revenus_capitaux_prelevement_bareme"
erfs_to_cal_ff = erfs_cal_ff.copy()
# Calibration
erfs_cal_ff, erfs_to_cal_ind, Distribs_rk_pbar, plot_rk_pbar = calibration(
erfs_to_cal_ind, var_name, annee_erfs, annee_pote
)
plot_rk_pbar
# Sauvegarde de l'agrégat de référence
wanted[var_name] = Distribs_rk_pbar[2].df["sum"].sum()
print("On a calibré ", var_name)
plt.figure().clear()
plot_rk_pbar
plt.show()
%%capture
var_name = "revenus_capitaux_prelevement_liberatoire"
erfs_to_cal_ff = erfs_cal_ff.copy()
# Calibration
erfs_cal_ff, erfs_to_cal_ind, Distribs_rk_plib, plot_rk_plib = calibration(
erfs_to_cal_ind, var_name, annee_erfs, annee_pote
)
plot_rk_plib
# Sauvegarde de l'agrégat de référence
wanted[var_name] = Distribs_rk_plib[2].df["sum"].sum()
# A-t'on ajouté des gens dans la base?
if erfs_cal_ff["idfoy"].nunique() > erfs_to_cal_ff["idfoy"].nunique():
new_ppl = True
elif erfs_cal_ff["idfoy"].nunique() == erfs_to_cal_ff["idfoy"].nunique():
new_ppl = False
else:
erfs_cal_ff["idfoy"].nunique() < erfs_to_cal_ff["idfoy"].nunique()
raise Exception("Il y a eu une erreur dans la calibration de ", var_name)
print("On a calibré ", var_name)
plt.figure().clear()
plot_rk_plib
plt.show()
%%capture
var_name = "revenus_capitaux_prelevement_forfaitaire_unique_ir"
erfs_to_cal_ff = erfs_cal_ff.copy()
# Calibration
erfs_cal_ff, erfs_to_cal_ind, Distribs_rk_pfu, plot_rk_pfu = calibration(
erfs_to_cal_ind, var_name, annee_erfs, annee_pote
)
plot_rk_pfu
# Sauvegarde de l'agrégat de référence
wanted[var_name] = Distribs_rk_pfu[2].df["sum"].sum()
print("On a calibré ", var_name)
plt.figure().clear()
plot_rk_pfu
plt.show()
var_list = [
"revenus_capitaux_prelevement_forfaitaire_unique_ir",
"salaire_imposable",
"retraite_brute",
"revenus_capitaux_prelevement_bareme",
"revenus_capitaux_prelevement_liberatoire",
"chomage_brut",
]
# erfs_cal_ind.head()
for var in var_list:
pipeline_tracker.loc[var, "post_calib"] = round(
(erfs_to_cal_ind["wprm"] * erfs_to_cal_ind[var]).sum()
)
# Taille de la base
pipeline_tracker.loc["Len_ind", "post_calib"] = round(len(erfs_to_cal_ind["wprm"]))
erfsff = individus_to_foyers_fiscaux(erfs_to_cal_ind)
pipeline_tracker.loc["Nb_foyers", "post_calib"] = round((erfsff["wprm"]).sum())
pipeline_tracker.loc["Len_ff", "post_calib"] = round(len(erfsff["wprm"]))
del erfsff
pipeline_tracker
outfile_path = (
config.get("DATA_OUT")
+ "08_erfs_salaire_cal_ind"
+ config.get("YEAR_ERFS")
+ "_inflated_to_"
+ config.get("YEAR_POTE")
+ ".h5"
)
erfs_to_cal_ind.to_hdf(outfile_path, key="input")
del erfs_to_cal_ind
gc.collect()
erfs_cal_ind = pd.read_hdf(
config.get("DATA_OUT")
+ "07_erfs_salaire_to_cal_ind"
+ config.get("YEAR_ERFS")
+ "_inflated_to_"
+ config.get("YEAR_POTE")
+ ".h5"
)
erfs_cal_ind.columns
OpenFisca n'arrive pas à calculer le salaire de base à partir du salaire imposable. Pour avoir le salaire de base après calibration, nous utilisons l'approximation suivante:
Avant calibration:
salaire_imposable = salaire_de_base + primes_etc...
Après calibration:
salaire_imposable_cal = salaire_de_base_cal + primes_etc...
Donc on va supposer que:
salaire_de_base_cal = salaire_imposable_cal - primes_etc...
salaire_de_base_cal = salaire_imposable_cal - (salaire_imposable - salaire_de_base)
erfs_cal_ind["salaire_de_base"] = (
erfs_cal_ind["salaire_imposable"] - erfs_cal_ind["primes_etc"]
)
erfs_cal_ind.drop(["primes_etc"], axis=1, inplace=True)
sum_imp = (
erfs_cal_ind["salaire_imposable"] * erfs_cal_ind["wprm"]
).sum() # . * erfs_cal_ind["wprm"]).sum()
print("Somme totale de salaire imposable: ", sum_imp)
print("Somme totale de Salaire Imposable selon pote : ", wanted["salaire_imposable"])
sum_base = (
erfs_cal_ind["salaire_de_base"] * erfs_cal_ind["wprm"]
).sum() # * erfs_cal_ind["wprm"]).sum()
print("Somme totale de salaire de base: ", sum_base)
# print("Ratio salaire imposable / salaire de base :", sum_imp / sum_base)
# Ajout des valeurs
data.append("salaire_de_base")
pipeline_tracker.loc["salaire_de_base", "post_calib"] = round(
(erfs_cal_ind["wprm"] * erfs_cal_ind["salaire_de_base"]).sum()
)
pipeline_tracker
outfile_path = (
config.get("DATA_OUT")
+ "08_erfs_salaire_cal_ind"
+ config.get("YEAR_ERFS")
+ "_inflated_to_"
+ config.get("YEAR_POTE")
+ ".h5"
)
erfs_cal_ind.to_hdf(outfile_path, key="input")
del erfs_cal_ind
gc.collect()
erfs_cal_ind = pd.read_hdf(
config.get("DATA_OUT")
+ "08_erfs_salaire_cal_ind"
+ config.get("YEAR_ERFS")
+ "_inflated_to_"
+ config.get("YEAR_POTE")
+ ".h5"
)
erfs_cal_ind.columns
my_simu, dico = create_simulation(data=erfs_cal_ind, tbs=TBS, period=annee_erfs)
# Paramétrage
var_list = ["rfr"]
cols_declarant_principal = ["rfr"]
# Calcul du RFR
erfs_cal_ind = compute_var_in_ff(
my_simu, annee_erfs, erfs_cal_ind, var_list, cols_declarant_principal
)
tc.assertEqual(
erfs_cal_ind["idfoy"].nunique(),
individus_to_foyers_fiscaux(erfs_cal_ind)["idfoy"].nunique(),
)
# Initialisation
pipeline_tracker.loc["rfr", "post_calib"] = round(
(erfs_cal_ind["wprm"] * erfs_cal_ind["rfr"]).sum()
)
pipeline_tracker
erfs_cal_ind = pd.read_hdf(
config.get("DATA_OUT")
+ "08_erfs_salaire_cal_ind"
+ config.get("YEAR_ERFS")
+ "_inflated_to_"
+ config.get("YEAR_POTE")
+ ".h5"
)
erfs_cal_ind.columns
erfs_cal_ff = individus_to_foyers_fiscaux(erfs_cal_ind)
# On vérifie qu'on ne perd personne en route
tc.assertEqual(len(erfs_cal_ff["idfoy"].unique()), len(erfs_cal_ind["idfoy"].unique()))
# Année de départ: année de la base vieillie
year_start = int(config.get("YEAR_POTE"))
# Année de fin : année de production de la base pour calculs sur l'API
year_end = int(config.get("YEAR_COMPUTATION"))
print("On passe la base de ", year_start, " à ", year_end)
erfs_cal_ind.columns
cols_to_inflate = [
"chomage_brut",
"pensions_alimentaires_percues",
"rag",
"ric",
"rnc",
"salaire_de_base",
"f4ba",
"retraite_brute",
"rfr",
"revenus_capitaux_prelevement_bareme",
"revenus_capitaux_prelevement_liberatoire",
"revenus_capitaux_prelevement_forfaitaire_unique_ir",
"salaire_imposable",
]
erfs_inflated_ff = inflation_economique(
erfs_cal_ff, cols_to_inflate, year_start, year_end
)
# On ajoute les valeurs dans la base individus
cols_declarant_principal = cols_to_inflate
to_update = cols_to_inflate
erfs_inflated_ind = foyers_fiscaux_to_individus(
erfs_cal_ind,
erfs_inflated_ff,
to_update,
cols_declarant_principal,
)
for var in data:
pipeline_tracker.loc[var, "infl_eco21"] = round(
(erfs_inflated_ind["wprm"] * erfs_inflated_ind[var]).sum()
)
# Taille de la base
pipeline_tracker.loc["Len_ind", "infl_eco21"] = round(len(erfs_inflated_ind["wprm"]))
erfsff = individus_to_foyers_fiscaux(erfs_inflated_ind)
pipeline_tracker.loc["Nb_foyers", "infl_eco21"] = round((erfsff["wprm"]).sum())
pipeline_tracker.loc["Len_ff", "infl_eco21"] = round(len(erfsff["wprm"]))
del erfsff
pipeline_tracker
erfs_inflated_ff, erfs_inflate_foyers_ind = inflation_foyers(
erfs_inflated_ind, year_start, year_end
)
for var in data:
pipeline_tracker.loc[var, "infl_ff21"] = round(
(erfs_inflate_foyers_ind["wprm"] * erfs_inflate_foyers_ind[var]).sum()
)
# Taille de la base
pipeline_tracker.loc["Len_ind", "infl_ff21"] = round(
len(erfs_inflate_foyers_ind["wprm"])
)
erfsff = individus_to_foyers_fiscaux(erfs_inflate_foyers_ind)
pipeline_tracker.loc["Nb_foyers", "infl_ff21"] = round((erfsff["wprm"]).sum())
pipeline_tracker.loc["Len_ff", "infl_ff21"] = round(len(erfsff["wprm"]))
del erfsff
pipeline_tracker
erfs_inflate_foyers_ind.drop(["rfr"], axis=1, inplace=True)
erfs_inflate_foyers_ind.columns
cols_declarant_principal = [
"chomage_brut",
"pensions_alimentaires_percues",
"rag",
"ric",
"rnc",
"salaire_de_base",
"f4ba",
"retraite_brute",
"revenus_capitaux_prelevement_bareme",
"revenus_capitaux_prelevement_liberatoire",
"revenus_capitaux_prelevement_forfaitaire_unique_ir",
"salaire_imposable",
]
simulation, _ = create_simulation(
data=erfs_inflate_foyers_ind, tbs=TBS, period=year_end
)
erfs_inflate_foyers_ind = compute_var_in_ff(
simulation,
annee_de_calcul,
erfs_inflate_foyers_ind,
["rfr"],
cols_declarant_principal,
)
print(
"Total de RFR en ",
year_end,
" : ",
f'{(erfs_inflate_foyers_ind["rfr"]*erfs_inflate_foyers_ind["wprm"]).sum():,}',
" €",
)
On bruite la base par souci d'anonymité, en accord avec nos engagements auprès des fournisseurs de données (la DGFiP, pour POTE)
erfs_ind = bruitage(erfs_inflate_foyers_ind)
erfs_ind.columns
erfs_ind.columns
erfs_ind.describe()
# On vérifie qu'il n'y a pas de Nans dans la base (car OpenFisca ne les digère pas)
tc.assertEqual(erfs_ind.isna().sum().sum(), 0)
for col in erfs_ind.columns:
if erfs_ind[col].max() == 0:
print("On supprime cette colonne qui est vide :", col)
erfs_ind = erfs_ind.drop(col, axis=1)
elif erfs_ind[col].max() == np.nan:
print("On supprime cette colonne qui est vide :", col)
erfs_ind = erfs_ind.drop(col, axis=1)
# Ajout des valeurs
for var in data:
pipeline_tracker.loc[var, "final"] = round((erfs_ind["wprm"] * erfs_ind[var]).sum())
# Taille de la base
pipeline_tracker.loc["Len_ind", "final"] = round(len(erfs_ind["wprm"]))
erfsff = individus_to_foyers_fiscaux(erfs_ind)
pipeline_tracker.loc["Nb_foyers", "final"] = round((erfsff["wprm"]).sum())
pipeline_tracker.loc["Len_ff", "final"] = round(len(erfsff["wprm"]))
del erfsff
pipeline_tracker
# Sur le server local GitHub
loc = config.get("DATA_OUT")
# Sur le server ProxMox (inter-repos)
# loc = config.get("ERFS_FINAL_FOR_SIMU")
config.get("ERFS_FINAL_FOR_SIMU")
outfile_path = (
loc
+ "erfs_final_ind_"
+ config.get("YEAR_ERFS")
+ "_aged_to_"
+ config.get("YEAR_COMPUTATION")
+ "_N_"
+ str(N)
+ ".h5"
)
erfs_ind.to_hdf(outfile_path, key="input", mode="w")
outfile_path
erfs_ff = individus_to_foyers_fiscaux(erfs_ind)
outfile_path = (
loc
+ "erfs_final_ff_"
+ config.get("YEAR_ERFS")
+ "_aged_to_"
+ config.get("YEAR_COMPUTATION")
+ "_N_"
+ str(N)
+ ".h5"
)
erfs_ff.to_hdf(outfile_path, key="input", mode="w")
outfile_path
# NB: on prend toujours la base locale
N = 3
erfs_ind = pd.read_hdf(
config.get("DATA_OUT")
+ "erfs_final_ind_"
+ config.get("YEAR_ERFS")
+ "_aged_to_"
+ config.get("YEAR_COMPUTATION")
+ "_N_"
+ str(N)
+ ".h5"
)
nb_foyers = erfs_ind["wprm"].sum()
print("Total de foyers: ", nb_foyers, " agrégat POTE 2019 : 39_331_689 ")
tc.assertGreater(
abs(nb_foyers), 39_331_689
) # 39_331_689 est le Nb de foyers en 2019, la dernière valeur connue
sum_rfr = (erfs_ind["rfr"] * erfs_ind["wprm"]).sum()
print("Total de RFR: ", sum_rfr, " agrégat POTE 2019 : ", wanted["rfr"])
tc.assertGreater(abs(sum_rfr), 0.8 * wanted["rfr"])
sum_salaire_imposable = (erfs_ind["salaire_imposable"] * erfs_ind["wprm"]).sum()
print(
"Total de salaire_imposable ",
sum_salaire_imposable,
" agrégat POTE 2019 : ",
wanted["salaire_imposable"],
)
tc.assertGreater(abs(sum_salaire_imposable), 0.8 * wanted["salaire_imposable"])
sum_retraite = (erfs_ind["retraite_brute"] * erfs_ind["wprm"]).sum()
print(
"Total des retraites: ",
f"{sum_retraite:,}",
"€, vs agrégat POTE : ",
f'{wanted["retraite_brute"]:,}',
)
tc.assertGreater(abs(sum_retraite), 0.8 * wanted["retraite_brute"])
tc.assertGreater(abs(sum_retraite), 250_000_000_000)
tc.assertLess(abs(sum_retraite), 500_000_000_000)
tc.assertGreater(len(erfs_ind), 116_861)
%%capture
# ON GENERE UNE SIMU SUR LA NOUVELLE BASE
my_simu, _ = create_simulation(data=erfs_ind, tbs=TBS, period=annee_de_calcul)
to_calc = my_simu.calculate_add("assiette_csg_abattue", period=annee_de_calcul)
erfs_ind["assiette_csg_abattue"] = to_calc
to_calc = my_simu.calculate_add("assiette_csg_non_abattue", period=annee_de_calcul)
erfs_ind["assiette_csg_non_abattue"] = to_calc
print(
"Assiette de CSG activité en ",
annee_de_calcul,
" : ",
(
(erfs_ind["assiette_csg_non_abattue"] + erfs_ind["assiette_csg_abattue"])
* erfs_ind["wprm"]
).sum(),
)
to_calc = my_simu.calculate_add("csg_imposable_salaire", period=annee_de_calcul)
erfs_ind["csg_imposable_salaire"] = to_calc
to_calc = my_simu.calculate_add("csg_deductible_salaire", period=annee_de_calcul)
erfs_ind["csg_deductible_salaire"] = to_calc
print(
"Somme pondérée de CSG Imposable Activité en ",
annee_de_calcul,
" : " + f'{(erfs_ind["csg_imposable_salaire"] * erfs_ind["wprm"]).sum():,}',
)
print(
"Somme pondérée de CSG Déductible Activité en ",
annee_de_calcul,
" : " + f'{(erfs_ind["csg_deductible_salaire"] * erfs_ind["wprm"]).sum():,}',
)
sum_csg_sal = (erfs_ind["csg_deductible_salaire"] * erfs_ind["wprm"]).sum() + (
erfs_ind["csg_imposable_salaire"] * erfs_ind["wprm"]
).sum()
print(
"Total de CSG activité calculé : ",
f"{sum_csg_sal:,}",
"€ et attendu : ",
(58_807_000_000 + 17_216_000_000),
)
tc.assertLessEqual(-sum_csg_sal, 1.2 * (58_807_000_000 + 17_216_000_000))
tc.assertGreaterEqual(-sum_csg_sal, 0.8 * (58_807_000_000 + 17_216_000_000))
print(
"Somme des retraites post calibration : "
+ f'{ (erfs_ind["wprm"] * erfs_ind["retraite_brute"]).sum() :,}'
+ " €"
)
to_calc = my_simu.calculate_add("csg_imposable_retraite", period=annee_de_calcul)
erfs_ind["csg_imposable_retraite"] = to_calc
# my_simu.tracer.print_computation_log()
print(
"Total de CSG imposable sur les retraites en 2021 : ",
f'{(erfs_ind["csg_imposable_retraite"]*erfs_ind["wprm"]).sum():,}',
)
to_calc = my_simu.calculate_add("csg_deductible_retraite", period=annee_de_calcul)
erfs_ind["csg_deductible_retraite"] = to_calc
print(
"Total de CSG deductible sur les retraites en 2021 : ",
f'{(erfs_ind["csg_deductible_retraite"]*erfs_ind["wprm"]).sum():,}',
)
sum_csg_ret = (erfs_ind["csg_deductible_retraite"] * erfs_ind["wprm"]).sum() + (
erfs_ind["csg_imposable_retraite"] * erfs_ind["wprm"]
).sum()
print(
"Total de CSG Retraites calculé : ",
f"{sum_csg_ret:,}",
"€ et attendu : ",
f"{(21_291_000_000):,}",
)
tc.assertLessEqual(-sum_csg_ret, 1.4 * 21_291_000_000)
tc.assertGreaterEqual(-sum_csg_ret, 0.6 * 21_291_000_000)
print(
"Somme du Chômage post calibration : "
+ f'{ (erfs_ind["wprm"] * erfs_ind["chomage_brut"]).sum() :,}'
+ " €"
)
to_calc = my_simu.calculate_add("csg_imposable_chomage", period=annee_de_calcul)
erfs_ind["csg_imposable_chomage"] = to_calc
# my_simu.tracer.print_computation_log()
print(
"Total de CSG imposable sur le chomage en 2021 : ",
f'{(erfs_ind["csg_imposable_chomage"]*erfs_ind["wprm"]).sum():,}',
)
to_calc = my_simu.calculate_add("csg_deductible_chomage", period=annee_de_calcul)
erfs_ind["csg_deductible_chomage"] = to_calc
print(
"Total de CSG deductible sur le chomage en 2021 : ",
f'{(erfs_ind["csg_deductible_chomage"]*erfs_ind["wprm"]).sum():,}',
)
sum_csg_chom = (erfs_ind["csg_deductible_chomage"] * erfs_ind["wprm"]).sum() + (
erfs_ind["csg_imposable_chomage"] * erfs_ind["wprm"]
).sum()
print(
"Total de CSG Chomage calculé : ",
f"{sum_csg_chom:,}",
"€ et attendu : ",
f"{1_037_000_000:,}",
)
tc.assertLessEqual(-sum_csg_chom, 1.2 * 1_037_000_000)
tc.assertGreaterEqual(-sum_csg_chom, 0.8 * 1_037_000_000)
var_list = ["csg_revenus_capital"]
cols_declarant_principal = ["csg_revenus_capital"]
# Calcul en foyers fiscaux
erfs_ind = compute_var_in_ff(
my_simu,
annee_de_calcul,
erfs_ind,
var_list,
cols_declarant_principal,
)
print(
"Total de CSG sur les revenus du capital en 2021 : ",
f'{(erfs_ind["csg_revenus_capital"]*erfs_ind["wprm"]).sum():,}',
)
sum_csg_rk = (erfs_ind["csg_revenus_capital"] * erfs_ind["wprm"]).sum()
print(
"Total de CSG sur les Revenus du Capital calculé : ",
f"{sum_csg_rk:,}",
"€ et attendu : ",
(12_344_000_000),
)
tc.assertLessEqual(-sum_csg_rk, 1.2 * 12_344_000_000)
tc.assertGreaterEqual(-sum_csg_rk, 0.7 * 12_344_000_000)
to_calc = my_simu.calculate_add("crds_salaire", period=annee_de_calcul)
erfs_ind["crds_salaire"] = to_calc
sum_crds_sal = (erfs_ind["crds_salaire"] * erfs_ind["wprm"]).sum()
print(
"Somme pondérée de CRDS Activité en ",
annee_de_calcul,
" : " + f"{sum_crds_sal:,}",
)
print(
"Total de CRDS sur les Revenus d'Activité calculé : ",
f"{sum_crds_sal:,}",
"€ et attendu : ",
10,
)
# tc.assertLessEqual(-sum_crds_sal, 1.2 * 000_000)
# tc.assertGreaterEqual(-sum_crds_sal, 0.8 * 000_000)
to_calc = my_simu.calculate_add("crds_retraite", period=annee_de_calcul)
erfs_ind["crds_retraite"] = to_calc
sum_crds_ret = (erfs_ind["crds_retraite"] * erfs_ind["wprm"]).sum()
print(
"Somme pondérée de CRDS Retraite en ",
annee_de_calcul,
" : " + f"{sum_crds_ret:,}",
)
print(
"Total de CRDS sur les Retraites calculé : ",
f"{sum_crds_ret:,}",
"€ et attendu : ",
(10),
)
# tc.assertLessEqual(sum_crds_ret, 1.2 * 000_000)
# tc.assertGreaterEqual(sum_crds_ret, 0.8 * 000_000)
to_calc = my_simu.calculate_add("crds_chomage", period=annee_de_calcul)
erfs_ind["crds_chomage"] = to_calc
sum_crds_chom = (erfs_ind["crds_chomage"] * erfs_ind["wprm"]).sum()
print(
"Somme pondérée de CRDS Chômage en ",
annee_de_calcul,
" : " + f"{sum_crds_chom:,}",
)
print(
"Total de CRDS sur le Chômage calculé : ",
f"{sum_crds_chom:,}",
"€ et attendu : ",
(10),
)
# tc.assertLessEqual(sum_crds_chom, 1.2 * 000_000)
# tc.assertGreaterEqual(sum_crds_chom, 0.8 * 000_000)
var_list = ["crds_revenus_capital"]
cols_declarant_principal = ["crds_revenus_capital"]
# Calcul en foyers fiscaux
erfs_ind = compute_var_in_ff(
my_simu,
annee_de_calcul,
erfs_ind,
var_list,
cols_declarant_principal,
)
sum_crds_rk = (erfs_ind["crds_revenus_capital"] * erfs_ind["wprm"]).sum()
print(
"Somme pondérée de CRDS sur les revenus du capital en ",
annee_de_calcul,
" : " + f"{sum_crds_rk:,}",
)
print(
"Total de CRDS sur les Revenus du Capital calculé : ",
f"{sum_crds_rk:,}",
"€ et attendu : ",
(10),
)
# tc.assertLessEqual(sum_crds_rk, 1.2 * 000_000)
# tc.assertGreaterEqual(sum_crds_rk, 0.8 * 000_000)
var_list = ["irpp"]
cols_declarant_principal = ["irpp"]
# Calcul en foyers fiscaux
erfs_ind = compute_var_in_ff(
my_simu, annee_de_calcul, erfs_ind, var_list, cols_declarant_principal
)
sum_irpp = (erfs_ind["irpp"] * erfs_ind["wprm"]).sum()
print(
"Somme pondérée d'impot sur le revenu des personnes physiques en ",
annee_de_calcul,
" : " + f"{sum_irpp:,}",
)
print(
"Total de CRDS sur les Revenus du Capital calculé : ",
f"{sum_irpp:,}",
"€ et attendu : ",
(76_000_000_000),
)
# tc.assertLessEqual(sum_irpp, 1.2 * 76_000_000_000) # Agrégat issu du PLF 2022
# tc.assertGreaterEqual(sum_irpp, 0.8 * 76_000_000_000)
print(f"Temps d'exécution total : {(time()-start_notebook)/60:,.2f} minutes.")