--- title: Simulation de réforme pour toutes les CSG : Calcul de toutes les CSG keywords: fastai sidebar: home_sidebar nb_path: "notebooks/analyses/csg_50_usage_copules.ipynb" ---
{% raw %}
{% endraw %} {% raw %}
from IPython.core.interactiveshell import InteractiveShell

InteractiveShell.ast_node_interactivity = "all"
{% endraw %} {% raw %}
import random

import pandas as pd
from leximpact_socio_fisca_simu_etat.config import Configuration

config = Configuration(project_name="leximpact-prepare-data")
{% endraw %} {% raw %}
from leximpact_socio_fisca_simu_etat.csg_simu import create_simulation
from openfisca_france import FranceTaxBenefitSystem
{% endraw %} {% raw %}
# from leximpact_simulation_engine.simulate_pop_from_reform import (  # type: ignore
#     TBS_DEFAULT,
#     annee_de_calcul,
#     simulation,
# )
{% endraw %}

I - OUTILS

{% raw %}
def calcul_distrib_from_copules(value_ref, dict_copules):
    """
    Implementation d'une fonction probabilistique qui associe une valeur estimée pour une variable VAR selon       la valeur d'une variable de référence VAR_REF , en utilisant des copules calculés à partir des                 distributions de VAR1 et VAR_REF.
    Par exemple, VAR1 = Rk et VAR_REF = RFR

    Dans quel bucket est value_ref ? ===> Nous donne la distrib de VAR
    value_ref :  une valeur de VAR_REF pour laquelle on cherche à attribuer une valeur de VAR
    dictionnaire_fichier
    """

    for bucket in dict_copules:
        if bucket["lower_bound"] <= value_ref < bucket["upper_bound"]:
            break

    probazero = bucket["nb_people"]["zero"] / sum(bucket["nb_people"].values())

    # On genere une variable aleatoire entre 0 et 1
    randunif = random.random()
    # Si cette proba est <= à la probabilité que Rk = 0
    if randunif <= probazero:
        return 0
    # Si on n'est pas dans ce cas
    randunif = (randunif - probazero) / (1 - probazero)

    randinnbgens = randunif * bucket["nb_people"]["nonzero"]

    # Générer un rk à partir de cette distrib (Monte-Carlo probablement)
    sommepass = 0  # nombre de gens dans les buckets déjà dépassés. Quand on a passé le nombre de gens qu'on veut, on a fini de générer !
    for bucket_rk in bucket["nonzerobuckets"]:
        sommepass += bucket_rk[0]
        if sommepass > randinnbgens:
            return bucket_rk[1] / bucket_rk[0]
{% endraw %}

Passage des individus aux foyers fiscaux

A noter : le wprm est un facteur indiquant le "poids" mathématique du foyer par rapport à la population française. Il est le même pour tout les individus d'un foyer, il ne faut donc pas le sommer quand on regroupe les individus par foyer, mais le garder tel quel.

{% raw %}
def individus_to_foyer_fiscaux(sample_individus):
    """
    Regroupe un échantillon par individus en échantillon par foyers fiscaux
    """
    # On regroupe les individus de sample_pop_individus en foyers fiscaux par leur idfoy
    sample_foyers_fiscaux = sample_individus.groupby(
        ["idfoy", "wprm"], as_index=False
    ).sum()

    return sample_foyers_fiscaux
{% endraw %} {% raw %}
def foyer_fiscaux_to_individus(
    sample_ind_origin, sample_ff_to_merge, cols_declarant_principal
):
    """
    Ajoute les données d'un échantillon de foyers fiscaux dans un échantillon par individus
    """
    # On fusionne les nouvelles valeurs dans l'échantillon originel par individus
    sample_individus_new = pd.merge(
        sample_ind_origin,
        sample_ff_to_merge,
        left_on="idfoy",
        right_on="idfoy",
        indicator=False,
    )

    # On supprime les valeurs qui ne s'appliquent qu'au déclarant principal pour les autres déclarants
    for col in cols_declarant_principal:
        sample_individus_new.loc[sample_individus_new["quifoy"] != 0, col] = 0

    return sample_individus_new
{% endraw %} {% raw %}
def convert_erfr_to_openfisca(data):
    # Traduction des roles attribués au format openfisca
    data["quimenof"] = "enfant"
    data.loc[data["quifoy"] == 1, "quimenof"] = "conjoint"
    data.loc[data["quifoy"] == 0, "quimenof"] = "personne_de_reference"

    data["quifoyof"] = "personne_a_charge"
    data.loc[data["quifoy"] == 1, "quifoyof"] = "conjoint"
    data.loc[data["quifoy"] == 0, "quifoyof"] = "declarant_principal"

    data["quifamof"] = "enfant"
    data.loc[data["quifam"] == 1, "quifamof"] = "conjoint"
    data.loc[data["quifam"] == 0, "quifamof"] = "demandeur"
    return data
{% endraw %}

