--- title: CASD : Conversion de l'extraction SAS keywords: fastai sidebar: home_sidebar nb_path: "notebooks/extractions_base_des_impots/20_Convert_Merge_and_sort.ipynb" ---
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"
year = "2019"
year = "2018"
SAS_FILE = (
"C:/Users/Public/Documents/TRAVAIL/csg/data_in/assiette_csg_pote_"
+ year
+ ".sas7bdat"
)
OUT_PARQUET_PATH = (
"C:/Users/Public/Documents/TRAVAIL/csg/data_in/assiettes_csg" + year + ".parquet"
)
IN_PATH = "C:/Users/Public/Documents/TRAVAIL/csg/data_in/"
OUT_PATH = "C:/Users/Public/Documents/TRAVAIL/csg/data_out/"
ARROW_OUT = IN_PATH + "assiettes_pote_brutes_" + year + "-arrow/"
taille_chunk = 1_048_576
from time import time
import pandas as pd
import vaex
from tqdm import tqdm
%%time
# Temps sur CASD : moins de 3 minutes.
st = time()
taille = 1_048_576
dfi = pd.read_sas(SAS_FILE, chunksize=taille_chunk, iterator=True)
dd_values = None
dfs = []
i = 0
for chunk in tqdm(dfi):
#### DEBUG
# i+=1
# if i>5:
# break
#### DEBUG
chunk = chunk.fillna(0)
if dd_values is None:
dd_values = vaex.from_pandas(chunk, copy_index=False)
else:
dd_values = dd_values.concat(vaex.from_pandas(chunk, copy_index=False))
Voir https://www.impots.gouv.fr/portail/files/formulaires/2041-gg/2021/2041-gg_3495.pdf
Sont à déclarer toutes les sommes versées par « Pôle emploi » :
l'allocation d'aide au retour à l'emploi (ARE) ;
allocation temporaire d'attente (ATA) ;
l'allocation de solidarité spécifique (ASS) ;
l'allocation équivalent retraite (AER)...
L’allocation de retour à l’emploi formation (AREF) doit être déclarée avec les revenus d’activité cases 1AJ à 1DJ.
Sont à déclarer :
l'allocation perçue dans le cadre d'une convention de coopération du Fonds national de l'emploi (allocation spéciale FNE) ;
l'allocation de « préretraite progressive » ;
l'allocation de remplacement pour l'emploi (ARPE) ;
l'allocation mensuelle versée dans le cadre des dispositifs de cessation d'activité de certains travailleurs salariés (« CATS ») ;
l'allocation de préretraite amiante ;
l'allocation versée dans le cadre d'un dispositif de préretraite d'entreprises (« préretraite maison »).
Rémunération des membres du Gouvernement, du Conseil économique, social et environnemental et du Conseil constitutionnel.
Indemnités parlementaires (de base et de résidence), y compris pour les députés européens.
Indemnités de fonction des élus locaux en cas d'option pour le régime d'imposition des traitements et salaires.
assert dd_values.get_column_names() == [
"rnsgbd",
"rnsgld",
"revkire",
"Z1aj",
"Z1ap",
"Z1as",
"Z1bj",
"Z1bp",
"Z1bs",
"Z1cj",
"Z1cw",
"Z1dw",
"Z2ch",
"Z2dc",
"Z2dh",
"Z2tr",
"Z3ua",
"Z3vg",
"Z3vz",
"Z4ba",
"Z4bb",
"Z4bc",
"Z4bd",
"Z4be",
"Z6de",
"Z8sc",
"Z8sw",
"Z8sx",
"CICS",
"MNIMQG",
]
def sum_columns(df):
# Pour assiette de CSG
df["revenus_capitaux_prelevement_bareme"] = df.Z2ch
df["revenus_capitaux_prelevement_liberatoire"] = df.Z2dh
df["revenus_capitaux_prelevement_forfaitaire_unique_ir"] = df.Z2dc + df.Z2tr
df["rente_viagere_titre_onereux_net"] = df.Z1cw + df.Z1dw
# Micro-foncier : on applique l'abattement de 30%
df["revenu_categoriel_foncier"] = (
df.Z4ba + (df.Z4be * 0.7) - (df.Z4bb + df.Z4bc + df.Z4bd)
)
# df['rev_categ_foncier4ba']=df.Z4ba
df["assiette_csg_plus_values"] = df.Z3vg + df.Z3ua + df.Z3vz
df["assiette_csg_revenus_capital"] = (
df["revenus_capitaux_prelevement_bareme"]
+ df["revenus_capitaux_prelevement_liberatoire"]
+ df["revenus_capitaux_prelevement_forfaitaire_unique_ir"]
+ df["rente_viagere_titre_onereux_net"]
+ df["revenu_categoriel_foncier"]
+ df["assiette_csg_plus_values"]
)
# Autres
df["retraites"] = df.Z1as + df.Z1bs + df.MNIMQG
df[
"pre_retraites_etranger"
] = df.Z8sc # Attention, seulement de l'étranger, sinon c'est df.Z1ap + df.Z1bp
df["chomage_et_indemnites"] = df.Z8sw + df.Z8sx + df.Z1ap + df.Z1bp
df["rev_salaire"] = df.Z1aj + df.Z1bj + df.Z1cj
# Il faudrait aussi interets_pel_moins_12_ans_cel, mais où le trouver ?
return df
dd_values = sum_columns(dd_values)
def compute_agg(vdf):
sub_total = []
ldf = vdf.shape[0]
for col in tqdm(vdf.get_column_names()):
name = f"{col}_non_zero"
vdf.select(f"{col} > 0", name=name)
lenzero = ldf - int(vdf.count("*", selection=name))
dict_col = {
"name": col,
"nb_line": ldf,
"lenzero": lenzero,
"sum": int(vdf.sum(col)),
"mean": int(vdf.mean(col)),
"pct_zero": lenzero / ldf * 100,
}
sub_total.append(dict_col)
return pd.DataFrame(sub_total)
%%time
# Temps sur CASD pour 41 colonnes : 2min 10s
df = compute_agg(dd_values)
(579282 + 330413) / (28041960914 + 6726148004) * 100
pd.set_option("display.float_format", "{:,}".format)
# Export dans un fichier
df.to_csv(OUT_PATH + "/agregats_des_variables_csg-POTE_" + year + ".csv", index=False)
df
%%time
if year == "2019":
assert dd_values.rev_salaire.sum() == 650_855_163_531
if year == "2018":
assert dd_values.rev_salaire.sum() == 649_078_215_971
col_to_keep = [
"revkire",
"revenus_capitaux_prelevement_bareme",
"revenus_capitaux_prelevement_liberatoire",
"revenus_capitaux_prelevement_forfaitaire_unique_ir",
"rente_viagere_titre_onereux_net",
"revenu_categoriel_foncier",
"assiette_csg_plus_values",
"assiette_csg_revenus_capital",
"retraites",
"pre_retraites_etranger",
"chomage_et_indemnites",
"rev_salaire",
]
vdf = dd_values[col_to_keep]
# vdf
assert vdf.get_column_names() == [
"revkire",
"revenus_capitaux_prelevement_bareme",
"revenus_capitaux_prelevement_liberatoire",
"revenus_capitaux_prelevement_forfaitaire_unique_ir",
"rente_viagere_titre_onereux_net",
"revenu_categoriel_foncier",
"assiette_csg_plus_values",
"assiette_csg_revenus_capital",
"retraites",
"pre_retraites_etranger",
"chomage_et_indemnites",
"rev_salaire",
]
%%time
# Temps sur CASD : 1min 31s
from pathlib import Path
Path(ARROW_OUT).mkdir(parents=True, exist_ok=True)
# rfrs_sorted.export_arrow(ARROW_OUT)
vdf.export_many(
ARROW_OUT + "assiettes_pote_brutes_" + year + "_chunk-{i:02}.arrow",
chunk_size=taille_chunk,
)
del vdf
del dd_values
del chunk
%%time
vdf = vaex.open(ARROW_OUT + "assiettes_pote_brutes_" + year + "_chunk-*.arrow")
vdf.columns
%%time
# Temps sur CASD : 15s mais c'est du lazy : ce n'est pas fait.
vdf_sorted = vdf.sort(by="revkire")
%%time
# Temps sur CASD : 3min 44s
from pathlib import Path
Path(ARROW_OUT).mkdir(parents=True, exist_ok=True)
# rfrs_sorted.export_arrow(ARROW_OUT)
vdf_sorted.export_many(
ARROW_OUT + "assiettes_pote_sorted_" + year + "_chunk-{i:02}.arrow",
chunk_size=taille_chunk,
)
%%time
nbff = vdf_sorted.shape[0]
print(f"Nombre de lignes = Nombre de foyer fiscaux : {nbff:,}")
if year == "fake":
assert nbff == 39_000_000
if year == "2018":
assert nbff == 38_487_937
if year == "2019":
assert nbff == 39_264_696
# On vérifie le tri : 1 m
assert vdf_sorted[0][0] == vdf_sorted.min("revkire")
assert vdf_sorted[-1][0] == vdf_sorted.max("revkire")
assert vdf_sorted[nbff - 9][0] > vdf_sorted[nbff - 10][0]
assert vdf_sorted[nbff - 10][0] > vdf_sorted[nbff - 11][0]
del vdf_sorted
import gc
gc.collect()
# from pathlib import Path
# Path(OUT_PATH+'/assiettes_pote2019').mkdir(parents=True, exist_ok=True)
# files = dd_values.to_hdf(OUT_PATH+'/assiettes_pote2019.*.hdf', '/pote2019')
# Path(OUT_PARQUET).mkdir(parents=True, exist_ok=True)
# files = dd_values.export_parquet(OUT_PARQUET_PATH)
if year == "2019":
assert rfrs.shape[0] == 39264696
if year == "2018":
assert rfrs.shape[0] == 38487937
# input_directory = PATH+'extraction_assiettes_csg/extract-pote.*.hdf'
# rfrs = dd.read_hdf(input_directory, '/pote2019')
# rfrs.columns