diff --git a/leximpact_prepare_data/pipeline_survey_scenario.py b/leximpact_prepare_data/pipeline_survey_scenario.py index 3e657cecca15ddf3f449dbacba92bba6a45fdaf7..55c258dc145c691117d8f669dd78ede17f7ba5c9 100644 --- a/leximpact_prepare_data/pipeline_survey_scenario.py +++ b/leximpact_prepare_data/pipeline_survey_scenario.py @@ -9,7 +9,7 @@ from openfisca_core.taxbenefitsystems import TaxBenefitSystem from openfisca_survey_manager.input_dataframe_generator import set_table_in_survey -from openfisca_survey_manager import default_config_files_directory +from openfisca_survey_manager.paths import default_config_files_directory from leximpact_prepare_data.scenario_tools.helpers_survey_scenario import get_copules diff --git a/leximpact_prepare_data/run_pipeline.py b/leximpact_prepare_data/run_pipeline.py index b374e86909279fec532e12b12aaa1be0493a5b31..7a714f985d0033ccb14ad6dc3b1a2120fd6cd83c 100644 --- a/leximpact_prepare_data/run_pipeline.py +++ b/leximpact_prepare_data/run_pipeline.py @@ -5,6 +5,8 @@ from leximpact_prepare_data.pipeline_tax_and_benefit_system import pipeline_tbs import os import click +import pandas as pd +from openfisca_survey_manager.input_dataframe_generator import set_table_in_survey config = Configuration(project_folder="leximpact-prepare-data") aggregates_path = config.get("AGREGATS_PATH") @@ -18,7 +20,7 @@ @click.option( "-erfs", "--annee_erfs", - default=2019, + default=2021, help="ERFS-FPR year", show_default=True, type=int, @@ -29,7 +31,7 @@ @click.option( "-calcul", "--annee_de_calcul", - default=2022, + default=2025, help="POTE year", show_default=True, type=int, @@ -37,22 +39,56 @@ def run_pipeline(annee_erfs, annee_pote, annee_de_calcul): log.debug("Create FranceTaxBenefitSystem") - annee_pote = str(annee_pote) # Instanciation du scenario pipeline_survey_scenario = PipelineErfsSurveyScenario( config_files_directory=config_files_directory, annee_donnees=annee_erfs, - period=annee_de_calcul, + period=annee_pote, baseline_tax_benefit_system=pipeline_tbs, collection="openfisca_erfs_fpr", survey_name=f"openfisca_erfs_fpr_{annee_erfs}", ) - pipeline_survey_scenario.build_imputation(year=annee_pote) + pipeline_survey_scenario.build_imputation(year=str(annee_pote)) pipeline_survey_scenario.save_current_survey( variables=pipeline_survey_scenario.used_as_input_variables, collection="leximpact", - survey_name=f"leximpact_{annee_de_calcul}", + survey_name=f"leximpact_{annee_pote}", period=annee_de_calcul, ) + + del pipeline_survey_scenario + + survey_scenario = PipelineErfsSurveyScenario( + period=annee_de_calcul, + annee_donnees=annee_pote, + collection="leximpact", + survey_name=f"leximpact_{annee_pote}", + config_files_directory=config_files_directory, + baseline_tax_benefit_system=pipeline_tbs, + ) + for year in [annee_de_calcul, annee_de_calcul - 1, annee_de_calcul - 2]: + variables = survey_scenario.used_as_input_variables + if year in [annee_de_calcul - 1, annee_de_calcul - 2]: + variables = variables + ["rfr"] + data_frame_by_entity = survey_scenario.simulations[ + "baseline" + ].create_data_frame_by_entity(variables=variables, period=year, index=True) + + collection = "leximpact" + survey_name = f"leximpact_{annee_de_calcul}" + for entity, input_dataframe in data_frame_by_entity.items(): + assert isinstance(input_dataframe, pd.DataFrame) + print( + f"set_table_in_survey of {entity} for {year} in {collection}.{survey_name}" + ) + set_table_in_survey( + input_dataframe, + entity, + period=year, + collection=collection, + survey_name=survey_name, + config_files_directory=survey_scenario.data["config_files_directory"], + source_format="parquet", + )