II - Import des données POTE dans notre échantillon

1 - Echantillon de population

{% raw %}
sample_pop_ind_initial = pd.read_hdf(config.get("SAMPLE_POP"))  # Une ligne par personne
sample_pop_ind_initial.columns
{% endraw %} {% raw %}
# Pour cela on utilise le dummy_data_final qui sort de la Madhinette et on lui calcule la colonne RFR
# Cette colonne RFR sera la variable de référence sur laquelle nous allons caler toutes nos données POTE

annee_de_calcul = "2021"
sample_pop_ind_initial_of = convert_erfr_to_openfisca(sample_pop_ind_initial)
# On lance une simulation OpenFisca
my_simu, dico = create_simulation(
    data=sample_pop_ind_initial_of, tbs=FranceTaxBenefitSystem(), period=annee_de_calcul
)
# On doit regrouper les personnes en foyers fiscaux pour calculer le RFR. On doit convertir l'échantillon:
sample_pop_ff = individus_to_foyer_fiscaux(sample_pop_ind_initial_of)

# On calcule une colonne de RFR pour notre échantillon
to_calc = my_simu.calculate("rfr", annee_de_calcul)
sample_pop_ff["rfr"] = to_calc
{% endraw %} {% raw %}
!ls -lrt {config.get("COPULE_FOLDER")}
{% endraw %}

Chargement des agregats de POTE

{% raw %}
liste_des_variables_csg = pd.read_csv(
    config.get("DATA_IN") + "liste_des_variables_csg-POTE_2019.csv"
)
pd.options.display.float_format = "{:,.7f}".format
liste_des_variables_csg
{% endraw %} {% raw %}
def get_aggregate(df, variable_name):
    return df.loc[df["nom_variable"] == variable_name, "somme_en_euros"].iloc[0]
{% endraw %} {% raw %}
(
    get_aggregate(liste_des_variables_csg, "rev_k_plvt_bareme")
    + get_aggregate(liste_des_variables_csg, "rev_k_plvt_lib")
    + get_aggregate(liste_des_variables_csg, "rev_k_plvt_forf_unik")
)
{% endraw %}

2 - Ajout des données dans l'échantillon par foyers fiscaux

