--- title: Simulation de réforme CSG au niveau de l'Etat keywords: fastai sidebar: home_sidebar nb_path: "notebooks/analyses/csg_30_simu_dct.ipynb" ---
{% raw %}
{% endraw %}

ATTENTION : Ancienne version avec DCT.csv

Fonctionnement de la simulation de l'Etat sur l'IR

C'est la méthode simuledeciles de handlers/cas_types.py qui est appelée. Elle appel ensuite simpop_stream qui appel CompareOldNew dans Simulation_engine/simulate_pop_from_reform.py.

C'est ce code qui fait la simulation :

# On prend les données de la population entière
        data = DUMMY_DATA
        reform = IncomeTaxReform(TBS, dictreform, PERIOD)
        # On ne crée qu'une simulation, le moteur de calcul ayant précalculé les autres.
        simulation_reform = simulation(PERIOD, data, reform)
        # On n'envoie à simuler que le "apres"
        return compare(PERIOD, {"apres": simulation_reform}, isdecile)

IncomeTaxReform est une class qui surcharge openfisca_france.model.base.Reform d'OpenFisca pour lui transmettre la réforme.

simulation(period, data, tbs) est une fonction de Simulation_engine/simulate_pop_from_reform.py qui utilise le SimulationBuilder d'OpenFisca

def simulation(period, data, tbs):
    # Traduction des roles attribués au format openfisca
    data["quimenof"] = "enfant"
    data.loc[data["quifoy"] == 1, "quimenof"] = "conjoint"
    data.loc[data["quifoy"] == 0, "quimenof"] = "personne_de_reference"
    [...]
    sb = SimulationBuilder()
    sb.create_entities(tbs)
    sb.declare_person_entity("individu", data.index)
    # Creates openfisca entities and generates grouped
    listentities = {"foy": "foyer_fiscal", "men": "menage", "fam": "famille"}
    instances = {}
    dictionnaire_datagrouped = {"individu": data}
    for ent, ofent in listentities.items():
        persons_ent = data["id" + ent].values
        persons_ent_roles = data["qui" + ent + "of"].values
        ent_ids = data["id" + ent].unique()
        instances[ofent] = sb.declare_entity(ofent, ent_ids)
        sb.join_with_persons(instances[ofent], persons_ent, roles=persons_ent_roles)
        # The following ssumes data defined for an entity are the same for all rows in
        # the same entity. Or at least that the first non null value found for an
        # entity will always be the total value for an entity (which is the case for
        # f4ba). These checks are performed in the checkdata function defined below.
        dictionnaire_datagrouped[ofent] = (
            data.groupby("id" + ent, as_index=False).first().sort_values(by="id" + ent)
        )
    # These variables should not be attributed to any OpenFisca Entity
    columns_not_OF_variables = set(        [            ...            "quimenof",        ]    )
    simulation = sb.build(tbs)
    memory_config = MemoryConfig(
        max_memory_occupation=0.95,  # When 95% of the virtual memory is full, switch to disk storage
        priority_variables=["salary", "age"],  # Always store these variables in memory
        variables_to_drop=non_cached_variables,
    )
    simulation.memory_config = memory_config
    # Attribution des variables à la bonne entité OpenFisca
    for colonne in data.columns:
        if colonne not in columns_not_OF_variables:
            # try:
            simulation.set_input(colonne,period,dictionnaire_datagrouped[tbs.get_variable(colonne).entity.key][colonne],
            )
    return simulation, dictionnaire_datagrouped
{% raw %}
from IPython.core.interactiveshell import InteractiveShell

InteractiveShell.ast_node_interactivity = "all"
{% endraw %} {% raw %}
import pandas as pd
from leximpact_socio_fisca_simu_etat.config import Configuration
from openfisca_core import periods  # type: ignore
from openfisca_core.parameters import ParameterNode  # type: ignore
from openfisca_core.simulation_builder import SimulationBuilder
from openfisca_france import FranceTaxBenefitSystem  # type: ignore
from openfisca_france.model.base import Reform  # type: ignore

config = Configuration(project_name="leximpact-prepare-data")
{% endraw %} {% raw %}
import timeit

# from memory_profiler import profile
{% endraw %} {% raw %}
from typing import List, Optional

from pydantic import BaseModel


