--- title: Simulation de réforme pour toutes les CSG : Calcul de toutes les CSG keywords: fastai sidebar: home_sidebar nb_path: "notebooks/analyses/csg_50_usage_copules.ipynb" ---
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"
import random
import pandas as pd
from leximpact_socio_fisca_simu_etat.config import Configuration
config = Configuration(project_name="leximpact-prepare-data")
from leximpact_socio_fisca_simu_etat.csg_simu import create_simulation
from openfisca_france import FranceTaxBenefitSystem
# from leximpact_simulation_engine.simulate_pop_from_reform import ( # type: ignore
# TBS_DEFAULT,
# annee_de_calcul,
# simulation,
# )
def calcul_distrib_from_copules(value_ref, dict_copules):
"""
Implementation d'une fonction probabilistique qui associe une valeur estimée pour une variable VAR selon la valeur d'une variable de référence VAR_REF , en utilisant des copules calculés à partir des distributions de VAR1 et VAR_REF.
Par exemple, VAR1 = Rk et VAR_REF = RFR
Dans quel bucket est value_ref ? ===> Nous donne la distrib de VAR
value_ref : une valeur de VAR_REF pour laquelle on cherche à attribuer une valeur de VAR
dictionnaire_fichier
"""
for bucket in dict_copules:
if bucket["lower_bound"] <= value_ref < bucket["upper_bound"]:
break
probazero = bucket["nb_people"]["zero"] / sum(bucket["nb_people"].values())
# On genere une variable aleatoire entre 0 et 1
randunif = random.random()
# Si cette proba est <= à la probabilité que Rk = 0
if randunif <= probazero:
return 0
# Si on n'est pas dans ce cas
randunif = (randunif - probazero) / (1 - probazero)
randinnbgens = randunif * bucket["nb_people"]["nonzero"]
# Générer un rk à partir de cette distrib (Monte-Carlo probablement)
sommepass = 0 # nombre de gens dans les buckets déjà dépassés. Quand on a passé le nombre de gens qu'on veut, on a fini de générer !
for bucket_rk in bucket["nonzerobuckets"]:
sommepass += bucket_rk[0]
if sommepass > randinnbgens:
return bucket_rk[1] / bucket_rk[0]
A noter : le wprm est un facteur indiquant le "poids" mathématique du foyer par rapport à la population française. Il est le même pour tout les individus d'un foyer, il ne faut donc pas le sommer quand on regroupe les individus par foyer, mais le garder tel quel.
def individus_to_foyer_fiscaux(sample_individus):
"""
Regroupe un échantillon par individus en échantillon par foyers fiscaux
"""
# On regroupe les individus de sample_pop_individus en foyers fiscaux par leur idfoy
sample_foyers_fiscaux = sample_individus.groupby(
["idfoy", "wprm"], as_index=False
).sum()
return sample_foyers_fiscaux
def foyer_fiscaux_to_individus(
sample_ind_origin, sample_ff_to_merge, cols_declarant_principal
):
"""
Ajoute les données d'un échantillon de foyers fiscaux dans un échantillon par individus
"""
# On fusionne les nouvelles valeurs dans l'échantillon originel par individus
sample_individus_new = pd.merge(
sample_ind_origin,
sample_ff_to_merge,
left_on="idfoy",
right_on="idfoy",
indicator=False,
)
# On supprime les valeurs qui ne s'appliquent qu'au déclarant principal pour les autres déclarants
for col in cols_declarant_principal:
sample_individus_new.loc[sample_individus_new["quifoy"] != 0, col] = 0
return sample_individus_new
def convert_erfr_to_openfisca(data):
# Traduction des roles attribués au format openfisca
data["quimenof"] = "enfant"
data.loc[data["quifoy"] == 1, "quimenof"] = "conjoint"
data.loc[data["quifoy"] == 0, "quimenof"] = "personne_de_reference"
data["quifoyof"] = "personne_a_charge"
data.loc[data["quifoy"] == 1, "quifoyof"] = "conjoint"
data.loc[data["quifoy"] == 0, "quifoyof"] = "declarant_principal"
data["quifamof"] = "enfant"
data.loc[data["quifam"] == 1, "quifamof"] = "conjoint"
data.loc[data["quifam"] == 0, "quifamof"] = "demandeur"
return data
sample_pop_ind_initial = pd.read_hdf(config.get("SAMPLE_POP")) # Une ligne par personne
sample_pop_ind_initial.columns
# Pour cela on utilise le dummy_data_final qui sort de la Madhinette et on lui calcule la colonne RFR
# Cette colonne RFR sera la variable de référence sur laquelle nous allons caler toutes nos données POTE
annee_de_calcul = "2021"
sample_pop_ind_initial_of = convert_erfr_to_openfisca(sample_pop_ind_initial)
# On lance une simulation OpenFisca
my_simu, dico = create_simulation(
data=sample_pop_ind_initial_of, tbs=FranceTaxBenefitSystem(), period=annee_de_calcul
)
# On doit regrouper les personnes en foyers fiscaux pour calculer le RFR. On doit convertir l'échantillon:
sample_pop_ff = individus_to_foyer_fiscaux(sample_pop_ind_initial_of)
# On calcule une colonne de RFR pour notre échantillon
to_calc = my_simu.calculate("rfr", annee_de_calcul)
sample_pop_ff["rfr"] = to_calc
!ls -lrt {config.get("COPULE_FOLDER")}
liste_des_variables_csg = pd.read_csv(
config.get("DATA_IN") + "liste_des_variables_csg-POTE_2019.csv"
)
pd.options.display.float_format = "{:,.7f}".format
liste_des_variables_csg
def get_aggregate(df, variable_name):
return df.loc[df["nom_variable"] == variable_name, "somme_en_euros"].iloc[0]
(
get_aggregate(liste_des_variables_csg, "rev_k_plvt_bareme")
+ get_aggregate(liste_des_variables_csg, "rev_k_plvt_lib")
+ get_aggregate(liste_des_variables_csg, "rev_k_plvt_forf_unik")
)
# RENTES VIAGERES Rv = Z1cw + Z1dw (somme des colonnes dans POTE)
# REVENU CATEGORIEL FONCIER RCF = Z4ba + Z4bb + Z4bc + Z4bd (somme des colonnes dans POTE)
# PLUS-VALUES PV = z3vg + z3ua + z3vz (somme des colonnes dans POTE)
# CHOMAGE chom = Z8sw + Z8sx (somme des colonnes dans POTE)
# RETRAITES ret = Z1as + Z1bs + MNIMQG (somme des colonnes dans POTE)
# PRE-RETRAITES Pre-ret = Z8sc (colonne dans POTE)
# TODO : INDEMNITES JOURNALIERES
data_to_process = [
{
"file": "20210610ExportCopule-rev_capital_partiel.txt",
"column_name": "pote_rev_capital",
"aggregat": get_aggregate(liste_des_variables_csg, "rev_k_plvt_bareme")
+ get_aggregate(liste_des_variables_csg, "rev_k_plvt_lib")
+ get_aggregate(liste_des_variables_csg, "rev_k_plvt_forf_unik"), # 35570000000
},
{
"file": "20210610ExportCopule-rente_viagere.txt",
"column_name": "pote_rente_viagere",
"aggregat": get_aggregate(
liste_des_variables_csg, "rente_viagere"
), # 869000000
},
{
"file": "20210610ExportCopule-rev_categ_foncier.txt",
"column_name": "pote_rev_categ_foncier",
"aggregat": get_aggregate(
liste_des_variables_csg, "rev_categ_foncier"
), # 48000000000
},
{
"file": "20210610ExportCopule-ass_csg_plus_value.txt",
"column_name": "pote_plus_values",
"aggregat": get_aggregate(
liste_des_variables_csg, "ass_csg_plus_value"
), # 18700000000
},
{
"file": "20210610ExportCopule-chomage.txt",
"column_name": "pote_chomage",
"aggregat": get_aggregate(liste_des_variables_csg, "chomage"), # 35_110_527_952
},
{
"file": "20210610ExportCopule-retraites.txt",
"column_name": "pote_retraite",
"aggregat": get_aggregate(
liste_des_variables_csg, "retraites"
), # 307_254_581_479
},
{
"file": "20210610ExportCopule-pre_retraites.txt",
"column_name": "pote_pre-retraite",
"aggregat": 1500000, # get_aggregate(liste_des_variables_csg, "pre_retraites")
},
]
def integration_data_ff(data_to_process):
for data in data_to_process:
variable_name = data["column_name"]
# On charge les copules
with open(config.get("COPULE_FOLDER") + data["file"]) as fichier:
contenu = fichier.read()
dict_copules_RFR_Data = eval(contenu)
# Chercher la distribution optimale
# On ajoute une distribution de data dans notre échantillon de population
sample_pop_ff[variable_name] = sample_pop_ff["rfr"].apply(
lambda row: calcul_distrib_from_copules(row, dict_copules_RFR_Data)
)
# Calcul de l'erreur
value = (sample_pop_ff[variable_name] * sample_pop_ff["wprm"]).sum()
expected = data["aggregat"]
# print(f"On trouve {value:,} € de {variable_name} alors qu'en 2019 c'était {expected:,} €, soit {((expected-value)/expected)*100}%")
data["error"] = ((expected - value) / expected) * 100
return sample_pop_ff, data_to_process
sample_pop_ff, data_to_process = integration_data_ff(data_to_process)
sample_pop_ff
data_to_process
new_columns = [data["column_name"] for data in data_to_process]
new_columns
sample_ff_to_merge = sample_pop_ff[["idfoy", "rfr"] + new_columns]
cols_declarant_principal = ["rfr"] + new_columns
sample_pop_ind = foyer_fiscaux_to_individus(
sample_pop_ind_initial, sample_ff_to_merge, cols_declarant_principal
)
print(sample_pop_ind.columns)
%%time
# Estimation de la variabilité des résultats due au random state
error_table = []
for i in range(10):
sample_pop_ff, data_to_process = integration_data_ff(data_to_process)
errors = {}
for data in data_to_process:
errors[data["column_name"]] = data["error"]
error_table.append(errors)
error_table = pd.DataFrame(error_table)
error_table
def get_ratios():
pote_rev_capital = (
get_aggregate(liste_des_variables_csg, "rev_k_plvt_bareme")
+ get_aggregate(liste_des_variables_csg, "rev_k_plvt_lib")
+ get_aggregate(liste_des_variables_csg, "rev_k_plvt_forf_unik")
)
return {
"ratio_revenus_capitaux_prelevement_bareme": get_aggregate(
liste_des_variables_csg, "rev_k_plvt_bareme"
)
/ pote_rev_capital,
"ratio_revenus_capitaux_prelevement_liberatoire": get_aggregate(
liste_des_variables_csg, "rev_k_plvt_lib"
)
/ pote_rev_capital,
"ratio_revenus_capitaux_prelevement_forfaitaire_unique_ir": get_aggregate(
liste_des_variables_csg, "rev_k_plvt_forf_unik"
)
/ pote_rev_capital,
}
get_ratios()
sum(get_ratios().values())
def convert_to_openfisca(data):
# On voudrait faire: dummy_data_csg['revenus_capital'] = dummy_data_final_rk['rk']
revenus_capital = data["pote_rev_capital"]
# Dans OpenFisca, on calcule rk ainsi: rk = revenus_capitaux_prelevement_bareme + revenus_capitaux_prelevement_liberatoire + revenus_capitaux_prelevement_forfaitaire_unique_ir
ratios = get_ratios()
data["revenus_capitaux_prelevement_bareme"] = (
revenus_capital * ratios["ratio_revenus_capitaux_prelevement_bareme"]
)
data["revenus_capitaux_prelevement_liberatoire"] = (
revenus_capital * ratios["ratio_revenus_capitaux_prelevement_liberatoire"]
)
data["revenus_capitaux_prelevement_forfaitaire_unique_ir"] = (
revenus_capital
* ratios["ratio_revenus_capitaux_prelevement_forfaitaire_unique_ir"]
)
# Rentes Viageres
data["rente_viagere_titre_onereux_net"] = data["pote_rente_viagere"]
# Revenu catégoriel foncier
data["revenu_categoriel_foncier"] = data["pote_rev_categ_foncier"]
# Plus-values
data["assiette_csg_plus_values"] = data["pote_plus_values"]
# On n'ajoute pas chomage_brut ni retraite_brute car déjà présent dans ERFS_FPR
# data["chomage_brut"] = data[
# "pote_chomage"
# ] # À vérifier : est-ce que c'est bien chomage brut qu'on calcule a partir de POTE ? OU : y etait-il deja avant, dans l'ERFS-FPR ?
# data["retraite_brute"] = data[
# "pote_retraite"
# ] # À vérifier : est-ce que c'est bien retraite brute qu'on calcule a partir de POTE ?
# Traduction des roles attribués au format openfisca
data["quimenof"] = "enfant"
data.loc[data["quifoy"] == 1, "quimenof"] = "conjoint"
data.loc[data["quifoy"] == 0, "quimenof"] = "personne_de_reference"
data["quifoyof"] = "personne_a_charge"
data.loc[data["quifoy"] == 1, "quifoyof"] = "conjoint"
data.loc[data["quifoy"] == 0, "quifoyof"] = "declarant_principal"
data["quifamof"] = "enfant"
data.loc[data["quifam"] == 1, "quifamof"] = "conjoint"
data.loc[data["quifam"] == 0, "quifamof"] = "demandeur"
return data
sample_pop_ind = convert_to_openfisca(sample_pop_ind)
sample_pop_ff = convert_to_openfisca(sample_pop_ff)
sample_pop_ind.to_csv(config.get("DATA_OUT") + "sample_pop_ind.csv", index=False)
sample_pop_ff.to_csv(config.get("DATA_OUT") + "sample_pop_ff.csv", index=False)
sample_pop_ind.columns
# On regroupe en foyers fiscaux pour calculer les assiettes
sample_pop_ff = individus_to_foyer_fiscaux(sample_pop_ind)
# Assiette du capital
to_calc = my_simu.calculate("assiette_csg_revenus_capital", annee_de_calcul)
sample_pop_ff["assiette_csg_revenus_capital"] = to_calc
#
print(
"assiette_csg_revenus_capital: ",
sum(sample_pop_ff["assiette_csg_revenus_capital"]),
"€",
)
to_calc = my_simu.calculate("csg_revenus_capital", annee_de_calcul)
sample_pop_ff["csg_revenus_capital"] = to_calc
CSG_Capital = to_calc * sample_pop_ff["wprm"]
print(
f"Total de CSG sur les revenus du capital, expected ~13.79 Mds€: {CSG_Capital.sum():,}"
)
print("Erreur", 100 * (CSG_Capital.sum() + 13790000000) / 13790000000, "% \n")
# Ref: p51 de https://www.securite-sociale.fr/files/live/sites/SSFR/files/medias/CCSS/2019/CCSS%20SEPT%2019%20DEF.pdf
to_calc = my_simu.calculate_add("csg_deductible_chomage", annee_de_calcul)
sample_pop_ind["csg_deductible_chomage"] = to_calc
to_calc = my_simu.calculate_add("csg_imposable_chomage", annee_de_calcul)
sample_pop_ind["csg_imposable_chomage"] = to_calc
sample_pop_ind["csg_chomage"] = (
sample_pop_ind["csg_imposable_chomage"] + sample_pop_ind["csg_imposable_chomage"]
)
CSG_chom = sample_pop_ind["csg_chomage"] * sample_pop_ind["wprm"]
print(f"CSG Chomage pondérée, expected ~1.9 Mds€ : {CSG_chom.sum():,}")
print("Erreur", 100 * (CSG_chom.sum() + 1900000000) / 1900000000, "% \n")
to_calc = my_simu.calculate_add("csg_deductible_retraite", annee_de_calcul)
sample_pop_ind["csg_deductible_retraite"] = to_calc
to_calc = my_simu.calculate_add("csg_imposable_retraite", annee_de_calcul)
sample_pop_ind["csg_imposable_retraite"] = to_calc
sample_pop_ind["csg_retraite"] = (
sample_pop_ind["csg_imposable_retraite"] + sample_pop_ind["csg_deductible_retraite"]
)
CSG_retraite = sample_pop_ind["csg_retraite"] * sample_pop_ind["wprm"]
print(f"CSG Chomage pondérée, expected ~ 21.869 Mds€ : {CSG_retraite.sum():,}")
print("Erreur", 100 * (CSG_retraite.sum() + 21869000000) / 21869000000, "% \n")
assert (
abs(
(
sample_pop_ff["revenus_capitaux_prelevement_bareme"] * sample_pop_ff["wprm"]
).sum()
- get_aggregate(liste_des_variables_csg, "rev_k_plvt_bareme")
)
/ get_aggregate(liste_des_variables_csg, "rev_k_plvt_bareme")
< 0.2
)
assert sample_pop_ff["wprm"].sum() > 38_000_000
assert sample_pop_ind["wprm"].sum() > 50_000_000
assert abs(sample_pop_ind["rfr"].sum() - sample_pop_ff["rfr"].sum()) < 1_000
des données finales
sample_pop_ind.to_csv(config.get("DATA_OUT") + "sample_pop_ind_csg.csv", index=False)
sample_pop_ff.to_csv(config.get("DATA_OUT") + "sample_pop_ff_csg.csv", index=False)