--- title: Simulation de réforme CSG au niveau de l'Etat keywords: fastai sidebar: home_sidebar nb_path: "notebooks/analyses/csg_30_simu_dct.ipynb" ---
C'est la méthode simuledeciles de handlers/cas_types.py qui est appelée.
Elle appel ensuite simpop_stream qui appel CompareOldNew dans Simulation_engine/simulate_pop_from_reform.py.
C'est ce code qui fait la simulation :
# On prend les données de la population entière
data = DUMMY_DATA
reform = IncomeTaxReform(TBS, dictreform, PERIOD)
# On ne crée qu'une simulation, le moteur de calcul ayant précalculé les autres.
simulation_reform = simulation(PERIOD, data, reform)
# On n'envoie à simuler que le "apres"
return compare(PERIOD, {"apres": simulation_reform}, isdecile)
IncomeTaxReform est une class qui surcharge openfisca_france.model.base.Reform d'OpenFisca pour lui transmettre la réforme.
simulation(period, data, tbs) est une fonction de Simulation_engine/simulate_pop_from_reform.py qui utilise le SimulationBuilder d'OpenFisca
def simulation(period, data, tbs):
# Traduction des roles attribués au format openfisca
data["quimenof"] = "enfant"
data.loc[data["quifoy"] == 1, "quimenof"] = "conjoint"
data.loc[data["quifoy"] == 0, "quimenof"] = "personne_de_reference"
[...]
sb = SimulationBuilder()
sb.create_entities(tbs)
sb.declare_person_entity("individu", data.index)
# Creates openfisca entities and generates grouped
listentities = {"foy": "foyer_fiscal", "men": "menage", "fam": "famille"}
instances = {}
dictionnaire_datagrouped = {"individu": data}
for ent, ofent in listentities.items():
persons_ent = data["id" + ent].values
persons_ent_roles = data["qui" + ent + "of"].values
ent_ids = data["id" + ent].unique()
instances[ofent] = sb.declare_entity(ofent, ent_ids)
sb.join_with_persons(instances[ofent], persons_ent, roles=persons_ent_roles)
# The following ssumes data defined for an entity are the same for all rows in
# the same entity. Or at least that the first non null value found for an
# entity will always be the total value for an entity (which is the case for
# f4ba). These checks are performed in the checkdata function defined below.
dictionnaire_datagrouped[ofent] = (
data.groupby("id" + ent, as_index=False).first().sort_values(by="id" + ent)
)
# These variables should not be attributed to any OpenFisca Entity
columns_not_OF_variables = set( [ ... "quimenof", ] )
simulation = sb.build(tbs)
memory_config = MemoryConfig(
max_memory_occupation=0.95, # When 95% of the virtual memory is full, switch to disk storage
priority_variables=["salary", "age"], # Always store these variables in memory
variables_to_drop=non_cached_variables,
)
simulation.memory_config = memory_config
# Attribution des variables à la bonne entité OpenFisca
for colonne in data.columns:
if colonne not in columns_not_OF_variables:
# try:
simulation.set_input(colonne,period,dictionnaire_datagrouped[tbs.get_variable(colonne).entity.key][colonne],
)
return simulation, dictionnaire_datagrouped
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"
import pandas as pd
from leximpact_socio_fisca_simu_etat.config import Configuration
from openfisca_core import periods # type: ignore
from openfisca_core.parameters import ParameterNode # type: ignore
from openfisca_core.simulation_builder import SimulationBuilder
from openfisca_france import FranceTaxBenefitSystem # type: ignore
from openfisca_france.model.base import Reform # type: ignore
config = Configuration(project_name="leximpact-prepare-data")
import timeit
# from memory_profiler import profile
from typing import List, Optional
from pydantic import BaseModel
class CasType(BaseModel):
revenu_activite: float
revenu_capital: float
revenu_remplacement: float
revenu_retraite: float
wprm: Optional[float]
class Config:
schema_extra = {
"example": {
"revenu_activite": 50000,
"revenu_capital": 0,
"revenu_remplacement": 0,
"revenu_retraite": 0,
"wprm": 10,
}
}
class TabCasType(BaseModel):
castype: List[CasType]
class Config:
schema_extra = {
"example": {
"castype": [
{
"revenu_activite": 50000,
"revenu_capital": 0,
"revenu_remplacement": 0,
"revenu_retraite": 0,
},
{
"revenu_activite": 0,
"revenu_capital": 50000,
"revenu_remplacement": 0,
"revenu_retraite": 0,
},
{
"revenu_activite": 0,
"revenu_capital": 0,
"revenu_remplacement": 50000,
"revenu_retraite": 0,
},
{
"revenu_activite": 0,
"revenu_capital": 0,
"revenu_remplacement": 0,
"revenu_retraite": 50000,
},
]
}
}
def TabloCasTypeToSituations(tct: List[CasType]):
return {
"familles": {
f"Famille {i}": {"parents": [f"Adulte {i}"], "enfants": []}
for i in range(len(tct))
},
"foyers_fiscaux": {
f"Foyer fiscal {i}": {
"declarants": [f"Adulte {i}"],
"personnes_a_charge": [],
"assiette_csg_revenus_capital": {"2021": d.revenu_capital},
}
for i, d in enumerate(tct)
},
"individus": {
f"Adulte {i}": {
"salaire_de_base": {"2021": d.revenu_activite},
"chomage_brut": {"2021": d.revenu_remplacement},
"retraite_brute": {"2021": d.revenu_retraite},
}
for i, d in enumerate(tct)
},
"menages": {
f"Menage {i}": {
"personne_de_reference": [f"Adulte {i}"],
"conjoint": [],
"enfants": [],
}
for i, d in enumerate(tct)
},
}
!ls
def csv_to_list_castype(filename):
data = pd.read_csv(filename)
d = []
for idfoy in set(data["idfoy"].values):
revenu_salarie = sum(data[data["idfoy"] == idfoy]["salaire_de_base"].values)
rev_retraite = sum(data[data["idfoy"] == idfoy]["retraite_brute"].values)
rev_capital = sum(data[data["idfoy"] == idfoy]["f4ba"].values)
rev_rempl = sum(data[data["idfoy"] == idfoy]["chomage_brut"].values)
wprm = data[data["idfoy"] == idfoy]["wprm"].values[0]
mon_cas_type = CasType(
revenu_activite=revenu_salarie,
revenu_capital=rev_capital,
revenu_remplacement=rev_rempl,
revenu_retraite=rev_retraite,
wprm=wprm,
)
d += [mon_cas_type]
# print("Nombre de foyer de cas type : ", len(d))
return d
def compute_csg(tct, tax_benefit_system):
# print('debut')
situation = TabloCasTypeToSituations(tct.castype)
print("\tSituation - Nombre de foyers : ", len(situation["foyers_fiscaux"]))
print("\tSituation - Nombre d'individus' : ", len(situation["individus"]))
simulation_builder = SimulationBuilder()
simulation = simulation_builder.build_from_entities(tax_benefit_system, situation)
# print('Simu ready_')
value = simulation.calculate_add("csg", "2021")
# population = simulation.get_variable_population('csg')
# entity_count = simulation_builder.entity_counts[population.entity.plural]
# print(f"Calculated : {entity_count} {value}")
return value
tax_benefit_system = FranceTaxBenefitSystem()
lct = csv_to_list_castype(config.get("DCT"))
mes_csg = compute_csg(TabCasType(castype=lct), tax_benefit_system)
for csg in mes_csg:
print("csg", float(csg))
df = pd.read_csv(config.get("DCT"))
df.columns
df["idfoy"].value_counts()
df["idfoy"].unique()
lct = csv_to_list_castype(config.get("DCT"))
mes_csg = compute_csg(TabCasType(castype=lct), tax_benefit_system)
print("lct", lct, "\n")
print("mes_csg", mes_csg, "\n")
sum(mes_csg[i] * lct[i].wprm for i in range(len(lct)))
class ReformCSG(BaseModel):
csg_activite_imposable_taux: float
csg_activite_deductible_taux: float
class Config:
schema_extra = {
"example": {
"csg_activite_imposable_taux": 0.068,
"csg_activite_deductible_taux": 0.024,
}
}
class CSGReform(Reform):
def __init__(
self, tbs: FranceTaxBenefitSystem, payload: ReformCSG, period: str
) -> None:
self.payload = payload
self.instant = periods.instant(period)
self.period = periods.period("year:1900:200")
super().__init__(tbs)
def modifier(self, parameters: ParameterNode) -> ParameterNode:
# openfisca-france/openfisca_france/parameters/prelevements_sociaux/contributions_sociales/csg/
parameters.prelevements_sociaux.contributions_sociales.csg.activite.imposable.taux.update(
period=self.period, value=self.payload.csg_activite_imposable_taux
)
parameters.prelevements_sociaux.contributions_sociales.csg.activite.deductible.taux.update(
period=self.period, value=self.payload.csg_activite_deductible_taux
)
return parameters
def apply(self) -> None:
self.modify_parameters(modifier_function=self.modifier)
def compute_reform(reform: ReformCSG):
"""
:reform: OpenFisca parameters to change.
"""
# Monitor CPU and RAM
# timeit.timeit('"-".join(str(n) for n in range(100))', number=10000)
debut = timeit.default_timer()
tax_benefit_system = CSGReform(FranceTaxBenefitSystem(), reform, "2020")
print(
f"Temps de création de la réforme : {timeit.default_timer() - debut} secondes"
)
debut_load_csv = timeit.default_timer()
lct = csv_to_list_castype(config.get("DCT"))
for i in range(10): # 10 -> 6 144 foyers
lct += lct
print(
f"Temps de création des faux castypes : {timeit.default_timer() - debut_load_csv} secondes"
)
print(
f"Temps de traitement avant calcul : {timeit.default_timer() - debut} secondes"
)
debut_compute = timeit.default_timer()
mes_csg = compute_csg(TabCasType(castype=lct), tax_benefit_system)
print(
f"Temps de calcul sur la population : {timeit.default_timer() - debut_compute} secondes"
)
montant_total = sum(mes_csg[i] * lct[i].wprm for i in range(len(lct)))
print(
f"Temps de traitement total pour {len(lct)} foyers : {timeit.default_timer() - debut} secondes"
)
# print("lct", lct[0])
# print("mes_csg", mes_csg)
return montant_total
reform = ReformCSG(
csg_activite_imposable_taux=0.068, csg_activite_deductible_taux=0.024
)
resultat = compute_reform(reform)
print(f"Montant de la CSG pour {reform}: {resultat:,}")
reform = ReformCSG(csg_activite_imposable_taux=0.0, csg_activite_deductible_taux=0.0)
resultat = compute_reform(reform)
print(f"Montant de la CSG pour {reform}: {resultat:,}")
ParameterNode
import inspect
def get_user_attributes(cls):
boring = dir(type("dummy", (object,), {}))
return [item for item in inspect.getmembers(cls) if item[0] not in boring]
get_user_attributes(ParameterNode)
!pip list | grep Open