class CasType(BaseModel):
    revenu_activite: float
    revenu_capital: float
    revenu_remplacement: float
    revenu_retraite: float
    wprm: Optional[float]

    class Config:
        schema_extra = {
            "example": {
                "revenu_activite": 50000,
                "revenu_capital": 0,
                "revenu_remplacement": 0,
                "revenu_retraite": 0,
                "wprm": 10,
            }
        }


class TabCasType(BaseModel):
    castype: List[CasType]

    class Config:
        schema_extra = {
            "example": {
                "castype": [
                    {
                        "revenu_activite": 50000,
                        "revenu_capital": 0,
                        "revenu_remplacement": 0,
                        "revenu_retraite": 0,
                    },
                    {
                        "revenu_activite": 0,
                        "revenu_capital": 50000,
                        "revenu_remplacement": 0,
                        "revenu_retraite": 0,
                    },
                    {
                        "revenu_activite": 0,
                        "revenu_capital": 0,
                        "revenu_remplacement": 50000,
                        "revenu_retraite": 0,
                    },
                    {
                        "revenu_activite": 0,
                        "revenu_capital": 0,
                        "revenu_remplacement": 0,
                        "revenu_retraite": 50000,
                    },
                ]
            }
        }


def TabloCasTypeToSituations(tct: List[CasType]):
    return {
        "familles": {
            f"Famille {i}": {"parents": [f"Adulte {i}"], "enfants": []}
            for i in range(len(tct))
        },
        "foyers_fiscaux": {
            f"Foyer fiscal {i}": {
                "declarants": [f"Adulte {i}"],
                "personnes_a_charge": [],
                "assiette_csg_revenus_capital": {"2021": d.revenu_capital},
            }
            for i, d in enumerate(tct)
        },
        "individus": {
            f"Adulte {i}": {
                "salaire_de_base": {"2021": d.revenu_activite},
                "chomage_brut": {"2021": d.revenu_remplacement},
                "retraite_brute": {"2021": d.revenu_retraite},
            }
            for i, d in enumerate(tct)
        },
        "menages": {
            f"Menage {i}": {
                "personne_de_reference": [f"Adulte {i}"],
                "conjoint": [],
                "enfants": [],
            }
            for i, d in enumerate(tct)
        },
    }
{% endraw %} {% raw %}
!ls
{% endraw %} {% raw %}
def csv_to_list_castype(filename):
    data = pd.read_csv(filename)
    d = []
    for idfoy in set(data["idfoy"].values):
        revenu_salarie = sum(data[data["idfoy"] == idfoy]["salaire_de_base"].values)
        rev_retraite = sum(data[data["idfoy"] == idfoy]["retraite_brute"].values)
        rev_capital = sum(data[data["idfoy"] == idfoy]["f4ba"].values)
        rev_rempl = sum(data[data["idfoy"] == idfoy]["chomage_brut"].values)
        wprm = data[data["idfoy"] == idfoy]["wprm"].values[0]
        mon_cas_type = CasType(
            revenu_activite=revenu_salarie,
            revenu_capital=rev_capital,
            revenu_remplacement=rev_rempl,
            revenu_retraite=rev_retraite,
            wprm=wprm,
        )
        d += [mon_cas_type]
    # print("Nombre de foyer de cas type : ", len(d))
    return d
{% endraw %} {% raw %}
def compute_csg(tct, tax_benefit_system):
    # print('debut')
    situation = TabloCasTypeToSituations(tct.castype)
    print("\tSituation - Nombre de foyers : ", len(situation["foyers_fiscaux"]))
    print("\tSituation - Nombre d'individus' : ", len(situation["individus"]))
    simulation_builder = SimulationBuilder()
    simulation = simulation_builder.build_from_entities(tax_benefit_system, situation)
    # print('Simu ready_')
    value = simulation.calculate_add("csg", "2021")
    # population = simulation.get_variable_population('csg')
    # entity_count = simulation_builder.entity_counts[population.entity.plural]
    # print(f"Calculated : {entity_count} {value}")
    return value
{% endraw %} {% raw %}
tax_benefit_system = FranceTaxBenefitSystem()
lct = csv_to_list_castype(config.get("DCT"))
mes_csg = compute_csg(TabCasType(castype=lct), tax_benefit_system)
{% endraw %} {% raw %}
for csg in mes_csg:
    print("csg", float(csg))
{% endraw %} {% raw %}
df = pd.read_csv(config.get("DCT"))
df.columns
df["idfoy"].value_counts()
{% endraw %} {% raw %}
df["idfoy"].unique()
{% endraw %}