{% raw %}
# RENTES VIAGERES Rv = Z1cw + Z1dw (somme des colonnes dans POTE)
# REVENU CATEGORIEL FONCIER RCF = Z4ba + Z4bb + Z4bc + Z4bd (somme des colonnes dans POTE)
# PLUS-VALUES PV = z3vg + z3ua + z3vz (somme des colonnes dans POTE)
# CHOMAGE chom = Z8sw + Z8sx (somme des colonnes dans POTE)
# RETRAITES ret = Z1as + Z1bs + MNIMQG (somme des colonnes dans POTE)
# PRE-RETRAITES Pre-ret = Z8sc (colonne dans POTE)
# TODO : INDEMNITES JOURNALIERES
data_to_process = [
    {
        "file": "20210610ExportCopule-rev_capital_partiel.txt",
        "column_name": "pote_rev_capital",
        "aggregat": get_aggregate(liste_des_variables_csg, "rev_k_plvt_bareme")
        + get_aggregate(liste_des_variables_csg, "rev_k_plvt_lib")
        + get_aggregate(liste_des_variables_csg, "rev_k_plvt_forf_unik"),  # 35570000000
    },
    {
        "file": "20210610ExportCopule-rente_viagere.txt",
        "column_name": "pote_rente_viagere",
        "aggregat": get_aggregate(
            liste_des_variables_csg, "rente_viagere"
        ),  # 869000000
    },
    {
        "file": "20210610ExportCopule-rev_categ_foncier.txt",
        "column_name": "pote_rev_categ_foncier",
        "aggregat": get_aggregate(
            liste_des_variables_csg, "rev_categ_foncier"
        ),  # 48000000000
    },
    {
        "file": "20210610ExportCopule-ass_csg_plus_value.txt",
        "column_name": "pote_plus_values",
        "aggregat": get_aggregate(
            liste_des_variables_csg, "ass_csg_plus_value"
        ),  # 18700000000
    },
    {
        "file": "20210610ExportCopule-chomage.txt",
        "column_name": "pote_chomage",
        "aggregat": get_aggregate(liste_des_variables_csg, "chomage"),  # 35_110_527_952
    },
    {
        "file": "20210610ExportCopule-retraites.txt",
        "column_name": "pote_retraite",
        "aggregat": get_aggregate(
            liste_des_variables_csg, "retraites"
        ),  # 307_254_581_479
    },
    {
        "file": "20210610ExportCopule-pre_retraites.txt",
        "column_name": "pote_pre-retraite",
        "aggregat": 1500000,  #  get_aggregate(liste_des_variables_csg, "pre_retraites")
    },
]


def integration_data_ff(data_to_process):
    for data in data_to_process:
        variable_name = data["column_name"]
        # On charge les copules
        with open(config.get("COPULE_FOLDER") + data["file"]) as fichier:
            contenu = fichier.read()
            dict_copules_RFR_Data = eval(contenu)

        # Chercher la distribution optimale
        # On ajoute une distribution de data dans notre échantillon de population
        sample_pop_ff[variable_name] = sample_pop_ff["rfr"].apply(
            lambda row: calcul_distrib_from_copules(row, dict_copules_RFR_Data)
        )
        # Calcul de l'erreur
        value = (sample_pop_ff[variable_name] * sample_pop_ff["wprm"]).sum()
        expected = data["aggregat"]
        # print(f"On trouve {value:,} € de {variable_name} alors qu'en 2019 c'était {expected:,} €, soit {((expected-value)/expected)*100}%")
        data["error"] = ((expected - value) / expected) * 100

    return sample_pop_ff, data_to_process


sample_pop_ff, data_to_process = integration_data_ff(data_to_process)
sample_pop_ff
data_to_process
{% endraw %}

2 - Séparation de l'échantillon en individus

{% raw %}
new_columns = [data["column_name"] for data in data_to_process]
new_columns
{% endraw %} {% raw %}
sample_ff_to_merge = sample_pop_ff[["idfoy", "rfr"] + new_columns]
cols_declarant_principal = ["rfr"] + new_columns
sample_pop_ind = foyer_fiscaux_to_individus(
    sample_pop_ind_initial, sample_ff_to_merge, cols_declarant_principal
)

print(sample_pop_ind.columns)
{% endraw %}

3 - Analyse des erreurs

TODO

{% raw %}
%%time
# Estimation de la variabilité des résultats due au random state
error_table = []
for i in range(10):
    sample_pop_ff, data_to_process = integration_data_ff(data_to_process)
    errors = {}
    for data in data_to_process:
        errors[data["column_name"]] = data["error"]
    error_table.append(errors)

error_table = pd.DataFrame(error_table)
{% endraw %} {% raw %}
error_table
{% endraw %}

Création des colonnes attendues par OpenFisca

Calcul du ratio du revenu du capital