Montant CSG sur une population

{% raw %}
lct = csv_to_list_castype(config.get("DCT"))
mes_csg = compute_csg(TabCasType(castype=lct), tax_benefit_system)
print("lct", lct, "\n")
print("mes_csg", mes_csg, "\n")
sum(mes_csg[i] * lct[i].wprm for i in range(len(lct)))
{% endraw %}

Reforme de la CSG

{% raw %}
class ReformCSG(BaseModel):
    csg_activite_imposable_taux: float
    csg_activite_deductible_taux: float

    class Config:
        schema_extra = {
            "example": {
                "csg_activite_imposable_taux": 0.068,
                "csg_activite_deductible_taux": 0.024,
            }
        }
{% endraw %} {% raw %}
class CSGReform(Reform):
    def __init__(
        self, tbs: FranceTaxBenefitSystem, payload: ReformCSG, period: str
    ) -> None:
        self.payload = payload
        self.instant = periods.instant(period)
        self.period = periods.period("year:1900:200")
        super().__init__(tbs)

    def modifier(self, parameters: ParameterNode) -> ParameterNode:
        # openfisca-france/openfisca_france/parameters/prelevements_sociaux/contributions_sociales/csg/
        parameters.prelevements_sociaux.contributions_sociales.csg.activite.imposable.taux.update(
            period=self.period, value=self.payload.csg_activite_imposable_taux
        )
        parameters.prelevements_sociaux.contributions_sociales.csg.activite.deductible.taux.update(
            period=self.period, value=self.payload.csg_activite_deductible_taux
        )
        return parameters

    def apply(self) -> None:
        self.modify_parameters(modifier_function=self.modifier)


def compute_reform(reform: ReformCSG):
    """
    :reform: OpenFisca parameters to change.
    """

    # Monitor CPU and RAM
    # timeit.timeit('"-".join(str(n) for n in range(100))', number=10000)
    debut = timeit.default_timer()
    tax_benefit_system = CSGReform(FranceTaxBenefitSystem(), reform, "2020")
    print(
        f"Temps de création de la réforme : {timeit.default_timer() - debut} secondes"
    )
    debut_load_csv = timeit.default_timer()
    lct = csv_to_list_castype(config.get("DCT"))

    for i in range(10):  # 10 -> 6 144 foyers
        lct += lct
    print(
        f"Temps de création des faux castypes : {timeit.default_timer() - debut_load_csv} secondes"
    )
    print(
        f"Temps de traitement avant calcul : {timeit.default_timer() - debut} secondes"
    )
    debut_compute = timeit.default_timer()
    mes_csg = compute_csg(TabCasType(castype=lct), tax_benefit_system)
    print(
        f"Temps de calcul sur la population : {timeit.default_timer() - debut_compute} secondes"
    )
    montant_total = sum(mes_csg[i] * lct[i].wprm for i in range(len(lct)))
    print(
        f"Temps de traitement total pour {len(lct)} foyers : {timeit.default_timer() - debut} secondes"
    )
    # print("lct", lct[0])
    # print("mes_csg", mes_csg)
    return montant_total
{% endraw %} {% raw %}
reform = ReformCSG(
    csg_activite_imposable_taux=0.068, csg_activite_deductible_taux=0.024
)
resultat = compute_reform(reform)
print(f"Montant de la CSG pour {reform}: {resultat:,}")
{% endraw %} {% raw %}
reform = ReformCSG(csg_activite_imposable_taux=0.0, csg_activite_deductible_taux=0.0)
resultat = compute_reform(reform)
print(f"Montant de la CSG pour {reform}: {resultat:,}")
{% endraw %} {% raw %}
ParameterNode
{% endraw %} {% raw %}
import inspect


def get_user_attributes(cls):
    boring = dir(type("dummy", (object,), {}))
    return [item for item in inspect.getmembers(cls) if item[0] not in boring]
{% endraw %} {% raw %}
get_user_attributes(ParameterNode)
{% endraw %} {% raw %}
!pip list | grep Open
{% endraw %}