{% raw %}
def get_ratios():
    pote_rev_capital = (
        get_aggregate(liste_des_variables_csg, "rev_k_plvt_bareme")
        + get_aggregate(liste_des_variables_csg, "rev_k_plvt_lib")
        + get_aggregate(liste_des_variables_csg, "rev_k_plvt_forf_unik")
    )
    return {
        "ratio_revenus_capitaux_prelevement_bareme": get_aggregate(
            liste_des_variables_csg, "rev_k_plvt_bareme"
        )
        / pote_rev_capital,
        "ratio_revenus_capitaux_prelevement_liberatoire": get_aggregate(
            liste_des_variables_csg, "rev_k_plvt_lib"
        )
        / pote_rev_capital,
        "ratio_revenus_capitaux_prelevement_forfaitaire_unique_ir": get_aggregate(
            liste_des_variables_csg, "rev_k_plvt_forf_unik"
        )
        / pote_rev_capital,
    }
{% endraw %} {% raw %}
get_ratios()
sum(get_ratios().values())
{% endraw %} {% raw %}
def convert_to_openfisca(data):
    # On voudrait faire: dummy_data_csg['revenus_capital'] = dummy_data_final_rk['rk']
    revenus_capital = data["pote_rev_capital"]

    # Dans OpenFisca, on calcule rk ainsi: rk = revenus_capitaux_prelevement_bareme + revenus_capitaux_prelevement_liberatoire + revenus_capitaux_prelevement_forfaitaire_unique_ir
    ratios = get_ratios()
    data["revenus_capitaux_prelevement_bareme"] = (
        revenus_capital * ratios["ratio_revenus_capitaux_prelevement_bareme"]
    )
    data["revenus_capitaux_prelevement_liberatoire"] = (
        revenus_capital * ratios["ratio_revenus_capitaux_prelevement_liberatoire"]
    )
    data["revenus_capitaux_prelevement_forfaitaire_unique_ir"] = (
        revenus_capital
        * ratios["ratio_revenus_capitaux_prelevement_forfaitaire_unique_ir"]
    )
    # Rentes Viageres
    data["rente_viagere_titre_onereux_net"] = data["pote_rente_viagere"]
    # Revenu catégoriel foncier
    data["revenu_categoriel_foncier"] = data["pote_rev_categ_foncier"]
    # Plus-values
    data["assiette_csg_plus_values"] = data["pote_plus_values"]

    # On n'ajoute pas chomage_brut ni  retraite_brute car déjà présent dans ERFS_FPR
    #     data["chomage_brut"] = data[
    #         "pote_chomage"
    #     ]  # À vérifier : est-ce que c'est bien chomage brut qu'on calcule a partir de POTE ? OU : y etait-il deja avant, dans l'ERFS-FPR ?
    #     data["retraite_brute"] = data[
    #         "pote_retraite"
    #     ]  # À vérifier : est-ce que c'est bien retraite brute qu'on calcule a partir de POTE ?

    # Traduction des roles attribués au format openfisca
    data["quimenof"] = "enfant"
    data.loc[data["quifoy"] == 1, "quimenof"] = "conjoint"
    data.loc[data["quifoy"] == 0, "quimenof"] = "personne_de_reference"

    data["quifoyof"] = "personne_a_charge"
    data.loc[data["quifoy"] == 1, "quifoyof"] = "conjoint"
    data.loc[data["quifoy"] == 0, "quifoyof"] = "declarant_principal"

    data["quifamof"] = "enfant"
    data.loc[data["quifam"] == 1, "quifamof"] = "conjoint"
    data.loc[data["quifam"] == 0, "quifamof"] = "demandeur"
    return data
{% endraw %} {% raw %}
sample_pop_ind = convert_to_openfisca(sample_pop_ind)
sample_pop_ff = convert_to_openfisca(sample_pop_ff)
{% endraw %}

4 - Export des deux échantillons

{% raw %}
sample_pop_ind.to_csv(config.get("DATA_OUT") + "sample_pop_ind.csv", index=False)
sample_pop_ff.to_csv(config.get("DATA_OUT") + "sample_pop_ff.csv", index=False)
{% endraw %}

III - Calcul de la CSG

{% raw %}
sample_pop_ind.columns
{% endraw %}

1 - CSG du capital

{% raw %}
# On regroupe en foyers fiscaux pour calculer les assiettes
sample_pop_ff = individus_to_foyer_fiscaux(sample_pop_ind)

# Assiette du capital
to_calc = my_simu.calculate("assiette_csg_revenus_capital", annee_de_calcul)
sample_pop_ff["assiette_csg_revenus_capital"] = to_calc

#
print(
    "assiette_csg_revenus_capital: ",
    sum(sample_pop_ff["assiette_csg_revenus_capital"]),
    "€",
)
{% endraw %} {% raw %}
to_calc = my_simu.calculate("csg_revenus_capital", annee_de_calcul)
sample_pop_ff["csg_revenus_capital"] = to_calc

CSG_Capital = to_calc * sample_pop_ff["wprm"]

print(
    f"Total de CSG sur les revenus du capital, expected ~13.79 Mds€: {CSG_Capital.sum():,}"
)
print("Erreur", 100 * (CSG_Capital.sum() + 13790000000) / 13790000000, "% \n")
# Ref: p51 de https://www.securite-sociale.fr/files/live/sites/SSFR/files/medias/CCSS/2019/CCSS%20SEPT%2019%20DEF.pdf
{% endraw %}

2 - CSG sur les allocations chômage

{% raw %}
to_calc = my_simu.calculate_add("csg_deductible_chomage", annee_de_calcul)
sample_pop_ind["csg_deductible_chomage"] = to_calc

to_calc = my_simu.calculate_add("csg_imposable_chomage", annee_de_calcul)
sample_pop_ind["csg_imposable_chomage"] = to_calc

sample_pop_ind["csg_chomage"] = (
    sample_pop_ind["csg_imposable_chomage"] + sample_pop_ind["csg_imposable_chomage"]
)

CSG_chom = sample_pop_ind["csg_chomage"] * sample_pop_ind["wprm"]

print(f"CSG Chomage pondérée, expected ~1.9 Mds€ : {CSG_chom.sum():,}")
print("Erreur", 100 * (CSG_chom.sum() + 1900000000) / 1900000000, "% \n")
{% endraw %}

3 - CSG sur les pensions de retraite

{% raw %}
to_calc = my_simu.calculate_add("csg_deductible_retraite", annee_de_calcul)
sample_pop_ind["csg_deductible_retraite"] = to_calc

to_calc = my_simu.calculate_add("csg_imposable_retraite", annee_de_calcul)
sample_pop_ind["csg_imposable_retraite"] = to_calc

sample_pop_ind["csg_retraite"] = (
    sample_pop_ind["csg_imposable_retraite"] + sample_pop_ind["csg_deductible_retraite"]
)

CSG_retraite = sample_pop_ind["csg_retraite"] * sample_pop_ind["wprm"]

print(f"CSG Chomage pondérée, expected ~ 21.869 Mds€ : {CSG_retraite.sum():,}")
print("Erreur", 100 * (CSG_retraite.sum() + 21869000000) / 21869000000, "% \n")
{% endraw %}

4 - CSG sur les Pre-retraites

Vérifications de sécurité

{% raw %}
assert (
    abs(
        (
            sample_pop_ff["revenus_capitaux_prelevement_bareme"] * sample_pop_ff["wprm"]
        ).sum()
        - get_aggregate(liste_des_variables_csg, "rev_k_plvt_bareme")
    )
    / get_aggregate(liste_des_variables_csg, "rev_k_plvt_bareme")
    < 0.2
)
{% endraw %} {% raw %}
assert sample_pop_ff["wprm"].sum() > 38_000_000
{% endraw %} {% raw %}
assert sample_pop_ind["wprm"].sum() > 50_000_000
{% endraw %} {% raw %}
assert abs(sample_pop_ind["rfr"].sum() - sample_pop_ff["rfr"].sum()) < 1_000
{% endraw %}

des données finales

{% raw %}
sample_pop_ind.to_csv(config.get("DATA_OUT") + "sample_pop_ind_csg.csv", index=False)
sample_pop_ff.to_csv(config.get("DATA_OUT") + "sample_pop_ff_csg.csv", index=False)
{% endraw %}